In [None]:

# Importing all the dependent libraries, Pandas will be majorly used library
import boto3
import pandas as pd
import numpy as np
import io
import glob
import pymysql # mysql library


# scripting is done on sample data, will share the sample data
#df_file = pd.read_csv('/Users/chaitanyapandey/Documents/Aqilliz/Dataset.csv')

def main():
    
''' 
Task 1: Data extraction
Importing the csv data from s3 bucket using Boto3 library into python pandas dataframe
(as pandas is highly optimised for structured data, to import data AWS access key and secret access key need to be provided
sample data:
Course_name
11111111_An Introduction to American Law (University of Pennsylvania)-201902.csv
12345678_Business Analysis (penn)-201902.csv
12345676M_Foundation to Data Engineering (University of California)-201902.csv
12343456_Foundation to Data Engineering (UC)-201902.csv
123454321_Bachelor of Arts (Nanyang Technological University)-201902.csv
12332112_Foundation of Medicine (Nanyang Technological University Singapore)-201902.csv
12346789_Foundation of politics (NTU)-201902.csv
12332112_Intermdiate of Blockchain (NTU)-201902.csv
'''
    REGION = 'Mention the Region'
    ACCESS_KEY_ID = 'key id'
    SECRET_ACCESS_KEY = 'Secret access key'

    BUCKET_NAME = 'arn:aws:s3:::aqilliz-data-engineer-assessment'
    KEY = '/*.csv' # file path in S3 

    s3c = boto3.client(
        's3', 
        region_name = REGION,
        aws_access_key_id = ACCESS_KEY_ID,
        aws_secret_access_key = SECRET_ACCESS_KEY
      )
    obj = s3c.get_object(Bucket= BUCKET_NAME , Key = KEY)
    df_file = pd.read_csv(io.BytesIO(obj['Body'].read()), encoding='utf8')


'''
Task2 : Data Transformation

once dataframe is created, now its need to be splitted into separate columns , so that further transformation
rules can be applied.
Assumption is input data format will be below always:
e.g input_data = '11111111_An Introduction to American Law (University of Pennsylvania)-201902.csv'

courseid and coursename delimitor is '_' (11111111_An Introduction to American Law)
CourseName and UniversityName delimitor is '(' (An Introduction to American Law (University of Pennsylvania)
UniversityName and YYYYMM delimitor is ')-' University of Pennsylvania)-201902
'''
    split = df_file.Course_name.str.split(r"[_()-.]",expand=True) # Splitting the dataframe

    df_file['CourseID'] = split[0] # Courseid will be index 0
    df_file['CourseName'] = split[1] # CourseName will be index 1
    df_file['UniversityName'] = split[2] # UniversityName will be index 2
    df_file['Year_mnth'] = split[4] # Year_mnth will be index 4 and csv will be index 5


# As metioned in problem statement , university name - NTU contain Singapore as suffix in some of the records, 
#so trimmed the singapore keyword, if more university name contain more suffix, so approach is to do profiling 
#and identify all the pattern and make reference data to replace with '' in bulk
    df_file['UniversityName'] = df_file['UniversityName'].replace(regex=r'.Singapore$', value='')

'''
 To handle below situation, created university dataframe and later will use this dataframe to do lookup & return
 
UC and University of California
Pennsylvania and Penn
Nanyang Technological University (some with trailing Singapore, some without) and NTU
'''
    df_university_lookup = pd.DataFrame([['UC', 'University of California'], 
              ['penn', 'University of Pennsylvania'],
              ['NTU', 'Nanyang Technological University'],
                                    ['Nanyang Technological University','Nanyang Technological University'],
                                    ['University of Pennsylvania','University of Pennsylvania'],
                                    ['University of California','University of California']])

    df_university_lookup.columns = ['University_shortName', 'UniversityName']

# To populate Country in a separate columns as data enrcihment, creating dataframe for university and their offering country
#Ideally separate reference table should be created for university and offering country, as we have limited university, this will help later to extend the code
    df_uni_country_lookup = pd.DataFrame([['USA', 'University of California'], 
              ['USA', 'University of Pennsylvania'],
              ['Singapore', 'Nanyang Technological University'],['Singapore','Singapore Management University']])
    df_uni_country_lookup.columns = ['Offering_country','Offering_university']


# Joining df_file and df_university_lookup as left join to handle below situation and later will drop additional columnn as part of join.
#UC and University of California
#Basically we will use the df_university_lookup['UniversityName'] for University name
    merged_df = pd.merge(df_file, df_university_lookup, 
                     left_on = 'UniversityName', 
                     right_on = 'University_shortName', 
                     how='left')

