## Using Python to Integrate MongoDB and Local (CSV) Data into an ETL Process


Integrate new data sourced from an instance of MongoDB. 

Fetch data into Pandas DataFrames, perform all the necessary transformations in-memory on the client, and then push the newly transformed DataFrame to the RDBMS data warehouse.


#### Import the Necessary Libraries

In [1]:
import os
import json
import numpy
import datetime
import pandas as pd

import pymongo
from sqlalchemy import create_engine

#### Declare & Assign Connection Variables for the MongoDB Server, the MySQL Server & Databases with which You'll be Working 

In [2]:
host_name = "localhost"
ports = {"mongo" : 27017, "mysql" : 3306}

user_id = "root"
pwd = "Scout1210!"

atlas_cluster_name = "ds2002"
atlas_default_dbname = "sample"
atlas_user_name = "bpugs"
atlas_password = "zoeyisgreat"

conn_str = {"local" : f"mongodb://{host_name}:{ports['mongo']}/",
    "atlas" : f"mongodb+srv://{atlas_user_name}:{atlas_password}@{atlas_cluster_name}.nxnrkfm.mongodb.net/{atlas_default_dbname}?retryWrites=true&w=majority"
}

src_dbname = "chinook_products"
dst_dbname = "chinook_dw"

#### Define Functions for Getting Data From and Setting Data Into Databases

In [3]:
def get_sql_dataframe(user_id, pwd, host_name, db_name, sql_query):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    
    '''Invoke the pd.read_sql() function to query the database, and fill a Pandas DataFrame.'''
    conn = sqlEngine.connect()
    dframe = pd.read_sql(sql_query, conn);
    conn.close()
    
    return dframe


def get_mongo_dataframe(connect_str, db_name, collection, query):
    '''Create a connection to MongoDB'''
    client = pymongo.MongoClient(connect_str)
    
    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    dframe = pd.DataFrame(list(db[collection].find(query)))
    dframe.drop(['_id'], axis=1, inplace=True)
    client.close()
    return dframe


def get_mongo_dataframe_local(user_id, pwd, host_name, port, db_name, collection, query):
    '''Create a connection to MongoDB, with or without authentication credentials'''
    if user_id and pwd:
        mongo_uri = 'mongodb://%s:%s@%s:%s/%s' % (username, password, host, port, db_name)
        client = pymongo.MongoClient(mongo_uri)
    else:
        conn_str = f"mongodb://{host_name}:{port}/"
        client = pymongo.MongoClient(conn_str)
    
    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    dframe = pd.DataFrame(list(db[collection].find(query)))
    dframe.drop(['_id'], axis=1, inplace=True)
    client.close()
    
    return dframe


