# ETL Workflow

![ETL](https://www.slideteam.net/media/catalog/product/cache/960x720/t/h/three_components_of_etl_process_flow_model_slide01.jpg)


## Import Data from Multiple Sources

In [None]:
# Import Dependencies
import pandas as pd
import numpy as np
from sqlalchemy import create_engine, Column, Integer, String, Float, Text
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import Session

In [None]:
# Importing data from external sources

cohort = "https://docs.google.com/spreadsheets/d/e/2PACX-1vQtiJNdQr8WdkeyQan6vSJBG6hyGikzEQ-MOnU5aBltsyEmGX7fNJAL5eBRM89Tw6DvWkfSAELzLrAP/pub?output=csv"
people = "https://docs.google.com/spreadsheets/d/e/2PACX-1vSxKxfioMVd06orR0x6BeBWWaj3zbKz-yn78RfmZCfNUwLlqoa-gRu0FeZAYfc4v8_eqZtlSJqSiAdm/pub?output=csv"

cohort_df = pd.read_csv(cohort)
people_df = pd.read_csv(people)

In [None]:
cohort_df

In [None]:
people_df

## Extract Useful Data

In [None]:
# Extract specific columns from people_df
people_df = people_df[['X1', 'X2', 'X3', 'X5']]

# Rename columns
people_df = people_df.rename(index=str, columns={"X1":"name", 
                                                "X2":"year",
                                                "X3":"id",
                                                "X5":"position"})

In [None]:
# Rename columns
cohort_df = cohort_df.rename(index=str, columns={"y1":"course_id", 
                                                "y2":"instructor",
                                                "y3":"lecturer",
                                                "y4":"start_date",
                                                "y5":"class_size"})

## Data Transformation

In [None]:
# Transform course_id into 4 digit code
cohort_df['course_id'] = cohort_df['course_id'].astype(str).str.zfill(4)
cohort_df.head()

In [None]:
# Transform start_date to datetime object
cohort_df['start_date'] = pd.to_datetime(cohort_df['start_date'])
cohort_df.head()

In [None]:
# Create new table with groupby
classes_df = cohort_df.groupby(['instructor', 'lecturer']).size().reset_index(name='counts')
classes_df.head()

## Load Data into Data Warehouse

In [None]:
# Creating SQLite Database
Base = declarative_base()

In [None]:
# Define people table
class People(Base):
    __tablename__='people'
    id = Column(Integer, primary_key=True)
    name = Column(String)
    year = Column(Text)
    idn = Column(Text)
    position = Column(String)

In [None]:
# Create table
Base.metadata.tables

In [None]:
# Create the database engine
engine = create_engine("sqlite:///./db/data.sqlite")

In [None]:
# This is where the tables was created in the database
Base.metadata.create_all(engine)

In [None]:
# The ORM's "handle" to the database is the Session
session = Session(engine)

In [None]:
# Adding the data to the session

for i in people_df.index:
    session.add(People(
        name = people_df["name"][i], 
        year = people_df["year"][i],
        idn = people_df["id"][i],
        position = people_df["position"][i]
    ))

In [None]:
# Data has not been added to the database yet
engine.execute("SELECT * FROM People").fetchall()

In [None]:
# Use the new attribute to see the queue of data ready to go into the database
session.new

In [None]:
# Commit flushes whatever remaining changes remain to the database and commits the transaction
session.commit()

In [None]:
# Confirm nothing new to add
session.new

In [None]:
# Query the database to check the data
session.query(People.name, People.position).all()

In [None]:
# Define people table
class Cohort(Base):
    __tablename__='cohort'
    id = Column(Integer, primary_key=True)
    course_id = Column(Text)
    instructor = Column(String)
    lecturer = Column(String)
    start_date = Column(Text)
    class_size = Column(Float)
    
# Create table
Base.metadata.tables

# Create the database engine
engine = create_engine("sqlite:///./db/data.sqlite")

# This is where the tables was created in the database
Base.metadata.create_all(engine)

# The ORM's "handle" to the database is the Session
session = Session(engine)

# Adding the data to the session
for i in cohort_df.index:
    session.add(Cohort(
        course_id = cohort_df["course_id"][i], 
        instructor = cohort_df["instructor"][i],
        lecturer = cohort_df["lecturer"][i],
        start_date = cohort_df["start_date"][i],
        class_size = cohort_df["class_size"][i]
    ))

# Data has not been added to the database yet
engine.execute("SELECT * FROM Cohort").fetchall()

# Use the new attribute to see the queue of data ready to go into the database
session.new

# Commit flushes whatever remaining changes remain to the database and commits the transaction
session.commit()

# Confirm nothing new to add
session.new

# Query the database to check the data
session.query(Cohort.course_id, Cohort.start_date).all()

In [None]:
# Define people table
class Classes(Base):
    __tablename__='classes'
    id = Column(Integer, primary_key=True)
    instructor = Column(String)
    lecturer = Column(String)
    count = Column(Float)
    
# Create table
Base.metadata.tables

# Create the database engine
engine = create_engine("sqlite:///./db/data.sqlite")

# This is where the tables was created in the database
Base.metadata.create_all(engine)

# The ORM's "handle" to the database is the Session
session = Session(engine)

# Adding the data to the session
for i in classes_df.index:
    session.add(Classes(
        instructor = classes_df["instructor"][i],
        lecturer = classes_df["lecturer"][i],
        count = classes_df["counts"][i]
    ))

# Data has not been added to the database yet
engine.execute("SELECT * FROM Classes").fetchall()

# Use the new attribute to see the queue of data ready to go into the database
session.new

# Commit flushes whatever remaining changes remain to the database and commits the transaction
session.commit()

# Confirm nothing new to add
session.new

# Query the database to check the data
session.query(Classes.instructor, Classes.lecturer, Classes.count).all()

## Use Data in Database for Analysis

In [None]:
# Import dependencies
import sqlite3
import pandas as pd

In [None]:
# Building connection to the database file
con = sqlite3.connect('./db/data.sqlite')
cursor = con.cursor()

In [None]:
table_query = \
"""
SELECT 
    name
FROM 
    sqlite_schema
WHERE 
    type ='table' AND 
    name NOT LIKE 'sqlite_%'
"""

cursor.execute(table_query)

In [None]:
# Simple query
people_query = \
"""
SELECT * 
FROM People
"""

In [None]:
# Load into Pandas dataframe
people_df = pd.read_sql(people_query, con)
people_df.head()