## Using Python to Integrate MongoDB Data into an ETL Process

#### Importing the necessary libraries

In [1]:
import os
import json
import numpy
import datetime
import pandas as pd

import pymongo
from sqlalchemy import create_engine

#### Declareing and assign connection variables for the MongoDB server, the MySQL server & databases 

In [2]:
host_name = "localhost"
ports = {"mongo" : 27017, "mysql" : 3306}

user_id = "root"
pwd = "Password"

src_dbname = "humanresources"
dst_dbname = "humanresources_dw3"

#### Defining functions for getting data from and setting data Into databases

In [3]:
def get_sql_dataframe(user_id, pwd, host_name, db_name, sql_query):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    
    '''Invoke the pd.read_sql() function to query the database, and fill a Pandas DataFrame.'''
    conn = sqlEngine.connect()
    dframe = pd.read_sql(sql_query, conn);
    conn.close()
    
    return dframe


def get_mongo_dataframe(user_id, pwd, host_name, port, db_name, collection, query):
    '''Create a connection to MongoDB, with or without authentication credentials'''
    if user_id and pwd:
        mongo_uri = 'mongodb://%s:%s@%s:%s/%s' % (username, password, host, port, db_name)
        client = pymongo.MongoClient(mongo_uri)
    else:
        conn_str = f"mongodb://{host_name}:{port}/"
        client = pymongo.MongoClient(conn_str)
    
    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    dframe = pd.DataFrame(list(db[collection].find(query)))
    dframe.drop(['_id'], axis=1, inplace=True)
    client.close()
    
    return dframe


