In [1]:
# Arguments
param_fileName = 'c_exa_021_PoT_dailyExportForScoring_v3_gold_historicalLoad_20200101_20200803_exportedOn_20200826_pipeID_4d0e9f02'
param_pipeID = '2f3bd07b-a50e-4d95-b30b-3533a94d99bd'

In [2]:
# Parameters
filename = param_fileName + '.csv.gz'
pipeID = param_pipeID
deployedModel = 'PoT_gbc_1596623330'
deployedScaler = 'fitted_scaler_1596623232'
param_deploymentDate_str = '20200805'
metadataCols = ['WKReservation','WKReservationDate_UTC','Res_id','edwid_res']

In [3]:
# Import deps
import datetime
import joblib
import pandas as pd
import numpy as np
import sklearn as sk

# Check versions
print('pandas version {}.'.format(pd.__version__))
print('scikit-learn version {}.'.format(sk.__version__))
print('joblib version {}.'.format(joblib.__version__))
print('numpy version {}.'.format(np.__version__))

pandas version 0.25.3.
scikit-learn version 0.22.2.post1.
joblib version 0.14.1.
numpy version 1.18.5.


In [0]:
#define schema 
# IntegerType, 
from pyspark.sql.types import *
customSchema = StructType(
  [
    StructField("WKReservation", IntegerType(), True, {'description': 'cdw reservation id'}), 
    StructField("WKReservationDate_UTC", DateType(), True, {'description': 'reservation issue date'}), 
    StructField("Res_id", StringType(), True,{'description':  'reservation PNR number'}),
    StructField("edwid_res", IntegerType(), True,{'description':  'reservation PNR number'}),
    StructField("dbdPlusOne", FloatType(), True,{'description':  'log(1 + tribe)'}),
    StructField("Psgr_Count", FloatType(), True,{'description':  'log(1 + dbd)'}),
    StructField("tribe", FloatType(), True,{'description':  'dummy'}),
    StructField("Seg_Count", FloatType(), True,{'description':  'dummy'}),
    StructField("FLAG_OneWay", IntegerType(), True,{'description':  'dummy'}),
    StructField("DaysAtDestinationPlusOne", FloatType(), True,{'description':  'dummy'}),
    StructField("FLAG_PurchasedAncillary", IntegerType(), True,{'description':  'dummy'}),
    StructField("FLAG_bookingType_Agent", IntegerType(), True,{'description':  'dummy'}),
    StructField("FLAG_bookingType_Internet", IntegerType(), True,{'description':  'dummy'}),
    StructField("FLAG_bookingType_OthAirl", IntegerType(), True,{'description':  'dummy'}),
    StructField("FLAG_bookingType_OwnSales", IntegerType(), True,{'description':  'dummy'}),
    StructField("FLAG_weekOrLonger_return", IntegerType(), True,{'description':  'dummy'}),  
    StructField("FLAG_CorporateBooking", IntegerType(), True,{'description':  'dummy'}),
    StructField("FLAG_infantInBooking", IntegerType(), True,{'description':  'dummy'}),
    StructField("FLAG_childrenInBooking", IntegerType(), True,{'description':  'dummy'}),
    StructField("FLAG_TravelMonth_01", IntegerType(), True,{'description':  'dummy'}),
    StructField("FLAG_TravelMonth_02", IntegerType(), True,{'description':  'dummy'}),
    StructField("FLAG_TravelMonth_03", IntegerType(), True,{'description':  'dummy'}),
    StructField("FLAG_TravelMonth_04", IntegerType(), True,{'description':  'dummy'}),
    StructField("FLAG_TravelMonth_05", IntegerType(), True,{'description':  'dummy'}),
    StructField("FLAG_TravelMonth_06", IntegerType(), True,{'description':  'dummy'}),
    StructField("FLAG_TravelMonth_07", IntegerType(), True,{'description':  'dummy'}),
    StructField("FLAG_TravelMonth_08", IntegerType(), True,{'description':  'dummy'}),
    StructField("FLAG_TravelMonth_09", IntegerType(), True,{'description':  'dummy'}),
    StructField("FLAG_TravelMonth_10", IntegerType(), True,{'description':  'dummy'}),
    StructField("FLAG_TravelMonth_11", IntegerType(), True,{'description':  'dummy'}),
    StructField("FLAG_TravelMonth_12", IntegerType(), True,{'description':  'dummy'}),
    StructField("FLAG_TravelOut_01", IntegerType(), True,{'description':  'dummy'}),
    StructField("FLAG_TravelOut_02", IntegerType(), True,{'description':  'dummy'}),
    StructField("FLAG_TravelOut_03", IntegerType(), True,{'description':  'dummy'}),
    StructField("FLAG_TravelOut_04", IntegerType(), True,{'description':  'dummy'}),
    StructField("FLAG_TravelOut_05", IntegerType(), True,{'description':  'dummy'}),
    StructField("FLAG_TravelOut_06", IntegerType(), True,{'description':  'dummy'}),
    StructField("FLAG_TravelOut_07", IntegerType(), True,{'description':  'dummy'}),
    StructField("FLAG_TravelIn_01", IntegerType(), True,{'description':  'dummy'}),
    StructField("FLAG_TravelIn_02", IntegerType(), True,{'description':  'dummy'}),
    StructField("FLAG_TravelIn_03", IntegerType(), True,{'description':  'dummy'}),
    StructField("FLAG_TravelIn_04", IntegerType(), True,{'description':  'dummy'}),
    StructField("FLAG_TravelIn_05", IntegerType(), True,{'description':  'dummy'}),
    StructField("FLAG_TravelIn_06", IntegerType(), True,{'description':  'dummy'}),
    StructField("FLAG_TravelIn_07", IntegerType(), True,{'description':  'dummy'}),
    StructField("FLAG_sameLastname", IntegerType(), True,{'description':  'dummy'}),
    StructField("FLAG_tripArea_AFR", IntegerType(), True,{'description':  'dummy'}),
    StructField("FLAG_tripArea_CAM", IntegerType(), True,{'description':  'dummy'}),
    StructField("FLAG_tripArea_DOM", IntegerType(), True,{'description':  'dummy'}),
    StructField("FLAG_tripArea_EUR", IntegerType(), True,{'description':  'dummy'}),
    StructField("FLAG_tripArea_FAE", IntegerType(), True,{'description':  'dummy'}),
    StructField("FLAG_tripArea_MEA", IntegerType(), True,{'description':  'dummy'}),
    StructField("FLAG_tripArea_NAM", IntegerType(), True,{'description':  'dummy'}),
    StructField("FLAG_tripArea_SAM", IntegerType(), True,{'description':  'dummy'}),
    StructField("FLAG_tripArea_SCA", IntegerType(), True,{'description':  'dummy'}),
    StructField("FLAG_tripArea_SWP", IntegerType(), True,{'description':  'dummy'}),
    StructField("FLAG_SK_longhaul", IntegerType(), True,{'description':  'dummy'}),
    StructField("FLAG_SVC_RNK_Hi_Go", IntegerType(), True,{'description':  'dummy'}),
    StructField("FLAG_SVC_RNK_Hi_Plus", IntegerType(), True,{'description':  'dummy'}),
    StructField("FLAG_SVC_RNK_Hi_Business", IntegerType(), True,{'description':  'dummy'}),
    StructField("FLAG_SVC_RNK_Lo_Go", IntegerType(), True,{'description':  'dummy'}),
    StructField("FLAG_SVC_RNK_Lo_Plus", IntegerType(), True,{'description':  'dummy'}),
    StructField("FLAG_SVC_RNK_Lo_Business", IntegerType(), True,{'description':  'dummy'}),
    StructField("FLAG_maxtierLevelInReservation_basic", IntegerType(), True,{'description':  'dummy'}),
    StructField("FLAG_maxtierLevelInReservation_silver", IntegerType(), True,{'description':  'dummy'}),
    StructField("FLAG_maxtierLevelInReservation_gold", IntegerType(), True,{'description':  'dummy'}),
    StructField("FLAG_maxtierLevelInReservation_diamond", IntegerType(), True,{'description':  'dummy'}),
    StructField("FLAG_maxtierLevelInReservation_pandion", IntegerType(), True,{'description':  'dummy'})
  ]
)

