# Project 1: ETL Processing with HR Database

Background:
I am currently a Data Scientist for a small (but growing) IT company based in Charlottesville, VA that has multiple remote employees throughout the United States. The HR department is in need of querying their SQL database and transforming its tables in order to find which employees are based in the US who are able to commute quickly to a conference held in Florida. My goal is to extract from the SQL database, transform the data to facilitate access to desired information, load into a new database specifically intended for information on eligible employees for conference attendance, and then load into a CSV file so multiple people can access it quickly. Because they are a growing company, loading the current SQL database into a NoSQL database is in the foreseeable future to increase scalability, but a local CSV file will do for now since this task is only intended to aid conference planning.

### Importing Necessary Libraries

In [2]:
import os
import numpy
import pyodbc 
import pandas as pd
from sqlalchemy import create_engine

### Assigning Connection Variables for MySQL Server

In [3]:
host_name = "localhost"
host_ip = "127.0.0.1"
port = "3306"

user_id = "root"
pwd = "Rocco#0630sofo"
src_dbname = "Human_resources"
dst_dbname = "Eligible_forConference"

### Defining Functions for Setting/Getting Dataframes

In [4]:
def get_dataframe(user_id, pwd, host_name, db_name, sql_query):
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    dframe = pd.read_sql(sql_query, connection);
    connection.close()
    
    return dframe


def set_dataframe(user_id, pwd, host_name, db_name, df, table_name, pk_column, db_operation):
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        sqlEngine.execute(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});")
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()

### Populating Dimension Tables

In [14]:
sql_locations = "SELECT * FROM Human_resources.locations;"
df_locations = get_dataframe(user_id, pwd, host_name, src_dbname, sql_locations)

in_US = df_locations["country_id"] == "US"
df_locations = df_locations[in_US]
df_locations.head(2)

Unnamed: 0,location_id,street_address,postal_code,city,state_province,country_id
0,1400,2014 Jabberwocky Rd,26192,Southlake,Texas,US
1,1500,2011 Interiors Blvd,99236,South San Francisco,California,US


In [15]:
sql_departments = "SELECT * FROM Human_resources.departments;"
df_departments = get_dataframe(user_id, pwd, host_name, src_dbname, sql_departments)
df_departments.head(2)

Unnamed: 0,department_id,department_name,location_id
0,1,Administration,1700
1,2,Marketing,1800


In [16]:
sql_employees = "SELECT * FROM Human_resources.employees;"
df_employees = get_dataframe(user_id, pwd, host_name, src_dbname, sql_employees)
df_employees.head(2)

Unnamed: 0,employee_id,first_name,last_name,email,phone_number,hire_date,job_id,salary,manager_id,department_id
0,100,Steven,King,steven.king@sqltutorial.org,515.123.4567,1987-06-17,4,24000.0,,9
1,101,Neena,Kochhar,neena.kochhar@sqltutorial.org,515.123.4568,1989-09-21,5,17000.0,100.0,9


#### Transformations

In [24]:
drop_cols = ['street_address','postcal_code']
df_locations.drop(drop_cols, axis=1, inplace=True)
df_locations.rename(columns={"location_id":"location_key"}, inplace=True)

df_locations.head(2)

Unnamed: 0,location_key,city,state_province,country_id
0,1400,Southlake,Texas,US
1,1500,South San Francisco,California,US


In [25]:
df_departments.rename(columns={"department_id":"department_key"}, inplace=True)

df_departments.head(2)

Unnamed: 0,department_key,department_name,location_id
0,1,Administration,1700
1,2,Marketing,1800


In [26]:
drop_cols = ['hire_date','salary']
df_employees.drop(drop_cols, axis=1, inplace=True)
df_employees.rename(columns={"employee_id":"employee_key"}, inplace=True)

df_employees.head(2)

Unnamed: 0,employee_key,first_name,last_name,email,phone_number,job_id,manager_id,department_id
0,100,Steven,King,steven.king@sqltutorial.org,515.123.4567,4,,9
1,101,Neena,Kochhar,neena.kochhar@sqltutorial.org,515.123.4568,5,100.0,9


