In [1]:
import findspark
findspark.init()

import pyspark
import pandas as pd

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import StringType, DateType, FloatType
from pyspark.sql import functions as F
from pyspark.sql import DataFrame
from pyspark.sql.functions import col

In [3]:
scSpark = SparkSession \
    .builder \
    .appName('data-consolidation.ipynb') \
    .config('spark.some.config.option', 'some-value') \
    .getOrCreate()

24/06/23 11:23:08 WARN Utils: Your hostname, Julies-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 10.0.0.77 instead (on interface en0)
24/06/23 11:23:08 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/06/23 11:23:08 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
scSpark.sparkContext.setLogLevel('OFF')

<h3>Pull Destinating Pieces</h3>

In [5]:
destinatingPieces = scSpark.read.csv('Scan Data/Mail/Destinating Pieces pt. 1.csv', header = True, sep=',')

In [6]:
#DestinatingPieces2 = scSpark.read.csv('Scan Data/Mail/Destinating Pieces pt. 2.csv', header = True, sep=',')

In [7]:
#destinatingPieces = DestinatingPieces1.union(DestinatingPieces2)

The Destinating Pieces files contain 44,210,232 records total. 
- 44,209,256 distinct unique indentifiers
- 115 distinct stat the clock dates
- 1587 distinct origin facilities
- 12 distinct actual delivery dates
- 107 distinct expected delivery dates
- 3 distinct expected destination facilities: 'MEMPHIS - 1441274', 'MUSIC CITY ANNEX - 1532174', 'NASHVILLE - 1441275'
- 4 distinct mail classes: 'USPS Marketing Mail', 'First Class Presort', 'Periodicals', 'Single Piece First Class'
- 3 distinct mail shape: 'Flat', 'Card', 'Letter'


In [8]:
destinatingPieces_cleaned = destinatingPieces.dropna() \
                            .filter((destinatingPieces.START_THE_CLOCK_DATE != 'null') & 
                                (destinatingPieces.EXPECTED_DELIVERY_DATE != 'null') &
                                (destinatingPieces.START_THE_CLOCK_DATE <= destinatingPieces.ACTUAL_DLVRY_DATE) &
                                (destinatingPieces.EXPECTED_DESTINATION_FACILITY == 'MUSIC CITY ANNEX - 1532174') &
                                (destinatingPieces.START_THE_CLOCK_DATE > '2023-12-22')) \
                            .selectExpr(
                                '*',
                                'count(*) over (partition by UNIQUE_IDENTIFIER) as cnt').filter(F.col('cnt') == 1).drop('cnt') \
                            .withColumn('daysDelivered', F.datediff(col('ACTUAL_DLVRY_DATE'), col('START_THE_CLOCK_DATE')))

After removing nulls, the Destintaing Pieces files contains 39,683,888 records.
- 39,682,919 distinct unique indentifiers
- 114 distinct start the clock dates, 1327 distinct origin facilities
- 11 distinct actual delivery dates, 106 distinct expected delivery dates
- 3 distinct expected destination facilities: 'MEMPHIS - 1441274', 'MUSIC CITY ANNEX - 1532174', 'NASHVILLE - 1441275'
- 4 distinct mail classes: 'USPS Marketing Mail', 'First Class Presort', 'Periodicals', 'Single Piece First Class'3 distinct mail shape: 'Flat', 'Card', 'Letter'

After removing records where the Start Time is after the Actual Delivery Date, the Destintaing Pieces files contains 39,338,233 records.
- 39,337,266 distinct unique indentifiers
- 99 distinct start the clock dates
- 1324 distinct origin facilities
- 11 distinct actual delivery dates
- 100 distinct expected delivery dates
- 3 distinct expected destination facilities: 'MEMPHIS - 1441274', 'MUSIC CITY ANNEX - 1532174', 'NASHVILLE - 1441275'
- 4 distinct mail classes: 'USPS Marketing Mail', 'First Class Presort', 'Periodicals', 'Single Piece First Class'
- 3 distinct mail shape: 'Flat', 'Card', 'Letter'