# Joining above merged_df and df_uni_country_lookup to populate the offering country for each university as part of data enrichment
    df = pd.merge(merged_df, df_uni_country_lookup, 
                     left_on = 'UniversityName_y', 
                     right_on = 'Offering_university', 
                     how='left')
# Dropping additional columns which got populate becuase of above joining condition
    df.drop(['UniversityName_x','University_shortName','UniversityName_y'], axis =1)


# Dropping Duplicate CourseId
    df_aqilliz_assessement = df.drop_duplicates(subset=['CourseID'], keep=False)

# Checking and filtering out if Courseid length is 8 digit and all numeric
    df_aqilliz_assessement = df_aqilliz_assessement[
                       df_aqilliz_assessement['CourseID'].apply(lambda x: len(x)==8) &
                       df_aqilliz_assessement['CourseID'].str.isnumeric()
                    ]

# Checking if CourseId starting as 1111 and populating CourseType as 'Certificate' else 'Diploma'
    df_aqilliz_assessement['CourseType'] = df_aqilliz_assessement['CourseID'].apply(lambda x: 'Certificate' if x.startswith('1111') else 'Diploma')

#Populating Metadata Column for auditing purpose
    df_aqilliz_assessement['Processed_Date'] = pd.datetime.now().strftime("%d/%m/%Y %I:%M:%S") # Processing Date as sysdate
    df_aqilliz_assessement['Processed_By'] = 'ETL_USER' # which ETL job has populated the data
    df_aqilliz_assessement['RowNumber'] = np.arange(len(df_aqilliz_assessement)) # Populated the row number and later can be used to check the count in case of batch job on a periodic manner


# To route the bad data into separate dataframe and reporting purpose
# Checking If Courseid is duplicated , length is not 8 and its not fully numeric and loading into dataframe- DF_Duplicated_CourseId
    DF_Duplicated_CourseId = df[df.duplicated(['CourseID'],keep=False) |
                            df['CourseID'].apply(lambda x: len(x)!=8) |
                            df['CourseID'].str.isalpha()]

#Populating Metadata Column for auditing purpose for bad data
    DF_Duplicated_CourseId['Processed_Date'] = pd.datetime.now().strftime("%d/%m/%Y %I:%M:%S")
    DF_Duplicated_CourseId['Processed_By'] = 'ETL_USER'
    DF_Duplicated_CourseId['RowNumber'] = np.arange(len(DF_Duplicated_CourseId))

# Dropping additional column, since we checked the data quality issue on the original dataframe
    DF_Duplicated_CourseId.drop(['UniversityName_x','University_shortName','UniversityName_y'], axis =1)

# loading the cleaned data and bad data into separate directory
# Cleaned data is into dataframe 'df_aqilliz_assessement'
# Bad data isinto dataframe 'DF_Duplicated_CourseId'
    Cleaned_data = df_aqilliz_assessement.to_csv (r'/Users/chaitanyapandey/Documents/Aqilliz/Cleaned_directory\export_Cleaned_data.csv', index = None, header=True)
    bad_Data = DF_Duplicated_CourseId.to_csv (r'/Users/chaitanyapandey/Documents/Aqilliz/Bad_directory\export_Bad_data.csv', index = None, header=True)


'''
Task 3 : Data loading
loading cleaned data 'df_aqilliz_assessement' into mySql database

Designed Database Schema created is 

CREATE DATABASE TestDatabase;
GO

USE TestDatabase;
CREATE TABLE df_aqilliz_assessement ( 
   Course_name Varchar(500) NOT NULL,
   CourseID Int NOT NULL,
   CourseName Varchar(30) NOT NULL,
   Year_mnth Varchar(10),
   Offering_country Varchar(50),
   Offering_university Varchar(200),
   CourseType Varchar(100),
   Processed_Date Timestamp,
   Processed_By Varchar(50),
   RowNumber Int
     );
'''

    user = 'root'
    passw = 'root123'
    host =  'local'
    port = 3306
    database = 'TestDatabase'


    conn = pymysql.connect(host=host,
                       port=port,
                       user=user, 
                       passwd=passw,  
                       db=database,
                       charset='utf8')

    df_aqilliz_assessement.to_sql(name=database, con=conn, if_exists = 'append', index=False, flavor = 'mysql')


if __name__ = '__main__':
    main()


