# Raw_ADS_preparation.ipynb
> Project: **ABI Turnover**  
> Turnover Process Phase: **1**  
> Author: **Varun V**  
> Location: **GCC**  
> Team: **People Analytics**

###Pending steps
- add target combined file creation functions from the prior setup to the target_class
- missing value imputation module
- package as seperate py classes file and make the notebook compact

In [3]:
## importing the relevant packages:

# clear the workspace
# %reset -f

# print list of files in directory
import os
print(os.listdir())

# the base packages
import collections # for the Counter function
import csv # for reading/writing csv files
import pandas as pd, numpy as np, time, gc, bisect, re
from datetime import datetime as dt

# the various packages/modules used across processing (sklearn), modelling (lightgbm) and bayesian optimization (hyperopt, bayes_opt)
import sklearn
from sklearn import metrics, preprocessing

randomseed = 1 # the value for the random state used at various points in the pipeline
pd.options.display.max_rows = 1000 # specify if you want the full output in cells rather the truncated list
pd.options.display.max_columns = 200
pd.options.mode.chained_assignment = None  # default='warn'

# to display multiple outputs in a cell without usin print/display
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"

__The Parent folder structure of the ADLS where the invidual files are stored__

*classes/functions follow the order as dictated by the folder structure*

In [5]:
%fs

ls /mnt/datalake/INPUT/

path,name,size
dbfs:/mnt/datalake/INPUT/BLUEPRINTS/,BLUEPRINTS/,0
dbfs:/mnt/datalake/INPUT/COMPETENCY/,COMPETENCY/,0
dbfs:/mnt/datalake/INPUT/MISC/,MISC/,0
dbfs:/mnt/datalake/INPUT/MOVEMENT/,MOVEMENT/,0
dbfs:/mnt/datalake/INPUT/OPR/,OPR/,0
dbfs:/mnt/datalake/INPUT/SALARY/,SALARY/,0
dbfs:/mnt/datalake/INPUT/TARGET/,TARGET/,0
dbfs:/mnt/datalake/INPUT/TURNOVER/,TURNOVER/,0
dbfs:/mnt/datalake/INPUT/test/,test/,0


In [6]:
# GLOBAL VARIABLES/DEFINITIONS

# for blueprint (main and nlp/text columns)
bp_cols_to_keep = ['org_unit_id_of_position_om', 'position_id_om', 'position_name_om', 'macro_entity_level_3_om', 'macro_entity_level_4_om',  'ab__inbev_entity_level_2_om', 'ab__inbev_entity_level_3_om', 'ab__inbev_entity_level_4_om', 'employee_global_id_pa', 'employee_personnal_number_pa', 'pay_grade_group_pa', 'physical_work_location_code_pa', 'physical_work_location_description_pa', 'physical_work_location_city_pa', 'global_job_om', 'job_family_om', 'functional_area_om', 'position_start_date_om', 'position_band_om', 'position_work_location_code_om', 'position_work_location_country_om', 'manager_s_global_id_pa', 'manager_s_position_id_om', 'manager_s_position_name_om', 'ebm_level_of_the_job_om', 'company_code_om', 'employee_group_code_om', 'employee_subgroup_code_om', 'cost_center_id_om', 'local_entity_code', 'employment_status_pa']
bp_cols_for_nlp = ['org_unit_name_of_position_om', 'position_name_om', 'company_name_om', 'global_job_om', 'job_family_om', 'functional_area_om', 'position_work_location_code_om', 'manager_s_position_name_om', 'cost_center_text_om', 'local_entity_description', 'employee_global_id_pa', 'employee_personnal_number_pa', 'position_id_om']

# for competency
cp_cols_to_keep = ['employee_global_id', 'personnel_number', 'year', 'competency_group', 'competency', 'manager_rating___scale_value', 'manager_rating___numeric_value']

# for movement
mov_cols_to_keep = ['pers_no_', 'position', 'name_of_manager_om_', 'ps_group', 'start_date', 'end_date', 'reason_for_action']

# for opr
opr_cols_to_keep = ['personnel_number', 'year', 'opr_rating_scale']

# for salary
sal_cols_to_keep = ['pers_no_', 'name_of_employee_or_applicant', 'for_period', 'amount', 'crcy']

# for target
target_cols_to_keep = ['appraisee_id', 'appraiser_id', 'net_target', 'year']

# for turnover
to_cols_to_keep = ['employee_id', 'global_id', 'pay_scale_group', 'name_of_action_type', 'name_of_reason_for_action', 'last_day', 'termination_date']

In [7]:
# HELPER FUNCTIONS CLASS #


class helper_funcs(object):
  
  def __init__(self):
    """ list down the various functions defined here """
  
  def csv_read(self, file_path, cols_to_remove=None, dtype=None):
    self.cols_to_remove = cols_to_remove
    if dtype is None:
      x=pd.read_csv(file_path, na_values=['No Data', ' ', 'UNKNOWN', '', 'Not Rated', 'Not Applicable'], encoding='latin-1', low_memory=False)
    else:
      x=pd.read_csv(file_path, na_values=['No Data', ' ', 'UNKNOWN', '', 'Not Rated', 'Not Applicable'], encoding='latin-1', low_memory=False, dtype=dtype)
    print(x.shape)
    if cols_to_remove is not None: x.drop(cols_to_remove, axis = 1, inplace = True)
    chars_to_remove = [' ', '.', '(', ')', '__', '-', '/', '\'']
    for i in chars_to_remove:
        x.columns = x.columns.str.strip().str.lower().str.replace(i, '_')
    return x
  
  def txt_read(self, file_path, cols_to_remove=None, sep='|', skiprows=1, dtype=None):
    # currently only supports salary files with the default values (need to implement dynamic programming for any generic txt)
    self.cols_to_remove = cols_to_remove
    if dtype is None:
      x=pd.read_table(file_path, sep=sep, skiprows=skiprows, na_values=['No Data', ' ', 'UNKNOWN', '', 'Not Rated', 'Not Applicable'])
    else:
      x=pd.read_table(file_path, sep=sep, skiprows=skiprows, na_values=['No Data', ' ', 'UNKNOWN', '', 'Not Rated', 'Not Applicable'], dtype=dtype)
    print(x.shape)
    if cols_to_remove is not None: x.drop(cols_to_remove, axis = 1, inplace = True)
    chars_to_remove = [' ', '.', '(', ')', '__', '-', '/', '\'']
    for i in chars_to_remove:
        x.columns = x.columns.str.strip().str.lower().str.replace(i, '_')
    return x

  def xlsx_read(self, file_path, cols_to_remove=None, sheet_name=0, dtype=None):
    self.cols_to_remove = cols_to_remove
    if dtype is None:
      x=pd.read_excel(file_path, na_values=['No Data', ' ', 'UNKNOWN', '', 'Not Rated', 'Not Applicable'], sheet_name=sheet_name)
    else:
      x=pd.read_excel(file_path, na_values=['No Data', ' ', 'UNKNOWN', '', 'Not Rated', 'Not Applicable'], sheet_name=sheet_name, dtype=dtype)
    print(x.shape)
    if cols_to_remove is not None: x.drop(cols_to_remove, axis = 1, inplace = True)
    chars_to_remove = [' ', '.', '(', ')', '__', '-', '/', '\'']
    for i in chars_to_remove:
        x.columns = x.columns.str.strip().str.lower().str.replace(i, '_')
    return x
  
  def process_columns(self, df):
    df = df.apply(lambda x: x.str.lower() if (x.dtype == 'object') else x)
    df = df.apply(lambda x: x.str.strip() if (x.dtype == 'object') else x)
    df = df.apply(lambda x: x.str.replace('[^\w\s]', '_', regex=True) if (x.dtype == 'object') else x)
    return df
  
  def nlp_process_columns(self, df, nlp_cols):
    df = df.apply(lambda x: x.str.replace('_', ' ') if x.name in nlp_cols else x)
    df = df.apply(lambda x: x.str.replace('\s+', ' ', regex=True) if x.name in nlp_cols else x)
    df = df.apply(lambda x: x.str.replace('crft', 'craft') if x.name in nlp_cols else x)
    return df

In [8]:
# initialize the helper functions class instance

helpers = helper_funcs()

#BLUEPRINT

In [10]:
%fs

ls /mnt/datalake/INPUT/BLUEPRINTS

