In [1]:
# libraries
import pandas as pd
from datetime import datetime
import re
import os
import hashlib
import shutil
from pathlib import Path

In [2]:
# Declare folders
BASE_FOLDER = Path(os.getcwd()) 

INPUT_FOLDER = BASE_FOLDER / "input"
SUCCESS_FOLDER = BASE_FOLDER / "output" / "success"
FAILURE_FOLDER = BASE_FOLDER / "output" / "fail"
ARCHIVE_FOLDER = BASE_FOLDER / "archive"

In [3]:
# Declare Reference date used to determine if a person is above 18 years old
REFERENCE_DT = datetime(2022, 1, 1)

# Declare acceptable date formats for parsing date of birth
VALID_DATE_FORMATS = ["%Y/%m/%d", "%m/%d/%Y", "%Y-%m-%d", "%d-%m-%Y"]

# Declare regular expression pattern to validate email format
VALID_EMAIL_PATTERN = re.compile(r"^[\w\.-]+@[\w\.-]+\.(com|net)$")

# Titles to be excluded when extracting names from full name field
VALID_TITLES = {"Mr.", "Mrs.", "Ms.", "Dr.", "Prof.", "Miss", "Sir", "Madam", "Mx.", "Jr.", "Sr.", "MD", "PhD", "DDS", "Sir", "Dame"'Lord', 
                'Lady', 'Rev', 'Reverend', 'Capt', 'Captain', 'Maj', 'Major'} 

In [4]:
# Function to parse a date string using multiple formats
def parse_date(date_str):
    for fmt in VALID_DATE_FORMATS:
        try:
            return datetime.strptime(date_str, fmt)
        except ValueError:
            continue  # Try the next format if the current one fails
    return None  # Return None if no format matches

In [5]:
# Function to check if an email is in valid pattern
def is_valid_email(email):
    return bool(VALID_EMAIL_PATTERN.match(email)) if pd.notna(email) else False

In [6]:
# Function to check if mobile number is valid
def is_valid_mobile(mobile):
    return str(mobile).isdigit() and len(str(mobile)) == 8

In [7]:
# Function to generate a unique membership ID using SHA-256 hashing of birthday, returning in lowercase
def generate_membership_id(last_name, birthday):
    hash_digest = hashlib.sha256(birthday.encode()).hexdigest()
    return f"{last_name.lower()}_{hash_digest[:5].lower()}"

In [8]:
# Function to parse a full name string and return the first and last name.
# This function should disregard any leading titles, as they are not considered part of the individual's name

def parse_name(full_name):
    if pd.isna(full_name) or not isinstance(full_name, str) or full_name.strip() == "":
        return None, None  # Return None for both fields if input is invalid
    
    parts = [word for word in full_name.split() if word not in VALID_TITLES]  # Remove titles
    if len(parts) > 1:
        return parts[0], " ".join(parts[1:])  # First word is first name, remaining words are last name
    return parts[0], None  # If the name contains only one word, treat it as the first name and set the last name to None.

