In [None]:
#!pip install simple_salesforce
#!pip install pymysql
#!pip install sqlalchemy

In [1]:
import json
from sqlalchemy import create_engine
import datetime as dt
from datetime import date
import pandas as pd
import pymysql
pymysql.install_as_MySQLdb()

In [2]:
# Make sure to use your own `config.py` file. Consider ensuring that these variable names are in sync
from config import sf_username, sf_password, sf_security_token
from config import remote_db_endpoint, remote_db_port
from config import remote_db_name, remote_db_user, remote_db_pwd

In [3]:
from simple_salesforce import Salesforce
sf = Salesforce(username=sf_username, password=sf_password, security_token=sf_security_token)

In [4]:
engine = create_engine(f"mysql://{remote_db_user}:{remote_db_pwd}@{remote_db_endpoint}:{remote_db_port}/{remote_db_name}")
conn = engine.connect()

## Prepare ETL for the Staff data

In [5]:
staff_data_df = pd.read_sql("SELECT * FROM staff", conn)
staff_data_df.head()

Unnamed: 0,ID_Staff,EmployeeID,LastName,FirstName,MiddleName,BirthDate
0,1,184220,Luongo,Darick,Nico,1995-07-05
1,2,130109,Sanford,Gemini,Blair,1992-04-22
2,3,160655,Williams,Dartanion,De Angelo,1993-05-21
3,4,159108,Rodney,Heather,Nicole,1994-06-07


In [6]:
staff_data_df.dtypes

ID_Staff       int64
EmployeeID    object
LastName      object
FirstName     object
MiddleName    object
BirthDate     object
dtype: object

In [7]:
# staff_data_df
staff_data_df['BirthDate'] = pd.to_datetime(staff_data_df['BirthDate']).dt.date
staff_data_df.head()

Unnamed: 0,ID_Staff,EmployeeID,LastName,FirstName,MiddleName,BirthDate
0,1,184220,Luongo,Darick,Nico,1995-07-05
1,2,130109,Sanford,Gemini,Blair,1992-04-22
2,3,160655,Williams,Dartanion,De Angelo,1993-05-21
3,4,159108,Rodney,Heather,Nicole,1994-06-07


In [8]:
staff_data_df.dtypes

ID_Staff       int64
EmployeeID    object
LastName      object
FirstName     object
MiddleName    object
BirthDate     object
dtype: object

In [9]:
staff_data_df.rename(columns={
    'EmployeeID':'Employee_ID__c',
    'FirstName':'First_Name__c',
    'LastName':'Last_Name__c',
    'MiddleName':'Middle_Name__c',
    'BirthDate':'Birth_Date__c'    
}, inplace=True)
staff_data_df.head()

Unnamed: 0,ID_Staff,Employee_ID__c,Last_Name__c,First_Name__c,Middle_Name__c,Birth_Date__c
0,1,184220,Luongo,Darick,Nico,1995-07-05
1,2,130109,Sanford,Gemini,Blair,1992-04-22
2,3,160655,Williams,Dartanion,De Angelo,1993-05-21
3,4,159108,Rodney,Heather,Nicole,1994-06-07


In [10]:
staff_data_df = staff_data_df[['ID_Staff', 'Employee_ID__c', 'Last_Name__c', 'First_Name__c', 'Middle_Name__c', 'Birth_Date__c']]
staff_data_df.head()

Unnamed: 0,ID_Staff,Employee_ID__c,Last_Name__c,First_Name__c,Middle_Name__c,Birth_Date__c
0,1,184220,Luongo,Darick,Nico,1995-07-05
1,2,130109,Sanford,Gemini,Blair,1992-04-22
2,3,160655,Williams,Dartanion,De Angelo,1993-05-21
3,4,159108,Rodney,Heather,Nicole,1994-06-07


In [11]:
staff_data_records = staff_data_df.to_dict('records')
staff_data_records

[{'ID_Staff': 1,
  'Employee_ID__c': '000184220',
  'Last_Name__c': 'Luongo',
  'First_Name__c': 'Darick',
  'Middle_Name__c': 'Nico',
  'Birth_Date__c': datetime.date(1995, 7, 5)},
 {'ID_Staff': 2,
  'Employee_ID__c': '000130109',
  'Last_Name__c': 'Sanford',
  'First_Name__c': 'Gemini',
  'Middle_Name__c': 'Blair',
  'Birth_Date__c': datetime.date(1992, 4, 22)},
 {'ID_Staff': 3,
  'Employee_ID__c': '000160655',
  'Last_Name__c': 'Williams',
  'First_Name__c': 'Dartanion',
  'Middle_Name__c': 'De Angelo',
  'Birth_Date__c': datetime.date(1993, 5, 21)},
 {'ID_Staff': 4,
  'Employee_ID__c': '000159108',
  'Last_Name__c': 'Rodney',
  'First_Name__c': 'Heather',
  'Middle_Name__c': 'Nicole',
  'Birth_Date__c': datetime.date(1994, 6, 7)}]