path,name,size
dbfs:/mnt/datalake/INPUT/BLUEPRINTS/01-2017.xlsx,01-2017.xlsx,9141149
dbfs:/mnt/datalake/INPUT/BLUEPRINTS/01-2018.xlsx,01-2018.xlsx,15701971
dbfs:/mnt/datalake/INPUT/BLUEPRINTS/02-2017.xlsx,02-2017.xlsx,9974343
dbfs:/mnt/datalake/INPUT/BLUEPRINTS/02-2018.xlsx,02-2018.xlsx,15826020
dbfs:/mnt/datalake/INPUT/BLUEPRINTS/03-2017.xlsx,03-2017.xlsx,10171703
dbfs:/mnt/datalake/INPUT/BLUEPRINTS/03-2018.xlsx,03-2018.xlsx,15900764
dbfs:/mnt/datalake/INPUT/BLUEPRINTS/04-2017.xlsx,04-2017.xlsx,10156939
dbfs:/mnt/datalake/INPUT/BLUEPRINTS/04-2018.xlsx,04-2018.xlsx,16046382
dbfs:/mnt/datalake/INPUT/BLUEPRINTS/05-2016.xlsx,05-2016.xlsx,7657060
dbfs:/mnt/datalake/INPUT/BLUEPRINTS/05-2017.xlsx,05-2017.xlsx,10231479


In [11]:
# BLUEPRINT CLASS #


class blueprint(object):
  
  def __init__(self, helper_funcs_instance, cols_to_keep, cols_for_nlp):
    """ 
    this class does multiple things:
      - reads in the multiple blueprint reports
      - subset only necessary columns and create a seperate dataset for NLP
      - do some basic preprocessing steps on the column names and content
      - ...
    """
    self.helpers = helper_funcs_instance
    self.cols_to_keep = cols_to_keep
    self.cols_for_nlp = cols_for_nlp
  
  def main(self):
    ## list the existing blueprint reports and store the list of file names in file_names_keys
    files = dbutils.fs.ls('/mnt/datalake/INPUT/BLUEPRINTS/')
    file_names = {}
    iter = 0
    for i in files:
      file_names[i.name] = i.path
      iter = iter+1
    file_names_keys = list(file_names.keys())
    self.file_names_keys = file_names_keys

    # splitting the list of files on the basis of the year it belongs to
    df16 = [x for x in file_names_keys if re.search(pattern='-2016.xlsx', string=x)]
    df17 = [x for x in file_names_keys if re.search(pattern='-2017.xlsx', string=x)]
    df18 = [x for x in file_names_keys if re.search(pattern='-2018.xlsx', string=x)]
    
    dict16_keys = [s.replace('.xlsx', '') for s in df16]
    dict17_keys = [s.replace('.xlsx', '') for s in df17]
    dict18_keys = [s.replace('.xlsx', '') for s in df18]

    # the dict to store the individual datasets (per year) and the nlp datasets of the corresponding files
    dict16 = {}
    dict17 = {}
    dict18 = {}
    nlp_dict16 = {}
    nlp_dict17 = {}
    nlp_dict18 = {}
    
    # read in the files and store them in the year dictionaries
    iter=0
    for i in df16:
        dict16[dict16_keys[iter]] = self.helpers.xlsx_read(str('/dbfs/mnt/datalake/INPUT/BLUEPRINTS/' + dict16_keys[iter] + '.xlsx'))
        nlp_dict16[dict16_keys[iter]] = dict16[dict16_keys[iter]][self.cols_for_nlp]
        dict16[dict16_keys[iter]] = dict16[dict16_keys[iter]][self.cols_to_keep]
        dict16[dict16_keys[iter]] = self.helpers.process_columns(dict16[dict16_keys[iter]])
        dict16[dict16_keys[iter]]['month_file'] = dict16_keys[iter]
        dict16[dict16_keys[iter]]['term_year'] = 2016
        dict16[dict16_keys[iter]]['term_month_temp'] = dict16[dict16_keys[iter]]['month_file'].apply(lambda s:s.split('-')[0])
        iter = iter+1

    iter=0
    for i in df17:
        dict17[dict17_keys[iter]] = self.helpers.xlsx_read(str('/dbfs/mnt/datalake/INPUT/BLUEPRINTS/' + dict17_keys[iter] + '.xlsx'))
        nlp_dict17[dict17_keys[iter]] = dict17[dict17_keys[iter]][self.cols_for_nlp]
        dict17[dict17_keys[iter]] = dict17[dict17_keys[iter]][self.cols_to_keep]
        dict17[dict17_keys[iter]] = self.helpers.process_columns(dict17[dict17_keys[iter]])
        dict17[dict17_keys[iter]]['month_file'] = dict17_keys[iter]
        dict17[dict17_keys[iter]]['term_year'] = 2017
        dict17[dict17_keys[iter]]['term_month_temp'] = dict17[dict17_keys[iter]]['month_file'].apply(lambda s:s.split('-')[0])
        iter = iter+1
    
    iter=0
    for i in df18:
        dict18[dict18_keys[iter]] = self.helpers.xlsx_read(str('/dbfs/mnt/datalake/INPUT/BLUEPRINTS/' + dict18_keys[iter] + '.xlsx'), 
                                                           sheet_name='Blueprint-Data')
        nlp_dict18[dict18_keys[iter]] = dict18[dict18_keys[iter]][self.cols_for_nlp]
        dict18[dict18_keys[iter]] = dict18[dict18_keys[iter]][self.cols_to_keep]
        dict18[dict18_keys[iter]] = self.helpers.process_columns(dict18[dict18_keys[iter]])
        dict18[dict18_keys[iter]]['month_file'] = dict18_keys[iter]
        dict18[dict18_keys[iter]]['term_year'] = 2018
        dict18[dict18_keys[iter]]['term_month_temp'] = dict18[dict18_keys[iter]]['month_file'].apply(lambda s:s.split('-')[0])
        iter = iter+1
        
    self.dict16 = dict16
    self.dict17 = dict17
    self.dict18 = dict18
    self.nlp_dict16 = nlp_dict16
    self.nlp_dict17 = nlp_dict17
    self.nlp_dict18 = nlp_dict18
    
    return None

In [12]:
bp = blueprint(helpers, bp_cols_to_keep, bp_cols_for_nlp)

In [13]:
bp.main()

#COMPETENCY

In [15]:
%fs

ls /mnt/datalake/INPUT/COMPETENCY

path,name,size
dbfs:/mnt/datalake/INPUT/COMPETENCY/NAZ_2018 Competency Results.xlsx,NAZ_2018 Competency Results.xlsx,15903629
dbfs:/mnt/datalake/INPUT/COMPETENCY/competency_2015.csv,competency_2015.csv,23309365
dbfs:/mnt/datalake/INPUT/COMPETENCY/competency_2016.csv,competency_2016.csv,27261788
dbfs:/mnt/datalake/INPUT/COMPETENCY/competency_2017.csv,competency_2017.csv,27045442
dbfs:/mnt/datalake/INPUT/COMPETENCY/competency_2018.csv,competency_2018.csv,39162901
dbfs:/mnt/datalake/INPUT/COMPETENCY/competency_combined.csv,competency_combined.csv,378720


In [16]:
# COMPETENCY CLASS #

class competency(object):
  
  def __init__(self, helper_funcs_instance, cols_to_keep):
    """ 
    this class does multiple things:
      - reads in the multiple competency reports
      - subset only necessary columns and create a seperate dataset for NLP
      - do some basic preprocessing steps on the column names and content
      - ...
    """
    self.helpers = helper_funcs_instance
    self.cols_to_keep = cols_to_keep
    return None
  
  def main(self):
    ## list the existing competency files and store the list of file names in file_names_keys
    files = dbutils.fs.ls('/mnt/datalake/INPUT/COMPETENCY/')
    file_names = {}
    iter = 0
    for i in files:
      file_names[i.name] = i.path
      iter = iter+1
    file_names_keys = list(file_names.keys())
    self.file_names_keys = file_names_keys

    # splitting the list of files on the basis of the year it belongs to
    comp = [x for x in file_names_keys if re.search(pattern='competency_20', string=x)]
    comp_keys = [s.replace('.csv', '') for s in comp]

    # the dict to store the individual datasets (per year)
    competency = {}
    comp_years = [i.split('_')[1] for i in comp_keys]

    # read in the files and store them in the year dictionaries
    iter=0
    for i in comp:
        competency[comp_keys[iter]] = self.helpers.csv_read(str('/dbfs/mnt/datalake/INPUT/COMPETENCY/' + comp_keys[iter] + '.csv'))
        competency[comp_keys[iter]] = competency[comp_keys[iter]][self.cols_to_keep]
        competency[comp_keys[iter]] = self.helpers.process_columns(competency[comp_keys[iter]])
        competency[comp_keys[iter]]['year'] = comp_years[iter]
        iter = iter+1
    self.competency = competency
    
    competency_combined = pd.concat(competency.values(), ignore_index=True)
    print('competency combined original shape: ', competency_combined.shape)
    competency_combined.drop_duplicates(inplace=True)
    print('competency combined new shape after distinct: ', competency_combined.shape)
    self.competency_combined = competency_combined

    return None

