# Data Ingestion Pipeline - Codecademy

For this project, you’ll build a data engineering pipeline to regularly transform a messy database into a clean source of truth for an analytics team.

### Scenario
You’ll be working with a mock database of long-term cancelled subscribers for a fictional subscription company. This database is regularly updated from multiple sources, and needs to be routinely cleaned and transformed into usable shape with as little human intervention as possible.

### Project Objectives
* Complete a project to add to your portfolio
* Use Jupyter notebooks to explore and clean a dataset
* Use Python to automate data cleaning and transformation using unit tests and error logging
* Use Bash scripts to automate file management and run scripts

### Prerequisites
* Intermediate and Advanced Python 3
* Pandas
* Bash Scripting

## Inspect and Clean the data
Import the tables in cademycode.db as dataframes. Inspect the tables for missing or invalid data and perform any data cleaning operations you think are necessary.

Start by importing libraries and creating a connection to the database and a cursor object

In [1]:
import sqlite3
import pandas as pd

In [2]:
con = sqlite3.connect("dev/cademycode.db")
cur = con.cursor()

Explore what tables there are in the database.

In [5]:
schema_query = cur.execute("""SELECT name, sql FROM sqlite_master""")
schema_query.fetchall()

[('cademycode_students',
  'CREATE TABLE cademycode_students (\n\tuuid INTEGER, \n\tname VARCHAR, \n\tdob VARCHAR, \n\tsex TEXT, \n\tcontact_info JSON, \n\tjob_id VARCHAR, \n\tnum_course_taken VARCHAR, \n\tcurrent_career_path_id VARCHAR, \n\ttime_spent_hrs VARCHAR\n)'),
 ('cademycode_courses',
  'CREATE TABLE cademycode_courses (\n\tcareer_path_id BIGINT, \n\tcareer_path_name TEXT, \n\thours_to_complete BIGINT\n)'),
 ('cademycode_student_jobs',
  'CREATE TABLE cademycode_student_jobs (\n\tjob_id BIGINT, \n\tjob_category TEXT, \n\tavg_salary BIGINT\n)'),
 ('students_data',
  'CREATE TABLE "students_data" (\n"index" INTEGER,\n  "uuid" INTEGER,\n  "name" TEXT,\n  "dob" TEXT,\n  "sex" TEXT,\n  "mailing_address" TEXT,\n  "email" TEXT,\n  "job_id" INTEGER,\n  "num_course_taken" INTEGER,\n  "career_path_id" INTEGER,\n  "time_spent_hrs" REAL,\n  "career_path_name" TEXT,\n  "hours_to_complete" REAL,\n  "job_category" TEXT,\n  "avg_salary" INTEGER,\n  "_completed_path" INTEGER\n)')]

###  Import and explore 'cademycode_students' table

Create a query and read it into a dataframe.
Then we analyze the dataframe to see what data there is and if need to clean it.

In [65]:
students_query = """SELECT * FROM cademycode_students"""
df_students = pd.read_sql_query(students_query, con)

In [66]:
print("Students DF info")
df_students.info()

Students DF info
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 5000 entries, 0 to 4999
Data columns (total 9 columns):
 #   Column                  Non-Null Count  Dtype 
---  ------                  --------------  ----- 
 0   uuid                    5000 non-null   int64 
 1   name                    5000 non-null   object
 2   dob                     5000 non-null   object
 3   sex                     5000 non-null   object
 4   contact_info            5000 non-null   object
 5   job_id                  4995 non-null   object
 6   num_course_taken        4749 non-null   object
 7   current_career_path_id  4529 non-null   object
 8   time_spent_hrs          4529 non-null   object
dtypes: int64(1), object(8)
memory usage: 351.7+ KB


In [67]:
print("\nStudents DF first 10 rows")
df_students.head(10)


Students DF first 10 rows