## Insert `Staff` Records into Salesforce

In [12]:
for rec in staff_data_records:

    record = {
        'Employee_ID__c': rec['Employee_ID__c'],
        'Last_Name__c': rec['Last_Name__c'],
        'First_Name__c': rec['First_Name__c'],
        'Middle_Name__c': rec['Middle_Name__c'],
        'Birth_Date__c': rec['Birth_Date__c']
    }
    
    try:
        sf.Staff__c.create(record)
    except Exception as e:
        print(e)

Object of type date is not JSON serializable
Object of type date is not JSON serializable
Object of type date is not JSON serializable
Object of type date is not JSON serializable


In [None]:
# Bulk 
#sf.bulk.Staff__c.insert(staff_data_records)

## Create Staff Lookup Table
You will use this later to crosswalk the course code with the primary key from the `Course` table

It is important to note that we will be querying **Salesforce** to retrieve the record IDs 

In [None]:
course_lookup_list = []

# The `Name` column in the primary key in Salesforce objects
# The Salesforce query language is called SOQL 
data = sf.query_all_iter("SELECT Course_Code__c, Name FROM Course__c")
for row in data:
    rec = {
        'ID_Course__c': row['Name'], # this is a critical line of code
        'Course_Code__c': row['Course_Code__c']
    }
    course_lookup_list.append(rec)
    
course_lookup_list

In [None]:
course_lookup_df = pd.DataFrame(course_lookup_list)
course_lookup_df

In [None]:
# Query the `Class` table from MySQL
query = '''
    SELECT 
        co.CourseCode,
        cl.*
    FROM 
        class cl
        INNER JOIN course co
        ON cl.ID_Course = co.ID_Course

'''

class_data_df = pd.read_sql(query, conn)
class_data_df.head()

In [None]:
class_data_df.rename(columns={
    'CourseCode':'Course_Code__c',
    'Section':'Section__c',
    'StartDate':'Start_Date__c',
    'EndDate':'End_Date__c',
    'CourseCode':'Course_Code__c'
}, inplace=True)


class_data_df = class_data_df[['Course_Code__c', 'Section__c', 'Start_Date__c', 'End_Date__c']]
class_data_df

## Join the Staff Assignment DataFrame with the Staff lookup table
This join is necessary to successfully lookup the foreign key for the Course table 

In [None]:
class_data_df = pd.merge(class_data_df, course_lookup_df, how='left')
#class_data_df.drop(columns = ['ID_Class','ID_Course','CourseName','CreditHours','BootCampCourse','Course_Code__c'], inplace=True)

class_data_df.head()

In [None]:
class_data_df
class_data_df['Start_Date__c'] = pd.to_datetime(class_data_df['Start_Date__c']).dt.date
class_data_df['End_Date__c'] = pd.to_datetime(class_data_df['End_Date__c']).dt.date

class_data_df.head()

In [None]:
class_data_records = class_data_df.to_dict(orient='records')
class_data_records

## Insert `Staff Assignment` Records into Salesforce

In [None]:
for rec in class_data_records:
 
    record = {
        'ID_Course__c': rec['ID_Course__c'],
        'Section__c': rec['Section__c'],
        'Start_Date__c': str(rec['Start_Date__c']),
        'End_Date__c': str(rec['End_Date__c']),
    }
    
    try:
        sf.Class__C.create(record)
    except Exception as e:
        print(e)

## Example of Deleting Records

Select the IDs of the records first and then process the results.

Ultimately, you want a list of IDs in the end.


In [None]:
class_records = sf.query("SELECT Id FROM Course__c")
recs_to_delete = [{'Id': r['Id']} for r in class_records['records']]
recs_to_delete

In [None]:
#sf.bulk.Course__c.delete(recs_to_delete)

In [None]:
for rec in recs_to_delete:
    try:
        sf.Course__c.delete(rec['Id'])
    except Exception as e:
        print(e)