Using Python to Integrate MongoDB Data into an ETL Process
    Insert Necessary Libraries

In [1]:
import os
import json
import numpy
import datetime
import pandas as pd

import pymongo
from sqlalchemy import create_engine

#### Declare & Assign Connection Variables for the MongoDB Server, the MySQL Server & Databases with which You'll be Working 

In [2]:
host_name = "localhost"
ports = {"mongo" : 27017, "mysql" : 3306}

user_id = "burrelllizzie"
pwd = "178Chandler"

src_dbname = "sakila"
dst_dbname = "sakila_dw"

#### Define Functions for Getting Data From and Setting Data Into Databases

In [3]:
def get_sql_dataframe(user_id, pwd, host_name, db_name, sql_query):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    
    '''Invoke the pd.read_sql() function to query the database, and fill a Pandas DataFrame.'''
    conn = sqlEngine.connect()
    dframe = pd.read_sql(sql_query, conn);
    conn.close()
    
    return dframe


def get_mongo_dataframe(user_id, pwd, host_name, port, db_name, collection, query):
    '''Create a connection to MongoDB, with or without authentication credentials'''
    if user_id and pwd:
        mongo_uri = 'mongodb://%s:%s@%s:%s/%s' % (username, password, host, port, db_name)
        client = pymongo.MongoClient(mongo_uri)
    else:
        conn_str = f"mongodb://{host_name}:{port}/"
        client = pymongo.MongoClient(conn_str)
    
    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    dframe = pd.DataFrame(list(db[collection].find(query)))
    dframe.drop(['_id'], axis=1, inplace=True)
    client.close()
    
    return dframe


