# Codecademy Subscriber Pipeline Portfolio Project

For this project, we will build a data engineering pipeline to regularly transform a messy database into a clean source for an analytics team.

## Scenario

We will be working with a mock database of long-term cancelled subscribers for a fictional subscription company. This database is regularly updated from multiple sources, and needs to be routinely cleaned and transformed into usable shape with as little human intervention as possible.

## About the Data

It’s important to practice working with customer data, especially as a data engineer. But it would be unethical to share actual customer data, so we are using a realistic but entirely fictional dataset. Our version of the dataset is based around a fictional education company called Cademycode.

## Project Objectives

- Complete a project to add to portfolio
- Use Jupyter notebooks to explore and clean a dataset
- Use Python to automate data cleaning and transformation using unit tests and error logging
- Use Bash scripts to automate file management and run scripts

## Prerequisites

- Intermediate and Advanced Python 3
- Pandas
- Bash Scripting

## Project Tasks

- Set Up
- Inspect and Clean Data
- Clean the Output CSV
- Develop Unit Test and Logs
- Create a Bash Script
- Create a Readme
- Create a Writeup
- Save your Project


## Set Up

Download the starter kit and set up your working directory to explore the data! When you’re ready, connect to the starter database by loading `dev/cademycode.db` in a Jupyter notebook.

- Download the starter kit
- Unzip the starter kit
- You should get a folder containing a folder `/dev`
- Create a Jupyter notebook in the `/dev` folder for your initial exploration
- Use the sqlite3 Python package or SQLAlchemy to establish a database connection
- Use the database `/cademycode.db` for your development

In [1]:
import sqlite3
import pandas as pd

con = sqlite3.connect('dev/cademycode.db')
print('Connection to database successful.')

Connection to database successful.


## Inspect and Clean the data

Import the tables in `cademycode.db` as dataframes. Inspect the tables for missing or invalid data and perform any data cleaning operations you think are necessary.

Here are some tips to get started (but remember, there are many different routes to take):

- Use a `SELECT` query on `sqlite_master` to determine the names of the tables
- Use `pandas.read_sql_query` to read each table in as a DataFrame
- Get familiar with the data by using the `.head()` function to explore the first handful of rows in the database.
- Look for null values or invalide datatypes by using
- `.info()` to display a summary table of each column
- `.describe()` to calculate summary statistics for all numerical columns
- `.value_counts()` to display each column’s distinct values.

Let's begin by listing all tables from database.

In [2]:
tables = pd.read_sql_query("SELECT name FROM sqlite_master WHERE type='table';", con)
tables

Unnamed: 0,name
0,cademycode_students
1,cademycode_courses
2,cademycode_student_jobs


Now let's import these tables as DataFrames and inspect them.

In [3]:
def inspect_dataframes(dataframes):
    for table_name, df in dataframes.items():
        print(f"\nTable: {table_name}")
        display(df.head())
        print("\nInfo:")
        display(df.info())
        print("\nDescribe:")
        display(df.describe())
        for column in df.columns:
            print(f"\nValue counts for {column}:")
            display(df[column].value_counts())

table_names = tables['name'].tolist()
df = {table: pd.read_sql_query(f"SELECT * FROM {table}", con) for table in table_names}

inspect_dataframes(df)


Table: cademycode_students


Unnamed: 0,uuid,name,dob,sex,contact_info,job_id,num_course_taken,current_career_path_id,time_spent_hrs
0,1,Annabelle Avery,1943-07-03,F,"{""mailing_address"": ""303 N Timber Key, Irondal...",7.0,6.0,1.0,4.99
1,2,Micah Rubio,1991-02-07,M,"{""mailing_address"": ""767 Crescent Fair, Shoals...",7.0,5.0,8.0,4.4
2,3,Hosea Dale,1989-12-07,M,"{""mailing_address"": ""P.O. Box 41269, St. Bonav...",7.0,8.0,8.0,6.74
3,4,Mariann Kirk,1988-07-31,F,"{""mailing_address"": ""517 SE Wintergreen Isle, ...",6.0,7.0,9.0,12.31
4,5,Lucio Alexander,1963-08-31,M,"{""mailing_address"": ""18 Cinder Cliff, Doyles b...",7.0,14.0,3.0,5.64



Info:
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 5000 entries, 0 to 4999
Data columns (total 9 columns):
 #   Column                  Non-Null Count  Dtype 