In [17]:
comp = competency(helpers, cp_cols_to_keep)

In [18]:
comp.main()

#MISC

In [20]:
%fs

ls /mnt/datalake/INPUT/MISC

path,name,size
dbfs:/mnt/datalake/INPUT/MISC/DEMOGRAPHICS_2017.xlsx,DEMOGRAPHICS_2017.xlsx,2420029
dbfs:/mnt/datalake/INPUT/MISC/NAZ_Headcountreport_20181109220859.csv,NAZ_Headcountreport_20181109220859.csv,8268833


In [21]:
# MISCELLANEOUS CLASS #

class misc_class(object):
  
  def __init__(self, helper_funcs_instance):
    """
    this class is for reading in the multiple miscellaneous files
    """
    self.helpers = helper_funcs_instance
    return None
  
  def main(self):
    # read in the files
    demographics = self.helpers.xlsx_read('/dbfs/mnt/datalake/INPUT/MISC/DEMOGRAPHICS_2017.xlsx')
    self.demo = demographics
    
    return None

In [22]:
misc = misc_class(helpers)

In [23]:
misc.main()

#MOVEMENT

In [25]:
%fs

ls /mnt/datalake/INPUT/MOVEMENT

path,name,size
dbfs:/mnt/datalake/INPUT/MOVEMENT/Movements_2014.csv,Movements_2014.csv,2247865
dbfs:/mnt/datalake/INPUT/MOVEMENT/Movements_2015.csv,Movements_2015.csv,1701558
dbfs:/mnt/datalake/INPUT/MOVEMENT/Movements_2016.csv,Movements_2016.csv,1787575
dbfs:/mnt/datalake/INPUT/MOVEMENT/Movements_2017.csv,Movements_2017.csv,2955452
dbfs:/mnt/datalake/INPUT/MOVEMENT/Movements_2018.csv,Movements_2018.csv,3903004
dbfs:/mnt/datalake/INPUT/MOVEMENT/NAZ_Movementsreport_20181109215959.csv,NAZ_Movementsreport_20181109215959.csv,34167938


In [26]:
# MOVEMENT CLASS #

class movement_class(object):
  
  def __init__(self, helper_funcs_instance, cols_to_keep):
    """ 
    this class does multiple things:
      - reads in the multiple movement reports
      - subset only necessary columns
      - do some basic preprocessing steps on the column names and content
      - ...
    """
    self.helpers = helper_funcs_instance
    self.cols_to_keep = cols_to_keep
    return None
  
  def main(self):
    ## list the existing movement files and store the list of file names in file_names_keys
    files = dbutils.fs.ls('/mnt/datalake/INPUT/MOVEMENT/')
    file_names = {}
    iter = 0
    for i in files:
      file_names[i.name] = i.path
      iter = iter+1
    file_names_keys = list(file_names.keys())
    self.file_names_keys = file_names_keys

    # splitting the list of files on the basis of the year it belongs to
    mov = [x for x in file_names_keys if re.search(pattern='Movements_2', string=x)]
    mov_keys = [s.replace('.csv', '') for s in mov]

    # the dict to store the individual datasets (per year)
    movement = {}
    movement_years = [i.split('_')[1] for i in mov_keys]

    # read in the files and store them in the year dictionaries
    iter=0
    for i in mov:
        movement[mov_keys[iter]] = self.helpers.csv_read(str('/dbfs/mnt/datalake/INPUT/MOVEMENT/' + mov_keys[iter] + '.csv'))
        movement[mov_keys[iter]] = movement[mov_keys[iter]][self.cols_to_keep]
        movement[mov_keys[iter]] = self.helpers.process_columns(movement[mov_keys[iter]])
        movement[mov_keys[iter]]['year'] = movement_years[iter]
        iter = iter+1
    self.movement = movement
    
    movement_combined = pd.concat(movement.values(), ignore_index=True)
    print('movement combined original shape: ', movement_combined.shape)
    movement_combined.drop_duplicates(inplace=True)
    print('movement combined new shape after distinct: ', movement_combined.shape)
    self.movement_combined = movement_combined

    return None

In [27]:
movement = movement_class(helpers, cols_to_keep = mov_cols_to_keep)

In [28]:
movement.main()

#OPR

In [30]:
%fs

ls /mnt/datalake/INPUT/OPR

path,name,size
dbfs:/mnt/datalake/INPUT/OPR/opr_2016.csv,opr_2016.csv,6291794
dbfs:/mnt/datalake/INPUT/OPR/opr_2017.csv,opr_2017.csv,6938386
dbfs:/mnt/datalake/INPUT/OPR/opr_combined.csv,opr_combined.csv,228941


In [31]:
# OPR CLASS #

class opr_class(object):
  
  def __init__(self, helper_funcs_instance, cols_to_keep):
    """ 
    this class does multiple things:
      - reads in the multiple opr files
      - subset only necessary columns
      - do some basic preprocessing steps on the column names and content
      - ...
    """
    self.helpers = helper_funcs_instance
    self.cols_to_keep = cols_to_keep
    return None
  
  def main(self):
    ## list the existing opr files and store the list of file names in file_names_keys
    files = dbutils.fs.ls('/mnt/datalake/INPUT/OPR/')
    file_names = {}
    iter = 0
    for i in files:
      file_names[i.name] = i.path
      iter = iter+1
    file_names_keys = list(file_names.keys())
    self.file_names_keys = file_names_keys

    # splitting the list of files on the basis of the year it belongs to
    opr = [x for x in file_names_keys if re.search(pattern='opr_2', string=x)]
    opr_keys = [s.replace('.csv', '') for s in opr]

    # the dict to store the individual datasets (per year)
    opr_list = {}
    opr_years = [i.split('_')[1] for i in opr_keys]

    # read in the files and store them in the year dictionaries
    iter=0
    for i in opr:
        opr_list[opr_keys[iter]] = self.helpers.csv_read(str('/dbfs/mnt/datalake/INPUT/OPR/' + opr_keys[iter] + '.csv'))
        opr_list[opr_keys[iter]] = opr_list[opr_keys[iter]][self.cols_to_keep]
        opr_list[opr_keys[iter]] = self.helpers.process_columns(opr_list[opr_keys[iter]])
        iter = iter+1
    self.opr_list = opr_list
    
    opr_processed = self.helpers.csv_read('/dbfs/mnt/datalake/INPUT/OPR/opr_combined.csv')
    opr_processed = self.helpers.process_columns(opr_processed)
    self.opr_processed = opr_processed
    
    opr_combined = pd.concat(opr_list.values(), ignore_index=True)
    print('opr combined original shape: ', opr_combined.shape)
    opr_combined.drop_duplicates(inplace=True)
    print('opr combined new shape after distinct: ', opr_combined.shape)
    self.opr_combined = opr_combined

    return None

In [32]:
opr = opr_class(helpers, opr_cols_to_keep)

In [33]:
opr.main()

#SALARY

In [35]:
%fs

ls /mnt/datalake/INPUT/SALARY

path,name,size
dbfs:/mnt/datalake/INPUT/SALARY/salary_bonus_CA_2018.xlsx,salary_bonus_CA_2018.xlsx,687042
dbfs:/mnt/datalake/INPUT/SALARY/salary_bonus_US_2018.txt,salary_bonus_US_2018.txt,34760201
dbfs:/mnt/datalake/INPUT/SALARY/salary_main_NAZ_2018.xlsx,salary_main_NAZ_2018.xlsx,6880592


In [36]:
# SALARY CLASS #

class salary_class(object):
  
  def __init__(self, helper_funcs_instance, cols_to_keep):
    """ 
    this class does multiple things:
      - reads in the multiple salary files (base salary and bonus, perhaps different files for USA and CANADA)
      - subset only necessary columns
      - do some basic preprocessing steps on the column names and content
      - ...
    """
    self.helpers = helper_funcs_instance
    self.cols_to_keep = cols_to_keep
    return None
  
  def main(self):
    ## list the existing salary files and store the list of file names in file_names_keys
#     files = dbutils.fs.ls('/mnt/datalake/INPUT/SALARY/')
#     file_names = {}
#     iter = 0
#     for i in files:
#       file_names[i.name] = i.path
#       iter = iter+1
#     file_names_keys = list(file_names.keys())
#     self.file_names_keys = file_names_keys

#     # splitting the list of files on the basis of the year it belongs to
#     sal_main = [x for x in file_names_keys if re.search(pattern='salary_main(.*?).xlsx', string=x)]
#     sal_bonus = [x for x in file_names_keys if re.search(pattern='salary_bonus(.*?).txt', string=x)]
#     sal_main_keys = [s.replace('.xlsx', '') for s in sal_main]
#     sal_bonus_keys = [s.replace('.txt', '') for s in sal_bonus]

