In [0]:
from pyspark.sql.functions import col, split, concat, lit, substring, upper, udf, trim
from pyspark.sql.types import DoubleType, IntegerType, StringType

In [0]:
trucksDF = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/shared_uploads/nagpritamnaik@gmail.com/trucks.csv")
trucksMGDF = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/shared_uploads/nagpritamnaik@gmail.com/trucks_mg.csv")
geolocDF = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/shared_uploads/nagpritamnaik@gmail.com/geolocation.csv")

# display(trucksDF)
# display(trucksMGDF)
# display(geolocDF)

In [0]:
# create a temp view for the datset for running sql queries 
trucksDF.createOrReplaceTempView("trucks")

In [0]:
# Unpivot the dataset for the trucks dataset
l = []
seperator = ', '
A = ['jun13_miles','jun13_gas','may13_miles','may13_gas','apr13_miles','apr13_gas','mar13_miles','mar13_gas','feb13_miles','feb13_gas','jan13_miles','jan13_gas','dec12_miles','dec12_gas','nov12_miles','nov12_gas','oct12_miles','oct12_gas','sep12_miles','sep12_gas','aug12_miles','aug12_gas','jul12_miles','jul12_gas','jun12_miles','jun12_gas','may12_miles','may12_gas','apr12_miles','apr12_gas','mar12_miles','mar12_gas','feb12_miles','feb12_gas','jan12_miles','jan12_gas',
'dec11_miles','dec11_gas','nov11_miles','nov11_gas','oct11_miles','oct11_gas','sep11_miles','sep11_gas','aug11_miles','aug11_gas','jul11_miles','jul11_gas','jun11_miles','jun11_gas','may11_miles','may11_gas','apr11_miles','apr11_gas','mar11_miles','mar11_gas','feb11_miles','feb11_gas','jan11_miles','jan11_gas',
'dec10_miles','dec10_gas','nov10_miles','nov10_gas','oct10_miles','oct10_gas','sep10_miles','sep10_gas','aug10_miles','aug10_gas','jul10_miles','jul10_gas','jun10_miles','jun10_gas','may10_miles','may10_gas','apr10_miles','apr10_gas','mar10_miles','mar10_gas','feb10_miles','feb10_gas','jan10_miles','jan10_gas',
'dec09_miles','dec09_gas','nov09_miles','nov09_gas','oct09_miles','oct09_gas','sep09_miles','sep09_gas','aug09_miles','aug09_gas','jul09_miles','jul09_gas','jun09_miles','jun09_gas','may09_miles','may09_gas','apr09_miles','apr09_gas','mar09_miles','mar09_gas','feb09_miles','feb09_gas','jan09_miles','jan09_gas']
B = seperator.join(A).split(",")
n = len(A)
for a in range(len(A)):
    l.append("'{}'".format(A[a]) + ", " + B[a])

k = seperator.join(l)
# print(n)
# print(k)

unpivotDF = spark.sql("""SELECT driverid,truckid,model, stack({0}, {1}) as (Month_Year, Miles_Gas_Values) from trucks""".format(n, k))
# display(unpivotDF)

In [0]:
#filter the dataset for miles value
unpivotDFmiles = unpivotDF.filter(col("Month_Year").like("%miles%")) \
    .withColumn("month",trim(upper(substring(split(col("Month_Year"),'_')[0],0,3)))) \
    .withColumn("year",concat(lit("20"),substring(split(col("Month_Year"),'_')[0],4,2)).cast(IntegerType())) \
    .withColumnRenamed("Miles_Gas_Values","Miles") \
    .withColumn("Miles",col("Miles").cast(DoubleType())) \
    .drop("Month_Year") \
    .selectExpr("trim(driverid) as driverid", "trim(truckid) as truckid", "trim(model) as model", "trim(month) as month", "year", "Miles")
    
# display(unpivotDFmiles)

In [0]:
#filter the dataset for gas value
unpivotDFgas = unpivotDF.filter(col("Month_Year").like("%gas%")) \
    .withColumn("month",trim(upper(substring(split(col("Month_Year"),'_')[0],0,3)))) \
    .withColumn("year",concat(lit("20"),substring(split(col("Month_Year"),'_')[0],4,2)).cast(IntegerType())) \
    .withColumnRenamed("Miles_Gas_Values","Gas") \
    .withColumn("Gas",col("Gas").cast(DoubleType())) \
    .drop("Month_Year") \
    .selectExpr("trim(driverid) as driverid", "trim(truckid) as truckid", "trim(model) as model", "trim(month) as month", "year", "Gas")
    
# display(unpivotDFgas)

In [0]:
# Perform the join between unpivotDFmiles and unpivotDFgas to extract Miles and Gas as separate columns in the dataset for each Month and year, joined on driverid, truckid, model, month and year

