In [1]:
import functools, os
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
spark = SparkSession.builder.getOrCreate()
sc = SQLContext(spark)

### Data

In [2]:
os.getcwd()

'/sfs/qumulo/qhome/dbn5eu/ds5110/DS5110_MAP21'

#### Input Files

In [3]:
# list inputs
crash_file = "inputs/EPDO Rate.csv" 
incident_file = "inputs/LIIR.csv"
ssp_file = "inputs/SSP schedules.csv"
volume_file = "inputs/Vol-VoverC all.csv"
lane_file = "inputs/num lanes.csv"
terrain_file = "inputs/terrain.csv"
truckpct_file = "inputs/Truck Pct.csv"
areatype_file = "inputs/urban_rural.csv"
LOTTR_file = "inputs/rel_unrel.csv"
TMCattribute_file = "inputs/miles urbanCode.csv"
county_file = "inputs/county_district.csv"
TMC_file = "inputs/TMC metadata.csv"
dir_AADT_file = "inputs/Dir AADT.csv"
num_days_file = "inputs/num days in data year.csv"

In [None]:
%%time
# read inputs
crash_data = spark.read.csv(crash_file, header=True)
incident_data = spark.read.csv(incident_file, header=True)
ssp_data = spark.read.csv(ssp_file, header=True)
volume_data = spark.read.csv(volume_file, header=True)
lane_data = spark.read.csv(lane_file, header=True)
terrain_data = spark.read.csv(terrain_file, header=True)
truckpct_data = spark.read.csv(truckpct_file, header=True)
areatype_data = spark.read.csv(areatype_file, header=True)
LOTTR_data = spark.read.csv(LOTTR_file, header=True)
TMCattribute_data = spark.read.csv(TMCattribute_file, header=True)
county_data = spark.read.csv(county_file, header=True)
TMC_data = spark.read.csv(TMC_file, header=True)
dir_AADT_data = spark.read.csv(dir_AADT_file, header=True)
num_days_data = spark.read.csv(num_days_file, header=True)

In [None]:
%%time
# merge inputs
sc.sql("set spark.sql.caseSensitive=true")

all_data = TMC_data.join(LOTTR_data, TMC_data.tmc == LOTTR_data.tmc_code, 'left_outer').drop('tmc_code')
all_data = all_data.join(crash_data.withColumnRenamed("Tmc","tmc"), on=['tmc','year'], how='left_outer')
all_data = all_data.join(incident_data.withColumnRenamed("Tmc","tmc"), on=['tmc','year'], how='left_outer')
all_data = all_data.join(ssp_data.withColumnRenamed("Tmc","tmc").withColumnRenamed("Year","year"), on=['tmc','year'], how='left_outer')
all_data = all_data.join(volume_data, on=['tmc','year'], how='left_outer')
all_data = all_data.join(lane_data, on=['tmc','year'], how='left_outer')
all_data = all_data.join(terrain_data, on=['tmc'], how='left_outer')
all_data = all_data.join(truckpct_data, on=['tmc'], how='left_outer')
all_data = all_data.join(areatype_data, on=['tmc'], how='left_outer')
all_data = all_data.join(TMCattribute_data.withColumnRenamed("tmc_code","tmc"), on=['tmc','year'], how='left_outer')
all_data = all_data.join(county_data.withColumnRenamed("tmc_code","tmc"), on=['tmc'], how='left_outer')
all_data = all_data.join(dir_AADT_data.withColumnRenamed("Travel_Time_Code","tmc").withColumnRenamed("Year_Record","year"), on=['tmc','year'], how='left_outer')
all_data = all_data.join(num_days_data.withColumnRenamed("Year","year"), on=['year'], how='left_outer')

#### Fixes

In [None]:
# replace whitespace in column names
all_data = all_data.select([F.col(col).alias(col.replace(' ', '_')) for col in all_data.columns])
print(list(all_data.columns))

In [None]:
# Add occupancy factor and observed LOTTR helper numerator and denominator terms

# default occupancy factor
occ_fac = 1.7

all_data = all_data.withColumn("occ_fac", F.lit(occ_fac))
all_data = all_data.withColumn("obs_isReliable", F.when(all_data['obs_rel_unrel'] == "Rel", 1).otherwise(0))
all_data = all_data.withColumn("obs_LOTTR_helper_denominator", all_data['miles_on_NHS'] * all_data['DIR_AADT'] * all_data['Number_Days'])
all_data = all_data.withColumn("obs_LOTTR_helper_numerator", all_data['obs_LOTTR_helper_denominator'] * all_data['obs_isReliable'])

In [None]:
# Add categorical fields for Terrain, and area type
all_data = all_data.withColumn("Rolling", F.when(all_data['Terrain'] == "Rolling", 1).otherwise(0))
all_data = all_data.withColumn("Urbanized", F.when(all_data['Area_Type'] == "Urbanized", 1).otherwise(0))
all_data = all_data.withColumn("UrbanZCluster", F.when(all_data['Area_Type'] == "Urban Cluster", 1).otherwise(0))

In [None]:
# Drop some columns that are not going to be used
all_data = all_data.drop('road','dir','road_dir','Intersection','county','road order', 'global_road_order', 'Urban_Code','DIR_ADDT','District')

