In [1]:
import sagemaker
import boto3
from sagemaker import get_execution_role
import pandas as pd
import numpy as np
import sys
import typing


region = boto3.Session().region_name
session = sagemaker.Session()
sm = boto3.Session().client(service_name='sagemaker',region_name=region)

#ins_argument = imp.spec_file_from_file_location("instance", "unit_risk_interns/src/preprocess/helpers/instance.py")
#ins = imp.module_from_spec(ins_argument)
sys.path.extend(['../src/preprocess', '../config'])
import helpers.instance as ins
import helpers.s3 as s3_helper
import helpers.utils as ut
import helpers.athena as at

# READ YAML FILES AND STORE RELEVANT PATHS in dictionaries
dataset_cfg = ins.read_config('../config/datasets.yaml') 
config_cfg = ins.read_config('../config/config.yaml')
sql_cfg = ins.read_config('../config/sql.yaml')

input_files = [dataset_cfg['raw']['repair_base']['input'], dataset_cfg['raw']['base_query']['input'],dataset_cfg['raw']['engine_hours']['input'], 
                dataset_cfg['raw']['postsaleissue']['input']]
output_paths = [dataset_cfg['raw']['repair_base']['output_paths'], dataset_cfg['raw']['base_query']['output_paths'], dataset_cfg['raw']['engine_hours']['output_paths'], 
                dataset_cfg['raw']['postsaleissue']['output_paths']]

In [2]:
df = pd.read_parquet(output_paths[0])
def clean_repair_base():
    df = pd.read_parquet(output_paths[0])
    drop_cols = ['flag_datatype']
    datatype = {'unit_num': 'str',
                'partition_key': 'int64',
                'labor_hours': 'float64',
                'parts_cost': 'float64',
                'labor_cost': 'float64',
                'outside_cost': 'float64',
                'major_pm':  'int64',
                'big_repair': 'int64',
                'accidentsinci': 'int64',
                'total_repairs': 'int64'}
    strip_cols = ['unit_num']

    # -------------------------------------------------------------------------------- NOTEBOOK-CELL: CODE
    '''
    - convert columns names to lower
    - drop unnecessary columns
    - change datatypes
    - strip columns of string datatypes
    '''
    df.columns = df.columns.str.lower()
    df.drop(columns=drop_cols, inplace=True)
    df = df.astype(datatype)
    #df[strip_cols] = df[strip_cols].apply(lambda x: x.str.strip())
    return df

In [3]:
clean_repair_base()

Unnamed: 0,unit_num,partition_key,labor_hours,parts_cost,labor_cost,outside_cost,major_pm,big_repair,accidentsinci,total_repairs
0,100001,201801,1.0,0.00,49.06,0.00,1,0,0,0
1,100001,201802,0.0,0.00,0.00,13.37,0,0,0,0
2,100001,201803,1.6,7.41,74.82,0.00,1,0,0,0
3,100001,201804,0.0,0.00,0.00,28.00,0,0,0,0
4,100001,201805,2.5,0.00,118.35,0.00,1,0,0,0
...,...,...,...,...,...,...,...,...,...,...
2587744,ZSS56291,202101,0.0,0.00,0.00,1511.14,1,0,0,7
2587745,ZSS63809,201905,0.0,0.00,0.00,91.27,1,0,0,0
2587746,ZSS63809,201909,0.0,0.00,0.00,507.31,0,0,0,1
2587747,ZSS63809,202012,0.0,0.00,0.00,794.35,0,0,1,1


In [4]:
def persist_repair_base_cleaned():
    rb_new_path = 's3://pske-stg-advanalytics/Projects/Unit_Sale_Risk_Interns/Data/Processed/repair_base_cleaned'
    s3_helper.persist_file_to_path(df=df,
                           path = rb_new_path,
                           filetype='parquet')

In [5]:
persist_repair_base_cleaned()

In [9]:
df2 = pd.read_parquet(output_paths[3])
def clean_postsaleissue():
    df2 = pd.read_parquet(output_paths[3])
    df2.columns = df2.columns.str.lower()
    object_columns = list(df2.select_dtypes(include='object').columns)
    df2[object_columns] = df2[object_columns].apply(lambda x: x.str.strip())

    # -------------------------------------------------------------------------------- NOTEBOOK-CELL: CODE
    '''
    - Using group by to keep relevant data
    - Assigning flag column
    '''
    data = df2.groupby('unit_num').agg({'unit_sold_date': 'max'}).reset_index()
    data['flag_cameback'] = 1
    return df2

In [11]:
clean_postsaleissue()

Unnamed: 0,unit_num,unit_sold_date,lower_date_range,wo_created_date,upper_date_range
0,100687,2020-02-12 00:00:00.000,2020-02-26 00:00:00.000,2020-02-26 09:41:48.000,2020-05-12 00:00:00.000
1,679602,2020-12-04 00:00:00.000,2020-12-18 00:00:00.000,2021-01-21 13:01:02.000,2021-03-04 00:00:00.000
2,603105,2019-07-03 00:00:00.000,2019-07-17 00:00:00.000,2019-07-20 09:04:27.000,2019-10-01 00:00:00.000
3,694890,2019-09-30 00:00:00.000,2019-10-14 00:00:00.000,2019-11-11 07:49:49.000,2019-12-29 00:00:00.000
4,693651,2021-04-06 00:00:00.000,2021-04-20 00:00:00.000,2021-04-20 08:51:54.000,2021-07-05 00:00:00.000
...,...,...,...,...,...
38833,665021,2021-04-21 00:00:00.000,2021-05-05 00:00:00.000,2021-05-07 18:09:18.000,2021-07-20 00:00:00.000
38834,91606201,2021-05-05 00:00:00.000,2021-05-19 00:00:00.000,2021-07-16 07:56:17.000,2021-08-03 00:00:00.000
38835,669252,2019-12-30 00:00:00.000,2020-01-13 00:00:00.000,2020-01-22 12:39:56.000,2020-03-29 00:00:00.000
38836,147197,2020-08-28 00:00:00.000,2020-09-11 00:00:00.000,2020-10-19 05:30:37.000,2020-11-26 00:00:00.000


