In [1]:
from kfp import dsl
from kfp import components as comp
from kfp.compiler import Compiler

In [2]:
def load_data(input_dir: str) -> dict:
    import os
    import csv

    files = os.listdir(input_dir)
    name_data = {}
    for file in files:
        file_path = os.path.join(input_dir, file)  # Create the full path
        with open(file_path, mode='r', newline='') as csv_file:
            csv_reader = csv.DictReader(csv_file)

            list_of_dicts = [row for row in csv_reader]

        file_name, file_extension = os.path.splitext(os.path.basename(file))
        name_data[file_name] = list_of_dicts
    
    return name_data

comp_load_data = comp.create_component_from_func(
    load_data
)


In [3]:
def generate_names(input_dict: dict, n_rows:int, mf_ratio:float, output_csv: comp.OutputPath('csv'),):
    import pandas as pd
    import numpy as np

    def add_ratios(df):
        df['value'] = df['value'].astype(int)
        total_sum = df['value'].sum()
        df['ratio'] = df['value'] / total_sum
        df.drop(columns=['value'], inplace=True)
        return df

    def generate_names_from_df(df, n):
        names = np.random.choice(df['name'], size=n, p=df['ratio'], replace=True)
        return names

    df_mfn = add_ratios(pd.DataFrame(input_dict['male_first_names']))
    df_mln = add_ratios(pd.DataFrame(input_dict['male_last_names']))
    df_ffn = add_ratios(pd.DataFrame(input_dict['female_first_names']))
    df_fln = add_ratios(pd.DataFrame(input_dict['female_last_names']))

    n_male = int(n_rows * mf_ratio)
    n_female = n_rows - n_male

    male_first_names = generate_names_from_df(df_mfn, n_male)
    male_last_names = generate_names_from_df(df_mln, n_male)
    female_first_names = generate_names_from_df(df_ffn, n_female)
    female_last_names = generate_names_from_df(df_fln, n_female)

    male_names = pd.DataFrame({
        'first_name': male_first_names,
        'last_name': male_last_names,
        'gender': 'M'
    })

    female_names = pd.DataFrame({
        'first_name': female_first_names,
        'last_name': female_last_names,
        'gender': 'F'
    })

    combined_names = pd.concat([male_names, female_names], ignore_index=True)
    combined_names = combined_names.sample(frac=1)

    combined_names.to_csv(output_csv, index=False)

comp_generate_names = comp.create_component_from_func(
    generate_names,
    packages_to_install=['pandas', 'numpy']
)

In [4]:
def generate_birthday(n_rows: int, min_age: int, max_age: int, output: comp.OutputPath('csv')):
    from datetime import datetime, timedelta
    import numpy as np
    import pandas as pd

    def generate_random_birthday(min_age, max_age):
        today = datetime.today()
        start_date = today - timedelta(days=(max_age * 365))
        end_date = today - timedelta(days=(min_age * 365))
        return start_date + timedelta(days=np.random.randint(0, (end_date - start_date).days))
    
    df = pd.DataFrame({
        'date_of_birth': [generate_random_birthday(min_age, max_age).strftime('%Y-%m-%d') for _ in range(n_rows)]
    })

    df.to_csv(output, index=False)


comp_generate_birthday = comp.create_component_from_func(
    generate_birthday,
    packages_to_install=['pandas', 'numpy', 'datetime']
)

In [5]:
def generate_id_codes__with_user_data(name_input_csv: comp.InputPath('csv'), birthday_input_csv: comp.InputPath('csv'), output_csv: comp.OutputPath('csv')):
    #imports & installs
    import pandas as pd
    import random
    print('Everything succesfully imported!')

    def calculate_control_number(id_code):
        def weighted_sum(weights, code):
            return sum(w * int(c) for w, c in zip(weights, code))
        
        level1_weights = [1, 2, 3, 4, 5, 6, 7, 8, 9, 1]
        level2_weights = [3, 4, 5, 6, 7, 8, 9, 1, 2, 3]
    
        sum_1 = weighted_sum(level1_weights, id_code)
        control_number = sum_1 % 11
        if control_number == 10:
            sum_2 = weighted_sum(level2_weights, id_code)
            control_number = sum_2 % 11
            if control_number == 10:
                control_number = 0
                
        return control_number

    df1 = pd.read_csv(name_input_csv)
    df2 = pd.read_csv(birthday_input_csv)
    input_df = pd.concat([df1, df2], axis=1)
    input_df['date_of_birth'] = pd.to_datetime(input_df['date_of_birth'], format='%Y-%m-%d')
    size = input_df.shape[0]
    id_code_list = []
    step_of_progress = int(size / 5)
    #Using a set to ensure that no duplicates are added
    for index, row in  input_df.iterrows():
        id_code = ''

        # Gender and first digit of the birth year indicator
        if row['gender'] == 'M':
            if row['date_of_birth'].year < 2000:
                id_code += '3'  # Male born in 1900-1999
            else:
                id_code += '5'  # Male born in 2000-2099
        elif row['gender'] == 'F':
            if row['date_of_birth'].year < 2000:
                id_code += '4'  # Female born in 1900-1999
            else:
                id_code += '6'  # Female born in 2000-2099
    
        # Birth year
        birth_year = row['date_of_birth'].year % 100
        id_code += f'{birth_year:02}'
    
        # Birth month
        birth_month = row['date_of_birth'].month
        id_code += f'{birth_month:02}'
    
        # Birth day
        birth_day = row['date_of_birth'].day
        id_code += f'{birth_day:02}'
    
        # Serial number
        serial_number = f'{random.randint(0, 999):03}'
        id_code += serial_number
    
        # 11. Control number (0...9)
        id_code += str(calculate_control_number(id_code))
        
        id_code_list.append(id_code)

        if len(id_code_list) % step_of_progress == 0:
            print(f'{len(id_code_list)} / {size} ID codes generated')
        
    print('All the ID codes have been generated!')
    id_df = pd.DataFrame({'id_code':id_code_list})
    print(id_df.head())
    df_output = pd.concat([input_df, id_df], axis=1)
    print(df_output.head(10))
    df_output.to_csv(output_csv, index=False)

