# Bulk Operations

In this example, we will look at bulk operations using simple saleforce

## Imports

In [None]:
import pandas as pd
from simple_salesforce import Salesforce
import os
import io
from dotenv import load_dotenv
import ipywidgets as widgets
from IPython.display import display

load_dotenv()


## Authenticate

Always store your credentials in environment variables, and always use a service account.

The `simple-salesforce` library supports multiple authentication methods:

- Username, password, and security token  
- Session ID and instance URL  
- OAuth 2.0 (JWT, web flow, or refresh token)  
- Connected App credentials (via external libraries) 

In [160]:

sf_domain = os.getenv('sf_domain')
sf_username = os.getenv('sf_username')
sf_password = os.getenv('sf_password')
sf_token = os.getenv('sf_token')


sf = Salesforce(
    username=sf_username,
    password=sf_password,
    security_token=sf_token,
    domain=sf_domain
   
)

## Prepare Data

### Load Data

In [194]:
df = pd.read_csv('../datasets/parts.csv')
df.sample(5)

Unnamed: 0,Part Number,Manufacturer,Description,External Id
227,4326302,TBA,STRAP,19548
6067,66-12687-2,SPIRIT,PIN,25388
4380,113W3002-15,Boeing,SHIM,23701
5245,01652-009,AIRBUS SAS,TANK WASTE,24566
2692,116W2126-1,None Specified,KEY,22013


###  Clean Data

In [195]:
#drop duplicate part numbers
df = df.drop_duplicates(subset='Part Number')

In [196]:
#rename columns to match Salesforce field names
df = df.rename(columns= {
    'Part Number': 'Name',
    'Manufacturer': 'Manufacturer_Name__c',
    'Description': 'inscor__Keyword__c',
    'External Id': 'BD_ExternalId__c'
})
df.sample(3)

Unnamed: 0,Name,Manufacturer_Name__c,inscor__Keyword__c,BD_ExternalId__c
6846,3101768-1,None Specified,GASKET,26167
1640,8034-205,TBA,SEAL,20961
1992,451W4110-36,TBA,RAIL AY,21313


## Bulk API Operations

### Common Functions


In [164]:
def get_bulk2_results(result):
    combined_failed = pd.DataFrame()
    combined_success = pd.DataFrame()
    
    for job in result:
        job_id = job['job_id']
        failed = sf.bulk2.Product2.get_failed_records(job_id)
        success = sf.bulk2.Product2.get_successful_records(job_id)
        
        #since the results are returned as CSV strings, we need to convert them to DataFrames
        success = pd.read_csv(io.StringIO(success))
        failed = pd.read_csv(io.StringIO(failed))
        
        failed['job_id'] = job_id
        success['job_id'] = job_id
        
        combined_failed = pd.concat([combined_failed, failed], ignore_index=True)
        combined_success = pd.concat([combined_success, success], ignore_index=True)
        
        
        return combined_success, combined_failed

### Insert

In [165]:
records=df.to_dict(orient='records')
result = sf.bulk2.Product2.insert(records=records, concurrency=100)
print(result)

#check the results
success, fail = get_bulk2_results(result)
print(f"Total records inserted: {len(success)}")
display(success.head(3))
print(f"Total records failed: {len(fail)}")
display(fail.head(3))

[{'numberRecordsFailed': 0, 'numberRecordsProcessed': 7069, 'numberRecordsTotal': 7069, 'job_id': '750ep000003EIL7AAO'}]
Total records inserted: 7069


Unnamed: 0,sf__Id,sf__Created,BD_ExternalId__c,Manufacturer_Name__c,Name,inscor__Keyword__c,job_id
0,01tep000002Neq1AAC,True,19321.0,None Specified,0FR1100A01G02,SURROUND,750ep000003EIL7AAO
1,01tep000002Neq2AAC,True,19322.0,Jamco America,0FR1100A03G01,SURROUND,750ep000003EIL7AAO
2,01tep000002Neq3AAC,True,19323.0,Jamco America,0FR1100A07G05,SURROUND,750ep000003EIL7AAO


Total records failed: 0


Unnamed: 0,sf__Id,sf__Error,BD_ExternalId__c,Manufacturer_Name__c,Name,inscor__Keyword__c,job_id


### Upsert

In [197]:
#update the dateset so TBA manufacturers are set to Boeing
df['Manufacturer_Name__c'] = df['Manufacturer_Name__c'].replace('TBA', 'Boeing')
#or use the datframe replace method loc
#df.loc[df['Manufacturer_Name__c'] == 'TBA', 'Manufacturer_Name__c'] = 'Boeing'
#add a new row to the dataset in a similar format to the existing rows
new_rows = pd.DataFrame([
    {'Name': 'ABC123', 'Manufacturer_Name__c': 'JAMCO', 'inscor__Keyword__c': 'WIDGET', 'BD_ExternalId__c': 99999},
    {'Name': 'XYZ789', 'Manufacturer_Name__c': 'JAMCO', 'inscor__Keyword__c': 'GADGET', 'BD_ExternalId__c': 88888}
])

df = pd.concat([df, new_rows], ignore_index=True)

In [198]:
#convert the DataFrame to a list of dictionaries
records = df.to_dict(orient='records')
result = sf.bulk2.Product2.upsert(records=records, external_id_field='BD_ExternalId__c')
print(result)

success, fail = get_bulk2_results(result)
print(f"Total records inserted: {len(success)}")
display(success.head(3))
print(f"Total records failed: {len(fail)}")
display(fail.head(3))

#only for update demonstration next, save sucess results
success.to_csv('../datasets/success_parts.csv', index=False)

