# Extract, Transform, Load
## Goal: To demonstrate understanding of a basic ETL data pipeline by extracting data from a MongoDB cluster/database/collection, a MySQL locally hosted server, and a local filesystem.

In [None]:
# Imports: 

import os
from sqlalchemy import text, create_engine
import pandas as pd
import json
import datetime
import pymongo
import pymysql
import datetime

In [None]:
#allows pd to show more columns when printing head
pd.set_option('display.max_columns', 500)

## Load data from MySQL Employees Data Warehouse

In [None]:
host_name = "localhost" # the host name
port = "3306" # the port to connect on
user_id = "root" # the user id of the account
pwd = os.environ["PWD"] # an enviornmental variable invisible as I delete the cell after creating it. 

src_dbname = "employees" #the database we are loading from


In [None]:
# function written by J. Tupitza with the purpose of loading a dataframe from a MySQL server
def get_dataframe(user_id, pwd, host_name, db_name, sql_query):
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    dframe = pd.read_sql(sql_query, connection);
    connection.close()
    
    return dframe

In [None]:
#Loading the employees table from employees
query = "SELECT * FROM employees;"
employees = get_dataframe(user_id, pwd, host_name, src_dbname, text(query))
employees.head(100)

In [None]:
#Loads the departments table from employees
query = "SELECT * FROM departments;"
departments = get_dataframe(user_id, pwd, host_name, src_dbname, text(query))
departments.head(1)

In [None]:
#Loads the dept_emp table from employees
query = "SELECT * FROM dept_emp;"
dept_emp = get_dataframe(user_id, pwd, host_name, src_dbname, text(query))
dept_emp.head(1)

In [None]:
#loads the dept_manager table from employees
query = "SELECT * FROM employees.salaries;"
salaries = get_dataframe(user_id, pwd, host_name, src_dbname, text(query))
salaries.head(1)

In [None]:
#Loads the lowestpaidemployee table from employees
query = "SELECT * FROM employees.lowestpaidemployee;"
lowestpaidemployee = get_dataframe(user_id, pwd, host_name, src_dbname, text(query))
lowestpaidemployee.head(2)

In [None]:
#Loads the lowsalemp table from employees
query = "SELECT * FROM employees.dept_manager;"
dept_manager = get_dataframe(user_id, pwd, host_name, src_dbname, text(query))
dept_manager.head(100)

In [None]:
#Loads the lowsalemp table from employees
query = "SELECT * FROM employees.titles;"
titles = get_dataframe(user_id, pwd, host_name, src_dbname, text(query))
titles.head(1)

# Build fact_employees

In [None]:
#The fact employees table will include all of the information about the employees from the employees' dept & emp number to the hire date 
df_fact_femployees = pd.merge(employees, dept_emp)
df_fact_femployees.head(1)

# now df_fact_femployees has a dept_emp column, so I can now merge the actual deparments table onto that. 

In [None]:
df_fact_femployees = pd.merge(df_fact_femployees, departments)
df_fact_femployees.head(1)
#Now the table includes the departments, I intend to clean up duplicate/unnecessary columns at the end.

In [None]:
df_fact_femployees = pd.merge(df_fact_femployees, titles)
df_fact_femployees.head(1)
#now the table has titles 

In [None]:
employees = pd.merge(df_fact_femployees, salaries)
employees.head(1)
#now the table has dept_manager

# Move dataframe into my MongoDB cluster

In [None]:
host_name = "localhost"
port = "27017"

atlas_cluster_name = "cluster1"
atlas_default_dbname = "employees"
atlas_user_name = "ds2002"
atlas_password = os.environ["PWD"] # omitted the cell defining this variable for password security. 
atlas_special_key = "enga0sb"

conn_str = f"mongodb+srv://{atlas_user_name}:{atlas_password}@{atlas_cluster_name}.{atlas_special_key}.mongodb.net"


In [None]:
#My function that sets a mongodb collection to be the provided dataframe
def set_mongo_dataframe(connection_str, db_name, collection, df):
        client = pymongo.MongoClient(connection_str)
        db = client[db_name]
        db[collection].insert_many(df.to_dict('records'))
        client.close()

In [None]:
# Assume df is your Pandas DataFrame containing the datetime column with invalid dates
default_date = pd.Timestamp('2024-01-01')

# Assume df is your Pandas DataFrame containing the datetime column to be formatted
df_fact_femployees['to_date'] = pd.to_datetime(df_fact_femployees['to_date'],  errors='coerce').fillna((default_date))

# Convert the datetime column to the desired format
df_fact_femployees['to_date'] = df_fact_femployees['to_date'].dt.strftime('%Y-%m-%d %H:%M:%S')

# Assume df is your Pandas DataFrame containing the datetime column to be formatted
df_fact_femployees['from_date'] = pd.to_datetime(df_fact_femployees['from_date'],  errors='coerce').fillna((default_date))

