In [18]:
import os
import json
#import numpy
#import datetime
import certifi
import pandas as pd

import pymongo
import sqlalchemy
from sqlalchemy import create_engine, text
from sqlalchemy.types import String, Integer

In [19]:
print(f"Running SQL Alchemy Version: {sqlalchemy.__version__}")
print(f"Running PyMongo Version: {pymongo.__version__}")

Running SQL Alchemy Version: 2.0.43
Running PyMongo Version: 4.15.3


### Declare & Assign Connection Variables for the MongoDB Server, the MySQL Server & Databases with which You'll be Working 

In [20]:
mysql_args = {
    "uid" : "root",
    "pwd" : "SoupbruhSm3lls!",
    "hostname" : "localhost",
    "dbname" : "sakila_dw"
}

# The 'cluster_location' must either be "atlas" or "local".
mongodb_args = {
    "user_name" : "ethan", #hkj7zy
    "password" : "ethan", #SoupBruhSm3lls
    "cluster_name" : "Cluster0",
    "cluster_subnet" : "gjazvwc",
    "cluster_location" : "atlas", # "local"
    "db_name" : "sakila_nosql"
}
# Global vars
MONGO_COLLECTION = "customer_profiles" 
STAGING_TABLE = "stg_customer_profiles"

### Define Functions for Getting Data From and Setting Data Into Databases

In [21]:
def get_sql_dataframe(sql_query, **args):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{args['uid']}:{args['pwd']}@{args['hostname']}/{args['dbname']}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the pd.read_sql() function to query the database, and fill a Pandas DataFrame.'''
    dframe = pd.read_sql(text(sql_query), connection);
    connection.close()
    
    return dframe
    

def set_dataframe(df, table_name, pk_column, db_operation, **args):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{args['uid']}:{args['pwd']}@{args['hostname']}/{args['dbname']}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the Pandas DataFrame .to_sql( ) function to either create, or append to, a table'''
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        connection.execute(text(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});"))
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()


def get_mongo_client(**args):
    '''Validate proper input'''
    if args["cluster_location"] not in ['atlas', 'local']:
        raise Exception("You must specify either 'atlas' or 'local' for the cluster_location parameter.")
    
    else:
        if args["cluster_location"] == "atlas":
            connect_str = f"mongodb+srv://{args['user_name']}:{args['password']}@"
            connect_str += f"{args['cluster_name']}.{args['cluster_subnet']}.mongodb.net"
            client = pymongo.MongoClient(connect_str, tlsCAFile=certifi.where())
            
        elif args["cluster_location"] == "local":
            client = pymongo.MongoClient("mongodb://localhost:27017/")
        
    return client


def get_mongo_dataframe(mongo_client, db_name, collection, query):
    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = mongo_client[db_name]
    dframe = pd.DataFrame(list(db[collection].find(query)))
    dframe.drop(['_id'], axis=1, inplace=True)
    mongo_client.close()
    
    return dframe


def set_mongo_collections(mongo_client, db_name, data_directory, json_files):
    db = mongo_client[db_name]
    
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(data_directory, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)
        
    mongo_client.close()

### Build MySQL engine

In [22]:
def mysql_engine():
    conn_str = f"mysql+pymysql://{mysql_args['uid']}:{mysql_args['pwd']}@" \
               f"{mysql_args['hostname']}/{mysql_args['dbname']}"
    return create_engine(conn_str, pool_recycle=3600)

### Extract Data from MongoDB Atlas

In [41]:
mongo_client = get_mongo_client(**mongodb_args)
df = get_mongo_dataframe(
    mongo_client,
    db_name=mongodb_args["db_name"],
    collection=MONGO_COLLECTION,
    query={}  # pull all docs; your helper already drops _id
)

if df.empty:
    print("Mongo collection is empty; nothing to merge.")
    raise SystemExit(0)

# Get rid of other columns we don't need (There are not any other columns but hypothetically if there were this would account for it)
keep_cols = ["email", "loyalty_tier", "loyalty_score"]
for col in keep_cols:
    if col not in df.columns:
        df[col] = None

df = df[keep_cols].copy()

# normalize data
df["email"] = df["email"].astype(str).str.strip().str.lower()
df["loyalty_tier"] = df["loyalty_tier"].astype(str).str.strip().str.upper()
df["loyalty_score"] = pd.to_numeric(df["loyalty_score"], errors="coerce").fillna(0).astype(int)

df = df.dropna(subset=["email"])
df = df.drop_duplicates(subset=["email"])

### Load to MySQL by building staging table

In [42]:
engine = mysql_engine()
with engine.begin() as conn:
    conn.execute(text(f"""
        CREATE TABLE IF NOT EXISTS {STAGING_TABLE} (
            email VARCHAR(255) NOT NULL,
            loyalty_tier VARCHAR(32),
            loyalty_score INT,
            PRIMARY KEY (email)
        )
    """))

set_dataframe(
    df,
    table_name=STAGING_TABLE,
    pk_column="email",
    db_operation="update",
    **mysql_args
)

### Merge into dim_customer table on MySQL

In [43]:
with engine.begin() as conn:
    # Again check first if columns exist
    exists_tier = conn.execute(text("""
        SELECT 1
        FROM INFORMATION_SCHEMA.COLUMNS
        WHERE TABLE_SCHEMA = :db AND TABLE_NAME = 'dim_customer' AND COLUMN_NAME = 'loyalty_tier'
        LIMIT 1
    """), {"db": mysql_args["dbname"]}).scalar()

    if not exists_tier:
        conn.execute(text("ALTER TABLE dim_customer ADD COLUMN loyalty_tier VARCHAR(32)"))

    exists_score = conn.execute(text("""
        SELECT 1
        FROM INFORMATION_SCHEMA.COLUMNS
        WHERE TABLE_SCHEMA = :db AND TABLE_NAME = 'dim_customer' AND COLUMN_NAME = 'loyalty_score'
        LIMIT 1
    """), {"db": mysql_args["dbname"]}).scalar()

    if not exists_score:
        conn.execute(text("ALTER TABLE dim_customer ADD COLUMN loyalty_score INT"))

    # Normalize warehouse emails once to ensure join success
    conn.execute(text("UPDATE dim_customer SET email = LOWER(TRIM(email))"))

    # Merge
    conn.execute(text(f"""
        UPDATE dim_customer d
        JOIN {STAGING_TABLE} s ON s.email = d.email
        SET d.loyalty_tier  = s.loyalty_tier,
            d.loyalty_score = s.loyalty_score
    """))

### How many customers now have a loyalty tier

In [44]:
# Check if adding enrichment to customers table was successful
with engine.connect() as conn:
    enriched = conn.execute(text("""
        SELECT COUNT(*) FROM dim_customer
        WHERE loyalty_tier IS NOT NULL OR loyalty_score IS NOT NULL
    """)).scalar()
    print(f"Mongo enrichment complete. Customers updated: {enriched}")

Mongo enrichment complete. Customers updated: 20