In [0]:
#load data into pandas
pd_df = spark.read.format('csv').options(header='true', inferSchema='false').schema(customSchema)\
.load("dbfs:/mnt/sasweucomexa/projects/021/in/"+filename)\
.toPandas()
#print the shape
print(pd_df.shape)
#pick at the dataframe
pd_df.sample(2)

Unnamed: 0,WKReservation,WKReservationDate_UTC,Res_id,edwid_res,dbdPlusOne,Psgr_Count,tribe,Seg_Count,FLAG_OneWay,DaysAtDestinationPlusOne,FLAG_PurchasedAncillary,FLAG_bookingType_Agent,FLAG_bookingType_Internet,FLAG_bookingType_OthAirl,FLAG_bookingType_OwnSales,FLAG_weekOrLonger_return,FLAG_CorporateBooking,FLAG_infantInBooking,FLAG_childrenInBooking,FLAG_TravelMonth_01,FLAG_TravelMonth_02,FLAG_TravelMonth_03,FLAG_TravelMonth_04,FLAG_TravelMonth_05,FLAG_TravelMonth_06,FLAG_TravelMonth_07,FLAG_TravelMonth_08,FLAG_TravelMonth_09,FLAG_TravelMonth_10,FLAG_TravelMonth_11,FLAG_TravelMonth_12,FLAG_TravelOut_01,FLAG_TravelOut_02,FLAG_TravelOut_03,FLAG_TravelOut_04,FLAG_TravelOut_05,FLAG_TravelOut_06,FLAG_TravelOut_07,FLAG_TravelIn_01,FLAG_TravelIn_02,FLAG_TravelIn_03,FLAG_TravelIn_04,FLAG_TravelIn_05,FLAG_TravelIn_06,FLAG_TravelIn_07,FLAG_sameLastname,FLAG_tripArea_AFR,FLAG_tripArea_CAM,FLAG_tripArea_DOM,FLAG_tripArea_EUR,FLAG_tripArea_FAE,FLAG_tripArea_MEA,FLAG_tripArea_NAM,FLAG_tripArea_SAM,FLAG_tripArea_SCA,FLAG_tripArea_SWP,FLAG_SK_longhaul,FLAG_SVC_RNK_Hi_Go,FLAG_SVC_RNK_Hi_Plus,FLAG_SVC_RNK_Hi_Business,FLAG_SVC_RNK_Lo_Go,FLAG_SVC_RNK_Lo_Plus,FLAG_SVC_RNK_Lo_Business,FLAG_maxtierLevelInReservation_basic,FLAG_maxtierLevelInReservation_silver,FLAG_maxtierLevelInReservation_gold,FLAG_maxtierLevelInReservation_diamond,FLAG_maxtierLevelInReservation_pandion
1095433,57594916,2018-01-09,N8GDTN,106111822,27.0,2.0,44.0,4.0,0,5.0,0,1,0,0,0,0,1,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,1,0,0,0,0,0,0,1,0,0,0,0,0,0,0,1,0,0,1,0,0,0,0,1,0,0
1465658,59704851,2018-02-28,NLVGP9,108812850,4.0,1.0,7.0,3.0,1,0.0,1,0,1,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,1,0,0,1,0,0,0,1,0,0,0