In [None]:
all_data.toPandas()

In [None]:
all_data.toPandas().to_csv("all_data.csv")

#### Reformat Data

The relatively small amount of rows could be an issue with training a classification algorithm. We are going to transform the data from wide to long format by separating all of the period columns into their own distinct dataframes, changing the column names to a universal name, and concatenating the dataframes. We'll then merge on all non-period associated columns. We'll save a copy of the response variable, OBS.rel_unrel, so that after running models we can do a grouby on year, tmc, and period, and compare to the original response format.

In [None]:
# separate data
non_period_cols = ['year', 'tmc', 'obs_rel_unrel', 'lanes', 'miles', 'Rolling',  'Urbanized', 'UrbanZCluster']
AMP_cols = ['year','tmc', 'AMP_EPDOR', 'AMP_LIIR', 'AMP_SSP', 'AMP_Hour_TMS_Vol', 'AMP_TMS_Vol', 'AMP_Lane_TMS_Vol', 'AMP_Hour_Lane_TMS_Vol', 'AMP_Hour_NPMRDS_Vol', 'AMP_NPMRDS_Vol', 'AMP_Hour_Lane_NPMRDS_Vol', 'AMP_NPMRDS_straight_VoverC', 'AMP_TMS_straight_VoverC', 'AMP_NPMRDS_weighted_VoverC', 'AMP_TMS_weighted_VoverC', 'AMP_Truck']
MIDD_cols = ['year','tmc', 'MIDD_EPDOR', 'MIDD_LIIR', 'MIDD_SSP', 'MIDD_Hour_TMS_Vol', 'MIDD_TMS_Vol', 'MIDD_Lane_TMS_Vol', 'MIDD_Hour_Lane_TMS_Vol', 'MIDD_Hour_NPMRDS_Vol', 'MIDD_NPMRDS_Vol', 'MIDD_Hour_Lane_NPMRDS_Vol', 'MIDD_NPMRDS_straight_VoverC', 'MIDD_TMS_straight_VoverC', 'MIDD_NPMRDS_weighted_VoverC', 'MIDD_TMS_weighted_VoverC', 'MIDD_Truck']
PMP_cols = ['year','tmc', 'PMP_EPDOR', 'PMP_LIIR', 'PMP_SSP', 'PMP_Hour_TMS_Vol', 'PMP_TMS_Vol', 'PMP_Lane_TMS_Vol', 'PMP_Hour_Lane_TMS_Vol', 'PMP_Hour_NPMRDS_Vol', 'PMP_NPMRDS_Vol', 'PMP_Hour_Lane_NPMRDS_Vol', 'PMP_NPMRDS_straight_VoverC', 'PMP_TMS_straight_VoverC', 'PMP_NPMRDS_weighted_VoverC', 'PMP_TMS_weighted_VoverC', 'PMP_Truck']
WE_cols = ['year','tmc', 'WE_EPDOR', 'WE_LIIR', 'WE_SSP', 'WE_Hour_TMS_Vol', 'WE_TMS_Vol', 'WE_Lane_TMS_Vol', 'WE_Hour_Lane_TMS_Vol', 'WE_Hour_NPMRDS_Vol', 'WE_NPMRDS_Vol', 'WE_Hour_Lane_NPMRDS_Vol', 'WE_NPMRDS_straight_VoverC', 'WE_TMS_straight_VoverC', 'WE_NPMRDS_weighted_VoverC', 'WE_TMS_weighted_VoverC', 'WE_Truck']
FINAL_TEST_COLS = ['year', 'tmc', 'obs_rel_unrel']

non_period_data = all_data[non_period_cols]
AMP_data = all_data[AMP_cols]
MIDD_data = all_data[MIDD_cols]
PMP_data = all_data[PMP_cols]
WE_data = all_data[WE_cols]
FINAL_TEST_DATA = all_data[FINAL_TEST_COLS]

In [None]:
# rename columns
def renameCols(data, name):
    for col in data.columns:
        data = data.withColumnRenamed(col,col.replace(name,""))
    return data

AMP_data = renameCols(AMP_data,"AMP_")
MIDD_data = renameCols(MIDD_data,"MIDD_")
PMP_data = renameCols(PMP_data,"PMP_")
WE_data = renameCols(WE_data,"WE_")

In [None]:
# add period column
AMP_data = AMP_data.withColumn("Period", F.lit("AMP"))
MIDD_data = MIDD_data.withColumn("Period", F.lit("MIDD"))
PMP_data = PMP_data.withColumn("Period", F.lit("PMP"))
WE_data = WE_data.withColumn("Period", F.lit("WE"))

In [None]:
# concatenate period data
period_data = functools.reduce(lambda df1, df2: df1.union(df2), [AMP_data, MIDD_data, PMP_data, WE_data])

In [None]:
# merge wit the non-period columns
data_final = period_data.join(non_period_data, on=['year','tmc'])

In [None]:
%%time
tempdf = data_final.toPandas()

In [None]:
tempdf.isna().sum()

In [None]:
tempdf

In [None]:
tempdf.to_csv("data_long.csv")