#     # the dict to store the individual datasets (per year)
#     salmain = {}
#     salbonus = {}
#     sal_main_years = [i.split('_')[3] for i in sal_main_keys]
#     sal_bonus_years = [i.split('_')[3] for i in sal_bonus_keys]
#     sal_main_zones = [i.split('_')[2] for i in sal_main_keys]
#     sal_bonus_zones = [i.split('_')[2] for i in sal_bonus_keys]

    # read in the files and store them in the main and bonus dictionaries
    # here there is only one file for main salary and two bonus files in two formats. hence making it static codes.
    # recommended to define proper governance around salary files and tweak the salary class accordingly
    
    # naz combined main file
    salmain = self.helpers.xlsx_read('/dbfs/mnt/datalake/INPUT/SALARY/salary_main_NAZ_2018.xlsx')
    
    # usa bonus file
    salbonus_naz = self.helpers.txt_read('/dbfs/mnt/datalake/INPUT/SALARY/salary_bonus_US_2018.txt', skiprows=37)
    salbonus_naz = salbonus_naz.iloc[1:, 1:]
    salbonus_naz.drop(salbonus_naz.columns[[-1,]], axis=1, inplace=True)
    salbonus_naz = salbonus_naz[self.cols_to_keep]
    salbonus_naz['year'] = 2018
    salbonus_naz['zone'] = 'usa'
    salbonus_naz.dropna(inplace=True, axis=0, how='all')
    salbonus_naz.drop_duplicates(inplace=True)
    salbonus_naz = salbonus_naz[salbonus_naz['pers_no_'] != 'Pers.No.']
    salbonus_naz.pers_no_ = salbonus_naz['pers_no_'].astype(float)
    
    # canada bonus file
    sal_bonus_can_excel = pd.ExcelFile('/dbfs/mnt/datalake/INPUT/SALARY/salary_bonus_CA_2018.xlsx')
    sheets = len(sal_bonus_can_excel.sheet_names)
    sal_bonus_can = {}
    for i in range(sheets):
      sal_bonus_can[i] = self.helpers.xlsx_read('/dbfs/mnt/datalake/INPUT/SALARY/salary_bonus_CA_2018.xlsx', sheet_name=i)
      sal_bonus_can[i]['year'] = sal_bonus_can_excel.sheet_names[i]
    sal_bonus_can_df = pd.concat(sal_bonus_can.values(), ignore_index=True, sort=False)
    sal_bonus_can_df = sal_bonus_can_df[['employee_number', 'pay_run_earning_committed_amount_sum', 'period_start', 'year']]
    sal_bonus_can_df['yearmonth'] = pd.to_datetime(sal_bonus_can_df['period_start']).dt.strftime('%Y%m')
    salbonus_ca = sal_bonus_can_df.groupby(['employee_number', 'yearmonth'])['pay_run_earning_committed_amount_sum'].sum().to_frame(name='bonus').reset_index()
       
    # finally
    self.salary_combined = salmain
    self.salbonus_naz = salbonus_naz
    self.salbonus_ca = salbonus_ca

    return None

In [37]:
salary = salary_class(helpers, sal_cols_to_keep)
salary.main()

#TARGET

In [39]:
%fs

ls /mnt/datalake/INPUT/TARGET

path,name,size
dbfs:/mnt/datalake/INPUT/TARGET/target_combined.csv,target_combined.csv,1953820
dbfs:/mnt/datalake/INPUT/TARGET/target_complete.xlsx,target_complete.xlsx,6090150


In [40]:
# TARGET CLASS #

class target_class(object):
  
  def __init__(self, helper_funcs_instance, cols_to_keep):
    """ 
    this class is for reading in the target files
    the target combined file read here is a prior-processed one. the function to generate it from 
    """
    self.helpers = helper_funcs_instance
    self.cols_to_keep = cols_to_keep
    return None
  
  def main(self):
    # read in the files
    target = pd.ExcelFile('/dbfs/mnt/datalake/INPUT/TARGET/target_complete.xlsx')
    sheets = len(target.sheet_names)
    self.target = target
    
    target_combined = self.helpers.csv_read('/dbfs/mnt/datalake/INPUT/TARGET/target_combined.csv')
    target_combined = target_combined[self.cols_to_keep]
    target_combined = self.helpers.process_columns(target_combined)
    self.target_combined = target_combined
    
    return None

In [41]:
target = target_class(helpers, target_cols_to_keep)

In [42]:
target.main()

#TURNOVER FILES

In [44]:
%fs

ls /mnt/datalake/INPUT/TURNOVER/

path,name,size
dbfs:/mnt/datalake/INPUT/TURNOVER/turnover-2016.csv,turnover-2016.csv,606800
dbfs:/mnt/datalake/INPUT/TURNOVER/turnover_2017.csv,turnover_2017.csv,4790559
dbfs:/mnt/datalake/INPUT/TURNOVER/turnover_2018.csv,turnover_2018.csv,9033569


In [45]:
# TURNOVER CLASS #

class to_class(object):
  
  def __init__(self, helper_funcs_instance, cols_to_keep):
    """ 
    this class is for reading in the target files
    """
    self.helpers = helper_funcs_instance
    self.cols_to_keep = cols_to_keep
    return None
  
  def main(self):
    # read in the files
    files = dbutils.fs.ls('/mnt/datalake/INPUT/TURNOVER/')
    file_names = {}
    iter = 0
    for i in files:
      file_names[i.name] = i.path
      iter = iter+1
    file_names_keys = list(file_names.keys())
    self.file_names_keys = file_names_keys

    # splitting the list of files on the basis of the format it has
    to_csv = [x for x in file_names_keys if re.search(pattern='.csv', string=x)]
    to_xlsx = [x for x in file_names_keys if re.search(pattern='.xlsx', string=x)]    
    to_csv_keys = [re.sub(pattern='turnover_|turnover-|.csv', repl = '', string=s) for s in to_csv]
    to_xlsx_keys = [re.sub(pattern='turnover_|turnover-|.xlsx', repl = '', string=s) for s in to_xlsx]

    # the dict to store the individual datasets to append for later
    to = {}

    # read in the files
    iter=0
    for i in to_csv:
        to[to_csv_keys[iter]] = self.helpers.csv_read(str('/dbfs/mnt/datalake/INPUT/TURNOVER/' + to_csv[iter]), dtype=str)
        to[to_csv_keys[iter]] = to[to_csv_keys[iter]][self.cols_to_keep]
        to[to_csv_keys[iter]] = self.helpers.process_columns(to[to_csv_keys[iter]])
        to[to_csv_keys[iter]]['year'] = to_csv_keys[iter]
        iter = iter+1
    iter=0
    for i in to_xlsx:
        to[to_xlsx_keys[iter]] = self.helpers.xlsx_read(str('/dbfs/mnt/datalake/INPUT/TURNOVER/' + to_xlsx[iter]), dtype=str)
        to[to_xlsx_keys[iter]] = to[to_xlsx_keys[iter]][self.cols_to_keep]
        to[to_xlsx_keys[iter]] = self.helpers.process_columns(to[to_xlsx_keys[iter]])
        to[to_xlsx_keys[iter]]['year'] = to_xlsx_keys[iter]
        iter = iter+1
    self.to = to
    
    to_combined = pd.concat(to.values(), ignore_index=True)
    print('turnover combined original shape: ', to_combined.shape)
    to_combined.drop_duplicates(inplace=True, subset=['employee_id'])
    to_combined['employee_id'] = to_combined['employee_id'].astype(int)
    print('turnover combined new shape after distinct: ', to_combined.shape)
    self.to_combined = to_combined
    
    return None

In [46]:
turnover = to_class(helpers, to_cols_to_keep)

In [47]:
turnover.main()

# END OF DATA PROCUREMENT
&nbsp;
_all the invidiual class instances are loaded into a super class that contains as attributes All the invidual files/list of files_

In [49]:
# global function to flatten columns after a grouped operation and aggregation
# outside all classes since it is added as an attribute to pandas DataFrames
def __my_flatten_cols(self, how="_".join, reset_index=True):
  how = (lambda iter: list(iter)[-1]) if how == "last" else how
  self.columns = [how(filter(None, map(str, levels))) for levels in self.columns.values] \
  if isinstance(self.columns, pd.MultiIndex) else self.columns
  return self.reset_index(drop=True) if reset_index else self
pd.DataFrame.my_flatten_cols = __my_flatten_cols