After filtering the dataset for those expected to be delivered to Music City, the Destintaing Pieces files contains 3,100,202 records.
- 3,100,107 distinct unique indentifiers
- 89 distinct start the clock dates
- 737 distinct origin facilities
- 11 distinct actual delivery dates
- 88 distinct expected delivery dates
- 1 distinct expected destination facilities: 'MUSIC CITY ANNEX - 1532174'
- 4 distinct mail classes: 'USPS Marketing Mail', 'First Class Presort', 'Periodicals', 'Single Piece First Class'
- 1 distinct mail shape: 'Flat'

After removing peices that had a start date before 12/22/23, the Destintaing Pieces files contains 3,092,463 records.
- 3,092,369 distinct unique indentifiers
- 26 distinct start the clock dates
- 709 distinct origin facilities
- 11 distinct actual delivery dates
- 28 distinct expected delivery dates
- 1 distinct expected destination facilities: 'MUSIC CITY ANNEX - 1532174'
- 4 distinct mail classes: 'USPS Marketing Mail', 'First Class Presort', 'Periodicals', 'Single Piece First Class'
- 1 distinct mail shape: 'Flat'

After removing duplicates, the Destintaing Pieces files contains 3,092,317 records.
- 3,092,317 distinct unique indentifiers
- 26 distinct start the clock dates
- 694 distinct origin facilities
- 11 distinct actual delivery dates
- 28 distinct expected delivery dates
- 1 distinct expected destination facilities: 'MUSIC CITY ANNEX - 1532174'
- 4 distinct mail classes: 'USPS Marketing Mail', 'First Class Presort', 'Periodicals', 'Single Piece First Class'
- 1 distinct mail shape: 'Flat'

<h3>Pull/Filter/Union Destinating Scans</h3>

In [9]:
destinatingScans = scSpark.read.csv('Scan Data/Mail/Destinating Scans pt. 1.csv', header = True, sep=',')

In [10]:
#DestinatingScans2 = scSpark.read.csv('Scan Data/Mail/Destinating Scans pt. 2.csv', header = True, sep=',')

In [11]:
#DestinatingScans3 = scSpark.read.csv('Scan Data/Mail/Destinating Scans pt. 3.csv', header = True, sep=',')

In [12]:
#destinatingScans = DestinatingScans1.union(DestinatingScans2)

In [13]:
#destinatingScans = destinatingScans.union(DestinatingScans3)

The Destinating Scans files contain 139,005,571 records total. 
- 53,730,012 distinct unique identifiers
- 2,064,204 distinct scan_datetime
- 370 distinct scan facilities
- 229 distinct ops codes

In [14]:
#remove 'null' values
destinatingScans_cleaned = destinatingScans.filter((destinatingScans.scan_datetime != 'null') & 
                                                   (destinatingScans.scan_facility != 'null') &
                                                   (destinatingScans.op_code != 'null'))

After removing null values, the Destinating Scans files contain 138,722,816 records total. 
- 53,621,334 distinct unique identifiers
- 369 distinct scan facilities
- 228 distinct ops codes

In [15]:
opsCodes = pd.read_excel('../teamkangaroo/usps_opscodes.xlsx', header = 1, sheet_name='OpCodes')

In [16]:
opsCodes = opsCodes['Operation Code'].to_list()

In [17]:
cleanedCodes = [x for x in opsCodes if str(x) != 'nan']

In [18]:
cleanedCodes = [int(x) for x in cleanedCodes]

In [19]:
scansGood = destinatingScans_cleaned.filter(destinatingScans_cleaned.op_code.isin(cleanedCodes))

In [20]:
scansGood = scansGood.groupBy('UNIQUE_IDENTIFIER').agg(F.count('op_code').alias('goodScanCnt'))

In [21]:
badScans = destinatingScans_cleaned.filter(~destinatingScans_cleaned.op_code.isin(cleanedCodes))

In [22]:
badScans = badScans.groupBy('UNIQUE_IDENTIFIER').agg(F.count('op_code').alias('badScanCnt'))

In [23]:
dScansIntervalTotal = destinatingScans_cleaned.groupBy('UNIQUE_IDENTIFIER') \
    .agg(F.min('scan_datetime').alias('minScan'), F.max('scan_datetime').alias('maxScan')) \
    .withColumn('scanIntervalTotal', F.datediff(col('maxScan'), col('minScan')))

