# Using Python to Integrate MongoDB Data into an ETL Process

In [1]:
# importing necessary libraries
import os
import json
import numpy
import datetime
import pandas as pd

import pymongo
from sqlalchemy import create_engine

In [2]:
# declaring and assigning connection variables for the MongoDB server, 
# the MySQL server, and databases with which i'll be working with
host_name = "localhost"
ports = {"mongo" : 27017, "mysql" : 3306}

user_id = "root"
pwd = "Password"

src_dbname = "HR_management"
dst_dbname = "HR_management_DW3"

In [35]:
# defining functions for getting data from and setting data into databases
def get_sql_dataframe(user_id, pwd, host_name, db_name, sql_query):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    
    '''Invoke the pd.read_sql() function to query the database, and fill a Pandas DataFrame.'''
    conn = sqlEngine.connect()
    dframe = pd.read_sql(sql_query, conn);
    conn.close()
    
    return dframe


def get_mongo_dataframe(user_id, pwd, host_name, port, db_name, collection, query):
    '''Create a connection to MongoDB, with or without authentication credentials'''
    if user_id and pwd:
        mongo_uri = 'mongodb://%s:%s@%s:%s/%s' % (username, password, host, port, db_name)
        client = pymongo.MongoClient(mongo_uri)
    else:
        conn_str = f"mongodb://{host_name}:{port}/"
        client = pymongo.MongoClient(conn_str)
    
    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    dframe = pd.DataFrame(list(db[collection].find(query)))
    dframe.drop(['_id'], axis=1, inplace=True)
    client.close()
    
    return dframe