In [17]:
def persist_postsaleissue_cleaned():
    postsaleissue_new_path = 's3://pske-stg-advanalytics/Projects/Unit_Sale_Risk_Interns/Data/Processed/postsaleissue_cleaned'
    s3_helper.persist_file_to_path(df=df2,
                           path = postsaleissue_new_path,
                           filetype='parquet')

In [18]:
persist_postsaleissue_cleaned()

In [2]:
df3 = pd.read_parquet('s3://pske-stg-advanalytics/Projects/Unit_Sale_Risk_Interns/Data/Interim/job_oil/')
def clean_job_oil():
    df3 = pd.read_parquet('s3://pske-stg-advanalytics/Projects/Unit_Sale_Risk_Interns/Data/Interim/job_oil/')
    df3.drop_duplicates(subset=['unit_num', 'sample_date'], inplace=True)
    df3['critical_value'] = np.where(df3['critical']==True, 1, 0)
    df3_agg = df3.groupby('unit_num').apply(lambda x: (x.critical_value.sum()/x.critical_value.count())*100).reset_index()
    df3_agg.rename(columns={0: 'critical_oil_sample_percentage'}, inplace=True)
    return df3

In [3]:
clean_job_oil()

Unnamed: 0,unit_num,sample_date,partition_key,oil_sample_observation,critical,critical_value
0,AAF530177,2019-11-25,201911,NORMAL,N,0
1,AAF530177,2021-04-28,202104,NORMAL,N,0
2,AA101691,2018-10-29,201810,NORMAL,N,0
3,AA101692,2018-09-27,201809,NORMAL,N,0
4,AA101693,2018-09-25,201809,NORMAL,N,0
...,...,...,...,...,...,...
3053604,9997IM,2020-08-18,202008,NORMAL,N,0
3053605,9997IM,2020-12-18,202012,NORMAL,N,0
3053606,9997IM,2021-04-20,202104,NORMAL,Y,0
3053607,9997IM,2021-09-08,202109,NORMAL,Y,0


In [4]:
def persist_job_oil_cleaned():
    postsaleissue_new_path = 's3://pske-stg-advanalytics/Projects/Unit_Sale_Risk_Interns/Data/Processed/job_oil_cleaned'
    s3_helper.persist_file_to_path(df=df3,
                           path = postsaleissue_new_path,
                           filetype='parquet')

In [5]:
persist_job_oil_cleaned()

In [49]:
df4 = pd.read_parquet(output_paths[2])
def clean_engine_hours():
    df4 = pd.read_parquet(output_paths[2])
    drop_cols = ['engine_hrs']
    col_dtypes = {'partition_key': 'int64',
                 'unit_num': 'str',
                 'engine_hrs': 'float'}
    strip_cols = ['unit_num']
    rename_cols = {'merge': 'engine_hrs'}

    # -------------------------------------------------------------------------------- NOTEBOOK-CELL: CODE
    df4.columns = df4.columns.str.lower()
    df4['engine_hrs'] = np.where(df4['engine_hrs']=='Not Available', '0', df4['engine_hrs'])

    # Cleaning engine hours
    df4_splits = df4['engine_hrs'].str.replace(' ', ':').str.split(':', expand=True)
    df4_splits = df4_splits.astype('str')
    df4_splits['merge'] = np.where(df4_splits[1]!='None', df4_splits[0] + '.' + df4_splits[1], df4_splits[0])

    df4 = pd.concat([df4, df4_splits['merge']], axis=1)
    df4.drop(columns=drop_cols, inplace=True)
    df4.rename(columns=rename_cols, inplace=True)

    # converting to acceptable datatypes
    df4 = df4.astype(col_dtypes)
    df4[strip_cols] = df4[strip_cols].apply(lambda x: x.str.strip())

    # final filters
    df4.fillna(0, inplace=True)
    df4 = df4[df4['unit_num']!=0]
    df4 = df4.groupby(['unit_num', 'partition_key']).agg({'engine_hrs': 'max'}).reset_index()
    df4.sort_values(by=['unit_num', 'partition_key', 'engine_hrs'], inplace=True)
    # taking the median engine hours for each unit
    df4 = df4.groupby('unit_num').agg({'engine_hrs': 'median'}).reset_index()
    return df4

In [50]:
clean_engine_hours()

Unnamed: 0,unit_num,engine_hrs
0,100002,4100.255
1,100003,4720.430
2,100004,9978.200
3,100006,4660.700
4,100008,6893.550
...,...,...
78176,Z5005,7822.790
78177,Z5672874,3033.000
78178,Z5672881,4268.700
78179,Z5676938,2517.700


In [51]:
def persist_engine_hours_cleaned():
    engine_hours_new_path = 's3://pske-stg-advanalytics/Projects/Unit_Sale_Risk_Interns/Data/Processed/engine_hours_cleaned'
    s3_helper.persist_file_to_path(df=df4,
                           path = engine_hours_new_path,
                           filetype='parquet')

In [52]:
persist_engine_hours_cleaned()