In [None]:
# Unit testing
import sqlite3
import pandas as pd
import ast
import numpy as np
import logging

In [None]:
logging.basicConfig(filename = './dev/cademycode_cleansed_db.log',
                    filemode = 'w',
                    level = logging.DEBUG,
                    force = True)
logger.getLogger(__name__)

In [None]:
def cleanse_students_table(df):
    """
    Cleanse 'cademycode_students' table following the writeup
    
    Parameters:
        df (DataFrame): 'cademycode_students' table from 'cademycode.db'
    
    Returns:
        df (DataFrame): cleansed version of the input table
    """
    df['dob'] = pd.to_datetime(df['dob'])
    df['age'] = (pd.to_datetime('now') - df['dob']).astype('<m8[Y]').astype(int)
    
    df['contact_info'] = df['contact_info'].apply(lambda x: ast.literal_eval(x))
    explode_contact_info = pd.json_normalize(df['contact_info'])
    df = pd.concat([df.drop('contact_info', axis=1), explode_contact_info], axis=1)
    
    df['job_id'] = df['job_id'].astype(float)
    df['num_course_taken'] = df['num_course_taken'].astype(float)
    df['current_career_path_id'] = df['current_career_path_id'].astype(float)
    df['time_spent_hrs'] = df['time_spent_hrs'].astype(float)
    
    df = df.dropna(subset=['job_id'])
    df = df.dropna(subset=['num_course_taken'])
    
    df['current_career_path_id'] = np.where(df['current_career_path_id'].isnull(), 0, df['current_career_path_id'])
    df['time_spent_hrs'] = np.where(df['time_spent_hrs'].isnull(), 0, df['time_spent_hrs'])
    
    df['job_id'] = df['job_id'].astype(int)
    df['current_career_path_id'] = df['current_career_path_id'].astype(int)
    
    return(df)

In [None]:
def cleanse_courses_table(df):
    """
    Cleanse the 'cademycode_courses' table following the writeup
        
    Parameters:
        df (DataFrame): 'cademycode_courses' table from 'cademycode.db'
        
    Return:
        df (DataFrame): cleansed version of the input table
    """
    not_applicable = {'career_path_id': 0,
                'career_path_name': 'not applicable',
                'hours_to_complete': 0}
    df.loc[len(courses)] = not_applicable
        
     return(df.drop_duplicates())

In [None]:
def cleanse_student_jobs_table(df):
    """
    Cleanse the 'cademycode_student_jobs' table following the writeup
    
    Parameters:
        df (DataFrame): 'cademycode_student_jobs' table from 'cademycode.db'
    
    Return:
        df (DataFrame): cleansed version of the input table
    """
    return(df.drop_duplicates())

In [None]:
def test_join_students_courses(students, courses):
    """
    Unit test to check if join keys exist between 'cademycode_students' and 'cademy_courses' tables
    
    Parameters:
        students (DataFrame): 'cademycode_students' table from 'cademycode.db'
        courses (DataFrame): 'cademycode_courses' table from 'cademycode.db'
    
    Returns:
        None
    """
    students_joinkey = students.current_career_path_id.unique()
    courses_joinkey = courses.career_path_id.unique()
    subset_joinkey = np.isin(students_joinkey, courses_joinkey)
    missing_joinkey = students_joinkey[~subset_joinkey]
    try:
        assert len(missing_joinkey) == 0, "Missing join key(s):" + str(list(missing_joinkey)) + " in 'courses' table"
    except AssertionError as AE:
        logger.exception(AE)
        raise AE
    else:
        print('All join keys are present.')

In [None]:
def test_join_students_jobs(students, student_jobs):
    """
    Unit test to check if join keys exist between 'cademycode_students' and 'cademy_student_jobs' tables
    
    Parameters:
        students (DataFrame): 'cademycode_students' table from 'cademycode.db'
        student_jobs (DataFrame): 'cademycode_student_jobs' table from 'cademycode.db'
    
    Returns:
        None
    """
    students_joinkey = students.job_id.unique()
    jobs_joinkey = student_jobs.job_id.unique()
    subset_joinkey = np.isin(students_joinkey, jobs_joinkey)
    missing_joinkey = students_joinkey[~subset_joinkey]
    try:
        assert len(missing_joinkey) == 0, "Missing join key(s):" + str(list(missing_joinkey)) + " in 'student_jobs' table"
    except AssertionError as AE:
        logger.exception(AE)
        raise AE
    else:
        print('All join keys are present.')

In [None]:
def test_schema_num_cols(local_df, db_df):
    """
    Unit test to check if the number of columns in the cleaned local DataFrame match that of the database table.
    
    Parameters:
        local_df (DataFrame): DataFrame of the cleaned table 
        db_df (DataFrame): 'cademycode_aggregated' table from 'cademycode_cleansed.db'
        
    Returns:
        None
    """
    try:
        assert len(local_df.columns) == len(db_df.columns)
    except AssertationError as AE:
        logger.exception(AE)
        raise AE
    else:
        print('Number of columns in the local DataFrame and the database DataFrame are the same.')