In [24]:
dScansMusicCity = destinatingScans_cleaned.filter(destinatingScans_cleaned.scan_facility == 'MUSIC CITY ANNEX - 1532174') \
    .groupBy('UNIQUE_IDENTIFIER') \
    .agg(F.min('scan_datetime').alias('minScanMC'), F.max('scan_datetime').alias('maxScanMC')) \
    .withColumn('scanIntervalMusicCity', F.datediff(col('maxScanMC'), col('minScanMC')))

In [25]:
destinatingMerge = destinatingPieces_cleaned.join(dScansIntervalTotal, ['UNIQUE_IDENTIFIER']) \
                                            .join(dScansMusicCity, ['UNIQUE_IDENTIFIER']) \
                                            .join(scansGood, ['UNIQUE_IDENTIFIER']) \
                                            .join(badScans, ['UNIQUE_IDENTIFIER'])

In [26]:
weather = scSpark.read.csv('../teamkangaroo/cleanedWeather.csv', header = True, sep=',')

In [27]:
destinatingMerge = destinatingMerge.withColumn('mcScan1', F.to_date(col('minScanMC'))) \
    .withColumn('mcScan2', F.to_date(col('maxScanMC')))

In [28]:
final_df = destinatingMerge.join(weather, (destinatingMerge.mcScan1 == weather.start_date) & (destinatingMerge.mcScan2 == weather.end_date))

In [29]:
final_df=final_df.withColumn('avgDAPR',final_df['avgDAPR'].cast("float").alias('avgDAPR'))
final_df=final_df.withColumn('avgMDPR',final_df['avgMDPR'].cast("float").alias('avgMDPR'))
final_df=final_df.withColumn('avgPRCP',final_df['avgPRCP'].cast("float").alias('avgPRCP'))
final_df=final_df.withColumn('avgSNOW',final_df['avgSNOW'].cast("float").alias('avgSNOW'))
final_df=final_df.withColumn('avgSNWD',final_df['avgSNWD'].cast("float").alias('avgSNWD'))

In [30]:
final_df = final_df.withColumn('isLate', F.when(final_df.ACTUAL_DLVRY_DATE > final_df.EXPECTED_DELIVERY_DATE, 1).otherwise(0))

In [31]:
columns_drop = ['START_THE_CLOCK_DATE', 'ACTUAL_DLVRY_DATE', 'EXPECTED_DELIVERY_DATE', 
                'EXPECTED_DESTINATION_FACILITY', 'MAIL_SHAPE', 'minScan', 'maxScan', 
                'minScanMC', 'maxScanMC', 'mcScan1', 'mcScan2', '_c0', 'start_date', 'end_date', 'UNIQUE_IDENTIFIER',
                'ORIGIN_FACILITY']

In [72]:
final_df = final_df.drop(*columns_drop)

In [73]:
final_df.columns

['MAIL_CLASS',
 'daysDelivered',
 'scanIntervalTotal',
 'scanIntervalMusicCity',
 'goodScanCnt',
 'badScanCnt',
 'avgDAPR',
 'avgMDPR',
 'avgPRCP',
 'avgSNOW',
 'avgSNWD',
 'isLate']

Linear Regression Model

In [None]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import RegressionEvaluator

In [None]:
assembler = VectorAssembler(
    inputCols=['scanIntervalTotal', 'scanIntervalMusicCity', 'goodScanCnt', 'badScanCnt', 'avgDAPR', 'avgMDPR', 
               'avgPRCP', 'avgSNOW', 'avgSNWD'],
    outputCol='features')

In [None]:
final_df = assembler.transform(final_df)

In [None]:
final_data = final_df.select("features", "daysDelivered")

In [None]:
train_data, test_data = final_data.randomSplit([0.8, 0.2], seed=96)

In [None]:
lr = LinearRegression(featuresCol="features", labelCol="daysDelivered", predictionCol="predicted_daysDelivered")


In [None]:
lr_model = lr.fit(train_data)

In [None]:
predictions = lr_model.transform(test_data)

In [None]:
evaluator = RegressionEvaluator(labelCol="daysDelivered", predictionCol="predicted_daysDelivered", metricName="rmse")

