In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import *
from dateutil.relativedelta import relativedelta
from pyspark.sql.window import Window
from pyspark.sql.types import (StructType, StructField, DateType, BooleanType,
                               DoubleType, IntegerType, StringType, TimestampType)
from functools import reduce
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from datetime import datetime, timedelta



In [2]:
#!hdfs dfs -put /home/mechols/data/rows.csv /user/mechols/data/
#!hdfs dfs -put /home/mechols/data/AllWeather.csv /user/mechols/data/
#!hdfs dfs -put /home/mechols/data/weather_data.csv /user/mechols/data/
#!hdfs dfs -rm -r /user/mechols/data/fulldf.csv

In [3]:
!hdfs dfs -ls /user/mechols/data/

Found 8 items
-rw-r--r--   3 mechols mechols      375182 2019-05-26 23:15 /user/mechols/data/AllWeather.csv
-rw-r--r--   3 mechols mechols        1708 2019-05-17 14:10 /user/mechols/data/chicago_community_names.csv
-rw-r--r--   3 mechols mechols  1804333782 2019-05-16 21:03 /user/mechols/data/chicago_crimes.csv
-rw-r--r--   3 mechols mechols   208276005 2019-04-30 20:04 /user/mechols/data/food-inspections.csv
drwxr-xr-x   - mechols mechols           0 2019-06-03 15:23 /user/mechols/data/fulldf.csv
-rw-r--r--   3 mechols mechols 11980344386 2019-05-21 14:25 /user/mechols/data/rows.csv
drwxr-xr-x   - mechols mechols           0 2019-05-26 17:22 /user/mechols/data/weatherTrial
drwxr-xr-x   - mechols mechols           0 2019-05-25 11:15 /user/mechols/data/weather_data


In [4]:
# Adding access for Alison and Alpha
#!hdfs dfs -chmod -R a+rX /user/mechols/data

In [5]:
# Removing access
#!hdfs dfs -chmodd -R go-rwx /user/$USER/data

In [6]:
spark = SparkSession.builder.appName('RideShare').getOrCreate()
conf = spark.sparkContext._conf.setAll([('spark.executor.memory', '256g'),
                                        ('spark.app.name', 'Spark Updated Conf'),
                                        ('spark.executor.cores', '16'),
                                        ('spark.cores.max', '16'),
                                        ('spark.driver.memory','256g'),
                                        ('spark.sql.AutoBroadcastJoinThreshold', -1),
                                        ('mapreduce.reduce.memory.mb',-1),
                                        ('spark.yarn.executor.memoryOverhead', -1),
                                        ('spark.kryoserializer.buffer.max.mb', '5g')])


#### Creating Spark Dataframes of Weather Data and RideShare Data. 

In [7]:
#Creates Spark Dataframe of all months (smoother than union)
weatherT = sqlContext.read.format('csv').options(header='true', inferSchema = True).load("/user/mechols/data/AllWeather.csv")
initial = weatherT.count()

In [8]:
rides = sqlContext.read.format('csv').options(header='true', inferSchema = True).load("/user/mechols/data/rows.csv")


## Data Cleaning

### RideShare Data

#### Cleaning Rideshare df by matching the order of dates to the weather data and converting them to timestamps.

In [9]:
rides = rides.withColumn('Trip Start Timestamp', from_unixtime(unix_timestamp(col(('Trip Start Timestamp')), "MM/dd/yyy hh:mm:ss aa"), "yyyy-MM-dd HH"))
rides = rides.withColumn('Trip End Timestamp', from_unixtime(unix_timestamp(col(('Trip End Timestamp')), "MM/dd/yyy hh:mm:ss aa"), "yyyy-MM-dd HH"))


In [10]:
rides = rides.withColumn("Trip Start Timestamp", rides["Trip Start Timestamp"].cast(TimestampType()))
rides = rides.withColumn("Trip End Timestamp", rides["Trip End Timestamp"].cast(TimestampType()))


In [11]:
rides.printSchema()