---  ------                  --------------  ----- 
 0   uuid                    5000 non-null   int64 
 1   name                    5000 non-null   object
 2   dob                     5000 non-null   object
 3   sex                     5000 non-null   object
 4   contact_info            5000 non-null   object
 5   job_id                  4995 non-null   object
 6   num_course_taken        4749 non-null   object
 7   current_career_path_id  4529 non-null   object
 8   time_spent_hrs          4529 non-null   object
dtypes: int64(1), object(8)
memory usage: 351.7+ KB


None


Describe:


Unnamed: 0,uuid
count,5000.0
mean,2500.5
std,1443.520003
min,1.0
25%,1250.75
50%,2500.5
75%,3750.25
max,5000.0



Value counts for uuid:


uuid
1       1
3331    1
3338    1
3337    1
3336    1
       ..
1667    1
1666    1
1665    1
1664    1
5000    1
Name: count, Length: 5000, dtype: int64


Value counts for name:


name
Melvin Felt              2
Robbie Davies            2
Annabelle Avery          1
Kelley Munnickhuijsen    1
Rachel de Kruijff        1
                        ..
Cleopatra Singleton      1
Linwood Patrick          1
Marcia Beeldhouwer       1
Arnoldo Rodgers          1
Elton Otway              1
Name: count, Length: 4998, dtype: int64


Value counts for dob:


dob
1993-08-03    4
1955-05-27    4
1953-12-05    3
1995-06-13    3
1956-05-22    3
             ..
1960-09-21    1
1992-05-08    1
1979-03-22    1
1966-12-30    1
1994-12-23    1
Name: count, Length: 4492, dtype: int64


Value counts for sex:


sex
M    1995
F    1990
N    1015
Name: count, dtype: int64


Value counts for contact_info:


