In [1]:

import pandas as pd
import numpy as np
import time
from os import listdir
from os.path import isfile, join
from datetime import time
import glob
import sys

from pyspark.sql.functions import hour, mean,minute, stddev, count,max as psmax,min as psmin, date_format, split, explode

from pyspark.sql import SQLContext
from pyspark.sql import Row
from pyspark.sql.types import *

In [2]:
dbutils.fs.unmount('/mnt/ch1data_3/')

In [3]:
dbutils.fs.unmount('/mnt/ch2data_3/')

In [4]:
# Replace with your values
#
# NOTE: Set the access to this notebook appropriately to protect the security of your keys.
# Or you can delete this cell after you run the mount command below once successfully.
ACCESS_KEY = ""
SECRET_KEY = ""
ENCODED_SECRET_KEY = SECRET_KEY.replace("/", "%2F")
AWS_BUCKET_NAME = "dse-team2-2014"
MOUNT_NAME = "ch1data_3"

dbutils.fs.mount("s3a://%s:%s@%s" % (ACCESS_KEY, ENCODED_SECRET_KEY, AWS_BUCKET_NAME), "/mnt/%s" % MOUNT_NAME)

In [5]:
# Replace with your values
#
# NOTE: Set the access to this notebook appropriately to protect the security of your keys.
# Or you can delete this cell after you run the mount command below once successfully.
ENCODED_SECRET_KEY = SECRET_KEY.replace("/", "%2F")
AWS_BUCKET_NAME = "dse-team1-2015"
MOUNT_NAME2 = "ch2data_3"

dbutils.fs.mount("s3a://%s:%s@%s" % (ACCESS_KEY, ENCODED_SECRET_KEY, AWS_BUCKET_NAME), "/mnt/%s" % MOUNT_NAME2)

In [6]:
weekdaySelector = udf(
    lambda x: "weekday" if int(x) < 6 else "weekend"
)

timeOfDay = udf(
    lambda x: time(int(x.hour), int(x.minute)).strftime("%H:%M")
)

split_tabbed_data = udf(
    lambda x: x.split("\t")
) 

In [7]:
struct_list = [
    StructField("timestamp",TimestampType(),True),
    StructField("station",IntegerType(),True),
    StructField("district",IntegerType(),True),
    StructField("freeway",IntegerType(),True),
    StructField("direction_of_travel",StringType(),True),
    StructField("lanetype",StringType(),True),
    StructField("stationlength",DoubleType(),True),
    StructField("samples",IntegerType(),True),
    StructField("perc_observed",IntegerType(),True),
    StructField("totalflow",IntegerType(),True),
    StructField("avgoccupancy",DoubleType(),True),
    StructField("avgspeed",DoubleType(),True),
    StructField("lane1_samples",IntegerType(),True),
    StructField("lane1_flow",IntegerType(),True),
    StructField("lane1_avgocc",DoubleType(),True),
    StructField("lane1_avgspeed",DoubleType(),True),
    StructField("lane1_observed",IntegerType(),True),
    StructField("lane2_samples",IntegerType(),True),
    StructField("lane2_flow",IntegerType(),True),
    StructField("lane2_avgocc",DoubleType(),True),
    StructField("lane2_avgspeed",DoubleType(),True),
    StructField("lane2_observed",IntegerType(),True),
    StructField("lane3_samples",IntegerType(),True),
    StructField("lane3_flow",IntegerType(),True),
    StructField("lane3_avgocc",DoubleType(),True),
    StructField("lane3_avgspeed",DoubleType(),True),
    StructField("lane3_observed",IntegerType(),True),
    StructField("lane4_samples",IntegerType(),True),
    StructField("lane4_flow",IntegerType(),True),
    StructField("lane4_avgocc",DoubleType(),True),
    StructField("lane4_avgspeed",DoubleType(),True),
    StructField("lane4_observed",IntegerType(),True),
    StructField("lane5_samples",IntegerType(),True),
    StructField("lane5_flow",IntegerType(),True),
    StructField("lane5_avgocc",DoubleType(),True),
    StructField("lane5_avgspeed",DoubleType(),True),
    StructField("lane5_observed",IntegerType(),True),
    StructField("lane6_samples",IntegerType(),True),
    StructField("lane6_flow",IntegerType(),True),
    StructField("lane6_avgocc",DoubleType(),True),
    StructField("lane6_avgspeed",DoubleType(),True),
    StructField("lane6_observed",IntegerType(),True),
    StructField("lane7_samples",IntegerType(),True),
    StructField("lane7_flow",IntegerType(),True),
    StructField("lane7_avgocc",DoubleType(),True),
    StructField("lane7_avgspeed",DoubleType(),True),
    StructField("lane7_observed",IntegerType(),True),
    StructField("lane8_samples",IntegerType(),True),
    StructField("lane8_flow",IntegerType(),True),
    StructField("lane8_avgocc",DoubleType(),True),
    StructField("lane8_avgspeed",DoubleType(),True),
    StructField("lane8_observed",IntegerType(),True)
]