#### Loading the Transformed DataFrames into the New Data Warehouse by Creating New Tables

In [27]:
exec_sql = f"CREATE DATABASE `{dst_dbname}`;"

conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}"
sqlEngine = create_engine(conn_str, pool_recycle=3600)
sqlEngine.execute(exec_sql) #create db
sqlEngine.execute("USE Eligible_forConference;") # select new db

<sqlalchemy.engine.cursor.LegacyCursorResult at 0x7f9095cd2f40>

In [28]:
db_operation = "insert"

tables = [('dim_locations', df_locations, 'location_key'),
          ('dim_departments', df_departments, 'department_key'),
          ('dim_employees', df_employees, 'employee_key')]

In [29]:
for table_name, dataframe, primary_key in tables:
    set_dataframe(user_id, pwd, host_name, dst_dbname, dataframe, table_name, primary_key, db_operation)

### Populating the Fact Table

In [58]:
sql_fact_table = """
SELECT `locations`.`location_id`,
    `employees`.`employee_id`,
    `employees`.`first_name`,
    `employees`.`last_name`,
    `employees`.`email`,
    `employees`.`phone_number`,
    `employees`.`job_id`,
    `employees`.`manager_id`,
    `employees`.`department_id` AS `employee_department_id`,
     `locations`.`city`,
    `locations`.`state_province`,
    `locations`.`country_id`,
    `departments`.`department_id`,
    `departments`.`department_name`,
    `departments`.`location_id` AS `department_location_id`
     FROM Human_resources.locations
 INNER JOIN Human_resources.departments
    ON departments.location_id = locations.location_id
 RIGHT OUTER JOIN Human_resources.employees
     ON departments.department_id = employees.department_id;
"""

In [59]:
df_fact_table = get_dataframe(user_id, pwd, host_name, src_dbname, sql_fact_table)
df_fact_table.head(2)

Unnamed: 0,location_id,employee_id,first_name,last_name,email,phone_number,job_id,manager_id,employee_department_id,city,state_province,country_id,department_id,department_name,department_location_id
0,1700,100,Steven,King,steven.king@sqltutorial.org,515.123.4567,4,,9,Seattle,Washington,US,9,Executive,1700
1,1700,101,Neena,Kochhar,neena.kochhar@sqltutorial.org,515.123.4568,5,100.0,9,Seattle,Washington,US,9,Executive,1700


In [65]:
drop_columns = ["location_id", "department_id"]
df_fact_table.drop(drop_columns, axis=1, inplace=True)
df_fact_table.insert(0, "EligibleEmployee_key", range(0, df_fact_table.shape[0]))
df_fact_table.head(2)

Unnamed: 0,EligibleEmployee_key,employee_id,first_name,last_name,email,phone_number,job_id,manager_id,employee_department_id,city,state_province,country_id,department_name,department_location_id
0,0,100,Steven,King,steven.king@sqltutorial.org,515.123.4567,4,,9,Seattle,Washington,US,Executive,1700
1,1,101,Neena,Kochhar,neena.kochhar@sqltutorial.org,515.123.4568,5,100.0,9,Seattle,Washington,US,Executive,1700


In [75]:
table_name = "Eligible_employees"
primary_key = "EligibleEmployee_key"
db_operatioin = "insert"

set_dataframe(user_id, pwd, host_name, dst_dbname, df_fact_table, table_name, primary_key, db_operation)

### Load Into CSV File Format

In [79]:
folder_path = '/Users/annsofo/Documents/UVA/Spring 2022/DS 3002/Project 1'
file_path = '/Users/annsofo/Documents/UVA/Spring 2022/DS 3002/Project 1/Eligible_employees.csv'
os.makedirs(folder_path, exist_ok=True)  
try:
    df_fact_table.to_csv(file_path)
except:
    print("An exception occured.")