contact_info
{"mailing_address": "303 N Timber Key, Irondale, Wisconsin, 84736", "email": "annabelle_avery9376@woohoo.com"}      1
{"mailing_address": "768 Silent Skyway, Impact, Idaho, 90270", "email": "orval_devos4502@coldmail.com"}             1
{"mailing_address": "409 SW First Pike, Leary, North Carolina, 43428", "email": "rachel886@woohoo.com"}             1
{"mailing_address": "554 Broad Rose, Flagler Beach, Florida, 65799", "email": "allan8306@inlook.com"}               1
{"mailing_address": "872 Wagon Land, Guthrie Center, Colorado, 86498", "email": "isidro3025@woohoo.com"}            1
                                                                                                                   ..
{"mailing_address": "470 Essex Curve, Copan, Mississippi, 86309", "email": "cleopatra_singleton7791@inlook.com"}    1
{"mailing_address": "162 Iron Chase, Campbell Station, Oklahoma, 78795", "email": "patrick6416@inlook.com"}         1
{"mailing_address": "889 Mountain Alley, Qu


Value counts for job_id:


job_id
2.0    706
1.0    693
7.0    680
3.0    675
4.0    671
5.0    660
6.0    657
8.0    253
Name: count, dtype: int64


Value counts for num_course_taken:


num_course_taken
5.0     341
12.0    332
2.0     312
15.0    309
10.0    306
7.0     303
13.0    297
0.0     296
8.0     291
11.0    289
4.0     285
6.0     282
14.0    280
3.0     279
1.0     279
9.0     268
Name: count, dtype: int64


Value counts for current_career_path_id:


current_career_path_id
5.0     476
3.0     469
10.0    460
1.0     459
6.0     454
2.0     450
7.0     449
9.0     441
8.0     437
4.0     434
Name: count, dtype: int64


Value counts for time_spent_hrs:


time_spent_hrs
5.93     8
17.47    8
11.9     7
7.05     7
2.91     7
        ..
27.53    1
8.07     1
27.51    1
29.66    1
23.54    1
Name: count, Length: 2192, dtype: int64


Table: cademycode_courses


Unnamed: 0,career_path_id,career_path_name,hours_to_complete
0,1,data scientist,20
1,2,data engineer,20
2,3,data analyst,12
3,4,software engineering,25
4,5,backend engineer,18



Info:
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 10 entries, 0 to 9
Data columns (total 3 columns):
 #   Column             Non-Null Count  Dtype 
---  ------             --------------  ----- 
 0   career_path_id     10 non-null     int64 
 1   career_path_name   10 non-null     object
 2   hours_to_complete  10 non-null     int64 
dtypes: int64(2), object(1)
memory usage: 372.0+ bytes


None


Describe:


Unnamed: 0,career_path_id,hours_to_complete
count,10.0,10.0
mean,5.5,21.9
std,3.02765,6.707376
min,1.0,12.0
25%,3.25,18.5
50%,5.5,20.0
75%,7.75,26.5
max,10.0,35.0



Value counts for career_path_id:


career_path_id
1     1
2     1
3     1
4     1
5     1
6     1
7     1
8     1
9     1
10    1
Name: count, dtype: int64


Value counts for career_path_name:


career_path_name
data scientist               1
data engineer                1
data analyst                 1
software engineering         1
backend engineer             1
frontend engineer            1
iOS developer                1
android developer            1
machine learning engineer    1
ux/ui designer               1
Name: count, dtype: int64


Value counts for hours_to_complete:


hours_to_complete
20    3
27    2
12    1
25    1
18    1
35    1
15    1
Name: count, dtype: int64


Table: cademycode_student_jobs


Unnamed: 0,job_id,job_category,avg_salary
0,1,analytics,86000
1,2,engineer,101000
2,3,software developer,110000
3,4,creative,66000
4,5,financial services,135000



Info:
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 13 entries, 0 to 12
Data columns (total 3 columns):
 #   Column        Non-Null Count  Dtype 
---  ------        --------------  ----- 
 0   job_id        13 non-null     int64 
 1   job_category  13 non-null     object
 2   avg_salary    13 non-null     int64 
dtypes: int64(2), object(1)
memory usage: 444.0+ bytes


None


Describe:


Unnamed: 0,job_id,avg_salary
count,13.0,13.0
mean,4.384615,89230.769231
std,2.662657,34727.879881
min,0.0,10000.0
25%,3.0,66000.0
50%,4.0,86000.0
75%,6.0,110000.0
max,9.0,135000.0



Value counts for job_id:


job_id
3    2
4    2
5    2
1    1
2    1
6    1
7    1
8    1
9    1
0    1
Name: count, dtype: int64


Value counts for job_category:


job_category
software developer    2
creative              2
financial services    2
analytics             1
engineer              1
education             1
HR                    1
student               1
healthcare            1
other                 1
Name: count, dtype: int64


Value counts for avg_salary:


avg_salary
110000    2
66000     2
135000    2
80000     2
86000     1
101000    1
61000     1
10000     1
120000    1
Name: count, dtype: int64

Let's take a look at what insights we might get from this data:

### Table: `cademycode_students`
1. **Data Types**:
   - All columns, except `uuid` are `object`type even though they keep different types of data (date, numbers, etc.).

2. **Null Data**:
   - Columns `job_id`, `num_course_taken`, `current_career_path_id` and `time_spent_hrs` have null data.

3. **Doubled data**:
   - Two names (`Melvin Felt` and `Robbie Davies`) have been counted twice.

4. **Gender Distribution**:
   - We have the same proportion of men (`M`) and women (`F`) in the courses, with a significant number of `N` (non specified).

5. **Contact Information**:
   - Column `contact_info` has a `mailing_address` and an `email`.

6. **Job IDs**:
   - Distinct values for `job_id`, probably related to `cademycode_student_jobs` table.

7. **Number of courses taken**:
   - `num_course_taken` indicates the number of courses users took.

8. **Time Spent**:
   - `time_spent_hrs` shows time spent from users in courses.

### Table: `cademycode_courses`
1. **Table Data**:
   - There are 10 courses added, with the necessarty hours to complete each.

2. **Data Integrity**:
   - Values are complete and coherent, as there is no apparent issues.

### Table: `cademycode_student_jobs`
1. **Category and Pay**:
   - Table contains job category and average pay for each profession.

2. **Pay Statistics**:
   - Average pay is 89,230.77, with a standard deviation of 34,727.88, with a minimum of 10,000.00 and a maximum of 135,000.00

3. **Doubled Values**:
   - There are doubled values for `job_id`.

## Next Steps

1. **Data Tidying**:
   - Correct data type in columns for `cademycode_students`.
   - Treat null values for columns `job_id`, `num_course_taken`, `current_career_path_id` and `time_spent_hrs`.
   - Solve doubled data in names and job id.

2. **Data Transformation**:
   - Dividing `contact_info` into separate columns for `mailing_address` and `email`.
   - Join related tables (i.e., joining `job_id` to `cademycode_students` and `cademycode_student_jobs`).

3. **Final Table Creation**:
   - Build one unique final table, with joined data tidy data.

Let's begin by converting columns `dob`, `job_id`, `num_course_taken`, `current_career_path_id` e `time_spent_hrs` to appropriate data types.

In [4]:
students_df = df['cademycode_students'].copy()

students_df['dob'] = pd.to_datetime(students_df['dob'], errors='coerce')
students_df['job_id'] = pd.to_numeric(students_df['job_id'], errors='coerce')
students_df['num_course_taken'] = pd.to_numeric(students_df['num_course_taken'], errors='coerce')
students_df['current_career_path_id'] = pd.to_numeric(students_df['current_career_path_id'], errors='coerce')
students_df['time_spent_hrs'] = pd.to_numeric(students_df['time_spent_hrs'], errors='coerce')

print('Data Types after Convertion:')
students_df.dtypes

Data Types after Convertion:


uuid                               int64
name                              object
dob                       datetime64[ns]
sex                               object
contact_info                      object
job_id                           float64
num_course_taken                 float64
current_career_path_id           float64
time_spent_hrs                   float64
dtype: object

Now we can treat null values for columns `job_id`, `num_course_taken`, `current_career_path_id` e `time_spent_hrs`.

To deal with null values, we can:

- Fill nulls with standard values (i.e. 0);
- Fill nulls with median values for column;
- Remove rows with null values (if applicable).

In this case, we will fill `job_id` e `current_career_path_id` with `0` (indicating 'unemployed' and 'not enrolled') and then use median for `num_course_taken` e `time_spent_hrs`.

In [5]:
students_df.loc[:, 'job_id'] = students_df['job_id'].fillna(0)
students_df.loc[:, 'current_career_path_id'] = students_df['current_career_path_id'].fillna(0)

students_df.loc[:, 'num_course_taken'] = students_df['num_course_taken'].fillna(students_df['num_course_taken'].median())
students_df.loc[:, 'time_spent_hrs'] = students_df['time_spent_hrs'].fillna(students_df['time_spent_hrs'].median())

print("Null values after filling:")
students_df.isnull().sum()

Null values after filling:


uuid                      0
name                      0
dob                       0
sex                       0
contact_info              0
job_id                    0
num_course_taken          0
current_career_path_id    0
time_spent_hrs            0
dtype: int64

Now let's identify and treat duplication data in `cademycode_students`. First, let's show all rows with the same `name`:

In [6]:
duplicate_names = ['Melvin Felt', 'Robbie Davies']
duplicate_rows = students_df[students_df['name'].isin(duplicate_names)]

print('Rows with duplicate names:')
display(duplicate_rows)

Rows with duplicate names:


Unnamed: 0,uuid,name,dob,sex,contact_info,job_id,num_course_taken,current_career_path_id,time_spent_hrs
534,535,Robbie Davies,1965-06-11,F,"{""mailing_address"": ""666 Dusty Land, Pangburn,...",5.0,15.0,3.0,0.53
1118,1119,Melvin Felt,1955-07-30,N,"{""mailing_address"": ""804 Rustic Elm, Geneseo v...",3.0,6.0,10.0,2.77
2116,2117,Robbie Davies,1942-10-17,M,"{""mailing_address"": ""111 Squaw Alley, Buckeye,...",7.0,0.0,0.0,10.67
3583,3584,Melvin Felt,1987-08-25,N,"{""mailing_address"": ""54 Noble Loaf Run, Lakela...",3.0,11.0,1.0,0.47


This table shows that the names are not duplicated, instead they are from homonymous people. To be sure, let's count duplicated data in the dataframe:

In [7]:
duplicates = students_df.duplicated(subset=['name', 'dob', 'sex', 'contact_info'])
print(f"Number of duplicates found: {duplicates.sum()}")

# Mostrar duplicações
print("Duplicates found:")
display(students_df[duplicates])

Number of duplicates found: 0
Duplicates found:


Unnamed: 0,uuid,name,dob,sex,contact_info,job_id,num_course_taken,current_career_path_id,time_spent_hrs


As in fact there is no duplicated rows, there is no need to remove these entries. So let's go ahead and divide `column_info` into `mailing_address` and `email`:

In [8]:
import json

def extract_contact_info(contact_info):
    try:
        # Here we extracted data as a json object because the data in contact_info is in a json format.
        info = json.loads(contact_info.replace("'", '"'))
        return pd.Series([info.get('mailing_address'), info.get('email')])
    except json.JSONDecoderError:
        return pd.Series([None, None])

students_df[['mailing_address', 'email']] = students_df['contact_info'].apply(extract_contact_info)
students_df.drop(columns=['contact_info'], inplace=True)

print('Data after contact_info split:')
display(students_df.head())

Data after contact_info split:


Unnamed: 0,uuid,name,dob,sex,job_id,num_course_taken,current_career_path_id,time_spent_hrs,mailing_address,email
0,1,Annabelle Avery,1943-07-03,F,7.0,6.0,1.0,4.99,"303 N Timber Key, Irondale, Wisconsin, 84736",annabelle_avery9376@woohoo.com
1,2,Micah Rubio,1991-02-07,M,7.0,5.0,8.0,4.4,"767 Crescent Fair, Shoals, Indiana, 37439",rubio6772@hmail.com
2,3,Hosea Dale,1989-12-07,M,7.0,8.0,8.0,6.74,"P.O. Box 41269, St. Bonaventure, Virginia, 83637",hosea_dale8084@coldmail.com
3,4,Mariann Kirk,1988-07-31,F,6.0,7.0,9.0,12.31,"517 SE Wintergreen Isle, Lane, Arkansas, 82242",kirk4005@hmail.com
4,5,Lucio Alexander,1963-08-31,M,7.0,14.0,3.0,5.64,"18 Cinder Cliff, Doyles borough, Rhode Island,...",alexander9810@hmail.com


Now let's check for duplicate data in the other tables.

In [9]:
courses_df = df['cademycode_courses'].copy()
course_duplicates = courses_df.duplicated()

print(f"Duplications find in cademycode_courses: {course_duplicates.sum()}")

if course_duplicates.sum() > 0 : display(courses_df[course_duplicates])
else : print('No duplication found on cademycode_courses.')

print('----------')

jobs_df = df['cademycode_student_jobs'].copy()
jobs_duplicates = jobs_df.duplicated()

print(f"Duplications find in cademycode_student_jobs: {jobs_duplicates.sum()}")

if jobs_duplicates.sum() > 0 : display(jobs_df[jobs_duplicates])
else : print('No duplication found on cademycode_student_jobs.')

Duplications find in cademycode_courses: 0
No duplication found on cademycode_courses.
----------
Duplications find in cademycode_student_jobs: 3


Unnamed: 0,job_id,job_category,avg_salary
10,3,software developer,110000
11,4,creative,66000
12,5,financial services,135000


In [10]:
jobs_df_cleaned = jobs_df.drop_duplicates()

print("Table jobs_df after duplicates removal:")
display(jobs_df_cleaned)

Table jobs_df after duplicates removal:


Unnamed: 0,job_id,job_category,avg_salary
0,1,analytics,86000
1,2,engineer,101000
2,3,software developer,110000
3,4,creative,66000
4,5,financial services,135000
5,6,education,61000
6,7,HR,80000
7,8,student,10000
8,9,healthcare,120000
9,0,other,80000


Now we can join `students_df`, `jobs_df` and `courses_df` tables to create a final consolidated table.

In [11]:
merged_df_cleaned = pd.merge(students_df, jobs_df_cleaned, how='left', left_on='job_id', right_on='job_id')
final_df_cleaned = pd.merge(merged_df_cleaned, courses_df, how='left', left_on='current_career_path_id', right_on='career_path_id')

display(final_df_cleaned.head())

Unnamed: 0,uuid,name,dob,sex,job_id,num_course_taken,current_career_path_id,time_spent_hrs,mailing_address,email,job_category,avg_salary,career_path_id,career_path_name,hours_to_complete
0,1,Annabelle Avery,1943-07-03,F,7.0,6.0,1.0,4.99,"303 N Timber Key, Irondale, Wisconsin, 84736",annabelle_avery9376@woohoo.com,HR,80000,1.0,data scientist,20.0
1,2,Micah Rubio,1991-02-07,M,7.0,5.0,8.0,4.4,"767 Crescent Fair, Shoals, Indiana, 37439",rubio6772@hmail.com,HR,80000,8.0,android developer,27.0
2,3,Hosea Dale,1989-12-07,M,7.0,8.0,8.0,6.74,"P.O. Box 41269, St. Bonaventure, Virginia, 83637",hosea_dale8084@coldmail.com,HR,80000,8.0,android developer,27.0
3,4,Mariann Kirk,1988-07-31,F,6.0,7.0,9.0,12.31,"517 SE Wintergreen Isle, Lane, Arkansas, 82242",kirk4005@hmail.com,education,61000,9.0,machine learning engineer,35.0
4,5,Lucio Alexander,1963-08-31,M,7.0,14.0,3.0,5.64,"18 Cinder Cliff, Doyles borough, Rhode Island,...",alexander9810@hmail.com,HR,80000,3.0,data analyst,12.0


Here's a summary:

1. Merge `students_df` with `jobs_df`:

- We use `job_id` as the key to join the two tables.
- `how='left'` ensures that all entries from `students_df` are kept, even if there are no matches in `jobs_df`.

2. Merge `merged_df` with `courses_df`:

- We use `current_career_path_id` as the key to join the two tables.
- `how='left'` ensures that all entries from `merged_df` are kept, even if there are no matches in `courses_df`.

The final table looks well consolidated and clean, with all the necessary columns and no obvious duplications or null values ​​remaining. Now we can move on to the next task.

## Create the Output CSV

Use the cleaned tables to produce an analytics-ready SQLite database and flat CSV file. The final CSV should contain all the data the analysts might need in a single table.

- Think about what fields you might want to have as an analyst – are there any you can create as part of this process?
- It is easier to update a database than update a CSV file, so create a clean SQLite database first, and then generate the CSV from that database.
- Make sure to validate the final table. Improper joins could result in losing rows due to unpaired keys or duplication. You can check for both by calculating the length of your dataframe before and after merges.

To start, let's create a new SQLite database to store our final table.

In [12]:
clean_db_con = sqlite3.connect('dev/clean_cademycode.db')

final_df_cleaned.to_sql('final_table', clean_db_con, if_exists='replace', index=False)
print('Final table saved in SQLite Database "clean_cademycode.db".')

Final table saved in SQLite Database "clean_cademycode.db".


Before generating the CSV, we will validate the final table to ensure there is no data loss or duplication due to improper joins.

In [13]:
original_length = len(df['cademycode_students'])
final_length = len(final_df_cleaned)

print(f"Number of lines before joining: {original_length}")
print(f"Number of lines after joining: {final_length}")

if original_length == final_length : print('Validation successful: The number of lines is correct.')
else: print('Attention: The number of lines differs after joining')

Number of lines before joining: 5000
Number of lines after joining: 5000
Validation successful: The number of lines is correct.


In [14]:
final_df_from_db = pd.read_sql_query("SELECT * FROM final_table", clean_db_con)
final_df_from_db.to_csv('dev/final_output.csv', index=False)

print('Final table saved to "final_output.csv".')

Final table saved to "final_output.csv".


## Develop Unit Tests and Logs

Turn the Jupyter Notebook into a Python script that can be run with minimal human intervention. The script should:

- check for updates to the database and
- use unit tests to protect the update process.

Any updates made to the final database should be written to a changelog, and any errors from the unit tests should be written to an error log.

Some ideas for unit tests include:

- checking that the updated database has the same schema as the original
- checking if the tables will join properly
- checking if there is any new data

Your changelog should include details like:

- a version number
- information about the update such as new row and missing data counts

Let's add a function to check the database for new updates before running the data pipeline.

In [15]:
import os, time

db_path = 'dev/cademycode.db'
last_mod_time = None

def is_database_updated():
    global last_mod_time
    current_mod_time = os.path.getmtime(db_path)
    if last_mod_time is None or current_mod_time > last_mod_time:
        last_mod_time = current_mod_time
        return True
    return False

Now let's add some more unit tests to ensure the integrity of the update process.

In [16]:
import unittest

class TestDataCleaning(unittest.TestCase):
    def test_no_null_values(self):
        self.assertFalse(final_df_cleaned.isnull().values.any(), 'There are null values in final table')

    def test_correct_number_of_rows(self):
        original_length = len(df['cademycode_students'])
        final_length = len(final_df_cleaned)
        self.assertEqual(original_length, final_length, 'Number of lines differ after the join')

    def test_schema_consistency(self):
        original_schema = set(df['cademycode_students'].columns)
        final_schema = set(final_df_cleaned.columns)
        self.assertEqual(original_schema, final_schema, 'Final table schema differs from original')

Now let's configure logging to record updates and errors.

In [17]:
import logging

logging.basicConfig(filename='logs/data_pipeline.log', level=logging.INFO, format='%(asctime)s:%(levelname)s:%(message)s')

def log_update(message):
    logging.info(message)

def log_error(message):
    logging.error(message)

Finally let's add a changelog that includes details about each update, such as number of new rows and count of missing data.

In [18]:
changelog_path = 'logs/changelog.txt'

def write_changelog(version, new_rows_count, missing_data_count):
    with open(changelog_path, 'a') as f:
        f.write(f"Version: {version}\n")
        f.write(f"New rows added: {new_rows_count}\n")
        f.write(f"Missing data count: {missing_data_count}\n")
        f.write("\n")

Now we can combine all of these components into a complete Python script.

In [19]:
import sqlite3
import pandas as pd
import json
import unittest
import logging
import os

logging.basicConfig(filename='logs/data_pipeline.log', level=logging.INFO, 
                    format='%(asctime)s:%(levelname)s:%(message)s')

def log_update(message):
    logging.info(message)

def log_error(message):
    logging.error(message)

def extract_contact_info(contact_info):
    try:
        info = json.loads(contact_info.replace("'", '"'))
        return pd.Series([info.get('mailing_address'), info.get('email')])
    except json.JSONDecodeError:
        return pd.Series([None, None])

def is_database_updated():
    global last_mod_time
    current_mod_time = os.path.getmtime(db_path)
    if last_mod_time is None or current_mod_time > last_mod_time:
        last_mod_time = current_mod_time
        return True
    return False

def write_changelog(version, new_rows_count, missing_data_count):
    with open(changelog_path, 'a') as f:
        f.write(f"Version: {version}\n")
        f.write(f"New rows added: {new_rows_count}\n")
        f.write(f"Missing data count: {missing_data_count}\n")
        f.write("\n")

try:
    db_path = 'dev/cademycode.db'
    changelog_path = 'logs/changelog.txt'
    last_mod_time = None

    if is_database_updated():
        log_update("Updated database. Running the pipeline...")

        con = sqlite3.connect(db_path)
        print('Database connection established successfully.')

        tables = pd.read_sql_query("SELECT name FROM sqlite_master WHERE type='table';", con)
        table_names = tables['name'].tolist()
        df = {table: pd.read_sql_query(f"SELECT * FROM {table}", con) for table in table_names}

        students_df = df['cademycode_students'].copy()
        students_df['dob'] = pd.to_datetime(students_df['dob'], errors='coerce')
        students_df['job_id'] = pd.to_numeric(students_df['job_id'], errors='coerce')
        students_df['num_course_taken'] = pd.to_numeric(students_df['num_course_taken'], errors='coerce')
        students_df['current_career_path_id'] = pd.to_numeric(students_df['current_career_path_id'], errors='coerce')
        students_df['time_spent_hrs'] = pd.to_numeric(students_df['time_spent_hrs'], errors='coerce')
        students_df.loc[:, 'job_id'] = students_df['job_id'].fillna(0)
        students_df.loc[:, 'current_career_path_id'] = students_df['current_career_path_id'].fillna(0)
        students_df.loc[:, 'num_course_taken'] = students_df['num_course_taken'].fillna(students_df['num_course_taken'].median())
        students_df.loc[:, 'time_spent_hrs'] = students_df['time_spent_hrs'].fillna(students_df['time_spent_hrs'].median())
        students_df[['mailing_address', 'email']] = students_df['contact_info'].apply(extract_contact_info)
        students_df.drop(columns=['contact_info'], inplace=True)

        jobs_df_cleaned = df['cademycode_student_jobs'].drop_duplicates()

        merged_df_cleaned = pd.merge(students_df, jobs_df_cleaned, how='left', left_on='job_id', right_on='job_id')
        final_df_cleaned = pd.merge(merged_df_cleaned, df['cademycode_courses'], how='left', left_on='current_career_path_id', right_on='career_path_id')

        final_df_cleaned = final_df_cleaned.assign(
            career_path_id=final_df_cleaned['career_path_id'].fillna(0),
            career_path_name=final_df_cleaned['career_path_name'].fillna('Unknown'),
            hours_to_complete=final_df_cleaned['hours_to_complete'].fillna(0)
        )

        clean_db_conn = sqlite3.connect('dev/clean_cademycode.db')
        final_df_cleaned.to_sql('final_table', clean_db_conn, if_exists='replace', index=False)
        final_df_cleaned.to_csv('dev/final_output.csv', index=False)

        log_update("Pipeline executed successfully.")

        original_length = len(df['cademycode_students'])
        final_length = len(final_df_cleaned)

        class TestDataCleaning(unittest.TestCase):
            def test_no_null_values(self):
                self.assertFalse(final_df_cleaned.isnull().values.any(), "There are null values in the final table")

            def test_correct_number_of_rows(self):
                self.assertEqual(original_length, final_length, "The number of rows differs after the merges")

            def test_schema_consistency(self):
                original_schema = set(df['cademycode_students'].columns)
                final_schema = set(final_df_cleaned.columns)
                original_schema.discard('contact_info')
                original_schema.update(['mailing_address', 'email'])
                self.assertTrue(original_schema.issubset(final_schema), "The final table schema does not include all original columns")

        if __name__ == '__main__':
            unittest.main(argv=['first-arg-is-ignored'], exit=False)

        new_rows_count = len(final_df_cleaned) - original_length
        missing_data_count = final_df_cleaned.isnull().sum().sum()
        write_changelog("1.0.0", new_rows_count, missing_data_count)

    else:
        log_update("No updates to the database. Pipeline not executed.")

except Exception as e:
    log_error(f"Error running the pipeline: {e}")
    raise

Database connection established successfully.


...
----------------------------------------------------------------------
Ran 3 tests in 0.005s

OK


Now that all the tests are ready, let's build our bash script.

## Create a Bash Script

Create a bash script to handle running the Python script and moving updated files from your working directory in /dev to a production directory. Your bash script should use the logs from the last task to determine if an update occurred.

- You can execute python files within bash scripts by calling `python path/to/source/file.py`.
- You can either chose to move the file over to the production folder using `mv /path/to/source /path/to/destination` or copy the files to the production folder by using `cp /path/to/source /path/to/destination`
- Use version numbers in your changelog to check for updates

### Step by Step to Create the Bash Script

**Step 1: Check the Changelog for Updates**

- Check the current version in the changelog before running the Python script.
- After running, check the changelog version again to determine if there was an update.

**Step 2: Run the Python Script**

- Run the Python script that performs the data pipeline.

**Step 3: Move Files to the Production Folder**

- If an update is detected (based on the changelog version), move the updated files to the production folder.

Here's the content for the Script Bash (`run_pipeline.sh`)

```bash
#!/bin/bash

# Path to Python Script
PYTHON_SCRIPT="path/to/your/script.py"

# Path to production directory
PROD_DIR="path/to/production"

# Path to changelog
CHANGELOG="logs/changelog.txt"

# Current version of changelog
CURRENT_VERSION=$(grep -oP 'Version: \K.*' $CHANGELOG | tail -1)

# Run Python script
python3 $PYTHON_SCRIPT

# Check pipeline connection
if [ $? -eq 0 ]; then
    echo "Pipeline executed successfully."

    # New version for changelog
    NEW_VERSION=$(grep -oP 'Version: \K.*' $CHANGELOG | tail -1)

    # Check for updates on changelog
    if [ "$CURRENT_VERSION" != "$NEW_VERSION" ]; then
        echo "Update detected. Moving files to production."

        # Move updated files to production
        mv final_output.csv $PROD_DIR/
        mv dev/clean_cademycode.db $PROD_DIR/

        echo "Files moved to production."
    else
        echo "No updates detected. No files moved to production."
    fi
else
    echo "Pipeline execution failed. Check logs for details."
fi
```

After creating the Bash script, we are going to make it executable using the `chmod` command.

```bash
chmod +x run_pipeline.sh
```

Let's run the Bash script to ensure it works correctly:

```bash
./run_pipeline.sh
```

The `run_pipeline.sh` file is in the root of this project. Now that we are done, we can finish writing the Readme and saving the project.

## Wrap-Up

### Summary of Tasks

Throughout this project, we aimed to build a data engineering pipeline to transform a messy database of long-term canceled subscribers into a clean, analytics-ready dataset. Here's a summary of the key tasks we accomplished:

1. **Set Up**: We began by setting up our working directory and ensuring all necessary files and tools were in place.
   
2. **Inspect and Clean the Data**: We imported the tables from `cademycode.db` into dataframes, inspected them for missing or invalid data, and performed various data cleaning operations. This included handling null values and ensuring data types were correct.

3. **Create the Output CSV**: Using the cleaned data, we produced an analytics-ready SQLite database and a flat CSV file. We validated the final table to ensure no data was lost or duplicated during the joins.

4. **Develop Unit Tests and Logs**: We converted our Jupyter Notebook into a Python script. The script includes unit tests to check for updates to the database and to protect the update process. It also includes logging to track updates and errors.

5. **Create a Bash Script**: We created a Bash script to handle running the Python script and moving updated files from the working directory to a production directory. The script checks the changelog to determine if an update occurred before moving the files.

6. **Create a Readme**: We documented the entire update process in a `readme.md` file. This file includes a description of the project structure, instructions on how to run the update process, and details about the version control system and error logging.

### Conclusion

In conclusion, this project successfully demonstrates how to build a robust data engineering pipeline that automates the transformation of raw data into a clean and usable format. By following a structured approach, we ensured that the pipeline is reliable, maintainable, and easy to understand. The inclusion of unit tests and logging provides additional safeguards and transparency, making it easier to monitor and debug the process.

This project not only serves as a valuable addition to our portfolio but also equips us with practical experience in handling real-world data engineering challenges. The skills and methodologies applied here are transferable to a wide range of data engineering tasks, ensuring we are well-prepared for future projects and roles in the field.

Thank you for following along with this project. Should you have any questions or require further assistance, please refer to the documentation or reach out to the project maintainer.