# Convert the datetime column to the desired format
df_fact_femployees['from_date'] = df_fact_femployees['from_date'].dt.strftime('%Y-%m-%d %H:%M:%S')

# Assume df is your Pandas DataFrame containing the datetime column to be formatted
df_fact_femployees['birth_date'] = pd.to_datetime(df_fact_femployees['birth_date'],  errors='coerce').fillna((default_date))

# Convert the datetime column to the desired format
df_fact_femployees['birth_date'] = df_fact_femployees['birth_date'].dt.strftime('%Y-%m-%d %H:%M:%S')

# Assume df is your Pandas DataFrame containing the datetime column to be formatted
df_fact_femployees['hire_date'] = pd.to_datetime(df_fact_femployees['hire_date'],  errors='coerce').fillna((default_date))

# Convert the datetime column to the desired format
df_fact_femployees['hire_date'] = df_fact_femployees['hire_date'].dt.strftime('%Y-%m-%d %H:%M:%S')

df_fact_femployees.head(1)

In [None]:
set_mongo_dataframe(conn_str, 'ds2002-Midterm', 'fact_employees', df_fact_femployees)


In [None]:
#Professor Tupitza's function
def get_mongo_dataframe(connect_str, db_name, collection, query):
    '''Create a connection to MongoDB'''
    client = pymongo.MongoClient(connect_str)
    
    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    dframe = pd.DataFrame(list(db[collection].find(query)))
    dframe.drop(['_id'], axis=1, inplace=True)
    client.close()
    return dframe


In [None]:
df_fact_femployees2 = get_mongo_dataframe(conn_str, 'ds2002-Midterm', 'fact_employees', '')
df_fact_femployees2.head(2)
#The table was actually loaded!

# Loads the data into Mongodb to complete the "Data Mart" :

In [None]:
set_mongo_dataframe(conn_str, 'ds2002-Midterm', 'departments' , departments)

In [None]:
# Assume df is your Pandas DataFrame containing the datetime column with invalid dates
default_date = pd.Timestamp('2024-01-01')

# Assume df is your Pandas DataFrame containing the datetime column to be formatted
employees['to_date'] = pd.to_datetime(employees['to_date'],  errors='coerce').fillna((default_date))

# Convert the datetime column to the desired format
employees['to_date'] = employees['to_date'].dt.strftime('%Y-%m-%d %H:%M:%S')

# Assume df is your Pandas DataFrame containing the datetime column to be formatted
employees['from_date'] = pd.to_datetime(employees['from_date'],  errors='coerce').fillna((default_date))

# Convert the datetime column to the desired format
employees['from_date'] = employees['from_date'].dt.strftime('%Y-%m-%d %H:%M:%S')

# Assume df is your Pandas DataFrame containing the datetime column to be formatted
employees['birth_date'] = pd.to_datetime(employees['birth_date'],  errors='coerce').fillna((default_date))

# Convert the datetime column to the desired format
employees['birth_date'] = employees['birth_date'].dt.strftime('%Y-%m-%d %H:%M:%S')

# Assume df is your Pandas DataFrame containing the datetime column to be formatted
employees['hire_date'] = pd.to_datetime(employees['hire_date'],  errors='coerce').fillna((default_date))

# Convert the datetime column to the desired format
employees['hire_date'] = employees['hire_date'].dt.strftime('%Y-%m-%d %H:%M:%S')


set_mongo_dataframe(conn_str, 'ds2002-Midterm', 'employees' , employees)

In [None]:
# Assume df is your Pandas DataFrame containing the datetime column with invalid dates
default_date = pd.Timestamp('2024-01-01')

# Assume df is your Pandas DataFrame containing the datetime column to be formatted
dept_emp['to_date'] = pd.to_datetime(dept_emp['to_date'],  errors='coerce').fillna((default_date))

# Convert the datetime column to the desired format
dept_emp['to_date'] = dept_emp['to_date'].dt.strftime('%Y-%m-%d %H:%M:%S')

# Assume df is your Pandas DataFrame containing the datetime column to be formatted
dept_emp['from_date'] = pd.to_datetime(dept_emp['from_date'],  errors='coerce').fillna((default_date))

# Convert the datetime column to the desired format
dept_emp['from_date'] = dept_emp['from_date'].dt.strftime('%Y-%m-%d %H:%M:%S')

set_mongo_dataframe(conn_str, 'ds2002-Midterm', 'dept_emp' , dept_emp)

In [None]:
# Assume df is your Pandas DataFrame containing the datetime column with invalid dates
default_date = pd.Timestamp('2024-01-01')


# Assume df is your Pandas DataFrame containing the datetime column to be formatted
salaries['to_date'] = pd.to_datetime(salaries['to_date'],  errors='coerce').fillna((default_date))