[{'numberRecordsFailed': 0, 'numberRecordsProcessed': 7071, 'numberRecordsTotal': 7071, 'job_id': '750ep000003EJyjAAG'}]
Total records inserted: 7071


Unnamed: 0,sf__Id,sf__Created,BD_ExternalId__c,Manufacturer_Name__c,Name,inscor__Keyword__c,job_id
0,01tep000002Neq1AAC,False,19321.0,None Specified,0FR1100A01G02,SURROUND,750ep000003EJyjAAG
1,01tep000002Neq2AAC,False,19322.0,Jamco America,0FR1100A03G01,SURROUND,750ep000003EJyjAAG
2,01tep000002Neq3AAC,False,19323.0,Jamco America,0FR1100A07G05,SURROUND,750ep000003EJyjAAG


Total records failed: 0


Unnamed: 0,sf__Id,sf__Error,BD_ExternalId__c,Manufacturer_Name__c,Name,inscor__Keyword__c,job_id


# Update

In [168]:
#load and inspect the data
df = pd.read_csv('../datasets/success_parts.csv')
display(df.head(1))
print(f"Total records in frame: {len(df)}")


Unnamed: 0,sf__Id,sf__Created,BD_ExternalId__c,Manufacturer_Name__c,Name,inscor__Keyword__c,job_id
0,01tep000002Neq1AAC,False,19321.0,None Specified,0FR1100A01G02,SURROUND,750ep000003EIWPAA4


Total records in frame: 7071


In [169]:
#Let's clean up dataset and only get the data we need
df = df[df['Manufacturer_Name__c'] == 'None Specified'][['sf__Id', 'Manufacturer_Name__c']]
df = df.rename(
   columns = {'sf__Id': 'Id'}
)
display(df.head(1))
print(f"Total records in frame: {len(df)}")

#update all manufacturer non speicified to Salesforce Aviation
df['Manufacturer_Name__c'] = 'Salesforce Aviation'
display(df.head(1))

Unnamed: 0,Id,Manufacturer_Name__c
0,01tep000002Neq1AAC,None Specified


Total records in frame: 1405


Unnamed: 0,Id,Manufacturer_Name__c
0,01tep000002Neq1AAC,Salesforce Aviation


In [None]:
#convert the DataFrame to a list of dictionaries
#update the records based on the salesforce Id
records = df.to_dict(orient='records')
result = sf.bulk2.Product2.update(records=records)
print(result)

success, fail = get_bulk2_results(result)
print(f"Total records inserted: {len(success)}")
display(success.head(3))
print(f"Total records failed: {len(fail)}")
display(fail.head(3))

[{'numberRecordsFailed': 0, 'numberRecordsProcessed': 1405, 'numberRecordsTotal': 1405, 'job_id': '750ep000003EIZdAAO'}]
Total records inserted: 1405


Unnamed: 0,sf__Id,sf__Created,Manufacturer_Name__c,Id,job_id
0,01tep000002Neq1AAC,False,Salesforce Aviation,01tep000002Neq1AAC,750ep000003EIZdAAO
1,01tep000002NeqXAAS,False,Salesforce Aviation,01tep000002NeqXAAS,750ep000003EIZdAAO
2,01tep000002NeqZAAS,False,Salesforce Aviation,01tep000002NeqZAAS,750ep000003EIZdAAO


Total records failed: 0


Unnamed: 0,sf__Id,sf__Error,Manufacturer_Name__c,Id,job_id


## Get Data


### Download

In [187]:
soql_query = """
SELECT Name, Manufacturer_Name__c, inscor__Keyword__c, BD_ExternalId__c
FROM Product2
WHERE Manufacturer_Name__c = 'Salesforce Aviation'
"""
#there is another option to download this into a memory object, but this is easier.
sf.bulk2.Account.download(
    soql_query, path='../datasets', max_records=200000,
)


[{'locator': '',
  'number_of_records': 1405,
  'file': 'c:\\Users\\JackMcHugh\\Documents\\BuckeyeDreaminDemo\\datasets\\tmp5bni7sqq.csv'}]

### Query

In [None]:
#This has a bug as well going to be working on a PR, but for now we can use the download method

results = sf.bulk2.Account.query(
    soql_query, max_records=50000, column_delimiter="COMM", line_ending="LF"
)
for i, data in enumerate(results):
    with open(f"../datasets/query-{1}.csv", "w") as bos:
        bos.write(data)
        
df_result = pd.read_csv(f"../datasets/query-1.csv")

## Delete & Hard Delete

In [199]:
success = pd.read_csv('../datasets/success_parts.csv')
success = success.rename(columns={'sf__Id': 'Id'})
csv_path = '../datasets/delete_ids.csv'
success[['Id']].to_csv(csv_path, index=False)

button = widgets.Button(description='Confirm Hard Delete', button_style='danger')

def on_button_click(b):
    print('Running hard delete...')
    #result = sf.bulk2.Product2.hard_delete(csv_file=csv_path) for normal delete you can simply change the method to delete
    result = sf.bulk2.Product2.hard_delete(csv_file=csv_path)
    print('Done:', result)

button.on_click(on_button_click)
display(button)

#Write to CSV in ../dataset — required because simple-salesforce hard_delete asserts csv_file is not None
#Even though records= is accepted, it's not respected internally though a downstream assertion that csv_file is not None for delete operations.
#I'm working on a PR to fix this bug in the simple-salesforce repo
#If you are doing hard delete you will to enable that permission in your profile, do not be silly with that permission

Button(button_style='danger', description='Confirm Hard Delete', style=ButtonStyle())

Running hard delete...


show dynamic get attr
show wait function