Unnamed: 0,uuid,name,dob,sex,contact_info,job_id,num_course_taken,current_career_path_id,time_spent_hrs
0,1,Annabelle Avery,1943-07-03,F,"{""mailing_address"": ""303 N Timber Key, Irondal...",7.0,6.0,1.0,4.99
1,2,Micah Rubio,1991-02-07,M,"{""mailing_address"": ""767 Crescent Fair, Shoals...",7.0,5.0,8.0,4.4
2,3,Hosea Dale,1989-12-07,M,"{""mailing_address"": ""P.O. Box 41269, St. Bonav...",7.0,8.0,8.0,6.74
3,4,Mariann Kirk,1988-07-31,F,"{""mailing_address"": ""517 SE Wintergreen Isle, ...",6.0,7.0,9.0,12.31
4,5,Lucio Alexander,1963-08-31,M,"{""mailing_address"": ""18 Cinder Cliff, Doyles b...",7.0,14.0,3.0,5.64
5,6,Shavonda Mcmahon,1989-10-15,F,"{""mailing_address"": ""P.O. Box 81591, Tarpon Sp...",6.0,10.0,3.0,10.12
6,7,Terrell Bleijenberg,1959-05-05,M,"{""mailing_address"": ""P.O. Box 53471, Oskaloosa...",2.0,9.0,8.0,24.17
7,8,Stanford Allan,1997-11-22,M,"{""mailing_address"": ""255 Spring Avenue, Point ...",3.0,3.0,1.0,19.54
8,9,Tricia Delacruz,1961-10-20,F,"{""mailing_address"": ""997 Dewy Apple, Lake Lind...",1.0,6.0,9.0,1.75
9,10,Regenia van der Helm,1999-02-23,N,"{""mailing_address"": ""220 Middle Ridge, Falcon ...",5.0,7.0,6.0,13.55


From the dataframe info we know there are missing values

In [68]:
df_students.isna().sum() / df_students.shape[0] * 100

uuid                      0.00
name                      0.00
dob                       0.00
sex                       0.00
contact_info              0.00
job_id                    0.10
num_course_taken          5.02
current_career_path_id    9.42
time_spent_hrs            9.42
dtype: float64

Inspect examples of missing values for each of the columns

In [69]:
print("job_id values")
df_students[df_students['job_id'].isna()]

job_id values


Unnamed: 0,uuid,name,dob,sex,contact_info,job_id,num_course_taken,current_career_path_id,time_spent_hrs
162,163,Glen Riley,2002-08-22,M,"{""mailing_address"": ""P.O. Box 37267, Cornlea v...",,8.0,3.0,5.7
757,758,Mercedez Vorberg,2002-03-25,F,"{""mailing_address"": ""284 Cedar Seventh, Virden...",,15.0,4.0,4.14
854,855,Kurt Ho,2002-05-29,M,"{""mailing_address"": ""P.O. Box 27254, Olin, New...",,0.0,8.0,23.72
1029,1030,Penny Gaines,2002-03-01,N,"{""mailing_address"": ""138 Misty Vale, Stockton ...",,15.0,4.0,16.25
1542,1543,Frederick Reilly,2002-11-13,M,"{""mailing_address"": ""P.O. Box 40769, Quakervil...",,7.0,9.0,21.32


In [70]:
print("num_course_taken values")
df_students[df_students['num_course_taken'].isna()]

num_course_taken values


Unnamed: 0,uuid,name,dob,sex,contact_info,job_id,num_course_taken,current_career_path_id,time_spent_hrs
25,26,Doug Browning,1970-06-08,M,"{""mailing_address"": ""P.O. Box 15845, Devine, F...",7.0,,5.0,1.92
26,27,Damon Schrauwen,1953-10-31,M,"{""mailing_address"": ""P.O. Box 84659, Maben, Ge...",4.0,,10.0,3.73
51,52,Alisa Neil,1977-05-28,F,"{""mailing_address"": ""16 View Annex, Mosses, No...",5.0,,8.0,22.86
70,71,Chauncey Hooper,1962-04-07,M,"{""mailing_address"": ""955 Dewy Flat, Slaughterv...",3.0,,3.0,3.97
80,81,Ellyn van Heest,1984-06-28,F,"{""mailing_address"": ""872 Cider Glade, Chicken,...",3.0,,10.0,12.39
...,...,...,...,...,...,...,...,...,...
4889,4890,Tegan Cochran,1970-11-08,F,"{""mailing_address"": ""106 Sunny Nook, Vernal, G...",5.0,,8.0,22.75
4898,4899,Ruthann Oliver,1998-05-22,F,"{""mailing_address"": ""644 Merry Island, Green V...",3.0,,7.0,21.27
4914,4915,Ernest Holmes,1995-03-11,M,"{""mailing_address"": ""872 Wintergreen Harbor, G...",7.0,,9.0,26.5
4980,4981,Brice Franklin,1946-12-01,M,"{""mailing_address"": ""947 Panda Way, New Bedfor...",4.0,,5.0,8.66


In [71]:
print("current_career_path_id values")
df_students[df_students['current_career_path_id'].isna()]

current_career_path_id values