transformDF = unpivotDFmiles.join(unpivotDFgas, (unpivotDFmiles.driverid == unpivotDFgas.driverid) & (unpivotDFmiles.truckid == unpivotDFgas.truckid) & (unpivotDFmiles.model == unpivotDFgas.model) & (unpivotDFmiles.month == unpivotDFgas.month) & (unpivotDFmiles.year == unpivotDFgas.year))

# select relevant columns from both Dataframes
seltransformDF = transformDF.select(unpivotDFmiles.driverid, unpivotDFmiles.truckid, unpivotDFmiles.model, unpivotDFmiles.month, unpivotDFmiles.year, unpivotDFmiles.Miles, unpivotDFgas.Gas)

# display(seltransformDF)

In [0]:
#split the Date column into Month and Year Column for the Dataframe 
preprocess_trucksMGDF = trucksMGDF.withColumn("Month",split(col("Date"),'/')[1].cast(IntegerType()))\
    .withColumn("year",split(col("Date"),'/')[2].cast(IntegerType()))

# display(preprocess_trucksMGDF)

In [0]:
#udf function to categorize numeric month value to Mon to match the above dataset

def udf_month(mon):
    dict_month = {1:"JAN", 2:"FEB", 3:"MAR", 4:"APR", 5:"MAY", 6:"JUN", 7:"JUL", 8:"AUG", 9:"SEP", 10:"OCT", 11:"NOV", 12:"DEC"}
    if(mon in list(dict_month.keys())):
        return dict_month[mon]
    return None
spark_udf_month = udf(lambda mon: udf_month(mon), StringType())

In [0]:
#add month column to the pre_process dataset
seltransform_trucksMGDF = preprocess_trucksMGDF.withColumn("month", spark_udf_month("Month")) \
    .drop(col("Date")) \
    .selectExpr("driverid", "truckid", "model", "month", "year", "cast(Miles as double) as Miles", "cast(Gas as double) as Gas") \
    .sort(col("year").desc())
# display(seltransform_trucksMGDF)

In [0]:
#perform a union between the trucks(unpivoted) and truck_mg dataset, 
#Final Dataset

union_truckDF = seltransformDF.union(seltransform_trucksMGDF).sort(col("year").desc())
display(union_truckDF)

driverid,truckid,model,month,year,Miles,Gas
A5,A5,Hino,MAY,2016,10233.0,1825.0
A17,A17,Western Star,MAY,2016,12939.0,2823.0
A29,A29,Ford,MAY,2016,12091.0,2606.0
A41,A41,Navistar,MAY,2016,8490.0,1636.0
A53,A53,Navistar,MAY,2016,9796.0,2049.0
A65,A65,Hino,MAY,2016,14214.0,2997.0
A77,A77,Ford,MAY,2016,13187.0,2752.0
A89,A89,Caterpillar,MAY,2016,14955.0,3344.0
A1,A1,Freightliner,JAN,2015,9217.0,1914.0
A2,A2,Ford,FEB,2015,12058.0,2335.0


In [0]:
#extract driverId, truckId and Model names from the union of dataset 

Idlist_Df = union_truckDF.select("driverid","truckid","model").distinct()

#join between geolocation dataset and Idlist_Df for analysis dataset 
join_geolocDF = Idlist_Df.join(geolocDF, (trim(Idlist_Df.driverid) == trim(geolocDF.driverid)) & (trim(Idlist_Df.truckid) == trim(geolocDF.truckid))) \
    .drop(geolocDF["driverid"], geolocDF["truckid"])
display(join_geolocDF)


driverid,truckid,model,event,latitude,longitude,city,state,velocity,event_ind,idling_ind
A54,A54,Western Star,normal,38.440467,-122.714431,Santa Rosa,California,17,0,0
A20,A20,Western Star,normal,36.977173,-121.899402,Aptos,California,27,0,0
A40,A40,Ford,overspeed,37.957702,-121.29078,Stockton,California,77,1,0
A31,A31,Kenworth,normal,39.409608,-123.355566,Willits,California,22,0,0
A71,A71,Peterbilt,normal,33.683947,-117.794694,Irvine,California,43,0,0
A50,A50,Oshkosh,normal,38.40765,-122.947713,Occidental,California,0,0,1
A51,A51,Crane,normal,37.639097,-120.996878,Modesto,California,0,0,1
A19,A19,Caterpillar,normal,37.962146,-122.345526,San Pablo,California,0,0,1
A77,A77,Ford,normal,37.962146,-122.345526,San Pablo,California,25,0,0
A92,A92,Volvo,normal,37.484938,-119.966284,Mariposa,California,0,0,1