# the super class
class all_files:
  
  def __init__(self, blueprint, competency, opr, target, movement, salary, turnover, misc, bp_nlp_cols, helpers):
    # initialize the various class instances into attributes of the super class
    self.bpt = blueprint
    self.cpy = competency
    self.trg = target
    self.opr = opr
    self.mov = movement
    self.sal = salary
    self.tvr = turnover
    self.misc = misc
    self.bpt_nlp_cols = bp_nlp_cols
    self.helper_functions = helpers
    
    # declare global variables
    self.ps_group_list = ['vi_b', 'vii_a', 'vii_b', 'v_a', 'viii_a', 'iii_a', 'v_b', 'vi_a', 'iv_b',
       'x_a', 'iv_a', 'viii_b', 'ii_b', 'ii_a', 'ix_b', 'ix_a', 'iii_b', 'i_b', 'i_a', 'x_b', 'xi_b', 'xi_a']
    self.opr_dict = {'4a': 7, '4b': 6, '3b': 5, '3a': 4, '2': 3, '1a': 2, '1b': 1}
    self.opr_dict_bucket = {'4a': 'high', '4b': 'high', '3b': 'medium', '3a': 'high', '2': 'medium', '1a': 'low', '1b': 'low'}
    self.mov_rfa_good = ['promotion _ band up', 'promotion within band']
    self.mov_rfa_neutral = ['lateral move', 'internal restructuring', 'local transfers', 'return to base location']
    self.mov_rfa_bad = ['demotion', 'grandfathering', 'position reevaluation', 'end of grandfathering']
    self.crcy_converter =  {'USD': 1, 'CNY': 0.15, 'TWD': 0.032, 'GBP': 1.26, 'HKD': 0.13, 'EUR': 1.13, 'MXN': 0.05, 'ARS': 0.026, 'SGD': 0.73,
       'CAD': 0.75, 'BRL': 0.26}  # average ratios based on last 5 years of currency changes

    
    # call the various feature creation classes (adds the features as attributes of the class instance)
    self.competency_features()
    self.target_features()
    self.opr_features()
    self.movement_features()
    self.blueprint_features()
    self.turnover()
    
    # final function call
    self.main()
  ###########################################################################################################################
  
  # some helper class functions
  def group_agg_feats(self, df, group_cols, agg_col, new_cols):
    df_grp = df.groupby(group_cols, as_index=False).agg({agg_col: ['sum', 'mean']}).my_flatten_cols()
    df_grp.columns = new_cols
    df = df.merge(df_grp, how='left')
    return df
  
  def group_cumsum_feats(self, df, group_cols, cumsum_cols, sort_col):
    df_temp = df.sort_values(sort_col).groupby(group_cols, sort=False).sum().groupby(level=[0])[cumsum_cols].cumsum().reset_index().my_flatten_cols()
    group_cols.extend(list(['cumsum_' + s for s in cumsum_cols]))
    df_temp.columns = group_cols
    df = df.merge(df_temp, how='left')
    return df
  
  def group_cummean_feats(self, df, group_cols, cummean_col, sort_col):
    for i in group_cols:
      df.sort_values([i, sort_col], ascending = [True, True], inplace=True)
      df.reset_index(drop=True, inplace=True)
      new_col = str('cummean_' + i + '_' + cummean_col)
      df[new_col] = df.groupby(i, sort=False)[cummean_col].expanding().mean().reset_index(drop=True)
    return df

  # below function to be tweaked. incorporate cummax/cummin version
  def group_topn_feats(self, df, group_cols, sort_col, subset_cols, new_cols, npwhere_col, npwhere_list, which, flag_col, n=1):
    # which = ['top', 'bottom']
    df[sort_col] = df[sort_col].astype(float)
    if which=='top':
      #df['new'] = df.groupby('id').value.cummax()
      df_temp = df.groupby(group_cols)[sort_col].nlargest(n)
    elif which=='bottom':
      df_temp = df.groupby(group_cols)[sort_col].nsmallest(n)
    df_temp = df[subset_cols].loc[df_temp.index.get_level_values(1)].reset_index(drop=True)
    df_temp.columns = new_cols
    df_temp[flag_col] = np.where(df_temp[npwhere_col].isin(npwhere_list), 1, 0)
    df_temp.drop(npwhere_col, inplace=True, axis=1)
    df = df.merge(df_temp, how='left')
    return df
  
  def date_converter(self, df):
    date_cols = [col for col in df.columns if 'date' in col]
    for i in date_cols:
      df[i] = df[i].astype(str)
      df[i] = df[i].str.replace(' +', ' ', regex=True)
      df[i] = df[i].str.split(' ').str[0].astype(str)
      df[i] = df[i].str.replace('[^\w\s]', '_', regex=True)
      df[i+'_year'] = df[i].str.split('_').str[2].astype(str).apply(lambda s: '20'+s if len(s) == 2 else s)
      df = df.loc[df[i+'_year'] != '0000']
      df[i+'_month'] = df[i].astype(str).str.split('_').str[0].astype(str).apply(lambda s: '0'+s if len(s) == 1 else s)
      df[i+'_day'] = df[i].astype(str).str.split('_').str[1].astype(float)
      df[i+'_month'] = np.where(df[i+'_year'].astype(float).isin([99, 9999, 2099]), 12, df[i+'_month'])
      df[i+'_day'] = np.where(df[i+'_year'].astype(float).isin([99, 9999, 2099]), 31, df[i+'_day'])

      df[i+'_year'] = np.where(df[i+'_year'].astype(float).isin([99, 9999, 2099]), 2019, df[i+'_year'])
      df[i+'_yearmonth'] = df[i+'_year'].map(str) + df[i+'_month'].astype(str)
      df[i+'_year'] = df[i+'_year'].astype(float)
      df[i+'_month'] = df[i+'_month'].astype(float)
    return df
  
  def groupby_count(self, df, grp1_col, grp2_cols, count_col):
    for i in grp2_cols:
      col_name = str('count_' + count_col)
      if 'l2' in i:
        col_name = 'team_l2_size'
      else:
        col_name = 'team_size'
      grp_cols = [grp1_col, i]
      df2 = df.groupby(grp_cols)[count_col].nunique().to_frame(name=col_name).reset_index()
      df = df.merge(df2, how='left')
    return df
  
  def groupby_compare(self, df, grp_cols, grp_fixed_cols, transform_col):
    for i in grp_cols:
      all_grp_cols = grp_fixed_cols.copy()
      if i is not None: all_grp_cols.extend([i])
      df1 = df.groupby(all_grp_cols)[transform_col].transform('size').sub(1)
      df2 = df.groupby(all_grp_cols)[transform_col].transform('sum').sub(df[transform_col])
      if i is not None: new_colname = str('compare_ratio_' + transform_col + '_atlevel_' + i)
      if i is None: new_colname = str('compare_ratio_' + transform_col + 'atlevel_psg_yearmonth')
      df[new_colname] = df2/df1
      df[new_colname] = df[transform_col]/df[new_colname]
    return df
  
  def fillna_df(self, df, fill_cols, mode, grp_col=None, abs_value=None, ref_value_col=None, ref_value_col_frac=None):
    """ mode = ['simple_abs', 'simple_ref', 'adv_fill']
    """
    for i in fill_cols:
      if mode=='simple_abs':
        df[i] = df[i].astype(float)
        df[i].fillna(value=abs_value, inplace=True)
      elif mode=='simple_ref':
        df[i].fillna(value=df[ref_value_col], inplace=True)
      elif mode=='adv_fill':
        df[i] = df.groupby(grp_col)[i].transform(lambda x: x.ffill())
        df['flagna'] = df[i].isnull()
        df[i] = df.groupby(grp_col)[i].transform(lambda x: x.bfill())
        if ref_value_col is not None: df[i] = np.where(df['flagna']==1, df[i]-(ref_value_col_frac*df[i]), df[i])
        if abs_value is not None: df[i] = np.where(df['flagna']==1, df[i]-abs_value, df[i])
        df.drop('flagna', axis=1, inplace=True)
        df.reset_index(inplace=True, drop=True)
    return df
  
  ###########################################################################################################################
  
  def competency_features(self):
    """ the major features required have been created at several levels:
        - personnel number + year
        - personnel number + year + competency_group
        - personnel number + year + competency_group + competency_group_type_l1
        the features being:
        - sum of manager_rating___numeric_value
        - mean of manager_rating___numeric_value
    """
    comp = self.cpy
    
    # find below some very rough code to clean, process and create some features for competency
    x = comp.competency_combined.copy()
    x.dropna(axis=0, subset=['manager_rating___numeric_value'], inplace=True)
    x = x[~x['manager_rating___numeric_value'].isin(['not rated', 'not applicable'])]
    x.manager_rating___numeric_value = x.manager_rating___numeric_value.astype('float')
    x['competency_group_type_l1'] = x['competency'].str.split(' _ ').str[0]
    x['competency_group_type_l2'] = x['competency'].str.split(' _ ').str[1]
    
    # creating the grouped dfs and merging the new features
    x = self.group_agg_feats(x, group_cols=['personnel_number', 'year'], agg_col='manager_rating___numeric_value',
                             new_cols=['personnel_number', 'year', 'pers_year_comp_score_sum', 'pers_year_comp_score_mean'])
    x = self.group_agg_feats(x, group_cols=['personnel_number', 'competency_group', 'year'], agg_col='manager_rating___numeric_value',
                             new_cols=['personnel_number', 'competency_group', 'year', 'pers_compgroup_year_comp_score_sum', 'pers_compgroup_year_comp_score_mean'])
    # commenting out the below feature creation pending good EDA on the comp L2 level sparsity and information
    #x = self.group_agg_feats(x, group_cols=['personnel_number', 'competency_group', 'competency_group_type_l1', 'year'], agg_col='manager_rating___numeric_value', new_cols=['personnel_number', 'competency_group', 'competency_group_type_l1', 'year', 'pers_compgroupl1_year_comp_score_sum', 'pers_compgroupl1_year_comp_score_mean'])
    
    x = x[['personnel_number', 'year', 'competency_group', 'pers_year_comp_score_sum', 'pers_year_comp_score_mean', 'pers_compgroup_year_comp_score_sum', 'pers_compgroup_year_comp_score_mean']]
    x.drop_duplicates(inplace=True)
    
    # post-processing and final competency features added to the instance
    xx = x.copy()
    xx = xx[['personnel_number', 'year', 'competency_group', 'pers_compgroup_year_comp_score_sum', 'pers_compgroup_year_comp_score_mean']]
    xx.drop_duplicates(inplace=True)
    xx = xx.groupby(['personnel_number', 'year', 'competency_group']).sum().unstack('competency_group').reset_index().my_flatten_cols()
    x = x.merge(xx, how='left', on=['personnel_number', 'year'])
    x.drop(['competency_group'], axis=1, inplace=True)
    x.drop_duplicates(inplace=True, subset=['personnel_number', 'year'])
    x.rename(columns={'year': 'comp_year', 'personnel_number': 'employee_personnal_number_pa'}, inplace=True)

    x['employee_personnal_number_pa'] = x['employee_personnal_number_pa'].astype(float)
    x['comp_year'] = x['comp_year'].astype(int)
    self.comp_features = x
    return None
  
  ###########################################################################################################################
  
  def target_features(self):
    """ the major features are group aggregation features grouping on:
            - ['personnel_number']
            - ['personnel_number', 'year']
            - ['personnel_number_of_appraiser', 'year']
        using [net_target] as the aggregation factor and agg functions being [sum, mean]
    """    
    # find below some very rough code to clean, process and create some features for target
    x = self.trg.target_combined.copy()
    x.columns = ['employee_personnal_number_pa', 'personnel_number_of_appraiser', 'net_target' ,'target_year']
    
    x = self.group_cummean_feats(x, group_cols=['employee_personnal_number_pa', 'personnel_number_of_appraiser'], cummean_col='net_target', sort_col='target_year')
    x = x[['employee_personnal_number_pa', 'net_target', 'target_year', 'cummean_employee_personnal_number_pa_net_target', 'cummean_personnel_number_of_appraiser_net_target']]
    
    self.target_feats = x
    return None
  
  ###########################################################################################################################
  
  def opr_features(self):
    """
    contains a dictionary for mapping between opr values and their relative scale
    """   
    x = self.opr.opr_combined.copy()
    
    # some preprocessing
    x.dropna(axis=0, how='any', inplace=True)
    x['opr_value'] = x['opr_rating_scale'].map(self.opr_dict)
    x['opr_bucket'] = x['opr_rating_scale'].map(self.opr_dict_bucket)
    x.year = x.year.astype('int')
    
    x.rename(columns={'year': 'opr_year', 'personnel_number': 'employee_personnal_number_pa'}, inplace=True)
    self.opr_feats = x
    return None
  
  ###########################################################################################################################
  
  def movement_features(self):
    """ movement is defined as any position/band/location changes
    features:
      - mov_rfa_quality features (count, recent_flag)
      - lateral move (count, recent_flag)
      - internal restructuring (count, recent_flag)
    """    
    x = self.mov.movement_combined.copy()

    # some pre-processing
    x.rename(columns={'pers_no_': 'personnel_number'}, inplace=True)
    x.drop_duplicates(subset=(['personnel_number', 'position', 'ps_group', 'start_date', 'reason_for_action']), inplace=True)
    x.dropna(axis=0, subset=['personnel_number', 'start_date'], how='any', inplace=True)
    x['mov_rfa_good'] = np.where(x['reason_for_action'].isin(self.mov_rfa_good), 1, 0)
    x['mov_rfa_neutral'] = np.where(x['reason_for_action'].isin(self.mov_rfa_neutral), 1, 0)
    x['mov_rfa_bad'] = np.where(x['reason_for_action'].isin(self.mov_rfa_bad), 1, 0)
    x['mov_rfa_score'] = np.where(x['reason_for_action'].isin(self.mov_rfa_good), 2,
                                 np.where(x['reason_for_action'].isin(self.mov_rfa_neutral), 1, -1))
    x = self.date_converter(x)
    x.sort_values(['personnel_number', 'start_date_yearmonth', 'end_date_yearmonth'], ascending=[True, True, True], inplace=True)
    x.reset_index(drop=True, inplace=True)
    
    # creating the cumulative movements (qualitative and quantitative) for each employee+year+month (startdate as reference)
    x = self.group_cumsum_feats(x, group_cols = ['personnel_number', 'start_date_yearmonth'], sort_col='start_date_yearmonth',
                                cumsum_cols = ['mov_rfa_good', 'mov_rfa_neutral', 'mov_rfa_bad', 'mov_rfa_score'])
    # other features
    # expanding top/worst mov_rfa_score
    # rolling (window=4) mean mov_rfa_score 
    
    self.move_features = x
    return None
  
  ###########################################################################################################################
  
  def salary_features(self):
    x1 = self.sal.salbonus_ca.copy()
    x2 = self.sal.salbonus_naz.copy()
    
    # process and return the final combined bonus salary dataset
    x1.columns = ['employee_personnal_number_pa', 'yearmonth', 'bonus']
    x1['crcy'] = 'CAD'
    x2 = x2[['pers_no_', 'for_period', 'amount', 'crcy']]
    x2.columns = ['employee_personnal_number_pa',  'yearmonth', 'bonus', 'crcy']
    x_bonus = x1.append(x2, ignore_index=True)
    x_bonus.sort_values(['employee_personnal_number_pa', 'yearmonth'], inplace=True)
    x_bonus = x_bonus.apply(lambda x: x.str.strip() if (x.dtype == 'object') else x)
    x_bonus = x_bonus.apply(lambda x: x.str.replace('-|,', '') if (x.dtype == 'object') else x)
    x_bonus['bonus'] = x_bonus['bonus'].astype(float)
    x_bonus = x_bonus.loc[(x_bonus['bonus'] > 0.0) & (x_bonus['yearmonth'] != 0)]
    x_bonus['employee_personnal_number_pa'] = x_bonus['employee_personnal_number_pa'].astype(float)
    x_bonus['yearmonth'] = x_bonus['yearmonth'].astype(int)
    x_bonus['crcy_rate'] = x_bonus['crcy'].map(self.crcy_converter)
    x_bonus['bonus'] = x_bonus.bonus * x_bonus.crcy_rate
    x_bonus = x_bonus[['employee_personnal_number_pa', 'yearmonth', 'bonus']]
    
    # process and return the final combined main salary dataset
    x = salary.salary_combined.copy()
    x['start_date'] = x['start_date'].dt.strftime('%Y%m')
    x.dropna(inplace=True, axis=0, subset=['persno', 'start_date', 'annual_salary'], how='any')
    x = x[['persno', 'annual_salary', 'crcy', 'start_date']]
    x.sort_values(['persno', 'start_date'], inplace=True)
    x['annual_salary'] = x['annual_salary'].astype(float)
    x = x.loc[(x['annual_salary'] > 0.0) & (x['start_date'] != 0)]
    x['start_date'] = np.where(x['start_date'].astype(int) < int(self.min_yearmonth), int(self.min_yearmonth), x['start_date'].astype(int))
    x.columns = ['employee_personnal_number_pa', 'salary', 'crcy', 'yearmonth']
    x.drop_duplicates(subset=['employee_personnal_number_pa', 'yearmonth'], keep='last', inplace=True)
    x['employee_personnal_number_pa'] = x['employee_personnal_number_pa'].astype(float)
    x['yearmonth'] = x['yearmonth'].astype(int)
    x['crcy_rate'] = x['crcy'].map(self.crcy_converter)
    x['salary'] = x.salary * x.crcy_rate
    x = x[['employee_personnal_number_pa', 'yearmonth', 'salary']]
    
    # merge the final bonus and main salary files
    x = x.merge(x_bonus, how='left', on=['employee_personnal_number_pa', 'yearmonth'])
    self.sal_features = x
    return None
  
  ###########################################################################################################################
  
  def misc_features(self):
    x = self.misc.demo.copy()
    x['date_of_birth'] = pd.to_datetime(x['date_of_birth'], infer_datetime_format=True)
    x['original_hire_date'] = pd.to_datetime(x['original_hire_date'], infer_datetime_format=True)
    x['date_of_birth'] = np.where(x['date_of_birth'].dt.year.astype(float) > 2000, 
                                    x['date_of_birth'] - pd.DateOffset(years=100), x['date_of_birth'])
    x['original_hire_date'] = np.where(x['original_hire_date'].dt.year.astype(float) > 2020, 
                                    x['original_hire_date'] - pd.DateOffset(years=100), x['original_hire_date'])
    x2 = x.groupby(['personnel_number', 'date_of_birth', 'original_hire_date']).size().to_frame(name='count').reset_index()
    x2.sort_values(['personnel_number', 'count'], ascending=[True, False], inplace=True)
    x2.drop_duplicates(subset=['personnel_number'], inplace=True)
    x2.drop(['count'], axis=1, inplace=True)
    x2.columns = ['employee_personnal_number_pa', 'DOB', 'OHD']
    
    self.misc_feats = x2
    return None
  
  ###########################################################################################################################
  
  def blueprint_features(self):
    """ the biggest of all the classes. contains all the major hierarchy features and also is the mother dataset to all others (each one will be merged into this one after the other)
    """
    x1 = self.bpt.dict16.copy()
    x2 = self.bpt.dict17.copy()
    x3 = self.bpt.dict18.copy()
    x11 = pd.concat(x1.values(), ignore_index=True)
    x21 = pd.concat(x2.values(), ignore_index=True)
    x31 = pd.concat(x3.values(), ignore_index=True)
    x = x11.append(x21, ignore_index=True)
    x = x.append(x31, ignore_index=True)
    
    # some custom preprocessing for the blueprint files (not modularised because not applicable elsewhere)
    ## remove people with employment status being = 1
    x = x[x['employment_status_pa']!=1]
    x.drop('employment_status_pa', axis=1, inplace=True)
    ## drop rows with nas (conservative on id columns, liberal on band/group columns)
    x.dropna(subset=['employee_personnal_number_pa', 'employee_global_id_pa', 'position_id_om'], how='any', inplace=True, axis=0)
    x.dropna(subset=['pay_grade_group_pa', 'position_band_om'], how='all', inplace=True, axis=0)
    ## fill nas for the two band/group columns based on each other (cases of both nas are removed prior)
    x = self.fillna_df(x, fill_cols=['position_band_om'], mode='simple_ref', ref_value_col='pay_grade_group_pa')
    x = self.fillna_df(x, fill_cols=['pay_grade_group_pa'], mode='simple_ref', ref_value_col='position_band_om')
    ## filter for banded employees (based on pay group)
    x = x.loc[x['pay_grade_group_pa'].isin(self.ps_group_list)]
    ## create the monthly flag column and process the date columns available
    x['yearmonth'] = x['term_year'].map(str) + x['term_month_temp'].astype(str)
    x.position_start_date_om = pd.to_datetime(x.position_start_date_om)
    x = self.helper_functions.nlp_process_columns(x, self.bpt_nlp_cols)
    x['monthly_file_date'] = pd.to_datetime(x['term_year'].astype(str) + x['term_month_temp'].astype(str), format='%Y%m')
    ## treat the position start date
    x = self.fillna_df(x, fill_cols=['position_start_date_om'], mode='adv_fill', grp_col='employee_personnal_number_pa')

    self.min_yearmonth = np.datetime64(x.yearmonth.min())
    self.min_yearmonthfile = np.datetime64(x.monthly_file_date.min())
    # calling the miscellaneous and salary features functions
    self.misc_features()
    self.salary_features()
    
    ## add the misc features to the bp_combined dataset
    x = x.merge(self.misc_feats, how='left')
    
    ####################################################################################################################
    # MANAGER FEATURES
    x_m = x[['manager_s_global_id_pa', 'manager_s_position_id_om', 'manager_s_position_name_om', 'employee_global_id_pa',
             'employee_personnal_number_pa', 'pay_grade_group_pa', 'org_unit_id_of_position_om', 'position_band_om',
            'DOB', 'OHD', 'monthly_file_date', 'position_start_date_om', 'position_work_location_code_om']]
    man_list = x.manager_s_global_id_pa.astype(int).unique()
    x_m = x_m.loc[x_m['employee_global_id_pa'].isin(man_list)]
    x_m.columns = ['manager_s_global_id_pa_l2', 'manager_s_position_id_om_l2', 'manager_s_position_name_om_l2', 'manager_s_global_id_pa', 'manager_s_personnal_number_pa', 'manager_s_pay_grade_group_pa', 'manager_s_org_unit_id_of_position_om', 'manager_s_position_band_om',
                  'manager_s_DOB', 'manager_s_OHD', 'monthly_file_date', 'manager_s_position_start_date_om', 'manager_s_position_work_location_code_om']
    x_m.drop_duplicates(inplace=True)
    x = x.merge(x_m, how='left')
    
    # features applicable for both employee and manager
    ## create the position tenure features
    x['position_start_date_om'] = np.where(x['position_start_date_om'] > x['monthly_file_date'], self.min_yearmonthfile, x['position_start_date_om'])
    x['manager_s_position_start_date_om'] = np.where(x['manager_s_position_start_date_om'] > x['monthly_file_date'], self.min_yearmonthfile, x['manager_s_position_start_date_om'])
    x['position_tenure'] = (x.monthly_file_date - x.position_start_date_om).astype('timedelta64[D]')
    x['manager_s_position_tenure'] = (x.monthly_file_date - x.manager_s_position_start_date_om).astype('timedelta64[D]')
    x['position_tenure_diff'] = (x.manager_s_position_tenure - x.position_tenure).astype(float)
    ## create the remote location flag
    x['remote_flag'] = np.where(x['position_work_location_code_om'].str.contains('remote', case=False, na=False), 1, 0)
    x['manager_s_remote_flag'] = np.where(x['manager_s_position_work_location_code_om'].str.contains('remote', case=False, na=False), 1, 0)
    ## create the age features
    x['employee_age'] = (x.monthly_file_date - x.DOB).astype('timedelta64[D]')
    x['manager_s_age'] = (x.monthly_file_date - x.manager_s_DOB).astype('timedelta64[D]')
    x['age_diff'] = (x.manager_s_age - x.employee_age).astype(float)
    ## create the tenure features
    x['employee_tenure'] = (x.monthly_file_date - x.OHD).astype('timedelta64[D]')
    x['manager_s_tenure'] = (x.monthly_file_date - x.manager_s_OHD).astype('timedelta64[D]')
    # treat the negative cases that came in due to OHD data issues
    x['employee_tenure'] = np.where(x['employee_tenure'] < 0, x['position_tenure'], x['employee_tenure'])
    x['manager_s_tenure'] = np.where(x['manager_s_tenure'] < 0, x['position_tenure'], x['manager_s_tenure'])
    x['tenure_diff'] = (x.manager_s_tenure - x.employee_tenure).astype(float)
    ## create the org unit id comparison binary flag
    x['orgunitid_employee_isdifferentfrom_manager'] = np.where(x['org_unit_id_of_position_om'] == x['manager_s_org_unit_id_of_position_om'], 0, 1)
    
    ####################################################################################################################
    
    # TEAM FEATURES
    x = self.groupby_count(x, grp1_col='yearmonth', grp2_cols=['manager_s_global_id_pa', 'manager_s_global_id_pa_l2', 'org_unit_id_of_position_om',
                                                         'pay_grade_group_pa', 'cost_center_id_om', 'local_entity_code'], 
                      count_col = 'employee_personnal_number_pa')
    
    ####################################################################################################################
    
    # adding the salary features
    x['employee_personnal_number_pa'] = x['employee_personnal_number_pa'].astype(float)
    x['yearmonth'] = x['yearmonth'].astype(int)
    x = x.merge(self.sal_features, how='left', on=['employee_personnal_number_pa', 'yearmonth'])
    
    x = self.fillna_df(x, fill_cols=['salary'], mode='adv_fill', grp_col='employee_personnal_number_pa', ref_value_col_frac=0.1)
    x = self.fillna_df(x, fill_cols=['bonus'], mode='simple_abs', abs_value=0)
    
    x = self.groupby_compare(x, grp_cols=['org_unit_id_of_position_om', 'macro_entity_level_4_om', 'ab__inbev_entity_level_3_om', 'physical_work_location_city_pa', 'global_job_om', 'job_family_om', 'functional_area_om', 'cost_center_id_om', 'local_entity_code', None], grp_fixed_cols=['yearmonth', 'pay_grade_group_pa'], transform_col='salary')
    x = self.groupby_compare(x, grp_cols=['org_unit_id_of_position_om', 'macro_entity_level_4_om', 'ab__inbev_entity_level_3_om', 'physical_work_location_city_pa', 'global_job_om', 'job_family_om', 'functional_area_om', 'cost_center_id_om', 'local_entity_code', None], grp_fixed_cols=['yearmonth', 'pay_grade_group_pa'], transform_col='bonus')
    
    ####################################################################################################################
    
    # adding the movement features
    bp_move = self.move_features
    bp_move = bp_move[['personnel_number', 'start_date_yearmonth', 'mov_rfa_good', 'mov_rfa_neutral', 'mov_rfa_bad', 'mov_rfa_score',
                      'cumsum_mov_rfa_good', 'cumsum_mov_rfa_neutral', 'cumsum_mov_rfa_bad', 'cumsum_mov_rfa_score']]
    bp_move.columns = ['employee_personnal_number_pa', 'yearmonth', 'mov_rfa_good', 'mov_rfa_neutral', 'mov_rfa_bad', 'mov_rfa_score',
                      'cumsum_mov_rfa_good', 'cumsum_mov_rfa_neutral', 'cumsum_mov_rfa_bad', 'cumsum_mov_rfa_score']
    bp_move.drop_duplicates(subset=['employee_personnal_number_pa', 'yearmonth'], inplace=True, keep='first')
    bp_move['employee_personnal_number_pa'] = bp_move['employee_personnal_number_pa'].astype(float)
    bp_move['yearmonth'] = bp_move['yearmonth'].astype(int)
    # corresponding features for the manager
    bp_move_manager = bp_move[['employee_personnal_number_pa', 'yearmonth', 'mov_rfa_score', 'cumsum_mov_rfa_score']].copy()
    bp_move_manager.columns = ['manager_' + x for x in bp_move_manager.columns]
    bp_move_manager.rename(columns={'manager_employee_personnal_number_pa': 'manager_s_personnal_number_pa',
                                   'manager_yearmonth': 'yearmonth'}, inplace=True)
    
    # merge the employee level features and manager level features
    x = x.merge(bp_move, how='left', on=['employee_personnal_number_pa', 'yearmonth'])
    x = x.merge(bp_move_manager, how='left', on=['manager_s_personnal_number_pa', 'yearmonth'])
    
    x = self.fillna_df(x, fill_cols=['mov_rfa_good', 'mov_rfa_neutral', 'mov_rfa_bad', 'mov_rfa_score', 'manager_mov_rfa_score'], mode='simple_abs', abs_value=0)
    x = self.fillna_df(x, fill_cols=['cumsum_mov_rfa_good', 'cumsum_mov_rfa_neutral', 'cumsum_mov_rfa_bad', 'cumsum_mov_rfa_score', 'manager_cumsum_mov_rfa_score'], mode='adv_fill', grp_col=['employee_personnal_number_pa'], abs_value=1)
    
    ####################################################################################################################
    
    # adding the opr features
    x['opr_year'] = np.where(x['term_month_temp'].astype(int) < 9, x['term_year']-1, x['term_year'])
    x = x.merge(self.opr_feats, how='left', on=['employee_personnal_number_pa', 'opr_year'])
    # adding the manager opr features
    manager_opr = self.opr_feats
    manager_opr.columns = ['manager_' + x for x in manager_opr.columns]
    manager_opr.rename(columns={'manager_employee_personnal_number_pa': 'manager_s_personnal_number_pa',
                                   'manager_opr_year': 'opr_year'}, inplace=True)
    x = x.merge(manager_opr, how='left', on=['manager_s_personnal_number_pa', 'opr_year'])
    
    ####################################################################################################################
    
    # adding the target features
    x['target_year'] = np.where(x['term_month_temp'].astype(int) < 4, x['term_year']-2, x['term_year']-1)
    x = x.merge(self.target_feats, how='left', on=['employee_personnal_number_pa', 'target_year'])
    # adding the manager target features
    manager_target = self.target_feats
    manager_target.columns = ['manager_' + x for x in manager_target.columns]
    manager_target.rename(columns={'manager_employee_personnal_number_pa': 'manager_s_personnal_number_pa',
                                   'manager_target_year': 'target_year'}, inplace=True)
    x = x.merge(manager_target, how='left', on=['manager_s_personnal_number_pa', 'target_year'])
    
    ####################################################################################################################
    
    # adding the competency features
    x['comp_year'] = np.where(x['term_month_temp'].astype(int) < 7, x['term_year']-1, x['term_year'])
    x = x.merge(self.comp_features, how='left', on=['employee_personnal_number_pa', 'comp_year'])
    # adding the manager comp features
    manager_comp = self.comp_features
    manager_comp.columns = ['manager_' + x for x in manager_comp.columns]
    manager_comp.rename(columns={'manager_employee_personnal_number_pa': 'manager_s_personnal_number_pa',
                                   'manager_comp_year': 'comp_year'}, inplace=True)
    x = x.merge(manager_comp, how='left', on=['manager_s_personnal_number_pa', 'comp_year'])
    
    self.bp_features = x
    return None
  ###########################################################################################################################
  
  def turnover(self):
    # write the turnover file
    to_df = self.tvr.to_combined.copy()
    to_df = self.date_converter(to_df)
    self.to_df = to_df
    
    to_df.to_csv('/dbfs/mnt/datalake/OUTPUT/turnover_labels_df.csv', index=False)
    return None
    
  ###########################################################################################################################
    
  def main(self):
    ads = self.bp_features
    ads.drop(['org_unit_id_of_position_om', 'position_id_om', 'ab__inbev_entity_level_4_om', 'position_start_date_om',
             'manager_s_global_id_pa', 'manager_s_position_id_om', 'month_file', 'term_year', 'term_month_temp', 'monthly_file_date',
             'manager_s_global_id_pa_l2', 'manager_s_position_id_om_l2', 'manager_s_personnal_number_pa', 'manager_s_org_unit_id_of_position_om',
             'manager_s_DOB', 'DOB', 'opr_year', 'target_year', 'comp_year'], axis=1, inplace=True)
    ads.dropna(subset=['employee_personnal_number_pa', 'pay_grade_group_pa'], inplace=True, how='any')
    ads.dropna(subset=['opr_value', 'net_target', 'pers_year_comp_score_sum'], inplace=True, how='all')
    # add filter to remove people/records with 1B OPR value
    ads = ads[ads['opr_rating_scale']!='1b']
    self.ads = ads
    ads.set_index('yearmonth', inplace=True)
    ads_dict = {x: ads.loc[x] for x in ads.index.unique()}

    # write the individual monthly datasets into the output folder
    for i in ads_dict.keys():
      ads_temp = ads_dict[i]
      ads_temp.to_csv(str('/dbfs/mnt/datalake/OUTPUT/MONTHLY_DATASETS/' + str(i) + '.csv'), index=False)
    return None