Unnamed: 0,uuid,name,dob,sex,contact_info,job_id,num_course_taken,current_career_path_id,time_spent_hrs
15,16,Norene Dalton,1976-04-30,F,"{""mailing_address"": ""130 Wishing Essex, Branch...",6.0,0.0,,
19,20,Sofia van Steenbergen,1990-02-21,N,"{""mailing_address"": ""634 Clear Barn Dell, Beam...",7.0,13.0,,
30,31,Christoper Warner,1989-12-28,M,"{""mailing_address"": ""556 Stony Highlands, Drai...",2.0,5.0,,
49,50,Antony Horne,1996-05-29,M,"{""mailing_address"": ""P.O. Box 78685, Lenox, Te...",3.0,2.0,,
54,55,Omar Bunk,1955-11-08,M,"{""mailing_address"": ""445 Dale Hollow, Vermont ...",3.0,14.0,,
...,...,...,...,...,...,...,...,...,...
4904,4905,Eduardo Daniel,2004-06-18,M,"{""mailing_address"": ""598 Deer Trace, Forest Gr...",8.0,12.0,,
4922,4923,Francisco van Ede,1961-04-26,M,"{""mailing_address"": ""282 Fourth Trace, Carter ...",7.0,5.0,,
4948,4949,Dewitt van Malsem,1949-03-08,M,"{""mailing_address"": ""423 Course Trail, Wilmot,...",4.0,7.0,,
4956,4957,Todd Stamhuis,1961-06-15,M,"{""mailing_address"": ""251 Grand Rose Underpass,...",7.0,8.0,,


In [72]:
print("time_spent_hrs values")
df_students[df_students['time_spent_hrs'].isna()]

time_spent_hrs values


Unnamed: 0,uuid,name,dob,sex,contact_info,job_id,num_course_taken,current_career_path_id,time_spent_hrs
15,16,Norene Dalton,1976-04-30,F,"{""mailing_address"": ""130 Wishing Essex, Branch...",6.0,0.0,,
19,20,Sofia van Steenbergen,1990-02-21,N,"{""mailing_address"": ""634 Clear Barn Dell, Beam...",7.0,13.0,,
30,31,Christoper Warner,1989-12-28,M,"{""mailing_address"": ""556 Stony Highlands, Drai...",2.0,5.0,,
49,50,Antony Horne,1996-05-29,M,"{""mailing_address"": ""P.O. Box 78685, Lenox, Te...",3.0,2.0,,
54,55,Omar Bunk,1955-11-08,M,"{""mailing_address"": ""445 Dale Hollow, Vermont ...",3.0,14.0,,
...,...,...,...,...,...,...,...,...,...
4904,4905,Eduardo Daniel,2004-06-18,M,"{""mailing_address"": ""598 Deer Trace, Forest Gr...",8.0,12.0,,
4922,4923,Francisco van Ede,1961-04-26,M,"{""mailing_address"": ""282 Fourth Trace, Carter ...",7.0,5.0,,
4948,4949,Dewitt van Malsem,1949-03-08,M,"{""mailing_address"": ""423 Course Trail, Wilmot,...",4.0,7.0,,
4956,4957,Todd Stamhuis,1961-06-15,M,"{""mailing_address"": ""251 Grand Rose Underpass,...",7.0,8.0,,


In [73]:
df_students[df_students['num_course_taken'].isna()]

Unnamed: 0,uuid,name,dob,sex,contact_info,job_id,num_course_taken,current_career_path_id,time_spent_hrs
25,26,Doug Browning,1970-06-08,M,"{""mailing_address"": ""P.O. Box 15845, Devine, F...",7.0,,5.0,1.92
26,27,Damon Schrauwen,1953-10-31,M,"{""mailing_address"": ""P.O. Box 84659, Maben, Ge...",4.0,,10.0,3.73
51,52,Alisa Neil,1977-05-28,F,"{""mailing_address"": ""16 View Annex, Mosses, No...",5.0,,8.0,22.86
70,71,Chauncey Hooper,1962-04-07,M,"{""mailing_address"": ""955 Dewy Flat, Slaughterv...",3.0,,3.0,3.97
80,81,Ellyn van Heest,1984-06-28,F,"{""mailing_address"": ""872 Cider Glade, Chicken,...",3.0,,10.0,12.39
...,...,...,...,...,...,...,...,...,...
4889,4890,Tegan Cochran,1970-11-08,F,"{""mailing_address"": ""106 Sunny Nook, Vernal, G...",5.0,,8.0,22.75
4898,4899,Ruthann Oliver,1998-05-22,F,"{""mailing_address"": ""644 Merry Island, Green V...",3.0,,7.0,21.27
4914,4915,Ernest Holmes,1995-03-11,M,"{""mailing_address"": ""872 Wintergreen Harbor, G...",7.0,,9.0,26.5
4980,4981,Brice Franklin,1946-12-01,M,"{""mailing_address"": ""947 Panda Way, New Bedfor...",4.0,,5.0,8.66