def set_dataframe(user_id, pwd, host_name, db_name, df, table_name, pk_column, db_operation):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the Pandas DataFrame .to_sql( ) function to either create, or append to, a table'''
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        sqlEngine.execute(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});")
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()

In [6]:
# populating MongoDB with source data
####### RUN ONLY ONCE !!
port = ports["mongo"]
conn_str = f"mongodb://{host_name}:{port}/"
client = pymongo.MongoClient(conn_str)
db = client[src_dbname]

data_dir = os.path.join(os.getcwd(), 'data')

json_files = {"departments" : 'HR_management_departments.json',
              "locations" : 'HR_management_locations.json',
              "jobs" : 'HR_management_jobs.json',
              "employees" : 'HR_management_employees.json'
             }

for file in json_files:
    json_file = os.path.join(data_dir, json_files[file])
    with open(json_file, 'r') as openfile:
        json_object = json.load(openfile)
        file = db[file]
        result = file.insert_many(json_object)
        #print(f"{file} was successfully loaded.")

        
client.close()    

In [17]:
conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}"
sqlEngine = create_engine(conn_str, pool_recycle=3600)

sqlEngine.execute(f"DROP DATABASE IF EXISTS `{dst_dbname}`;")
sqlEngine.execute(f"CREATE DATABASE `{dst_dbname}`;")
sqlEngine.execute(f"USE {dst_dbname};")

<sqlalchemy.engine.cursor.LegacyCursorResult at 0x7f7e02b987c0>

# Creating and Populating the New Dimension Tables

In [None]:
### extracting data from the source MongoDB collections into dataframes ###

In [7]:
query = {}
port = ports["mongo"]
collection = "departments"

df_departments = get_mongo_dataframe(None, None, host_name, port, src_dbname, collection, query)
df_departments.head(2)

Unnamed: 0,department_id,department_name,location_id
0,1,Administration,1700
1,2,Marketing,1800


In [8]:
query = {}
port = ports["mongo"]
collection = "locations"

df_locations = get_mongo_dataframe(None, None, host_name, port, src_dbname, collection, query)
df_locations.head(2)

Unnamed: 0,location_id,street_address,postal_code,city,state_province,country_id
0,1400,2014 Jabberwocky Rd,26192,Southlake,Texas,US
1,1500,2011 Interiors Blvd,99236,South San Francisco,California,US


In [9]:
query = {}
port = ports["mongo"]
collection = "jobs"

df_jobs = get_mongo_dataframe(None, None, host_name, port, src_dbname, collection, query)
df_jobs.head(2)

Unnamed: 0,job_id,job_title,min_salary,max_salary
0,1,Public Accountant,4200.0,9000.0
1,2,Accounting Manager,8200.0,16000.0


In [10]:
query = {}
port = ports["mongo"]
collection = "employees"

df_employees = get_mongo_dataframe(None, None, host_name, port, src_dbname, collection, query)
df_employees.head(2)

Unnamed: 0,employee_id,first_name,last_name,email,phone_number,hire_date,job_id,salary,manager_id,department_id
0,100,Steven,King,steven.king@sqltutorial.org,515.123.4567,1987-06-17,4,24000.0,,9
1,101,Neena,Kochhar,neena.kochhar@sqltutorial.org,515.123.4568,1989-09-21,5,17000.0,100.0,9


In [None]:
### performing any necessary transformations to the dataframes ### 

In [12]:
# renaming id's to key's
df_departments.rename(columns={"department_id":"department_key"}, inplace=True)
df_departments.rename(columns={"location_id":"location_key"}, inplace=True)
df_departments.head(2)

Unnamed: 0,department_key,department_name,location_key
0,1,Administration,1700
1,2,Marketing,1800


In [13]:
# renaming id's to key's
df_locations.rename(columns={"location_id":"location_key"}, inplace=True)
df_locations.head(2)

Unnamed: 0,location_key,street_address,postal_code,city,state_province,country_id
0,1400,2014 Jabberwocky Rd,26192,Southlake,Texas,US
1,1500,2011 Interiors Blvd,99236,South San Francisco,California,US


In [14]:
# renaming id's to key's
df_jobs.rename(columns={"job_id":"job_key"}, inplace=True)
df_jobs.head(2)

Unnamed: 0,job_key,job_title,min_salary,max_salary
0,1,Public Accountant,4200.0,9000.0
1,2,Accounting Manager,8200.0,16000.0


In [15]:
# renaming id's to key's
df_employees.rename(columns={"employee_id":"employee_key", 
                             "job_id":"job_key", "department_id":"department_key"}, 
                    inplace=True)
df_employees.head(2)

Unnamed: 0,employee_key,first_name,last_name,email,phone_number,hire_date,job_key,salary,manager_id,department_key
0,100,Steven,King,steven.king@sqltutorial.org,515.123.4567,1987-06-17,4,24000.0,,9
1,101,Neena,Kochhar,neena.kochhar@sqltutorial.org,515.123.4568,1989-09-21,5,17000.0,100.0,9


In [None]:
### loading the transformed dataframes into the new data warehouse by creating new tables ### 

In [19]:
dataframe = df_departments
table_name = 'dim_departments'
primary_key = 'department_key'
db_operation = "insert"

set_dataframe(user_id, pwd, host_name, dst_dbname, dataframe, table_name, primary_key, db_operation)


In [20]:
dataframe = df_locations
table_name = 'dim_locations'
primary_key = 'location_key'
db_operation = "insert"

set_dataframe(user_id, pwd, host_name, dst_dbname, dataframe, table_name, primary_key, db_operation)

In [21]:
dataframe = df_jobs
table_name = 'dim_jobs'
primary_key = 'job_key'
db_operation = "insert"

set_dataframe(user_id, pwd, host_name, dst_dbname, dataframe, table_name, primary_key, db_operation)

In [22]:
dataframe = df_employees
table_name = 'dim_employees'
primary_key = 'employee_key'
db_operation = "insert"

set_dataframe(user_id, pwd, host_name, dst_dbname, dataframe, table_name, primary_key, db_operation)

In [None]:
### validating that the new dimension tables were created ###

In [23]:
sql_departments = "SELECT * FROM HR_management_DW3.dim_departments;"
df_dim_departments = get_sql_dataframe(user_id, pwd, host_name, dst_dbname, sql_departments)
df_dim_departments.head(2)

Unnamed: 0,department_key,department_name,location_key
0,1,Administration,1700
1,2,Marketing,1800


In [24]:
sql_locations = "SELECT * FROM HR_management_DW3.dim_locations;"
df_dim_locations = get_sql_dataframe(user_id, pwd, host_name, dst_dbname, sql_locations)
df_dim_locations.head(2)

Unnamed: 0,location_key,street_address,postal_code,city,state_province,country_id
0,1400,2014 Jabberwocky Rd,26192,Southlake,Texas,US
1,1500,2011 Interiors Blvd,99236,South San Francisco,California,US


In [25]:
sql_jobs = "SELECT * FROM HR_management_DW3.dim_jobs;"
df_dim_jobs = get_sql_dataframe(user_id, pwd, host_name, dst_dbname, sql_jobs)
df_dim_jobs.head(2)

Unnamed: 0,job_key,job_title,min_salary,max_salary
0,1,Public Accountant,4200.0,9000.0
1,2,Accounting Manager,8200.0,16000.0


In [26]:
sql_employees = "SELECT * FROM HR_management_DW3.dim_employees;"
df_dim_employees = get_sql_dataframe(user_id, pwd, host_name, dst_dbname, sql_employees)
df_dim_employees.head(2)

Unnamed: 0,employee_key,first_name,last_name,email,phone_number,hire_date,job_key,salary,manager_id,department_key
0,100,Steven,King,steven.king@sqltutorial.org,515.123.4567,1987-06-17,4,24000.0,,9
1,101,Neena,Kochhar,neena.kochhar@sqltutorial.org,515.123.4568,1989-09-21,5,17000.0,100.0,9


# Creating and Populating the New Fact Tables

In [None]:
### merging all the tables together to create the fact table ###

In [40]:
df_EventFact = pd.merge(df_jobs, df_employees, on='job_key', how='inner')
df_EventFact.head(2)

Unnamed: 0,job_key,job_title,min_salary,max_salary,employee_key,first_name,last_name,email,phone_number,hire_date,salary,manager_id,department_key
0,1,Public Accountant,4200.0,9000.0,206,William,Gietz,william.gietz@sqltutorial.org,515.123.8181,1994-06-07,8300.0,205.0,11
1,2,Accounting Manager,8200.0,16000.0,205,Shelley,Higgins,shelley.higgins@sqltutorial.org,515.123.8080,1994-06-07,12000.0,101.0,11


In [41]:
df_EventFact = pd.merge(df_EventFact, df_departments, on='department_key', how='inner')
df_EventFact.head(2)

Unnamed: 0,job_key,job_title,min_salary,max_salary,employee_key,first_name,last_name,email,phone_number,hire_date,salary,manager_id,department_key,department_name,location_key
0,1,Public Accountant,4200.0,9000.0,206,William,Gietz,william.gietz@sqltutorial.org,515.123.8181,1994-06-07,8300.0,205.0,11,Accounting,1700
1,2,Accounting Manager,8200.0,16000.0,205,Shelley,Higgins,shelley.higgins@sqltutorial.org,515.123.8080,1994-06-07,12000.0,101.0,11,Accounting,1700


In [42]:
df_EventFact = pd.merge(df_EventFact, df_locations, on='location_key', how='inner')
df_EventFact.head(2)

Unnamed: 0,job_key,job_title,min_salary,max_salary,employee_key,first_name,last_name,email,phone_number,hire_date,salary,manager_id,department_key,department_name,location_key,street_address,postal_code,city,state_province,country_id
0,1,Public Accountant,4200.0,9000.0,206,William,Gietz,william.gietz@sqltutorial.org,515.123.8181,1994-06-07,8300.0,205.0,11,Accounting,1700,2004 Charade Rd,98199,Seattle,Washington,US
1,2,Accounting Manager,8200.0,16000.0,205,Shelley,Higgins,shelley.higgins@sqltutorial.org,515.123.8080,1994-06-07,12000.0,101.0,11,Accounting,1700,2004 Charade Rd,98199,Seattle,Washington,US


In [None]:
drop_cols = ['job_title', 'min_salary', 'max_salary', 'department_name', 'manager_id', 'city', 
             'state_province', 'country_id']
df_EventFact.drop(drop_cols, axis=1, inplace=True)
df_EventFact.head(2)

In [46]:
drop_cols = ['first_name', 'last_name', 'email',
            'phone_number', 'street_address', 'postal_code']
df_EventFact.drop(drop_cols, axis=1, inplace=True)
df_EventFact.head(2)

Unnamed: 0,job_key,employee_key,hire_date,salary,department_key,location_key
0,1,206,1994-06-07,8300.0,11,1700
1,2,205,1994-06-07,12000.0,11,1700


In [48]:
ordered_cols = ['employee_key', 'job_key', 'department_key', 'location_key', 
                  'hire_date', 'salary']
df_EventFact = df_EventFact[ordered_cols]

In [50]:
df_EventFact.head(3)

Unnamed: 0,employee_key,job_key,department_key,location_key,hire_date,salary
0,206,1,11,1700,1994-06-07,8300.0
1,205,2,11,1700,1994-06-07,12000.0
2,200,3,1,1700,1987-09-17,4400.0


In [52]:
table_name = 'EventFact'
primary_key = 'employee_key'
db_operation = 'insert'

set_dataframe(user_id, pwd, host_name, dst_dbname, df_EventFact, table_name, primary_key, db_operation)


In [53]:
sql_EventFact = "SELECT * FROM HR_management_DW3.EventFact;"
df_EventFact = get_sql_dataframe(user_id, pwd, host_name, dst_dbname, sql_EventFact)
df_EventFact.head(3)

Unnamed: 0,employee_key,job_key,department_key,location_key,hire_date,salary
0,100,4,9,1700,1987-06-17,24000.0
1,101,5,9,1700,1989-09-21,17000.0
2,102,5,9,1700,1993-01-13,17000.0