schema_struct = StructType(struct_list)

In [8]:
district_no_list = range(3,9)+range(10,13)
dir_list = ["/mnt/%s/dse_traffic/station_5min/%i/d%i/" % (MOUNT_NAME,y,z) for y in range(2008,2016) for z in district_no_list]
files = [dbutils.fs.ls(a) for a in dir_list]
onlyfiles = [b[0].path for b in files]
onlyfiles

In [9]:
rdd = spark.read.csv(
    onlyfiles, 
    header='false',
    timestampFormat='MM/dd/yyyy HH:mm:ss',
    schema=schema_struct,
    inferSchema='false'
)
rdd.take(1)

In [10]:
station_time = (
    rdd
    .select(
        'district',
        'freeway',
        'direction_of_travel',
        'timestamp',
        'station',
        'totalflow',
        'avgoccupancy',
        'avgspeed',
        date_format('timestamp', 'u').alias('dayofweek')
    )
    
)

station_time = (
    station_time
    .withColumn(
        'dayType', 
        weekdaySelector(station_time.dayofweek)
        )
    .withColumn(
        'timeOfDay',
        timeOfDay(station_time.timestamp)
    )
    .groupBy([
        'district',
        'freeway',
        'direction_of_travel',
        'station',
        'dayType',
        'timeOfDay'
    ])
    .agg(
        mean("totalflow").alias("flow_mean"),
        stddev("totalflow").alias("flow_std"),
        count("totalflow").alias("flow_count"),
        psmax("totalflow").alias("flow_max"),
        psmin("totalflow").alias("flow_min"),
        
        mean("avgoccupancy").alias("occ_mean"),
        stddev("avgoccupancy").alias("occ_std"),
        count("avgoccupancy").alias("occ_count"),
        psmax("avgoccupancy").alias("occ_max"),
        psmin("avgoccupancy").alias("occ_min"),
        
        mean("avgspeed").alias("speed_mean"),
        stddev("avgspeed").alias("speed_std"),
        count("avgspeed").alias("speed_count"),
        psmax("avgspeed").alias("speed_max"),
        psmin("avgspeed").alias("speed_min")
    )
)


In [11]:
# We'll be using the station_time dataframe over and over again to export csv splices of the data.  Caching for performance purposes.
station_time.cache()
station_time.take(5)

In [12]:
meta_dir_list = ["/mnt/%s/dse_traffic/meta/%i/d%i/" % ('ch1data_3',y,z) for y in range(2008,2017) for z in district_no_list]

meta_onlyfiles = []
for a in meta_dir_list:
  try:
    meta_onlyfiles += [b.path for b in dbutils.fs.ls(a) if b.size != '0L'],
  except:
    print "no directory"
    