# Convert the datetime column to the desired format
salaries['to_date'] = salaries['to_date'].dt.strftime('%Y-%m-%d %H:%M:%S')

# Assume df is your Pandas DataFrame containing the datetime column to be formatted
salaries['from_date'] = pd.to_datetime(salaries['from_date'],  errors='coerce').fillna((default_date))

# Convert the datetime column to the desired format
salaries['from_date'] = salaries['from_date'].dt.strftime('%Y-%m-%d %H:%M:%S')

set_mongo_dataframe(conn_str, 'ds2002-Midterm', 'salaries' , salaries)

In [None]:
# Assume df is your Pandas DataFrame containing the datetime column with invalid dates
default_date = pd.Timestamp('2024-01-01')

# Assume df is your Pandas DataFrame containing the datetime column to be formatted
titles['to_date'] = pd.to_datetime(titles['to_date'],  errors='coerce').fillna((default_date))

# Convert the datetime column to the desired format
titles['to_date'] = titles['to_date'].dt.strftime('%Y-%m-%d %H:%M:%S')

# Assume df is your Pandas DataFrame containing the datetime column to be formatted
titles['from_date'] = pd.to_datetime(titles['from_date'],  errors='coerce').fillna((default_date))

# Convert the datetime column to the desired format
titles['from_date'] = titles['from_date'].dt.strftime('%Y-%m-%d %H:%M:%S')

set_mongo_dataframe(conn_str, 'ds2002-Midterm', 'titles' , titles)

# Data mart now established, all dataframes loaded into the 'ds2002-Midterm'. 



### Now to load a table from a local file system into a dataframe and then into the Data mart:

In [None]:
data_dir = os.path.join(os.getcwd(), 'dept_manager.csv')
print(data_dir)

In [None]:
dept_manager_text = pd.read_csv(data_dir, sep = ";")
dept_manager_text = dept_manager_text.reset_index(drop = True)
dept_manager_text.head(5)

In [None]:
set_mongo_dataframe(conn_str, 'ds2002-Midterm', 'film_text' , dept_manager_text)

# Load all of this data into it's own data warehouse on my local SQL server

In [None]:
#Professor Tupitza's function for setting a table in a sql server. 
def set_dataframe(user_id, pwd, host_name, db_name, df, table_name, pk_column, db_operation):
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=360)
    connection = sqlEngine.connect()
        
    if db_operation == "insert":
        df.to_sql(table_name, schema = db_name, con = sqlEngine, index=False, if_exists='replace')
        #connection.execute(text(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});"))
            
    elif db_operation == "update":
        df.to_sql(table_name, schema=db_name,con=connection, index=False, if_exists='append')
    
    connection.close()

In [None]:
conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}"
sqlEngine = create_engine(conn_str, pool_recycle=3600)

conn = sqlEngine.connect()

conn.execute(text("DROP DATABASE IF EXISTS `ds2002-Midterm`;"))
conn.execute(text("CREATE DATABASE `ds2002-Midterm`;"))

dst_dbname = 'ds2002-Midterm'

conn.close()


In [None]:
tables = [('fact_employees', df_fact_femployees, 'emp_no'), ('employees', employees, 'emp_no'), ('departments', departments, 'dep_no'), ('dept_emp', dept_emp, 'dep_no'), 
          ('salaries', salaries, 'emp_no'), ('titles', titles, 'emp_no'), ('dept_manager_text', dept_manager_text, 'dept_no'),]

db_operation = "insert"

for table_name, dataframe, primary_key in tables:
    set_dataframe(user_id, pwd, host_name, dst_dbname, dataframe, table_name, primary_key, db_operation)

# Data mart is finished

In [None]:
query = "SELECT * FROM dim_date;"
df_dim_date = get_dataframe(user_id, pwd, host_name, 'ds2002-midterm', text(query))
df_dim_date.head(2)

In [None]:
import datetime

#Changes datetime.date to be datetime.datetime for compatibility with mongo. 

for col in df_dim_date.columns:
    df_dim_date[col] = df_dim_date[col].apply(lambda d: datetime.datetime.combine(d, datetime.time.min) if isinstance(d, datetime.date) else d)
        


In [None]:
set_mongo_dataframe(conn_str, 'ds2002-Midterm', 'dim_date', df_dim_date)

In [None]:
#Update the fact_films so that it has the date_key's 
query = "SELECT * FROM fact_employees;"
fact_employees = get_dataframe(user_id, pwd, host_name, 'ds2002-midterm', text(query))

In [None]:
set_mongo_dataframe(conn_str, 'ds2002-Midterm', 'fact_employees', fact_employees)

## Screenshots turned in alongside this notebook for proof

In [None]:
from IPython.display import Image
Image(filename='mySQL.png') 

In [None]:
Image(filename='mongoDB.png')