def set_dataframe(user_id, pwd, host_name, db_name, df, table_name, pk_column, db_operation):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the Pandas DataFrame .to_sql( ) function to either create, or append to, a table'''
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        sqlEngine.execute(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});")
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()

#### Populate MongoDB with Source Data
Be certain you run this cell **ONLY ONCE!**  Otherwise, you will fill your MongoDB database with duplicate records which will cause duplicate key errors when you attempt to create and populate the MySQL data warehouse dimension and fact tables.

In [4]:
client = pymongo.MongoClient(conn_str["atlas"])
db = client[src_dbname]

data_dir = os.path.join(os.getcwd(), 'data')

json_files = {"Artist" : 'Chinook_Artist.json',
              "Album" : 'Chinook_Album.json',
              "Genre" : 'Chinook_Genre.json',
              "Media" : 'Chinook_Media.json'
             }

for file in json_files:
    json_file = os.path.join(data_dir, json_files[file])
    with open(json_file, 'r') as openfile:
        try:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)
            print(f"{file} was successfully loaded.")
        except: 
            print("Error: unable to send data")

        
client.close()        

Collection(Database(MongoClient(host=['ac-fbz8yr3-shard-00-01.nxnrkfm.mongodb.net:27017', 'ac-fbz8yr3-shard-00-00.nxnrkfm.mongodb.net:27017', 'ac-fbz8yr3-shard-00-02.nxnrkfm.mongodb.net:27017'], document_class=dict, tz_aware=False, connect=True, retrywrites=True, w='majority', authsource='admin', replicaset='atlas-76w0rx-shard-0', tls=True), 'chinook_products'), 'Artist') was successfully loaded.
Collection(Database(MongoClient(host=['ac-fbz8yr3-shard-00-01.nxnrkfm.mongodb.net:27017', 'ac-fbz8yr3-shard-00-00.nxnrkfm.mongodb.net:27017', 'ac-fbz8yr3-shard-00-02.nxnrkfm.mongodb.net:27017'], document_class=dict, tz_aware=False, connect=True, retrywrites=True, w='majority', authsource='admin', replicaset='atlas-76w0rx-shard-0', tls=True), 'chinook_products'), 'Album') was successfully loaded.
Collection(Database(MongoClient(host=['ac-fbz8yr3-shard-00-01.nxnrkfm.mongodb.net:27017', 'ac-fbz8yr3-shard-00-00.nxnrkfm.mongodb.net:27017', 'ac-fbz8yr3-shard-00-02.nxnrkfm.mongodb.net:27017'], docume

### Create and Populate the New Dimension Tables
#### Extract Data from the Source MongoDB Collections Into DataFrames

In [5]:
query ={} #query all columns and all rows
collection = 'Album'

try:
    df_album = get_mongo_dataframe(conn_str['atlas'], src_dbname, collection, query)
except:
    print("Error: unable to fetch data")

df_album.head(2)

Unnamed: 0,AlbumId,Title,ArtistId
0,1,For Those About To Rock We Salute You,1
1,2,Balls to the Wall,2


In [6]:
query ={}
collection = 'Artist'

try:
    df_artist = get_mongo_dataframe(conn_str['atlas'], src_dbname, collection, query)
except:
    print("Error: unable to fetch data")

df_artist.head(2)

Unnamed: 0,ArtistId,Name
0,1,AC/DC
1,2,Accept


In [7]:
query ={}
collection = 'Genre'

try:
    df_genre = get_mongo_dataframe(conn_str['atlas'], src_dbname, collection, query)
except:
    print("Error: unable to fetch data")

df_genre.head(2)

Unnamed: 0,GenreId,Name
0,1,Rock
1,2,Jazz


In [8]:
query ={}
collection = 'Media'

try:
    df_media = get_mongo_dataframe(conn_str['atlas'], src_dbname, collection, query)
except:
    print("Error: unable to fetch data")

df_media.head(2)

Unnamed: 0,MediaTypeId,Name
0,1,MPEG audio file
1,2,Protected AAC audio file


### Perform Any Necessary Transformations to the DataFrames

#### Combine album and artists

In [9]:
#explore artists df
df_artist.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 275 entries, 0 to 274
Data columns (total 2 columns):
 #   Column    Non-Null Count  Dtype 
---  ------    --------------  ----- 
 0   ArtistId  275 non-null    int64 
 1   Name      275 non-null    object
dtypes: int64(1), object(1)
memory usage: 4.4+ KB


In [10]:
#explore album df
df_album.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 347 entries, 0 to 346
Data columns (total 3 columns):
 #   Column    Non-Null Count  Dtype 
---  ------    --------------  ----- 
 0   AlbumId   347 non-null    int64 
 1   Title     347 non-null    object
 2   ArtistId  347 non-null    int64 
dtypes: int64(2), object(1)
memory usage: 8.3+ KB


In [11]:
#artists can have many albums, so right outer join on ArtistId
df_album_artist = pd.merge(df_artist, df_album, on='ArtistId', how='right')
#Drop artistid since no reference to it anymore, all in one dataframe.
df_album_artist.drop(['ArtistId'], axis=1, inplace=True)
#rename ids to keys... make name and title more explicit 
df_album_artist.rename(columns={"AlbumId": "album_key", "Name":"artist","Title":'album'}, inplace=True)
df_album_artist.head()

Unnamed: 0,artist,album_key,album
0,AC/DC,1,For Those About To Rock We Salute You
1,Accept,2,Balls to the Wall
2,Accept,3,Restless and Wild
3,AC/DC,4,Let There Be Rock
4,Aerosmith,5,Big Ones


In [12]:
#explore genre df
df_genre.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 25 entries, 0 to 24
Data columns (total 2 columns):
 #   Column   Non-Null Count  Dtype 
---  ------   --------------  ----- 
 0   GenreId  25 non-null     int64 
 1   Name     25 non-null     object
dtypes: int64(1), object(1)
memory usage: 528.0+ bytes


In [13]:
#rename ids to keys... make name more explicit 
df_genre.rename(columns={"GenreId": "genre_key", "Name":"genre"}, inplace=True)
df_genre.head()

Unnamed: 0,genre_key,genre
0,1,Rock
1,2,Jazz
2,3,Metal
3,4,Alternative & Punk
4,5,Rock And Roll


In [14]:
#explore media df
df_media.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 5 entries, 0 to 4
Data columns (total 2 columns):
 #   Column       Non-Null Count  Dtype 
---  ------       --------------  ----- 
 0   MediaTypeId  5 non-null      int64 
 1   Name         5 non-null      object
dtypes: int64(1), object(1)
memory usage: 208.0+ bytes


In [15]:
#rename ids to keys... make name more explicit 
df_media.rename(columns={"MediaTypeId": "media_key", "Name":"media"}, inplace=True)
df_media.head()

Unnamed: 0,media_key,media
0,1,MPEG audio file
1,2,Protected AAC audio file
2,3,Protected MPEG-4 video file
3,4,Purchased AAC audio file
4,5,AAC audio file


### Load Data from a Comma-Separated Values (CSV) File

In [16]:
data_dir = os.path.join(os.getcwd(), 'data')
data_file = os.path.join(data_dir, 'chinook_track.csv')

try:
    df_track = pd.read_csv(data_file, header=0, index_col=0)
except:
    print("Error: unable to fetch data")

df_track.head()

Unnamed: 0,TrackId,Name,AlbumId,MediaTypeId,GenreId,Composer,Milliseconds,Bytes,UnitPrice
0,1,For Those About To Rock (We Salute You),1,1,1,"Angus Young, Malcolm Young, Brian Johnson",343719,11170334,0.99
1,2,Balls to the Wall,2,2,1,,342562,5510424,0.99
2,3,Fast As a Shark,3,2,1,"F. Baltes, S. Kaufman, U. Dirkscneider & W. Ho...",230619,3990994,0.99
3,4,Restless and Wild,3,2,1,"F. Baltes, R.A. Smith-Diesel, S. Kaufman, U. D...",252051,4331779,0.99
4,5,Princess of the Dawn,3,2,1,Deaffy & R.A. Smith-Diesel,375418,6290521,0.99


### Drop Unimportant Features

In [17]:
#explore track df
df_track.info()
#all important...

<class 'pandas.core.frame.DataFrame'>
Int64Index: 3503 entries, 0 to 3502
Data columns (total 9 columns):
 #   Column        Non-Null Count  Dtype  
---  ------        --------------  -----  
 0   TrackId       3503 non-null   int64  
 1   Name          3503 non-null   object 
 2   AlbumId       3503 non-null   int64  
 3   MediaTypeId   3503 non-null   int64  
 4   GenreId       3503 non-null   int64  
 5   Composer      2525 non-null   object 
 6   Milliseconds  3503 non-null   int64  
 7   Bytes         3503 non-null   int64  
 8   UnitPrice     3503 non-null   float64
dtypes: float64(1), int64(6), object(2)
memory usage: 273.7+ KB


In [18]:
# rename columns, change ids to keys again so no confusion with relationship between tables
df_track.rename(columns={"TrackId":"track_key", "AlbumId": "album_key",'MediaTypeId':"media_key",'GenreId':'genre_key'}, inplace=True)
df_track.head(2)

Unnamed: 0,track_key,Name,album_key,media_key,genre_key,Composer,Milliseconds,Bytes,UnitPrice
0,1,For Those About To Rock (We Salute You),1,1,1,"Angus Young, Malcolm Young, Brian Johnson",343719,11170334,0.99
1,2,Balls to the Wall,2,2,1,,342562,5510424,0.99


### Merge data tables

In [19]:
#do left join since want to retain all tracks even if album_key is null, which it won't be but just to be safe
#join into new dataframe 
df_track_details = pd.merge(df_track, df_album_artist, on='album_key', how='left')
df_track_details.head()

Unnamed: 0,track_key,Name,album_key,media_key,genre_key,Composer,Milliseconds,Bytes,UnitPrice,artist,album
0,1,For Those About To Rock (We Salute You),1,1,1,"Angus Young, Malcolm Young, Brian Johnson",343719,11170334,0.99,AC/DC,For Those About To Rock We Salute You
1,2,Balls to the Wall,2,2,1,,342562,5510424,0.99,Accept,Balls to the Wall
2,3,Fast As a Shark,3,2,1,"F. Baltes, S. Kaufman, U. Dirkscneider & W. Ho...",230619,3990994,0.99,Accept,Restless and Wild
3,4,Restless and Wild,3,2,1,"F. Baltes, R.A. Smith-Diesel, S. Kaufman, U. D...",252051,4331779,0.99,Accept,Restless and Wild
4,5,Princess of the Dawn,3,2,1,Deaffy & R.A. Smith-Diesel,375418,6290521,0.99,Accept,Restless and Wild


In [20]:
#pass new dataframe to be joined with genre
#same as above, want to retain all tracks even if genre is null. Use left join to be safe
#dont run more than once without rerunning the merge above
df_track_details = pd.merge(df_track_details, df_genre, on='genre_key', how='left')
df_track_details.head()

Unnamed: 0,track_key,Name,album_key,media_key,genre_key,Composer,Milliseconds,Bytes,UnitPrice,artist,album,genre
0,1,For Those About To Rock (We Salute You),1,1,1,"Angus Young, Malcolm Young, Brian Johnson",343719,11170334,0.99,AC/DC,For Those About To Rock We Salute You,Rock
1,2,Balls to the Wall,2,2,1,,342562,5510424,0.99,Accept,Balls to the Wall,Rock
2,3,Fast As a Shark,3,2,1,"F. Baltes, S. Kaufman, U. Dirkscneider & W. Ho...",230619,3990994,0.99,Accept,Restless and Wild,Rock
3,4,Restless and Wild,3,2,1,"F. Baltes, R.A. Smith-Diesel, S. Kaufman, U. D...",252051,4331779,0.99,Accept,Restless and Wild,Rock
4,5,Princess of the Dawn,3,2,1,Deaffy & R.A. Smith-Diesel,375418,6290521,0.99,Accept,Restless and Wild,Rock


In [21]:
#pass same dataframe to be joined with media, same as above, left outer join
#dont run more than once without rerunning the merges above
df_track_details = pd.merge(df_track_details, df_media, on='media_key', how='left')
df_track_details.head()

Unnamed: 0,track_key,Name,album_key,media_key,genre_key,Composer,Milliseconds,Bytes,UnitPrice,artist,album,genre,media
0,1,For Those About To Rock (We Salute You),1,1,1,"Angus Young, Malcolm Young, Brian Johnson",343719,11170334,0.99,AC/DC,For Those About To Rock We Salute You,Rock,MPEG audio file
1,2,Balls to the Wall,2,2,1,,342562,5510424,0.99,Accept,Balls to the Wall,Rock,Protected AAC audio file
2,3,Fast As a Shark,3,2,1,"F. Baltes, S. Kaufman, U. Dirkscneider & W. Ho...",230619,3990994,0.99,Accept,Restless and Wild,Rock,Protected AAC audio file
3,4,Restless and Wild,3,2,1,"F. Baltes, R.A. Smith-Diesel, S. Kaufman, U. D...",252051,4331779,0.99,Accept,Restless and Wild,Rock,Protected AAC audio file
4,5,Princess of the Dawn,3,2,1,Deaffy & R.A. Smith-Diesel,375418,6290521,0.99,Accept,Restless and Wild,Rock,Protected AAC audio file


In [22]:
df_track_details.info() #same count as before, all tracks retained. nice!

<class 'pandas.core.frame.DataFrame'>
Int64Index: 3503 entries, 0 to 3502
Data columns (total 13 columns):
 #   Column        Non-Null Count  Dtype  
---  ------        --------------  -----  
 0   track_key     3503 non-null   int64  
 1   Name          3503 non-null   object 
 2   album_key     3503 non-null   int64  
 3   media_key     3503 non-null   int64  
 4   genre_key     3503 non-null   int64  
 5   Composer      2525 non-null   object 
 6   Milliseconds  3503 non-null   int64  
 7   Bytes         3503 non-null   int64  
 8   UnitPrice     3503 non-null   float64
 9   artist        3503 non-null   object 
 10  album         3503 non-null   object 
 11  genre         3503 non-null   object 
 12  media         3503 non-null   object 
dtypes: float64(1), int64(6), object(6)
memory usage: 383.1+ KB


In [23]:
#drop excess keys no longer needed since all in one dataframe now!
df_track_details.drop(['album_key', 'media_key','genre_key'], axis=1, inplace=True)
df_track_details.head()

Unnamed: 0,track_key,Name,Composer,Milliseconds,Bytes,UnitPrice,artist,album,genre,media
0,1,For Those About To Rock (We Salute You),"Angus Young, Malcolm Young, Brian Johnson",343719,11170334,0.99,AC/DC,For Those About To Rock We Salute You,Rock,MPEG audio file
1,2,Balls to the Wall,,342562,5510424,0.99,Accept,Balls to the Wall,Rock,Protected AAC audio file
2,3,Fast As a Shark,"F. Baltes, S. Kaufman, U. Dirkscneider & W. Ho...",230619,3990994,0.99,Accept,Restless and Wild,Rock,Protected AAC audio file
3,4,Restless and Wild,"F. Baltes, R.A. Smith-Diesel, S. Kaufman, U. D...",252051,4331779,0.99,Accept,Restless and Wild,Rock,Protected AAC audio file
4,5,Princess of the Dawn,Deaffy & R.A. Smith-Diesel,375418,6290521,0.99,Accept,Restless and Wild,Rock,Protected AAC audio file


#### Load the Transformed DataFrames into the New Data Warehouse by Creating New Tables


In [24]:
dataframe = df_track_details
table_name = 'dim_tracks'
primary_key = 'track_key'
db_operation = "insert"

try:
    set_dataframe(user_id, pwd, host_name, dst_dbname, dataframe, table_name, primary_key, db_operation)
except:
    print("Error: unable to send data")

#### Validate that the New Dimension Tables were Created.

In [25]:
sql_suppliers = "SELECT * FROM chinook_dw.dim_tracks;"

try:
    df_dim_suppliers = get_sql_dataframe(user_id, pwd, host_name, dst_dbname, sql_suppliers)
except:
    print("Error: unable to get data")

df_dim_suppliers.head(2) #nice!

Unnamed: 0,track_key,Name,Composer,Milliseconds,Bytes,UnitPrice,artist,album,genre,media
0,1,For Those About To Rock (We Salute You),"Angus Young, Malcolm Young, Brian Johnson",343719,11170334,0.99,AC/DC,For Those About To Rock We Salute You,Rock,MPEG audio file
1,2,Balls to the Wall,,342562,5510424,0.99,Accept,Balls to the Wall,Rock,Protected AAC audio file