comp_generate_id_codes = comp.create_component_from_func(
    generate_id_codes__with_user_data,
    packages_to_install=['pandas']
)

In [6]:
def generate_addresses(n_rows:int, output_csv: comp.OutputPath('csv')):
    import pandas as pd
    import random

    def generate_address():
        street_names = ["Tartu maantee", "Narva maantee", "Pärnu maantee", "Viru tänav", "Pargi tänav", 'Pikk tänav', 'Lai tänav', 'Rüütli tänav']
        cities = ["Tallinn", "Tartu", "Narva", "Pärnu", "Viljandi"]
        postal_codes = {"Tallinn": "10115", "Tartu": "50102", "Narva": "20101", "Pärnu": "80011", "Viljandi": "71020"}
        street_name = random.choice(street_names)
        street_number = random.randint(1, 100)
        city = random.choice(cities)
        postal_code = postal_codes[city]
        
        address = f"{street_name} {street_number}, {city}, {postal_code}, Estonia"
        return address
    
    df = pd.DataFrame([generate_address() for _ in range(n_rows)], columns=['address'])
    print(df.head())
    df.to_csv(output_csv, index=False)

comp_addresss = comp.create_component_from_func(
    generate_addresses,
    packages_to_install=['pandas']
)


In [7]:
def email(input_csv: comp.InputPath('csv'), output_csv: comp.OutputPath('csv')):
    import random
    import pandas as pd
    import unidecode

    df = pd.read_csv(input_csv)

    def generate_email(first_name_clean: str, last_name_clean: str) -> str:
        patterns = [
            f"{first_name_clean}.{last_name_clean}",
            f"{first_name_clean}{last_name_clean}",
            f"{first_name_clean[0]}.{last_name_clean}",
            f"{first_name_clean}.{last_name_clean[0]}",
            f"{first_name_clean}{random.randint(10, 99)}"
        ]
        
        domains = ["hotmail.com", "gmail.com", "yahoo.com", "outlook.com"]
        email_pattern = random.choice(patterns)
        domain = random.choice(domains)
        
        email_address = f"{email_pattern}@{domain}"
        return email_address

    df['email'] = df.apply(
        lambda row: generate_email(
            unidecode.unidecode(row['first_name'].lower()),
            unidecode.unidecode(row['last_name'].lower())
        ), axis=1
    )

    df[['email']].to_csv(output_csv, index=False)

comp_email = comp.create_component_from_func(
    email,
    packages_to_install=['unidecode', 'pandas']
)

In [8]:
def output(id_code: comp.InputPath('csv'),
           addresses: comp.InputPath('csv'),
           email: comp.InputPath('csv'),
           output: comp.OutputPath('csv')
           ):
    import pandas as pd

    df1 = pd.read_csv(id_code)
    df2 = pd.read_csv(addresses)
    df3 = pd.read_csv(email)
    final_df = pd.concat([df1, df2, df3], axis=1)

    print(final_df.head())
    final_df.to_csv(output, index=False)

comp_output = comp.create_component_from_func(
    output,
    packages_to_install=['pandas']
)

In [9]:
@dsl.pipeline(
    name='EE user pipeline',
    description="description"
)
def pipeline(n_rows:int, min_age:int=16, max_age:int=85, mf_ratio:float=0.5):
    existing_pvc = dsl.PipelineVolume(pvc='egp-pvc')

    load = comp_load_data('mnt/data/datasets').add_pvolumes({"/mnt/data": existing_pvc})

    names = comp_generate_names(load.output, n_rows, mf_ratio)

    birthday  = comp_generate_birthday(n_rows, min_age, max_age)

    id_code = comp_generate_id_codes(names.output, birthday.output)

    address = comp_addresss(n_rows)

    email = comp_email(names.output)

    output = comp_output(
        id_code.output,
        address.output,
        email.output
    )

Compiler().compile(pipeline, 'ee_pipeline.yaml')