## Section1_Data_Pipelines

An e-commerce company requires that users sign up for a membership on the website in order to purchase a product from the platform. As a data engineer under this company, you are tasked with designing and implementing a pipeline to process the membership applications submitted by users on an hourly interval.

Applications are batched into a varying number of datasets and dropped into a folder on an hourly basis. You are required to set up a pipeline to ingest, clean, perform validity checks, and create membership IDs for successful applications. An application is successful if:

Application mobile number is 8 digits
Applicant is over 18 years old as of 1 Jan 2022
Applicant has a valid email (email ends with @emailprovider.com or @emailprovider.net)
You are required to format datasets in the following manner:

Split name into first_name and last_name
Format birthday field into YYYYMMDD
Remove any rows which do not have a name field (treat this as unsuccessful applications)
Create a new field named above_18 based on the applicant's birthday
Membership IDs for successful applications should be the user's last name, followed by a SHA256 hash of the applicant's birthday, truncated to first 5 digits of hash (i.e <last_name>_<hash(YYYYMMDD)>)
You are required to consolidate these datasets and output the successful applications into a folder, which will be picked up by downstream engineers. Unsuccessful applications should be condolidated and dropped into a separate folder.

You can use common scheduling solutions such as cron or airflow to implement the scheduling component. Please provide a markdown file as documentation.

Note: Please submit the processed dataset and scripts used

###  Data Loading

In [18]:
import os
file_dir = os.getcwd()
file_dir_input = os.path.join(file_dir, 'input')
file_dir_archive =  os.path.join(file_dir, 'archive')
print(file_dir_input)
print(file_dir_archive)

def get_files_in_dir(file_dir_input, extension = 'csv'):
    files = os.listdir(file_dir_input)
    if extension is not None: # we have a filter 
        files = [f for f in files if f.endswith(extension)]
    return files


C:\Users\Jayanthi\Desktop\Jobs\GovTech\input
C:\Users\Jayanthi\Desktop\Jobs\GovTech\archive


In [19]:
import csv
  

files = get_files_in_dir(file_dir_input)
print(files)

['membership_application_001.csv', 'membership_application_002.csv']


In [5]:

import psycopg2
  
conn = psycopg2.connect(dbname="postgres",
                        user='postgres', password='15253545', 
                        host='localhost', port='5432'
)
  
conn.autocommit = True
cursor = conn.cursor()
  
sql = '''DROP TABLE IF EXISTS TB_MEM_APP_STG;'''
 
cursor.execute(sql)
#mobile number should be of 8 characters
sql1 = '''CREATE TABLE TB_MEM_APP_STG(mobile_nbr char(8) NOT NULL,\
applicant_name char(100),\
date_of_birth char(8),\
employee_email varchar(30));'''
  
  
cursor.execute(sql1)

for i in files:
     
    
        membership_application_filepath = os.path.join(file_dir, i)
        print(membership_application_filepath)

        try:
                    membership_application_file = open(membership_application_filepath)
                    membership_data = csv.reader(membership_application_file)
                    print(membership_data)

        except FileNotFoundError:
                print('membership_application_file not found')    

        sql2 = '''INSERT INTO TB_MEM_APP_STG(mobile_nbr, applicant_name,\
        date_of_birth,employee_email) VALUES (%s,%s,%s,%s  ) ''' 
        cursor.executemany(sql2, membership_data)
        
        
        
  
sql3 = '''select * from TB_MEM_APP_STG;'''
cursor.execute(sql3)
for i in cursor.fetchall():
    print(i)
  
conn.commit()
conn.close()