In [None]:
rmse = evaluator.evaluate(predictions)

In [None]:
print(rmse)

In [None]:
evaluator_r2 = RegressionEvaluator(labelCol="daysDelivered", predictionCol="predicted_daysDelivered", metricName="r2")

In [None]:
r2 = evaluator_r2.evaluate(predictions)

In [None]:
print(r2)

Random Forest Classifier

In [33]:
from pyspark.ml.pipeline import Pipeline
from pyspark.ml.feature import OneHotEncoder, StandardScaler, VectorAssembler, StringIndexer, Imputer

In [34]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

In [35]:
pipe_stages= []

In [36]:
sindexer= StringIndexer(inputCols= ['MAIL_CLASS'], 
                        outputCols= ["indexed_{}".format(item) for item in ['MAIL_CLASS']],
                        handleInvalid='keep',
                        stringOrderType='frequencyDesc')

In [37]:
assembler = VectorAssembler(
    inputCols=['scanIntervalTotal', 'scanIntervalMusicCity', 'goodScanCnt', 'badScanCnt', 'avgDAPR', 'avgMDPR', 
               'avgPRCP', 'avgSNOW', 'avgSNWD', 'indexed_MAIL_CLASS'],
    outputCol='features')

In [45]:
#rf = RandomForestClassifier(labelCol="isLate", featuresCol="features")

In [46]:
pipeline = Pipeline(stages=[sindexer, assembler])

In [47]:
df_rf = pipeline.fit(final_df).transform(final_df)

                                                                                

In [48]:
train_data, test_data = df_rf.randomSplit([0.7, 0.3], seed=96)

In [49]:
rfc= RandomForestClassifier(numTrees=70,
                            maxDepth=3, 
                            labelCol='isLate',
                            seed=96)

In [50]:
rfc.setFeaturesCol("features")

RandomForestClassifier_b58cad7c0223

In [51]:
rfc_model= rfc.fit(train_data)



In [52]:
preds= rfc_model.transform(test_data) 


In [53]:
bce= BinaryClassificationEvaluator(rawPredictionCol= "rawPrediction",
                                   labelCol="isLate", 
                                   metricName= "areaUnderROC")

In [54]:
bce.evaluate(preds) 

                                                                                

0.9764948743273518

Random Forest Regression Model

In [58]:
rfr_data = final_df.drop(col('isLate'))

In [60]:
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator

In [61]:
sindexer= StringIndexer(inputCols= ['MAIL_CLASS'], 
                        outputCols= ["indexed_{}".format(item) for item in ['MAIL_CLASS']],
                        handleInvalid='keep',
                        stringOrderType='frequencyDesc')

In [62]:
assembler = VectorAssembler(
    inputCols=['scanIntervalTotal', 'scanIntervalMusicCity', 'goodScanCnt', 'badScanCnt', 'avgDAPR', 'avgMDPR', 
               'avgPRCP', 'avgSNOW', 'avgSNWD', 'indexed_MAIL_CLASS'],
    outputCol='features')

In [63]:
pipeline = Pipeline(stages=[sindexer, assembler])

In [65]:
df_rfr = pipeline.fit(final_df).transform(final_df)

                                                                                

In [66]:
train_data, test_data = df_rf.randomSplit([0.7, 0.3], seed=96)

In [67]:
random_forest_reg = RandomForestRegressor(featuresCol="features",labelCol="daysDelivered")

In [68]:
model = random_forest_reg.fit(train_data)

                                                                                

In [69]:
predictions = model.transform(test_data)

In [70]:
evaluator = RegressionEvaluator(labelCol="daysDelivered"\
                                , predictionCol="prediction", metricName="rmse")
print ("Root Mean Squared Error (RMSE) on test data = ",evaluator.evaluate(predictions))



Root Mean Squared Error (RMSE) on test data =  0.6701515000365502


                                                                                

In [71]:
evaluator = RegressionEvaluator(labelCol="daysDelivered",\
                                predictionCol="prediction", metricName="r2")
print("R Squared (R2) on test data =", evaluator.evaluate(predictions))



R Squared (R2) on test data = 0.8547722329343596


                                                                                