In [0]:
#check that there are no missing values in any column (count is the same for all columns, and that there is variation std is not Zero)
with pd.option_context('display.float_format', '{:.2f}'.format):
  display(pd_df.describe().T.reset_index())

index,count,mean,std,min,25%,50%,75%,max
WKReservation,6735983.0,60628858.28526972,2441710.624092442,42968888.0,58787129.5,60695388.0,62623494.5,64642466.0
edwid_res,6735983.0,109929200.16458651,3119528.3748017377,88694489.0,107584659.0,110003875.0,112461782.0,115127577.0
dbdPlusOne,6735983.0,42.09017181396485,52.952762603759766,0.0,8.0,21.0,55.0,703.0
Psgr_Count,6735983.0,1.385170817375183,1.3095368146896362,1.0,1.0,1.0,1.0,99.0
tribe,6735983.0,7.578166961669922,18.7164306640625,0.0,0.0,1.0,8.0,832.0
Seg_Count,6735983.0,2.2257180213928223,1.2532203197479248,1.0,1.0,2.0,2.0,32.0
FLAG_OneWay,6735983.0,0.4372753019121337,0.4960500466483339,0.0,0.0,0.0,1.0,1.0
DaysAtDestinationPlusOne,6735983.0,3.992586612701416,11.587522506713867,0.0,0.0,1.0,4.0,634.0
FLAG_PurchasedAncillary,6735983.0,0.0897763548393753,0.285861108027099,0.0,0.0,0.0,0.0,1.0
FLAG_bookingType_Agent,6735983.0,0.522507405377953,0.4994931968926704,0.0,0.0,1.0,1.0,1.0


In [0]:
#check that there are no missing values (version 2)
pd_df[pd_df.isnull().any(axis=1)]

