### Merge algorithm

The algorithm was designed in order to combine two different data sources which are different in sizes and have no unique columns, hence why the conventional merge operations aren't suitable.

<b> Use Case for the Algorithm </b>

The goal is to combine production events (e.g., machine downtimes) and employees present during that event.

<b>Example:</b> In the downtimes dataset, an entry is recorded about a machine with <b>ID 3</b> that stopped working at <b>2020-04-28 21:25:00</b> and was functional again at <b>2020-04-28 22:00:00</b>. In the employees dataset we can check which employees were working during that timeframe and associate them responsible for fixing the broken machine. The number of employees, their IDs and their shift are recorded for each production event as arrays during the merge process.

##### Importing Libraries

In [None]:
import multiprocessing
import time
import numpy as np
from scripts import utility_functions as ut #data preprocessing pipeline
from pathlib import Path
from functools import partial
from multiprocessing import Pool
from pandas import DataFrame

##### Read and clean the data sources

In [None]:
emp, emp2 = ut.get_employee_data() #employee data
exp = ut.get_machine_error_data() #downtimes
exp2 = ut.get_quality_control_data() #quality control data

##### Merge Algorithm

In [None]:
def merge_data(x, y):
    '''
    :param x: Machine Dataset
    :param y: Employee Data
    :return: Merged Dataset
    '''
    df1 = x[['relevant columns from downtimes dataset']]
    df2 = y[['relevant columns from employee dataset']]

    new_rows = []

    for row in df1.itertuples():
        in_time_window = df2[
            (row.start >= df2['start']) & (row.end <= df2['end']) # filtering all employees between events
            ]

        hashes = in_time_window['hash_code'].tolist() # an array representing a group of employees for each event
        plan_group = in_time_window['plan_group'].tolist() # plan group of each employee
        experiences = in_time_window['experience'].tolist() # experience in years of each employee
        shifts = in_time_window['shift'].tolist() # shift of each employee in case an event overlaps
        group_shift = list(dict.fromkeys(shifts)) # check wheter all employees are in the same shift
        group_shift = " ".join(str(x) for x in group_shift)
        group_avg_experience = 0 # average group experience
        if len(experiences) == 0:
            pass # divide by zero error
        else:
            group_avg_experience = sum(experiences) / len(experiences)

        new_rows.append( # create new dataset with selected variables
            {
                'start_date': row.start,
                'end_date': row.end,
                'hash_keys': hashes, # employee groups
                'plan_groups': plan_group,
                'downtime': (row.end - row.start) / np.timedelta64(1, 'h'), # recorded downtime for each event
                'group_size': len(hashes), # group size
                # rest are omitted
            }
        )

    output = DataFrame(new_rows)

    return output

##### Process parallelization using multiprocessing

The above algorithm's run time decreases w.r.t to the dataset size (e.g., the number of date comparisons), hence the algorithm is adapted for process parallelization using Pool object from multiprocessing module, which significantlly increased the performance of the algorithm.

<b>The algorithm for quality control data</b>

In [None]:
def to_parallelize(df: DataFrame, iterrows):
    _, row = iterrows
    in_time_window = df2[
        (df2['start'] <= row['start']) & (row['start'] <= df2['end'])
        ]
    hashes = in_time_window['hash_code'].tolist() # employee groups
    # rest of the generated columns are omitted
    return {
        'inspectionDate': row['start'],
        'product_type': row['product_type'],
        # rest are omitted
    }

##### Running process parallelization and constructing pool object

Pool method was chosen since it allows parallelization of a function accross different input sizes.

In [None]:
multiprocessing.set_start_method("fork")
st = time.time()
func = partial(to_parallelize, df2)
rows = df1.iterrows()

pool = Pool(processes=4)
new_rows2 = pool.map(func, rows)
pool.close()
pool.join()
print(time.time() - st)  # 59 seconds

The average runtime wasn't tested but the algorithm without parallelization ran around 1hr and during this test it completed in 59 seconds with process based parallelism.