## Connect to Google Drive 

In [3]:


from google.colab import drive
drive.mount('/content/drive')
!pwd
import os
%cd /content/drive/My Drive/Colab Notebooks


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
/content/drive/My Drive/Colab Notebooks
/content/drive/My Drive/Colab Notebooks


# Data Pipeline to process Membership Details 

In [10]:
# Import all libraries

!pip install apache-airflow
import numpy as np
import pandas as pd

import pandas as pd
import numpy as np
from datetime import datetime
from pandas.api.types import is_numeric_dtype
import hashlib
from os import listdir
from os.path import isfile, join
import shutil
import datetime

from airflow.models import DAG
from airflow.operators.python import PythonOperator
from airflow.sensors.python import PythonSensor

# Hash a single string with hashlib.sha256
import hashlib


Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


### Viewing and Understanding the data files 

In [5]:
data1 = pd.read_csv("in_data/applications_dataset_1.csv")

print(data1.shape)
print(data1.head())
print(data1.describe())

data2 = pd.read_csv("in_data/applications_dataset_2.csv")

print(data2.shape)
print(data2.head())
print(data2.describe())


(1999, 4)
             name                              email date_of_birth mobile_no
0   William Dixon  William_Dixon@woodward-fuller.biz    1986/01/10  40601711
1    Kristen Horn               Kristen_Horn@lin.com    1974-09-10    737931
2  Kimberly Chang   Kimberly_Chang@johnson-lopez.biz    02/27/1974   2692047
3       Mary Ball              Mary_Ball@stevens.biz    02/05/1968    886359
4  Benjamin Craig           Benjamin_Craig@berry.net    12/11/1988    696429
                 name                              email date_of_birth  \
count            1999                               1999          1999   
unique           1968                               1999          1984   
top     Brian Johnson  William_Dixon@woodward-fuller.biz    08/22/1993   
freq                4                                  1             2   

       mobile_no  
count       1999  
unique      1993  
top        39056  
freq           2  
(3000, 4)
              name                          email da

# Data Processor for Incoming Data and scheduler using AirFlow


## Settings and helper functions 

In [6]:
'''
  Function Name: fetch_hash_string
  Description : Fetches the hashed value baesd on last name and birth date. The first 5 characters are used as hash value
  input:<last name, birth_date>
  output: hashsed value

'''

def fetch_hash_string(last_name,birth_date):
  hashed_string = hashlib.sha256(str(birth_date).encode('utf-8')).hexdigest()
  return last_name+hashed_string[0:5]

'''
   in_data: folder where the batch file is placed and is processed in an hourly manner
   in_archive: Folder where the data is moved after processing 
   out_successful: Folder where successful memberships are placed
   out_unsuccessful: Folder where unsuccessful memberships are placed

'''
PATH="in_data/"
ARCHIVE_PATH="in_archive/"
OUT_SUCCESSFUL_PATH="out_successful/"
OUT_UNSUCCESSFUL_PATH="out_unsuccessful/"

## The data pipeline Function

In [7]:

'''
  Function Name: data_pipeline
  Description : Performes the following 
  Split name into first_name and last_name
  Format birthday field into YYYYMMDD
  Remove any rows which do not have a name field (treat this as unsuccessful applications)
  Create a new field named above_18 based on the applicant's birthday
  Membership IDs for successful applications should be the user's last name, followed by a SHA256 hash of the applicant's birthday, truncated to first 5 digits of hash (i.e <last_name>_<hash(YYYYMMDD)>)
  
  input:<>
  output: The successful and unsuccessful applicants are written to a folder 
'''

def data_pipeline():

    # Concatenate all data together 
    onlyfiles = [join(PATH, f) for f in listdir(PATH) if isfile(join(PATH, f))]

    # Convert to data frame 
    raw_df = pd.concat([pd.read_csv(f, low_memory=False) for f in onlyfiles], ignore_index=True)
    
    
    #Data Cleansing
    raw_df['mobile_no']=raw_df['mobile_no'].str.replace(' ', '')
    raw_df['mobile_no']=raw_df['mobile_no'].astype('int')

    # Create a boolean is_mobile_valid and is_email_valid based on conditions
    raw_df['is_mobile_valid'] = np.where(raw_df['mobile_no'].astype(str).str.len() == 8,True, False)
    raw_df['is_email_valid'] = np.where(raw_df['email'].str.contains(r'[^@]+@[^@]+\.[com]+') | raw_df['email'].str.contains(r'[^@]+@[^@]+\.[net]+'),True,False)

    #Format birthday field into YYYYMMDD
    raw_df['date_of_birth'] = pd.to_datetime(raw_df['date_of_birth'])
    raw_df['date_of_birth'] = raw_df.date_of_birth.apply(lambda x: x.strftime('%Y%m%d')).astype(int)

    

    # Reference for DOB Calculation
    ref_dt = datetime(2022, 1, 1)

    # Create a new field named above_18 based on the applicant's birthday
    raw_df['age'] = raw_df['date_of_birth'].apply(lambda x: (ref_dt - x).days // 365)
    raw_df['above_18'] = np.where(raw_df['age'] > 18 ,True, False)
    
    #Split name into first_name and last_name
    raw_df['first_name'] = [x.split(' ')[-1] for x in raw_df['name']]
    raw_df['last_name'] = [x.split(' ')[-0] for x in raw_df['name']]
    success_df = raw_df.loc[(raw_df['is_mobile_valid'] == True) & (raw_df['is_email_valid'] == True) & (raw_df['above_18'] == True)  & (raw_df['name']!="")]

    #Identify Successful applications ad Hash the value
    success_df["membership_id"] = success_df.apply(lambda x : fetch_hash_string(x["last_name"],x["date_of_birth"]), axis=1)

    #Identify UnSuccessful applications 
    unsuccess_df = raw_df.loc[(raw_df['is_mobile_valid'] == False) | (raw_df['is_email_valid'] == False) | (raw_df['above_18'] == False)]


    curr_time = datetime.now()
    dt_string = curr_time.strftime("%d-%m-%Y_%H:%M:%S")

    # Write results
    success_df.to_csv(OUT_SUCCESSFUL_PATH+'successful_applications_'+dt_string+'.csv')
    unsuccess_df.to_csv(OUT_UNSUCCESSFUL_PATH+'unsuccessful_applications_'+dt_string+'.csv')
    
    
    #Archive processed files 
    for f in listdir(PATH):
      shutil.move(PATH+f, ARCHIVE_PATH+f)

## Airflow Scheduler Definition 

In [11]:

# Parameters
WORFKLOW_DAG_ID = "data_pipeline"
WORFKFLOW_START_DATE = datetime.datetime(2022, 11,26)
WORKFLOW_SCHEDULE_INTERVAL = "* * * * *"
WORKFLOW_EMAIL = ["brindha@example.com"]

WORKFLOW_DEFAULT_ARGS = {
    "owner": "brindha",
    "start_date": WORFKFLOW_START_DATE,
    "email": WORKFLOW_EMAIL,
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 0,
    'schedule_interval': '@hourly',
}

# Initialize DAG
dag = DAG(
    dag_id=WORFKLOW_DAG_ID,
    schedule_interval=WORKFLOW_SCHEDULE_INTERVAL,
    default_args=WORKFLOW_DEFAULT_ARGS,
)

# Define jobs
job_1_operator = PythonOperator(
    task_id="task_job_1",
    python_callable=data_pipeline,
    dag=dag,
)