Unnamed: 0,WKReservation,WKReservationDate_UTC,Res_id,edwid_res,dbdPlusOne,Psgr_Count,tribe,Seg_Count,FLAG_OneWay,DaysAtDestinationPlusOne,FLAG_PurchasedAncillary,FLAG_bookingType_Agent,FLAG_bookingType_Internet,FLAG_bookingType_OthAirl,FLAG_bookingType_OwnSales,FLAG_weekOrLonger_return,FLAG_CorporateBooking,FLAG_infantInBooking,FLAG_childrenInBooking,FLAG_TravelMonth_01,FLAG_TravelMonth_02,FLAG_TravelMonth_03,FLAG_TravelMonth_04,FLAG_TravelMonth_05,FLAG_TravelMonth_06,FLAG_TravelMonth_07,FLAG_TravelMonth_08,FLAG_TravelMonth_09,FLAG_TravelMonth_10,FLAG_TravelMonth_11,FLAG_TravelMonth_12,FLAG_TravelOut_01,FLAG_TravelOut_02,FLAG_TravelOut_03,FLAG_TravelOut_04,FLAG_TravelOut_05,FLAG_TravelOut_06,FLAG_TravelOut_07,FLAG_TravelIn_01,FLAG_TravelIn_02,FLAG_TravelIn_03,FLAG_TravelIn_04,FLAG_TravelIn_05,FLAG_TravelIn_06,FLAG_TravelIn_07,FLAG_sameLastname,FLAG_tripArea_AFR,FLAG_tripArea_CAM,FLAG_tripArea_DOM,FLAG_tripArea_EUR,FLAG_tripArea_FAE,FLAG_tripArea_MEA,FLAG_tripArea_NAM,FLAG_tripArea_SAM,FLAG_tripArea_SCA,FLAG_tripArea_SWP,FLAG_SK_longhaul,FLAG_SVC_RNK_Hi_Go,FLAG_SVC_RNK_Hi_Plus,FLAG_SVC_RNK_Hi_Business,FLAG_SVC_RNK_Lo_Go,FLAG_SVC_RNK_Lo_Plus,FLAG_SVC_RNK_Lo_Business,FLAG_maxtierLevelInReservation_basic,FLAG_maxtierLevelInReservation_silver,FLAG_maxtierLevelInReservation_gold,FLAG_maxtierLevelInReservation_diamond,FLAG_maxtierLevelInReservation_pandion


In [0]:
#load model and column scaler
model_path = f'/dbfs/mnt/sasweucomexa/projects/021/models/{deployedModel}.joblib'
trainedModel = joblib.load(model_path)
scaler_path = f'/dbfs/mnt/sasweucomexa/projects/021/models/{deployedScaler}.joblib'
fittedScaler = joblib.load(scaler_path)

In [0]:
#divide dataset into features and metadata
featuresCols = np.setdiff1d(list(pd_df.columns),metadataCols).tolist() #get all columns from the dataframe except the ones defined above as metadata columns
metadata_reservations = pd_df[metadataCols]
features = pd_df[featuresCols]

#apply the scaler
num_cols = ['dbdPlusOne','Psgr_Count','tribe','Seg_Count','DaysAtDestinationPlusOne'] 
features[num_cols] = fittedScaler.transform(features[num_cols])

#predict class probabilities and store them in a pandas dataframe
class_probabilities = pd.DataFrame(trainedModel.predict_proba(features))
#predict class label and store them in a pandas dataframe
class_label = pd.DataFrame(trainedModel.predict(features))
#append columns from previous dataframes
res = pd.concat([class_probabilities, class_label], axis=1, sort=False)
#rename columns
res.columns = ['prob_leisure','prob_business','label_businessIs1']
#join predictions with metadatacolumns by index number
print(metadata_reservations.shape[0],res.shape[0]) #check that they have equal lenght
res2 = metadata_reservations.join(res)
print(res2.shape[0])

In [0]:
#add metadata about the run
res2['model'] = f'{deployedModel}_deployedOn_{param_deploymentDate_str}'
res2['input_fileName'] = filename
res2['pipe_runID'] = pipeID
res2['prediction_tms_utc'] = pd.Timestamp.utcnow()

In [0]:
res2.head(2)

Unnamed: 0,WKReservation,WKReservationDate_UTC,Res_id,edwid_res,prob_leisure,prob_business,label_businessIs1,model,input_fileName,pipe_runID,prediction_tms_utc
0,61809472,2018-04-18,JRJZAA,111380464,0.054915,0.945085,1,PoT_gbc_1596623330_deployedOn_20200805,c_exa_021_PoT_dailyExportForScoring_v3_gold_hi...,2f3bd07b-a50e-4d95-b30b-3533a94d99bd,2020-08-26 22:12:28.133040+00:00
1,61809565,2018-04-18,KRECWV,111423356,0.899303,0.100697,0,PoT_gbc_1596623330_deployedOn_20200805,c_exa_021_PoT_dailyExportForScoring_v3_gold_hi...,2f3bd07b-a50e-4d95-b30b-3533a94d99bd,2020-08-26 22:12:28.133040+00:00


In [0]:
 #store file in lake
filenameWithoutExtension = filename.split('.')[0]
processed_filename_withPath = f"/mnt/sasweucomexa/projects/021/out/{filenameWithoutExtension}"
print(processed_filename_withPath)

#store in sasweucomexa lake as a parquet file
spark.createDataFrame(res2)\
.write.mode("overwrite")\
.parquet(processed_filename_withPath)