In [50]:
all = all_files(blueprint=bp, competency=comp, opr=opr, target=target, movement=movement, salary=salary, turnover=turnover, misc=misc,
               bp_nlp_cols=['position_name_om', 'physical_work_location_city_pa', 'position_work_location_code_om', 'manager_s_position_name_om',
                           'physical_work_location_description_pa'], helpers = helpers)

In [51]:
#opr_df=opr.opr_combined.copy()
#opr_df.to_csv('/dbfs/mnt/datalake/OUTPUT/oprdf.csv', index=False)
bpp_df=all.ads.copy()
bpp_df.to_csv('/dbfs/mnt/datalake/WORKING/bppdf.csv', index=False)

In [52]:
%whos blueprint competency opr_class target_class movement_class salary_class to_class misc_class

In [53]:
# '2016 blueprint files', all.bpt.dict16.keys()
# '2017 blueprint files', all.bpt.dict17.keys()
# 'competency files', all.cpy.competency.keys()
# 'competency combined', all.cpy.competency_combined.keys()
# 'demographics', all.misc.demo.keys()
# 'movement files', all.mov.movement.keys()
# 'movement combined', all.mov.movement_combined.keys()
# 'opr combined', all.opr.opr_processed.keys()
# 'salary main combined', all.sal.salary_combined.keys()
# 'salary bonus combined', all.sal.salary_bonus_combined.keys()
# 'target combined', all.trg.target_combined.keys()
# 'turnover combined', all.tvr.to_combined.keys()