def set_dataframe(user_id, pwd, host_name, db_name, df, table_name, pk_column, db_operation):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the Pandas DataFrame .to_sql( ) function to either create, or append to, a table'''
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        sqlEngine.execute(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});")
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()

#### Populating MongoDB with source data

In [4]:
port = ports["mongo"]
conn_str = f"mongodb://{host_name}:{port}/"
client = pymongo.MongoClient(conn_str)
db = client[src_dbname]

data_dir = os.path.join(os.getcwd(), 'data')

json_files = {"jobs" : 'humanresources_jobs.json',
              "employees" : 'humanresources_employees.json',
              "departments" : 'humanresources_departments.json',
              "locations" : 'humanresources_locations.json'
             }

for file in json_files:
    json_file = os.path.join(data_dir, json_files[file])
    with open(json_file, 'r') as openfile:
        json_object = json.load(openfile)
        file = db[file]
        result = file.insert_many(json_object)
        #print(f"{file} was successfully loaded.")

        
client.close() 

In [24]:
conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}"
sqlEngine = create_engine(conn_str, pool_recycle=3600)

sqlEngine.execute(f"DROP DATABASE IF EXISTS `{dst_dbname}`;")
sqlEngine.execute(f"CREATE DATABASE `{dst_dbname}`;")
sqlEngine.execute(f"USE {dst_dbname};")

<sqlalchemy.engine.cursor.LegacyCursorResult at 0x7f8d528f5b80>

### Creating and populating the new dimension tables
#### Extracting data from the source MongoDB collections into dataframes

In [25]:
query = {}
port = ports["mongo"]
collection = "jobs"

df_jobs = get_mongo_dataframe(None, None, host_name, port, src_dbname, collection, query)
df_jobs.head(2)

Unnamed: 0,job_id,job_title,min_salary,max_salary
0,1,Public Accountant,4200.0,9000.0
1,2,Accounting Manager,8200.0,16000.0


In [26]:
query = {}
port = ports["mongo"]
collection = "employees"

df_employees = get_mongo_dataframe(None, None, host_name, port, src_dbname, collection, query)
df_employees.head(2)

Unnamed: 0,employee_id,first_name,last_name,email,phone_number,hire_date,job_id,salary,manager_id,department_id
0,100,Steven,King,steven.king@sqltutorial.org,515.123.4567,1987-06-17,4,24000.0,,9
1,101,Neena,Kochhar,neena.kochhar@sqltutorial.org,515.123.4568,1989-09-21,5,17000.0,100.0,9


In [27]:
query = {}
port = ports["mongo"]
collection = "departments"

df_departments = get_mongo_dataframe(None, None, host_name, port, src_dbname, collection, query)
df_departments.head(2)

Unnamed: 0,department_id,department_name,location_id
0,1,Administration,1700
1,2,Marketing,1800


In [28]:
query = {}
port = ports["mongo"]
collection = "locations"

df_locations = get_mongo_dataframe(None, None, host_name, port, src_dbname, collection, query)
df_locations.head(2)

Unnamed: 0,location_id,street_address,postal_code,city,state_province,country_id
0,1400,2014 Jabberwocky Rd,26192,Southlake,Texas,US
1,1500,2011 Interiors Blvd,99236,South San Francisco,California,US


#### Performing necessary transformations

In [29]:
# jobs - renaming job_id to job_key
df_jobs.rename(columns={"job_id":"job_key"}, inplace=True)
df_jobs.head(2)

Unnamed: 0,job_key,job_title,min_salary,max_salary
0,1,Public Accountant,4200.0,9000.0
1,2,Accounting Manager,8200.0,16000.0


In [30]:
# employees - dropping first_name, last_name, email, phone_number, manager_id and renaming employee_id to employee_key, job_id to job_key, and department_id to department_key
drop_cols = ['first_name','last_name','email', 'phone_number', 'manager_id']
df_employees.drop(drop_cols, axis=1, inplace=True)
df_employees.rename(columns={"employee_id":"employee_key", "job_id": "job_key", "department_id": "department_key"}, inplace=True)

df_employees.head(2)

Unnamed: 0,employee_key,hire_date,job_key,salary,department_key
0,100,1987-06-17,4,24000.0,9
1,101,1989-09-21,5,17000.0,9


In [31]:
# departments - renaming department_id to department_key and location_id to location_key
df_departments.rename(columns={"department_id":"department_key", "location_id": "location_key"}, inplace=True)

df_departments.head(2)

Unnamed: 0,department_key,department_name,location_key
0,1,Administration,1700
1,2,Marketing,1800


In [32]:
# locations - dropping street_address, postal_code, city, country_id and renaming location_id to location_key
drop_cols = ['street_address','postal_code','city', 'country_id']
df_locations.drop(drop_cols, axis=1, inplace=True)
df_locations.rename(columns={"location_id":"location_key"}, inplace=True)

df_locations.head(2)

Unnamed: 0,location_key,state_province
0,1400,Texas
1,1500,California


#### Loading the transformed data frames into the new data warehouse by creating new tables

In [33]:
db_operation = "insert"

tables = [('dim_jobs', df_jobs, 'job_key'),
          ('dim_employees', df_employees, 'employee_key'),
          ('dim_departments', df_departments, 'department_key'),
          ('dim_locations', df_locations, 'location_key')]

In [34]:
for table_name, dataframe, primary_key in tables:
    set_dataframe(user_id, pwd, host_name, dst_dbname, dataframe, table_name, primary_key, db_operation)

#### Validating that the new dimension tables were created

In [35]:
sql_jobs = "SELECT * FROM humanresources_dw3.dim_jobs;"
df_dim_jobs = get_sql_dataframe(user_id, pwd, host_name, dst_dbname, sql_jobs)
df_dim_jobs.head(2)

Unnamed: 0,job_key,job_title,min_salary,max_salary
0,1,Public Accountant,4200.0,9000.0
1,2,Accounting Manager,8200.0,16000.0


In [36]:
sql_employees = "SELECT * FROM humanresources_dw3.dim_employees;"
df_dim_employees = get_sql_dataframe(user_id, pwd, host_name, dst_dbname, sql_employees)
df_dim_employees.head(2)

Unnamed: 0,employee_key,hire_date,job_key,salary,department_key
0,100,1987-06-17,4,24000.0,9
1,101,1989-09-21,5,17000.0,9


In [37]:
sql_departments = "SELECT * FROM humanresources_dw3.dim_departments;"
df_dim_departments = get_sql_dataframe(user_id, pwd, host_name, dst_dbname, sql_departments)
df_dim_departments.head(2)

Unnamed: 0,department_key,department_name,location_key
0,1,Administration,1700
1,2,Marketing,1800


In [40]:
sql_locations = "SELECT * FROM humanresources_dw3.dim_locations;"
df_dim_locations = get_sql_dataframe(user_id, pwd, host_name, dst_dbname, sql_locations)
df_dim_locations.head(2)

Unnamed: 0,location_key,state_province
0,1400,Texas
1,1500,California


### Creating and populating the fact table

In [41]:
df_fact = pd.merge(df_jobs, df_employees, on='job_key', how='inner')
df_fact.head(2)

Unnamed: 0,job_key,job_title,min_salary,max_salary,employee_key,hire_date,salary,department_key
0,1,Public Accountant,4200.0,9000.0,206,1994-06-07,8300.0,11
1,2,Accounting Manager,8200.0,16000.0,205,1994-06-07,12000.0,11


In [42]:
df_fact = pd.merge(df_fact, df_departments, on='department_key', how='inner')
df_fact.head(2)

Unnamed: 0,job_key,job_title,min_salary,max_salary,employee_key,hire_date,salary,department_key,department_name,location_key
0,1,Public Accountant,4200.0,9000.0,206,1994-06-07,8300.0,11,Accounting,1700
1,2,Accounting Manager,8200.0,16000.0,205,1994-06-07,12000.0,11,Accounting,1700


In [43]:
df_fact = pd.merge(df_fact, df_locations, on='location_key', how='inner')
df_fact.head(2)

Unnamed: 0,job_key,job_title,min_salary,max_salary,employee_key,hire_date,salary,department_key,department_name,location_key,state_province
0,1,Public Accountant,4200.0,9000.0,206,1994-06-07,8300.0,11,Accounting,1700,Washington
1,2,Accounting Manager,8200.0,16000.0,205,1994-06-07,12000.0,11,Accounting,1700,Washington


In [44]:
# dropping unnecessary columns from the fact table
drop_cols = ['min_salary','max_salary', 'state_province']
df_fact.drop(drop_cols, axis=1, inplace=True)

# reordering the columns
ordered_columns = ['employee_key', 'job_key','job_title','department_key','department_name','location_key',
                   'hire_date','salary']
df_fact = df_fact[ordered_columns]

df_fact.head(2)

Unnamed: 0,employee_key,job_key,job_title,department_key,department_name,location_key,hire_date,salary
0,206,1,Public Accountant,11,Accounting,1700,1994-06-07,8300.0
1,205,2,Accounting Manager,11,Accounting,1700,1994-06-07,12000.0


##### Writing the dataframe back to the database

In [45]:
table_name = "fact"
primary_key = "employee_key"
db_operation = "insert"

set_dataframe(user_id, pwd, host_name, dst_dbname, df_fact, table_name, primary_key, db_operation)

##### Validating that the fact table exists and is loaded

In [48]:
sql_fact = "SELECT * FROM humanresources_dw3.fact;"
df_fact = get_sql_dataframe(user_id, pwd, host_name, dst_dbname, sql_fact)
df_fact.head(2)

Unnamed: 0,employee_key,job_key,job_title,department_key,department_name,location_key,hire_date,salary
0,100,4,President,9,Executive,1700,1987-06-17,24000.0
1,101,5,Administration Vice President,9,Executive,1700,1989-09-21,17000.0