C:\Users\Jayanthi\Desktop\Jobs\GovTech\membership_application_001.csv
<_csv.reader object at 0x000001CBA3A57FA0>
C:\Users\Jayanthi\Desktop\Jobs\GovTech\membership_application_002.csv
<_csv.reader object at 0x000001CBA3A664C0>
('12345678', 'john abhraham                                                                                       ', '19980922', 'ja@gmail.com')
('24345678', 'jack alexander                                                                                      ', '20011101', 'jal@gmail.com')
('39345678', 'jenny ng                                                                                            ', '20040406', 'jng@gmail.com')
('98345678', '                                                                                                    ', '20040406', 'jng@gmail.com')
('42345678', 'john abhraham                                                                                       ', '19980921', 'ja@gmail.com')
('52345678', 'jack alexander                  

###  Data preprocessing


In [6]:
from datetime import date  
curr_year =  (date.today()).year

print(curr_year )

2022


#### Split name into first_name and last_name 
#### Format birthday field into YYYYMMDD 
#### Remove any rows which do not have a name field (treat this as unsuccessful applications)
#### Create a new field named above_18 based on the applicant's birthday 
#### Create a new field named Membership IDs for successful applications should be the user's last name, 
#### followed by a SHA256 hash of the applicant's birthday, truncated to first 5 digits of hash (i.e _<hash(YYYYMMDD)>)

In [7]:
import hashlib

#hash_value = hashlib.sha256()
 


conn = psycopg2.connect(dbname="postgres",
                        user='postgres', password='15253545', 
                        host='localhost', port='5432'
)
  
conn.autocommit = True
cursor = conn.cursor()

# unsuccessful applicants

sql_d = '''DELETE FROM TB_MEM_APP_STG WHERE (applicant_name ='');'''
 
cursor.execute(sql_d)


sql = '''DROP TABLE IF EXISTS TB_MEM_APP_TGT;'''
 
cursor.execute(sql)

sql1 = '''CREATE TABLE TB_MEM_APP_TGT(membership_id varchar(100) PRIMARY KEY,\
mobile_nbr char(100) NOT NULL,\
first_name char(100),\
last_name char(100),\
date_of_birth char (10),\
above_18 char(1),\
employee_email varchar(30));'''
  
  
cursor.execute(sql1)
  
#for i in cursor.fetchall():
 #   if ((len(str(i[0])) == 8) & (i[1] is not None)):
        
sql2 = '''SELECT '', mobile_nbr,  substring(applicant_name,1, position(' ' in applicant_name)-1 ),  substring(applicant_name,  position(' ' in applicant_name)+1 ),    date_of_birth ,  substring(applicant_name,position(' ' in applicant_name)+1 )||date_of_birth , substring(date_of_birth,1,4) ,employee_email from TB_MEM_APP_STG'''
cursor.execute(sql2)

 
 
for j in cursor.fetchall():
     
    print(j)
    print(j[5]) 
    #hash_value =   (hashlib.sha256(bytes(j[1]) ).hexdigest())
    hash_value =   (hashlib.sha256(bytes(j[5] ,encoding='utf8')).hexdigest())
    hash_value = hash_value[:5]
    print(hash_value)
    age=curr_year-int(j[6]) 
    if age > 18:
        above_18 = 'Y'
    else:
        above_18 = 'N'
        
    cursor.execute( '''INSERT INTO TB_MEM_APP_TGT VALUES(%s,%s,%s,%s,%s,%s,%s) ''' , (hash_value,j[1],j[2],j[3],j[4],above_18,j[7] ))
     


conn.commit()
conn.close()


('', '12345678', 'john', 'abhraham', '19980922', 'abhraham19980922', '1998', 'ja@gmail.com')
abhraham19980922
1601b
('', '24345678', 'jack', 'alexander', '20011101', 'alexander20011101', '2001', 'jal@gmail.com')
alexander20011101
3f8ed
('', '39345678', 'jenny', 'ng', '20040406', 'ng20040406', '2004', 'jng@gmail.com')
ng20040406
c09f0
('', '42345678', 'john', 'abhraham', '19980921', 'abhraham19980921', '1998', 'ja@gmail.com')
abhraham19980921
e6202
('', '52345678', 'jack', 'alexander', '20011111', 'alexander20011111', '2001', 'jal@gmail.com')
alexander20011111
07d43
('', '62345678', 'jenny', 'ng', '20040426', 'ng20040426', '2004', 'jng@gmail.com')
ng20040426
ed990



#### Applicant is over 18 years old as of 1 Jan 2022
#### Printing final processed table output

In [8]:
conn = psycopg2.connect(dbname="postgres",
                        user='postgres', password='15253545', 
                        host='localhost', port='5432'
)
  
conn.autocommit = True
cursor = conn.cursor()

    
### applicant must be older than 18 yrs
sql_d = '''DELETE FROM TB_MEM_APP_TGT WHERE (above_18 ='N');'''
cursor.execute(sql_d)

sql_success = '''SELECT  membership_id,trim(mobile_nbr),trim(first_name),trim(last_name),trim(date_of_birth),above_18,employee_email from TB_MEM_APP_TGT'''
cursor.execute(sql_success)

print('\n')
print('SUCCESSFUL APPLICANTS')
print('---------------------')
for l in cursor.fetchall():
    print(l)

conn.commit()
conn.close()




SUCCESSFUL APPLICANTS
---------------------
('1601b', '12345678', 'john', 'abhraham', '19980922', 'Y', 'ja@gmail.com')
('3f8ed', '24345678', 'jack', 'alexander', '20011101', 'Y', 'jal@gmail.com')
('e6202', '42345678', 'john', 'abhraham', '19980921', 'Y', 'ja@gmail.com')
('07d43', '52345678', 'jack', 'alexander', '20011111', 'Y', 'jal@gmail.com')


#### Moving processed files to archive

In [20]:
import shutil 

import time

for i in files:
    time.sleep(10)
    print(i)
    
    time.sleep(10)
    shutil.move(file_dir_input+ r"\\" + i, file_dir_archive)
    time.sleep(15)

membership_application_001.csv
membership_application_002.csv