root
 |-- Trip ID: string (nullable = true)
 |-- Trip Start Timestamp: timestamp (nullable = true)
 |-- Trip End Timestamp: timestamp (nullable = true)
 |-- Trip Seconds: integer (nullable = true)
 |-- Trip Miles: double (nullable = true)
 |-- Pickup Census Tract: long (nullable = true)
 |-- Dropoff Census Tract: long (nullable = true)
 |-- Pickup Community Area: integer (nullable = true)
 |-- Dropoff Community Area: integer (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Tip: integer (nullable = true)
 |-- Additional Charges: double (nullable = true)
 |-- Trip Total: double (nullable = true)
 |-- Shared Trip Authorized: boolean (nullable = true)
 |-- Trips Pooled: integer (nullable = true)
 |-- Pickup Centroid Latitude: double (nullable = true)
 |-- Pickup Centroid Longitude: double (nullable = true)
 |-- Pickup Centroid Location: string (nullable = true)
 |-- Dropoff Centroid Latitude: double (nullable = true)
 |-- Dropoff Centroid Longitude: double (nullable = true)
 |-- 

#### Dropping rows where missing values are in features that cannot be imputed accurately.  

In [12]:
rides = rides.where(col("Trip Miles").isNotNull())
rides = rides.where(col("Pickup Community Area").isNotNull())
rides = rides.where(col("Dropoff Community Area").isNotNull())
rides = rides.where(col("Trip Seconds") != 0)
rides = rides.where(col("Trip Miles") != 0)
rides = rides.where(col("Fare").isNotNull())
rides = rides.where(col("Fare") != 0)
rides.count()

39202018

#### Dropping columns of features that will not be available for test sets. 

In [13]:
drop_list = ['Pickup Census Tract', "Dropoff Census Tract", "Tip", "Trips Pooled","tripID",
             'startTime',"endTime","addCharge","tripTotal","pickupLoc","dropoffLoc"]
rides = rides.select([column for column in rides.columns if column not in drop_list])

### Weather Data

In [14]:
weather = weatherT.toPandas()

#### Dropping columns of features that will not be available for test sets 

In [15]:
weather = weather.drop(["_c0","Unnamed: 0","cloudCover","dewPoint","icon","pressure","uvIndex","visibility","windBearing","windGust","windSpeed", "precipType", "precipAccumulation"], axis = 1)

#### Converting datatype of time fromt a string to a timestamp 

In [16]:
weather['time'] = pd.to_datetime(weather['time'])
weather['time'] = pd.to_datetime(weather['time'], format = '%Y-%m-%d %H:%M:%S')

#### Grouping weather types by Cloudy, Rainy, Snowy, or Clear 

In [17]:
Type = {"Foggy":"Cloudy","Mostly Cloudy":"Cloudy","Overcast":"Cloudy","Windy and Partly Cloudy":"Cloudy",
        "Windy and Mostly Cloudy":"Cloudy","Partly Cloudy":"Cloudy",
        "Rain":"Rain", "Drizzle":"Rain","Light Rain":"Rain",
        "Heavy Snow": "Snow","Possible Light Snow": "Snow","Light Snow": "Snow","Snow": "Snow",
        "Clear":"Clear"}

weather['summary'] = weather['summary'].map(Type)
weather['Cloudy'] = np.where(weather['summary']=="Cloudy", 1, 0)
weather['Rainy'] = np.where(weather['summary']=="Rain", 1, 0)
weather['Snowy'] = np.where(weather['summary']=="Snow", 1, 0)
weather = weather.drop("summary", axis = 1)

In [18]:
weather.head()

Unnamed: 0,apparentTemperature,humidity,precipIntensity,precipProbability,temperature,time,Cloudy,Rainy,Snowy
0,49.99,0.72,0.0,0.0,49.99,2018-11-01 00:00:00,1,0,0
1,50.11,0.71,0.0,0.0,50.11,2018-11-01 01:00:00,1,0,0
2,48.95,0.67,0.0,0.0,49.99,2018-11-01 02:00:00,1,0,0
3,51.08,0.64,0.0,0.0,51.08,2018-11-01 03:00:00,1,0,0
4,50.43,0.61,0.0,0.0,50.43,2018-11-01 04:00:00,1,0,0


#### Imputing missing rows in weather data.

In [19]:
z = 3596
weather = weather.sort_values("time")
datediff = []
datediff.append(pd.Timedelta(weather.time.iloc[0] - weather.time.iloc[0]).seconds/3600)
for i in range(1,len(weather)):
    j = i - 1
    datediff.append(pd.Timedelta(weather.time.iloc[i] - weather.time.iloc[j]).seconds/3600)
weather["Diff"] = datediff
listplus = weather.loc[weather.Diff > 1]
linestoadd = len(listplus)
listplus

Unnamed: 0,apparentTemperature,humidity,precipIntensity,precipProbability,temperature,time,Cloudy,Rainy,Snowy,Diff
78,42.64,0.75,0.0,0.0,47.76,2018-11-04 07:00:00,1,0,0,2.0
239,21.8,0.71,0.0,0.0,28.65,2018-11-11 01:00:00,1,0,0,2.0
262,33.36,0.76,0.0,0.0,33.36,2018-11-12 01:00:00,1,0,0,2.0
1173,40.32,0.76,0.0,0.0,44.05,2018-12-20 01:00:00,1,0,0,2.0
2324,33.07,0.94,0.0092,0.64,33.07,2019-02-06 01:00:00,1,0,0,2.0
2371,-8.05,0.75,0.0,0.0,10.67,2019-02-08 01:00:00,1,0,0,2.0
2670,24.98,0.98,0.0052,0.4,32.49,2019-02-20 13:00:00,1,0,0,2.0
2717,31.76,0.67,0.0,0.0,35.55,2019-02-22 13:00:00,1,0,0,2.0
2764,13.55,0.72,0.0,0.0,26.53,2019-02-24 13:00:00,1,0,0,2.0
2805,8.27,0.75,0.0,0.0,18.55,2019-02-26 07:00:00,1,0,0,2.0


In [20]:
for i in range(len(listplus)):   
    weather = weather.sort_values("time")
    datediff = []
    datediff.append(pd.Timedelta(weather.time.loc[0] - weather.time.loc[0]).seconds/3600)
    for i in range(1,len(weather)):
        j = i - 1
        datediff.append(pd.Timedelta(weather.time.iloc[i] - weather.time.iloc[j]).seconds/3600)
    weather["Diff"] = datediff
    listplus = weather.index[weather.Diff > 1]
    j = listplus[0]
    i = j - 1
    weather.loc[z] = [(weather.loc[i][0]+weather.loc[j][0])/2, (weather.loc[i][1]+weather.loc[j][1])/2,
                              (weather.loc[i][2]+weather.loc[j][2])/2, (weather.loc[i][3]+weather.loc[j][3])/2,
                              (weather.loc[i][4]+weather.loc[j][4])/2, (weather.loc[i][5] + pd.Timedelta(hours=1)),
                              (weather.loc[i][6]+weather.loc[j][6])/2,(weather.loc[i][7]+weather.loc[j][7])/2,
                              (weather.loc[i][8]+weather.loc[j][8])/2,1]
    z += 1
    print("Errors Left:",len(listplus))
print("Errors Left:",len(listplus)-1)

Errors Left: 27
Errors Left: 26
Errors Left: 25
Errors Left: 24
Errors Left: 23
Errors Left: 22
Errors Left: 21
Errors Left: 20
Errors Left: 19
Errors Left: 18
Errors Left: 17
Errors Left: 16
Errors Left: 15
Errors Left: 14
Errors Left: 13
Errors Left: 12
Errors Left: 11
Errors Left: 10
Errors Left: 9
Errors Left: 8
Errors Left: 7
Errors Left: 6
Errors Left: 5
Errors Left: 4
Errors Left: 3
Errors Left: 2
Errors Left: 1
Errors Left: 0


#### Calculates if any duplicates were created using the function above 

In [21]:
times = weather.time
weather.index[times.isin(times[times.duplicated()])].sort_values("time")

(Int64Index([], dtype='int64'), array([], dtype=int64))

In [22]:
weather = weather.drop("Diff", axis = 1)

#### Casting timestamp of weather data to match rides data for joining.

In [23]:
weather.time = weather.time.dt.strftime('%Y-%m-%d %H:%M:%S')

In [24]:
spark_weather = sqlContext.createDataFrame(weather)
spark_weather = spark_weather.withColumn("time", spark_weather["time"].cast(TimestampType()))

In [25]:
spark_weather.dtypes

[('apparentTemperature', 'double'),
 ('humidity', 'double'),
 ('precipIntensity', 'double'),
 ('precipProbability', 'double'),
 ('temperature', 'double'),
 ('time', 'timestamp'),
 ('Cloudy', 'double'),
 ('Rainy', 'double'),
 ('Snowy', 'double')]

In [26]:
rides.dtypes

[('Trip ID', 'string'),
 ('Trip Start Timestamp', 'timestamp'),
 ('Trip End Timestamp', 'timestamp'),
 ('Trip Seconds', 'int'),
 ('Trip Miles', 'double'),
 ('Pickup Community Area', 'int'),
 ('Dropoff Community Area', 'int'),
 ('Fare', 'double'),
 ('Additional Charges', 'double'),
 ('Trip Total', 'double'),
 ('Shared Trip Authorized', 'boolean'),
 ('Pickup Centroid Latitude', 'double'),
 ('Pickup Centroid Longitude', 'double'),
 ('Pickup Centroid Location', 'string'),
 ('Dropoff Centroid Latitude', 'double'),
 ('Dropoff Centroid Longitude', 'double'),
 ('Dropoff Centroid Location', 'string')]

#### Cleaning feature names to make indexing easier 

In [27]:
oldColumns = rides.schema.names
newColumns = ["tripID","startTime","endTime","seconds","miles","communityPickup","communityDropoff","fare","addCharge",
              "tripTotal","shared","pickupLat","pickupLong","pickupLoc","dropoffLat","dropoffLong","dropoffLoc"]
rides = reduce(lambda rides, idx: rides.withColumnRenamed(oldColumns[idx], newColumns[idx]), range(len(oldColumns)), rides)
rides.printSchema()


root
 |-- tripID: string (nullable = true)
 |-- startTime: timestamp (nullable = true)
 |-- endTime: timestamp (nullable = true)
 |-- seconds: integer (nullable = true)
 |-- miles: double (nullable = true)
 |-- communityPickup: integer (nullable = true)
 |-- communityDropoff: integer (nullable = true)
 |-- fare: double (nullable = true)
 |-- addCharge: double (nullable = true)
 |-- tripTotal: double (nullable = true)
 |-- shared: boolean (nullable = true)
 |-- pickupLat: double (nullable = true)
 |-- pickupLong: double (nullable = true)
 |-- pickupLoc: string (nullable = true)
 |-- dropoffLat: double (nullable = true)
 |-- dropoffLong: double (nullable = true)
 |-- dropoffLoc: string (nullable = true)



## Joining Data 

####  Joining rides and weather data on time of occurance

In [28]:
fulldf = rides.join(spark_weather, rides.startTime == spark_weather.time, how='inner')

#### Confirming shape of new dataframe is consistent.

In [29]:
#print(fulldf.count())
#print(len(fulldf.dtypes))
#print(fulldf.count() * (len(fulldf.dtypes)))


#### Dropping columns that are redundent or won't be applicable for prediction 

In [30]:
drop_list = ["tripID",'startTime',"endTime","addCharge","tripTotal","pickupLoc","dropoffLoc"]
fulldf = fulldf.select([column for column in fulldf.columns if column not in drop_list])

#### Checking for null/NaN elements within columns 

In [31]:
def count_not_null(c, nan_as_null=False):
    pred = col(c).isNull() & (isnan(c) if nan_as_null else lit(True))
    return sum(pred.cast("integer")).alias(c)

fulldf.agg(*[count_not_null(c) for c in fulldf.columns]).show()

+-------+-----+---------------+----------------+----+------+---------+----------+----------+-----------+-------------------+--------+---------------+-----------------+-----------+----+------+-----+-----+
|seconds|miles|communityPickup|communityDropoff|fare|shared|pickupLat|pickupLong|dropoffLat|dropoffLong|apparentTemperature|humidity|precipIntensity|precipProbability|temperature|time|Cloudy|Rainy|Snowy|
+-------+-----+---------------+----------------+----+------+---------+----------+----------+-----------+-------------------+--------+---------------+-----------------+-----------+----+------+-----+-----+
|      0|    0|              0|               0|   0|     0|        0|         0|         0|          0|                  0|       0|              0|                0|          0|   0|     0|    0|    0|
+-------+-----+---------------+----------------+----+------+---------+----------+----------+-----------+-------------------+--------+---------------+-----------------+-----------+----+

#### Removing cached images

In [32]:
rides.unpersist()
spark_weather.unpersist()
fulldf.unpersist()

DataFrame[seconds: int, miles: double, communityPickup: int, communityDropoff: int, fare: double, shared: boolean, pickupLat: double, pickupLong: double, dropoffLat: double, dropoffLong: double, apparentTemperature: double, humidity: double, precipIntensity: double, precipProbability: double, temperature: double, time: timestamp, Cloudy: double, Rainy: double, Snowy: double]

#### Adding columns for Month/Day/Year and dropping the time column 

In [33]:
fulldf = fulldf.withColumn('month', month(fulldf['time']))\
.withColumn('day', dayofweek(fulldf['time']))\
.withColumn('hour', hour(fulldf['time']))\
.cache()

In [34]:
drop_list = ["time"]
fulldf = fulldf.select([column for column in fulldf.columns if column not in drop_list])

### Writing dataframe back to HDFS for use by the team

In [None]:
#WRITE to HDFS

res_path = '/user/mechols/data/fulldf.csv'
fulldf.write.csv(path=res_path, header=True, compression='gzip')