It seems that time spent is related to the select career path as the same number of records are missing these, 
while job id and course taken are unrelated and seem to be missing randomly.

In [74]:
# Filling in missing values with 0

df_students['job_id'] = df_students['job_id'].fillna(0)
df_students['num_course_taken'] = df_students['num_course_taken'].fillna(0)
df_students['current_career_path_id'] = df_students['current_career_path_id'].fillna(0)
df_students['time_spent_hrs'] = df_students['time_spent_hrs'].fillna(0)

In [75]:
# Checking no missing values left

df_students.isna().sum() / df_students.shape[0] * 100

uuid                      0.0
name                      0.0
dob                       0.0
sex                       0.0
contact_info              0.0
job_id                    0.0
num_course_taken          0.0
current_career_path_id    0.0
time_spent_hrs            0.0
dtype: float64

In [76]:
# Looking again at the dataframe info,
# we might want to assign correct datatypes to numerical columns

print("Students DF info")
df_students.info()

Students DF info
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 5000 entries, 0 to 4999
Data columns (total 9 columns):
 #   Column                  Non-Null Count  Dtype 
---  ------                  --------------  ----- 
 0   uuid                    5000 non-null   int64 
 1   name                    5000 non-null   object
 2   dob                     5000 non-null   object
 3   sex                     5000 non-null   object
 4   contact_info            5000 non-null   object
 5   job_id                  5000 non-null   object
 6   num_course_taken        5000 non-null   object
 7   current_career_path_id  5000 non-null   object
 8   time_spent_hrs          5000 non-null   object
dtypes: int64(1), object(8)
memory usage: 351.7+ KB


In [77]:
# Changing datatypes

df_students = df_students.astype({'job_id': 'float64', 'num_course_taken': 'float64', 'current_career_path_id': 'float64', 'time_spent_hrs': 'float64'})
df_students = df_students.astype({'job_id': 'int64', 'num_course_taken': 'int64', 'current_career_path_id': 'int64'})

df_students.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 5000 entries, 0 to 4999
Data columns (total 9 columns):
 #   Column                  Non-Null Count  Dtype  
---  ------                  --------------  -----  
 0   uuid                    5000 non-null   int64  
 1   name                    5000 non-null   object 
 2   dob                     5000 non-null   object 
 3   sex                     5000 non-null   object 
 4   contact_info            5000 non-null   object 
 5   job_id                  5000 non-null   int64  
 6   num_course_taken        5000 non-null   int64  
 7   current_career_path_id  5000 non-null   int64  
 8   time_spent_hrs          5000 non-null   float64
dtypes: float64(1), int64(4), object(4)
memory usage: 351.7+ KB


For numerical values we can use describe method to see if any of the values jump out.

In [86]:
df_students.describe()

Unnamed: 0,uuid,job_id,num_course_taken,current_career_path_id,time_spent_hrs
count,5000.0,5000.0,5000.0,5000.0,5000.0
mean,2500.5,4.168,7.1554,4.9628,10.435382
std,1443.520003,2.15107,4.784415,3.169263,7.946934
min,1.0,0.0,0.0,0.0,0.0
25%,1250.75,2.0,3.0,2.0,3.68
50%,2500.5,4.0,7.0,5.0,9.665
75%,3750.25,6.0,11.0,8.0,15.84
max,5000.0,8.0,15.0,10.0,35.98


In column contact info we seem to have two pieces of information - email and mailing address, which we can extract into separate columns for ease of use.

In [78]:
df_students['contact_info'][0]

'{"mailing_address": "303 N Timber Key, Irondale, Wisconsin, 84736", "email": "annabelle_avery9376@woohoo.com"}'

The format represents a JSON, so we can unpack values using the following function that takes care of possible exceptions.

In [79]:
import json

# Function to convert to json
def to_json(row, field):
    try:
        return json.loads(row).get(field, None)
    except (json.JSONDecodeError, TypeError):
        print(f"Version {version}: Failed to parse JSON in row: {row}")
        return None

Now let's extract values and drop the contact info column as we no longer need it.

In [80]:
df_students['mailing_address'] = df_students['contact_info'].apply(lambda x: to_json(x, 'mailing_address'))
df_students['email'] = df_students['contact_info'].apply(lambda x: to_json(x, 'email'))
df_students = df_students.drop(columns= {'contact_info'})
df_students.head()