def set_dataframe(user_id, pwd, host_name, db_name, df, table_name, pk_column, db_operation):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the Pandas DataFrame .to_sql( ) function to either create, or append to, a table'''
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        sqlEngine.execute(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});")
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()

Populate MongoDB with Source Data

port = ports["mongo"]
conn_str = f"mongodb://{host_name}:{port}/"
client = pymongo.MongoClient(conn_str)
db = client[src_dbname]

data_dir = os.path.join(os.getcwd(), 'data')

json_files = {"customer" : 'sakila.customer.json',
              "actor" : 'sakila.actor.json',
              "rental" : 'sakila.rental.json',
              "inventory" : 'sakila.inventory.json'
             }

for file in json_files:
    json_file = os.path.join(data_dir, json_files[file])
    with open(json_file, 'r') as openfile:
        json_object = json.load(openfile)
        file = db[file]
        result = file.insert_many(json_object)
        #print(f"{file} was successfully loaded.")

        
client.close()        

### 1.0. Create and Populate the New Dimension Tables
#### 1.1. Extract Data from the Source MongoDB Collections Into DataFrames

In [5]:
query = {}
port = ports["mongo"]
collection = "customer"

df_customer = get_mongo_dataframe(None, None, host_name, port, src_dbname, collection, query)
df_customer.head(2)

Unnamed: 0,customer_id,store_id,first_name,last_name,email,address_id,active,create_date,last_update
0,1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,1,2006-02-14 22:04:36,2006-02-15 04:57:20
1,2,1,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,6,1,2006-02-14 22:04:36,2006-02-15 04:57:20


In [8]:
query = {}
port = ports["mongo"]
collection = "actor"

df_actor = get_mongo_dataframe(None, None, host_name, port, src_dbname, collection, query)
df_actor.head(2)

Unnamed: 0,actor_id,first_name,last_name,last_update
0,1,PENELOPE,GUINESS,2006-02-15 04:34:33
1,2,NICK,WAHLBERG,2006-02-15 04:34:33


#### 1.2. Perform Any Necessary Transformations to the DataFrames

In [9]:
df_customer.rename(columns={"customer_id":"customer_key"}, inplace=True)
df_customer.head(2)

Unnamed: 0,customer_key,store_id,first_name,last_name,email,address_id,active,create_date,last_update
0,1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,1,2006-02-14 22:04:36,2006-02-15 04:57:20
1,2,1,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,6,1,2006-02-14 22:04:36,2006-02-15 04:57:20


In [10]:
df_actor.rename(columns={"actor_id":"actor_key"}, inplace=True)
df_actor.head(2)

Unnamed: 0,actor_key,first_name,last_name,last_update
0,1,PENELOPE,GUINESS,2006-02-15 04:34:33
1,2,NICK,WAHLBERG,2006-02-15 04:34:33


Load the Transformed DataFrames into the New Data Warehouse by Creating New Tables

In [11]:
dataframe = df_customer
table_name = 'dim_customer'
primary_key = 'customer_key'
db_operation = "insert"

set_dataframe(user_id, pwd, host_name, dst_dbname, dataframe, table_name, primary_key, db_operation)

In [12]:
dataframe = df_actor
table_name = 'dim_actor'
primary_key = 'actor_key'
db_operation = "insert"

set_dataframe(user_id, pwd, host_name, dst_dbname, dataframe, table_name, primary_key, db_operation)


#### 1.4. Validate that the New Dimension Tables were Created.

In [13]:
sql_customer = "SELECT * FROM sakila_dw.dim_customer;"
df_dim_customer = get_sql_dataframe(user_id, pwd, host_name, dst_dbname, sql_customer)
df_dim_customer.head(2)

Unnamed: 0,customer_key,store_id,first_name,last_name,email,address_id,active,create_date,last_update
0,1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,1,2006-02-14 22:04:36,2006-02-15 04:57:20
1,2,1,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,6,1,2006-02-14 22:04:36,2006-02-15 04:57:20


In [14]:
sql_actor = "SELECT * FROM sakila_dw.dim_actor;"
df_dim_actor = get_sql_dataframe(user_id, pwd, host_name, dst_dbname, sql_actor)
df_dim_actor.head(2)

Unnamed: 0,actor_key,first_name,last_name,last_update
0,1,PENELOPE,GUINESS,2006-02-15 04:34:33
1,2,NICK,WAHLBERG,2006-02-15 04:34:33


### 2.0. Create and Populate the New Fact Tables
#### 2.1. Extract Data from the Source MongoDB Collections Into DataFrames

In [15]:
query = {} # Select all elements (columns), and all documents (rows).

port = ports["mongo"]
collection = "rental"

df_rental = get_mongo_dataframe(None, None, host_name, port, src_dbname, collection, query)
df_rental.head(2)

Unnamed: 0,rental_id,rental_date,inventory_id,customer_id,return_date,staff_id,last_update
0,1,2005-05-24 22:53:30,367,130,2005-05-26 22:04:30,1,2006-02-15 21:30:53
1,2,2005-05-24 22:54:33,1525,459,2005-05-28 19:40:33,1,2006-02-15 21:30:53


In [16]:
query = {} 

port = ports["mongo"]
collection = "inventory"

df_inventory = get_mongo_dataframe(None, None, host_name, port, src_dbname, collection, query)
df_inventory.head(2)

Unnamed: 0,inventory_id,film_id,store_id,last_update
0,1,1,1,2006-02-15 05:09:17
1,2,1,1,2006-02-15 05:09:17


#### 2.2. Perform Any Necessary Transformations to the DataFrames

In [17]:
column_name_map = {"rental_id" : "rental_key",
                   "inventory_id" : "inventory_key",
                   "customer_id" : "customer_key",
                   "staff_id" : "staff_key",
                  }

df_rental.rename(columns=column_name_map, inplace=True)
df_rental.insert(0, "fact_rental_key", range(1, df_pos.shape[0]+1))
df_rental

Unnamed: 0,fact_rental_key,inventory_key,film_id,store_id,last_update
0,1,1,1,1,2006-02-15 05:09:17
1,2,2,1,1,2006-02-15 05:09:17
2,3,3,1,1,2006-02-15 05:09:17
3,4,4,1,1,2006-02-15 05:09:17
4,5,5,1,2,2006-02-15 05:09:17
...,...,...,...,...,...
995,996,996,222,2,2006-02-15 05:09:17
996,997,997,222,2,2006-02-15 05:09:17
997,998,998,222,2,2006-02-15 05:09:17
998,999,999,223,2,2006-02-15 05:09:17


In [19]:
column_name_map = {"inventory_id" : "inventory_key",
                   "film_id" : "film_key",
                   "store_id" : "store_key",
                  }

df_inventory.rename(columns=column_name_map, inplace=True)
df_inventory.insert(0, "fact_inventory_key", range(1, df_pos.shape[0]+1))
df_inventory

Unnamed: 0,fact_inventory_key,fact_rental_key,inventory_key,film_key,store_key,last_update
0,1,1,1,1,1,2006-02-15 05:09:17
1,2,2,2,1,1,2006-02-15 05:09:17
2,3,3,3,1,1,2006-02-15 05:09:17
3,4,4,4,1,1,2006-02-15 05:09:17
4,5,5,5,1,2,2006-02-15 05:09:17
...,...,...,...,...,...,...
995,996,996,996,222,2,2006-02-15 05:09:17
996,997,997,997,222,2,2006-02-15 05:09:17
997,998,998,998,222,2,2006-02-15 05:09:17
998,999,999,999,223,2,2006-02-15 05:09:17


Load Newly Transformed MongoDB Data into the Northwind_DW2 Data Warehouse

In [20]:
dataframe = df_pos
table_name = 'fact_rental'
primary_key = 'Fact_rental_key'
db_operation = "insert"

set_dataframe(user_id, pwd, host_name, dst_dbname, dataframe, table_name, primary_key, db_operation)

In [21]:
dataframe = df_pos
table_name = 'fact_inventory'
primary_key = 'Fact_inventory_key'
db_operation = "insert"

set_dataframe(user_id, pwd, host_name, dst_dbname, dataframe, table_name, primary_key, db_operation)

#### 2.4. Validate that the New Fact Tables were Created

In [22]:
sql_rental = "SELECT * FROM sakila_dw.fact_rental;"
df_fact_purchase_orders = get_sql_dataframe(user_id, pwd, host_name, dst_dbname, sql_rental)
df_fact_purchase_orders.head(2)

Unnamed: 0,fact_inventory_key,fact_rental_key,inventory_key,film_key,store_key,last_update
0,1,1,1,1,1,2006-02-15 05:09:17
1,2,2,2,1,1,2006-02-15 05:09:17


In [23]:
sql_inventory = "SELECT * FROM sakila_dw.fact_inventory;"
df_fact_inventory = get_sql_dataframe(user_id, pwd, host_name, dst_dbname, sql_inventory)
df_fact_inventory.head(2)

Unnamed: 0,fact_inventory_key,fact_rental_key,inventory_key,film_key,store_key,last_update
0,1,1,1,1,1,2006-02-15 05:09:17
1,2,2,2,1,1,2006-02-15 05:09:17