from itertools import chain
meta_onlyfiles = list(chain.from_iterable(meta_onlyfiles))

meta_onlyfiles

In [13]:
meta_rdd = spark.read.csv(
    meta_onlyfiles,
    mode='DROPMALFORMED',
    header=True,
    inferSchema=True
)
meta_header_cols = meta_rdd.columns[0].split('\t')
split_col = split(meta_rdd[meta_rdd.columns[0]], '\t')
newdf = meta_rdd
for i,a in enumerate(meta_header_cols):
    newdf = newdf.withColumn(a, split_col.getItem(i))
meta_df = newdf[[meta_header_cols]].toPandas()

In [14]:
print len(meta_df)
meta_df = meta_df.drop_duplicates()
print len(meta_df)
meta_df = meta_df.groupby(['ID']).max().reset_index().sort_values(['District', 'Fwy', 'Dir', 'Abs_PM'])
print len(meta_df)

In [15]:
type(meta_df)
sqlCtx = SQLContext(sc)
meta_sdf = sqlCtx.createDataFrame(meta_df)
meta_sdf.count()

In [16]:
print type(meta_sdf)
meta_sdf.write.csv("/mnt/ch2data/share_data/meta_2008_2016.csv" , mode='overwrite', header=True)

In [17]:
meta_sdf.columns

In [18]:
# We need to first rename some of the columns as spark doesn't recognize case sensitivity.  In addition, we don't need the City, County or user information if we have lat and long and user information is blank.
sqlContext.registerDataFrameAsTable(meta_sdf, "meta_table")
meta_sdf = sqlContext.sql("SELECT ID, Fwy, Dir, District AS dis, State_PM, Abs_PM, Latitude, Longitude, Length, Type, Lanes, Name FROM meta_table")

# Joining the station_time and meta_sdf spark dataframes on four columns: station/id, freeway, direction and district
traffic_with_meta = station_time.join(meta_sdf,
                                      (station_time.station == meta_sdf.ID) & 
                                      (station_time.freeway == meta_sdf.Fwy) & 
                                      (station_time.direction_of_travel == meta_sdf.Dir) &
                                      (station_time.district == meta_sdf.dis)
                                     )
traffic_with_meta

In [19]:
# Across all districts and dayTypes
traffic_with_meta.write.csv("/mnt/ch2data/share_data/stats_with_meta_2008_2015.csv", mode='overwrite', header=True)

for my_dayType in ['weekday', 'weekend']:
  # Across all districts, split dayTypes
  traffic_with_meta.filter(traffic_with_meta['dayType'] == my_dayType) \
    .write.csv("/mnt/ch2data/share_data/stats_with_meta_2008_2015_%s.csv" % (my_dayType), mode='overwrite', header=True)
  print "/mnt/ch2data/share_data/stats_with_meta_2008_2015_%s.csv" % (my_dayType)
  # Split by district and dayTypes
  for my_district in district_no_list:
      traffic_with_meta.filter(traffic_with_meta['dayType'] == my_dayType) \
                .filter(traffic_with_meta['district'] == my_district) \
                .write.csv("/mnt/ch2data/share_data/stats_with_meta_2008_2015_d%i_%s.csv" % (my_district,my_dayType), mode='overwrite', header=True)
      print "/mnt/ch2data/share_data/stats_with_meta_2008_2015_d%i_%s.csv" % (my_district,my_dayType)

In [20]:
for my_district in district_no_list:
  traffic_with_meta.filter(traffic_with_meta['district'] == my_district) \
                .write.csv("/mnt/ch2data/share_data/stats_with_meta_2008_2015_d%i.csv" % (my_district), mode='overwrite', header=True)
  print "/mnt/ch2data/share_data/stats_with_meta_2008_2015_d%i.csv" % (my_district)

In [21]:
[d.path for d in dbutils.fs.ls('/mnt/ch2data/share_data')]