Unnamed: 0,uuid,name,dob,sex,job_id,num_course_taken,current_career_path_id,time_spent_hrs,mailing_address,email
0,1,Annabelle Avery,1943-07-03,F,7,6,1,4.99,"303 N Timber Key, Irondale, Wisconsin, 84736",annabelle_avery9376@woohoo.com
1,2,Micah Rubio,1991-02-07,M,7,5,8,4.4,"767 Crescent Fair, Shoals, Indiana, 37439",rubio6772@hmail.com
2,3,Hosea Dale,1989-12-07,M,7,8,8,6.74,"P.O. Box 41269, St. Bonaventure, Virginia, 83637",hosea_dale8084@coldmail.com
3,4,Mariann Kirk,1988-07-31,F,6,7,9,12.31,"517 SE Wintergreen Isle, Lane, Arkansas, 82242",kirk4005@hmail.com
4,5,Lucio Alexander,1963-08-31,M,7,14,3,5.64,"18 Cinder Cliff, Doyles borough, Rhode Island,...",alexander9810@hmail.com


In [81]:
# reorder colums logically
df_students = df_students[["uuid", "name", "dob", "sex", "mailing_address", "email", "job_id", "num_course_taken", "current_career_path_id", "time_spent_hrs"]]

Next we check if the dataframe contains any duplicates.

In [84]:
df_students_duplicates = df_students.duplicated().sum()
print(f'There are {df_students_duplicates} in the df_students dataframe')

There are 0 in the df_students dataframe


We can also check if the same people could have been recorded more than once by combining Name, DOB and email columns and checking for duplicates

In [87]:
df_students_person_info = df_students['name'] + " " + df_students['dob'] + " " + df_students['email']
df_students_person_info.duplicated().sum()

0

###  Import and explore 'cademycode_courses' table

In [None]:
courses_query = """SELECT * FROM cademycode_courses"""
df_courses = pd.read_sql_query(courses_query, con)

In [None]:
print('Courses DF shape')
df_courses.shape

In [None]:
df_courses = df_courses.astype(
    {'career_path_id': 'Int64', 'hours_to_complete': 'Int64'}
)

In [None]:
print("Courses DF info")
df_courses.info()

print("\nCourses DF first 10 rows")
df_courses.head(10)

In [None]:
print(df_courses.to_string())

###  Import and explore 'cademycode_courses' table

In [None]:
jobs_query = """SELECT * FROM cademycode_student_jobs"""
df_jobs = pd.read_sql_query(jobs_query, con)

In [None]:
print('Jobs DF shape')
df_jobs.shape

In [None]:
print("Jobs DF info")
df_jobs.info()

print("\nJobs DF first 10 rows")
df_jobs.head(20)

In [None]:
df_jobs = df_jobs.astype(
    {'job_id': 'Int64', 'avg_salary': 'Int64'}
)

In [None]:
df_jobs.head()

## Create the Output CSV ##

Use the cleaned tables to produce an analytics-ready SQLite database and flat CSV file. The final CSV should contain all the data the analysts might need in a single table.

###  Transforming the data

In [None]:
df_merged = pd.merge(df_students, df_courses,  how='left', left_on="career_path_id", right_on="career_path_id")

In [None]:
df_final = pd.merge(df_merged, df_jobs, how="left", left_on="job_id", right_on="job_id")

In [None]:
df_final.shape[0]

In [None]:
df_students.shape[0]

In [None]:
df_students_data['_completed_path'] = df_students_data.apply(
    lambda row: False if pd.isna(row['time_spent_hrs']) or pd.isna(row['hours_to_complete']) else row['time_spent_hrs'] > row['hours_to_complete'], 
    axis=1)

In [None]:
df_students_data.head()

In [None]:
print(df_students_data[df_students_data["_completed_path"] == True])

In [None]:
df_students_data.isnull().values.any()

In [None]:
df_students_data_person_info = df_students_data['name'] + " " + df_students_data['dob'] + " " + df_students_data['email']
df_students_data_person_info.nunique()

In [None]:
df_students_data.isna().sum()

### Loading data into a new table in sqlite3 and exporting as CSV ###

In [None]:
df_students_data.to_sql(name='students_data', con=con, if_exists="replace")

In [None]:
students_data_query = """SELECT * FROM students_data LIMIT 10"""

In [None]:
print(cur.execute(students_data_query).fetchall())

In [None]:
df_students_data.to_csv('students_data.csv', index=False)