In [9]:
# Function to process all input csv files, transform and put them into success and fail.
def process_input_file(file_path):

    print(f"STARTING to process file: {file_path}")
    
    # Read the CSV file, assuming tab or comma as delimiters
    df = pd.read_csv(file_path, delimiter='\t|,', engine='python')
    
    # Extract first and last names from the 'name' column
    df[['first_name', 'last_name']] = df['name'].apply(lambda x: pd.Series(parse_name(x)))
    # keep the original name column after extraction
    # df.drop(columns=['name'], inplace=True) 
    
    # Convert 'date_of_birth' to datetime format
    df['dob_final'] = df['date_of_birth'].apply(parse_date)
    # keep original date_of_birth column after parsing
    # df.drop(columns=['date_of_birth'], inplace=True)  
    
    # Format birthday as a string for further processing
    df['dob_final_str'] = df['dob_final'].dt.strftime('%Y%m%d')
    
    # Determine if the person is above 18 years old
    df['above_18'] = df['dob_final'].notna() & ((REFERENCE_DT - df['dob_final']).dt.days >= 18 * 365)
    
    # Validate mobile numbers (expecting exactly 8 digits)
    df['valid_mobile'] = df['mobile_no'].astype(str).str.match(r'^[0-9]{8}$')
    
    # Validate email format
    df['valid_email'] = df['email'].apply(is_valid_email)
    
    # Determine whether the record is successful based on multiple conditions
    df['successful'] = df['above_18'] & df['valid_mobile'] & df['valid_email'] & df['name'].notna()
    
    # Generate a membership ID for successful records only
    df['membership_id'] = df.apply(lambda row: generate_membership_id(row['last_name'], row['dob_final_str']) if row['successful'] else None, axis=1)
    
    # Separate successful and unsuccessful records
    successful_df = df[df['successful']].drop(columns=['successful', 'valid_mobile', 'valid_email'])
    unsuccessful_df = df[~df['successful']].drop(columns=['successful', 'valid_mobile', 'valid_email'])
    
    # Reorder columns to match the desired order
    successful_column_order = ['membership_id', 'name', 'first_name', 'last_name', 'email', 'date_of_birth', 'dob_final', 'dob_final_str', 'mobile_no', 'above_18']
    successful_df = successful_df[successful_column_order]
    unsuccessful_column_order = ['name', 'email', 'date_of_birth', 'mobile_no']
    unsuccessful_df = unsuccessful_df[unsuccessful_column_order]
    
    # Extract filename and extension
    base_filename, ext = os.path.splitext(os.path.basename(file_path))
    # Define output file paths with suffixes
    success_output_path = os.path.join(SUCCESS_FOLDER, f"{base_filename}_success_{datetime.now().strftime('%Y%m%d%H%M%S')}{ext}")
    fail_output_path = os.path.join(FAILURE_FOLDER, f"{base_filename}_fail_{datetime.now().strftime('%Y%m%d%H%M%S')}_{ext}")
    
    # Save processed results
    successful_df.to_csv(success_output_path, index=False)
    unsuccessful_df.to_csv(fail_output_path, index=False)
    
    print(f"Processing file: {file_path} -> {len(successful_df)} successful, {len(unsuccessful_df)} unsuccessful")
    print(f"")

    # Archive processed file in dynamic date folder
    date_folder = ARCHIVE_FOLDER / datetime.now().strftime("%Y%m%d")
    date_folder.mkdir(parents=True, exist_ok=True)
    shutil.move(file_path, date_folder)
    print(f"Archiving file {file_path} to {date_folder}")
    print(f"")

# Main script to iterate all the input files and process them
if __name__ == "__main__":

    if not os.listdir(INPUT_FOLDER):
        print("No files found in the input folder. Nothing to process.")
    else:
        for file_name in os.listdir(INPUT_FOLDER):
            file_path = os.path.join(INPUT_FOLDER, file_name)
            
            # Process only CSV files
            if os.path.isfile(file_path) and file_name.endswith(".csv"):
                process_input_file(file_path)

STARTING to process file: C:\Albert\data_engineering_assessment\section_1\input\applications_dataset_1.csv
Processing file: C:\Albert\data_engineering_assessment\section_1\input\applications_dataset_1.csv -> 225 successful, 1774 unsuccessful

Archiving file C:\Albert\data_engineering_assessment\section_1\input\applications_dataset_1.csv to C:\Albert\data_engineering_assessment\section_1\archive\20250918

STARTING to process file: C:\Albert\data_engineering_assessment\section_1\input\applications_dataset_2.csv
Processing file: C:\Albert\data_engineering_assessment\section_1\input\applications_dataset_2.csv -> 301 successful, 2699 unsuccessful

Archiving file C:\Albert\data_engineering_assessment\section_1\input\applications_dataset_2.csv to C:\Albert\data_engineering_assessment\section_1\archive\20250918