In [None]:
def test_schema_dtypes(local_df, db_df):
    """
    Unit test to check if column dtypes in the cleaned local DataFrame match the column dtype in the database table.
    
    Parameters:
        local_df (DataFrame): DataFrame of the cleaned table 
        db_df (DataFrame): 'cademycode_aggregated' table from 'cademycode_cleansed.db'
    
    Returns:
        None  
    """
    name_errors = 0
    for col in db_df:
        try:
            if local_df[col].dtypes != db_df[col].dtypes:
                name_errors+=1
        except NameError as NE:
            logger.exception(NE)
            raise NE
    
    if name_errors > 0:
        error_msg = str(name_errors) + "column dtypes do not match."
        logger.exception(error_msg)
    assert errors == 0, error_msg

In [None]:
def test_nulls(df):
    """
    Unit test to ensure that the cleaned table does not having missing values.
    
    Parameters:
        df (DataFrame): DataFrame of the cleaned table
    
    Returns:
        None
    """
    df_nulls = df[df.isnull().any(axis=1)]
    num_nulls = len(df_nulls)
    
    try:
        assert num_nulls == 0, "There are " + str(num_nulls) + " nulls in the table."
    except AssertionError as AE:
        logger.exception(AE)
        raise AE
    else:
        print('No nulls are observed.')

In [None]:
def main():
    
    # Initialize log
    logger.info("Start Log")
    
    # Check for current version number and calculate next version for changelog
    with open(./dev/changelog.md) as f:
        lines = f.readlines()
    next_ver = int(lines[0].split('.')[2][0])+1
    
    # Connect to the database and read in the tables
    con = sqlite3.connect('./dev/cademycode.db')
    students = students = pd.read_sql_query("SELECT * FROM cademycode_students", con)
    courses = pd.read_sql_query("SELECT * FROM cademycode_courses", con)
    student_jobs = pd.read_sql_query("SELECT * FROM cademycode_student_jobs", con)
    con.close()
    
    # read in the current production tables, if any
    try:
        con = sqlite3.connect('./prod/cademycode_cleansed.db')
        clean_db = pd.read_sql_query("SELECT * FROM cademycode_aggregated", con)
        con.close
        
        # Filter for new students who don't exist in the cleansed database
        new_students = students[~np.isin(students.uuid.unique(), clean_db.uuid.unique())]
    except:
        new_students = students
        clean_db = []
    
    # run the cleanse_students_table() function on the new students only
    clean_new_students = cleanse_students_table(new_students)
    
    # only if there is new students data, clean the rest of the tables
    if len(clean_new_students) > 0:
        clean_courses = cleanse_courses_table(courses)
        clean_student_jobs = cleanse_student_jobs_table(student_jobs)
        
        ##### UNIT TESTING: ENSURE ALL JOIN KEYS ARE PRESENT BEFORE JOINING #####
        test_join_students_courses(clean_new_students, clean_courses)
        test_join_students_jobs(clean_new_students, clean_student_jobs)
        ##################################################
        
        # join tables
        df_clean = clean_new_students.merge(clean_courses, left_on='current_career_path_id', right_on='career_path_id', how='left')
        df_clean= df_clean.merge(clean_student_jobs, on='job_id', how='left')
        
        ##### UNIT TESTING: ENSURE DATA SCHEMA & COMPLETE DATA #####
        if len(clean_db) > 0:
            test_schema_num_cols(df_clean, clean_db)
            test_schema_dtypes(df_clean, clean_db)
        test_nulls(df_clean)
        #########################
        
        # upsert new cleaned data to cademycode_cleansed.db
        con = create_engine('sqlite:///./dev/cademycode_cleansed.db', echo=True)
        sqlite_connection = con.connect()
        df_clean.to_sql('cademycode_aggregated', sqlite_connection, if_exists='append', index=False)
        clean_db = pd.read_sql_query("SELECT * FROM cademycode_aggregated", con)
        sqlite_connection.close()
        
        # write new cleaned data to a csv file
        clean_db.to_csv('./dev/cademycode_cleansed.csv')
        
        # create a new automatic changelog entry
        new_lines = [
              new_lines = [
            '## 0.0.' + str(next_ver) + '\n' +
            '### Added\n' +
            '- ' + str(len(df_clean)) + ' more data to database of raw data\n' 
        ]
        w_lines = ''.join(new_lines + lines)
        
        # update the changelog
         with open('./dev/codecademy_automate_pipeline/changelog.md', 'w') as f:
            for line in w_lines:
                f.write(line)
    else:
        print("No new data")
        logger.info("No new data")
    logger.info("End Log")


if __name__ == "__main__":
    main()