# CitiBike Data Analysis. Logistic regression on count of bike trips per day vs. weather

In [1]:
from pyspark.sql import SparkSession

from pyspark.mllib.stat import Statistics as stat
from pyspark.sql.functions import desc, asc
from pyspark.sql.functions import explode
from pyspark.sql import functions as F
from pyspark.ml import Pipeline
from pyspark.ml.feature import Bucketizer
from pyspark.sql.functions import udf
import numpy as np
import seaborn as sns

from itertools import chain
from pyspark.sql.functions import create_map, lit
from pyspark.mllib.classification import LogisticRegressionWithLBFGS, LogisticRegressionModel
from pyspark.mllib.regression import LabeledPoint
from pyspark.ml.feature import VectorAssembler 
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.ml.feature import StandardScaler
from pyspark.sql.functions import round, col
import seaborn as sb
import matplotlib.pyplot as plt
import pyarrow.parquet as pq

# Necessary for distance calculations
from math import sin, cos, sqrt, atan2, radians    
from pyspark.sql.functions import col, radians, asin, sin, sqrt, cos

# set configurations
from pyspark import SparkConf, SparkContext
from pyspark.sql.types import DoubleType, StringType

import pandas as pd
import os

# load modules
from pyspark.sql import SparkSession

from pyspark.mllib.stat import Statistics as stat
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml import Pipeline

import pandas as pd
import numpy as np
import pyspark.sql.functions as F
from pyspark.sql import DataFrame
from pyspark.sql.types import StructType, StructField, DoubleType, StringType, IntegerType
from pyspark.sql.functions import col, when, isnan, count, udf, struct, date_format, coalesce, lit, round
from math import sin, cos, sqrt, atan2, radians    
from pyspark.sql.functions import col, radians, asin, sin, sqrt, cos
import matplotlib.pyplot as plt
import os

In [2]:
conf = SparkConf().setMaster("local[*]").setAppName("weatherwork")
sc = SparkContext.getOrCreate(conf=conf)

In [3]:
spark = SparkSession.builder \
        .appName("comm") \
        .getOrCreate()

In [4]:
spark = SparkSession.builder \
        .appName("bike") \
        .getOrCreate()
%matplotlib inline

In [5]:
df = spark.read.parquet("/project/ds5559/Summer2021_TeamBike/master_dataset.parquet")

In [6]:
# load the broadcast variables from csv
# duration by station and month
durationByStationMonth_filepath = "/project/ds5559/Summer2021_TeamBike/broadcastMonthStationDuration.csv"
df_durationByStationMonth = spark.read.csv(durationByStationMonth_filepath).rdd.collectAsMap()
#for x in list(df_durationByStationMonth)[0:3]:
#    print ("key {}, value {} ".format(x,  df_durationByStationMonth[x]))
bc_durationByStationMonth = sc.broadcast(df_durationByStationMonth)
# duration by station and month and good/bad weather
goodBadDurationByStationMonth_filepath = "/project/ds5559/Summer2021_TeamBike/broadcastMonthStationWeatherGoodBadDuration.csv"
df_goodBadDurationByStationMonth = spark.read.csv(goodBadDurationByStationMonth_filepath).rdd.collectAsMap()
#for x in list(df_goodBadDurationByStationMonth)[0:3]:
#    print ("key {}, value {} ".format(x,  df_goodBadDurationByStationMonth[x]))
bc_goodBadDurationByStationMonth = sc.broadcast(df_goodBadDurationByStationMonth)
# to add columns with the GOOD and bad weather duration indexes
from pyspark.sql.functions import col
@udf(returnType=DoubleType())
def setweatherGoodDurationIndex(month : str, startStationName : str):
    averageDuration = bc_durationByStationMonth.value.get(month + "-" + startStationName)
    goodDuration= bc_goodBadDurationByStationMonth.value.get(month + "-" + startStationName + "GOOD")
    if goodDuration is None:
        # if the dictionary doesn't have a good duration value, use the average trips
        goodDuration = averageDuration
    index_value = float(goodDuration) / float(averageDuration)
    return index_value
@udf(returnType=DoubleType())
def setweatherBadDurationIndex(month : str, startStationName : str):
    averageDuration = bc_durationByStationMonth.value.get(month + "-" + startStationName)
    badDuration= bc_goodBadDurationByStationMonth.value.get(month + "-" + startStationName + "BAD")
    if badDuration is None:
        # if the dictionary doesn't have a bad duration value, use the average trips
        badDuration = averageDuration
    index_value = float(badDuration) / float(averageDuration)
    return index_value
# load the standard data file with only the columns I am interested in using:
# THE IS THE FINAL DATA FILE
file = "/project/ds5559/Summer2021_TeamBike/master_dataset.parquet" 
df = spark.read.parquet(file)
df = df.withColumn("weatherGoodDurationIndex", setweatherGoodDurationIndex(col("month"), col("startStationName")))
df = df.withColumn("weatherBadDurationIndex", setweatherBadDurationIndex(col("month"), col("startStationName")))



In [7]:
df = df.drop(F.col("gender"))
userDict = {'member':'annual','Subscriber':'annual','Customer':'daily'}
df = df.replace(userDict,subset=['usertype'])
df.select(['usertype']).show()

+--------+
|usertype|
+--------+
|   daily|
|  annual|
|   daily|
|   daily|
|  annual|
|  annual|
|  annual|
|   daily|
|   daily|
|  annual|
|  annual|
|  annual|
|   daily|
|  annual|
|  annual|
|  annual|
|   daily|
|  annual|
|   daily|
|  annual|
+--------+
only showing top 20 rows



In [8]:
df.groupby('usertype').count().sort("count").show()



+--------+--------+
|usertype|   count|
+--------+--------+
|   daily| 7124164|
|  annual|42270056|
+--------+--------+



In [9]:
df = df.withColumn("tripduration_minute", (df.tripduration / 60))

df.select('tripduration_minute').describe().show()



+-------+-------------------+
|summary|tripduration_minute|
+-------+-------------------+
|  count|           49394220|
|   mean| 18.305129223284123|
| stddev| 204.05231864630437|
|    min|           -10008.9|
|    max|          165074.55|
+-------+-------------------+



In [10]:
df = df.drop(F.col("tripduration"))

In [11]:
df.filter(F.col("tripduration_minute") <= 0).count()

8861

In [12]:
df.filter(col("tripduration_minute") > 480).count()

44387

In [13]:
df = df.withColumn("tripduration_minute", F.when(F.col("tripduration_minute") > 0, F.col("tripduration_minute")).otherwise("0"))



In [14]:
df = df.withColumn("tripduration_minute", F.when(F.col("tripduration_minute") < 480, F.col("tripduration_minute")).otherwise("0.0"))



In [15]:
# radius of earth in miles
R = 3963.0

#Convert start/end latitude and longitude from degrees to Radians
df = df.withColumn("startRadLat", radians(df.startStationLatitude)) \
       .withColumn("endRadLat", radians(df.endStationLatitude)) \
       .withColumn("startRadLong", radians(df.startStationLongitude)) \
       .withColumn("endRadLong", radians(df.endStationLongitude))

df.select(df.startRadLat,df.endRadLat,df.startRadLong,df.endRadLong).show(5,False)

+------------------+------------------+-------------------+-------------------+
|startRadLat       |endRadLat         |startRadLong       |endRadLong         |
+------------------+------------------+-------------------+-------------------+
|0.7099873523967277|0.7099873523967277|-1.2912214593940174|-1.2912214593940174|
|0.7102313878234002|0.7108809377608699|-1.2910007792652636|-1.2915272517254752|
|0.7105313146268409|0.7099391987626652|-1.29093715555631  |-1.2908386765749968|
|0.7113661947943947|0.7109956472570291|-1.291032165695735 |-1.291370736008677 |
|0.7107304261512315|0.7109660489163764|-1.2916783473222815|-1.2916279617611712|
+------------------+------------------+-------------------+-------------------+
only showing top 5 rows



In [16]:
df = df.withColumn("diffRadLat", (df.startRadLat-df.endRadLat)) \
       .withColumn("diffRadLong", (df.startRadLong-df.endRadLong)) \
       .withColumn("diffLat", (df.startStationLatitude-df.endStationLatitude)) \
       .withColumn("diffLong", (df.startStationLongitude-df.endStationLongitude))

df = df.withColumn("crowDist", asin(sqrt((sin(df.diffRadLat/2))**2 + (cos(df.startRadLat) * cos(df.endRadLat)) * (sin(df.diffRadLong/2))**2)) * 2 * R)

In [17]:
df = df.withColumn("verticalDist", asin(sqrt((sin(df.diffRadLat/2))**2)) * 2 * R)
df = df.withColumn("horizontalDist", asin(sqrt((sin(df.diffRadLong/2))**2)) * 2 * R)
df = df.withColumn("manhattanDist", (df.horizontalDist + df.verticalDist))

In [18]:
df = df.drop(F.col("birthyear"))

In [19]:
df = df.filter(df.endStationName.isNotNull())
df = df.filter(df.endStationLatitude.isNotNull())
df = df.filter(df.endStationLongitude.isNotNull())
df = df.filter(df.endRadLat.isNotNull())
df = df.filter(df.diffRadLat.isNotNull())
df = df.filter(df.diffRadLong.isNotNull())
df = df.filter(df.diffLat.isNotNull())
df = df.filter(df.diffLong.isNotNull())
df = df.filter(df.crowDist.isNotNull())
df = df.filter(df.verticalDist.isNotNull())
df = df.filter(df.horizontalDist.isNotNull())
df = df.filter(df.manhattanDist.isNotNull())

In [20]:
vars_to_keep = ['date',
                'temp',
                'feels_like',
                'pressure',
                'humidity',
                'wind_speed',
                'rain_3h', 
                'precip',
                'clouds_all',
                'temp_max',
                'snow_3h']

df = df[vars_to_keep]

In [21]:
count_df = df.select(F.date_format('date','yyyy-MM-dd').alias('day')).groupby('day').count()
count_df.show(2)

+----------+-----+
|       day|count|
+----------+-----+
|2020-02-26|44503|
|2019-08-08|68714|
+----------+-----+
only showing top 2 rows



In [22]:
df = df.dropDuplicates(['date'])
df.show(2)

+----------+------+----------+--------+--------+----------+-------+---------+----------+--------+-------+
|      date|  temp|feels_like|pressure|humidity|wind_speed|rain_3h|   precip|clouds_all|temp_max|snow_3h|
+----------+------+----------+--------+--------+----------+-------+---------+----------+--------+-------+
|2019-08-08|70.664|    76.748|    1009|     100|       1.5|    0.0|   precip|        90|  69.008|    0.0|
|2019-08-22|77.972|    87.224|    1011|      94|      1.39|    0.0|no_precip|        75|  75.002|    0.0|
+----------+------+----------+--------+--------+----------+-------+---------+----------+--------+-------+
only showing top 2 rows



In [23]:
df1 = df.join(count_df,df.date ==  count_df.day,"inner")

In [24]:
df1.show(4)

+----------+------+----------+--------+--------+----------+-------+---------+----------+--------+-------+----------+-----+
|      date|  temp|feels_like|pressure|humidity|wind_speed|rain_3h|   precip|clouds_all|temp_max|snow_3h|       day|count|
+----------+------+----------+--------+--------+----------+-------+---------+----------+--------+-------+----------+-----+
|2019-08-08|70.664|    76.748|    1009|     100|       1.5|    0.0|   precip|        90|  69.008|    0.0|2019-08-08|68714|
|2019-08-22|77.972|    87.224|    1011|      94|      1.39|    0.0|no_precip|        75|  75.002|    0.0|2019-08-22|60527|
|2019-08-23|81.842|    87.926|    1011|      69|       1.5|    0.0|no_precip|        20|  78.998|    0.0|2019-08-23|54493|
|2020-02-26|46.256|    42.296|    1011|      93|       2.1|    0.0|   precip|        90|  44.006|    0.0|2020-02-26|44503|
+----------+------+----------+--------+--------+----------+-------+---------+----------+--------+-------+----------+-----+
only showing top

In [25]:
#Get a count of station
grouped_data = df1.groupby("count")
agg_data = grouped_data.agg(F.avg("feels_like").alias("avg_feels_like"),
                 F.avg("wind_speed").alias("avg_wind_speed"),
                 F.avg("temp_max").alias("avg_temp_max"),
                 F.avg("temp").alias("avg_temp"),
                 F.avg("pressure").alias("avg_pressure"),
                 F.avg("humidity").alias("avg_humidity"),
                 F.avg("rain_3h").alias("avg_rain_3h"),
                 F.avg("snow_3h").alias("avg_snow_3h"),
                 F.count(when(df.clouds_all == 1, lit(1))).alias("clouds_count"),
                 F.count(when(df.clouds_all == 0, lit(1))).alias("no_clouds_count"),
                 F.count(when(df.precip == "precip", lit(1))).alias("precip_count"),
                 F.count(when(df.precip == "no_precip", lit(1))).alias("non_precip_count"))

In [26]:
agg_data.show(3)

+-----+--------------+--------------+------------+--------+------------+------------+-----------+-----------+------------+---------------+------------+----------------+
|count|avg_feels_like|avg_wind_speed|avg_temp_max|avg_temp|avg_pressure|avg_humidity|avg_rain_3h|avg_snow_3h|clouds_count|no_clouds_count|precip_count|non_precip_count|
+-----+--------------+--------------+------------+--------+------------+------------+-----------+-----------+------------+---------------+------------+----------------+
|53294|        36.716|          4.63|        46.4|  48.506|      1021.0|        18.0|        0.0|        0.0|           1|              0|           0|               1|
|47928|        89.186|          1.54|      80.443|  84.956|      1018.0|        55.0|        0.0|        0.0|           1|              0|           0|               1|
|31762|        52.196|          2.36|       53.06|   56.93|      1003.0|        58.0|        0.0|        0.0|           1|              0|           0|    

In [27]:
#agg_data = agg_data.join(count_df,agg_data.date ==  count_df.day,"inner")

In [28]:
#agg_data.show(3)

In [29]:
#col_names = agg_data.columns
#feat = agg_data.rdd.map(lambda row: row[0:])
#Corr
#corr_ = stat.corr(feat, method="pearson")
#Corr to pandas DF
#corr_df = pd.DataFrame(corr_)
#corr_df.index, corr_df.columns = col_names, col_names
#corr_df

In [30]:
nums_feat = [i[0] for i in agg_data.dtypes if i[1] != 'string']
nums_feat

['count',
 'avg_feels_like',
 'avg_wind_speed',
 'avg_temp_max',
 'avg_temp',
 'avg_pressure',
 'avg_humidity',
 'avg_rain_3h',
 'avg_snow_3h',
 'clouds_count',
 'no_clouds_count',
 'precip_count',
 'non_precip_count']

In [31]:
#PIPELINE indexer-standard scalar
#indexer = StringIndexer(inputCol="zipcodes", outputCol="zips_indexed")

#encoder = OneHotEncoder(inputCol=indexer.getOutputCol(), outputCol="zips_encoded")

assembler = VectorAssembler(inputCols=list((set(nums_feat)-set(["count"]))), outputCol="vectorized_features")

standardScaler = StandardScaler(inputCol=assembler.getOutputCol(), outputCol="features")

#pipeline
pipeline = Pipeline(stages=[assembler] + [standardScaler])
model = pipeline.fit(agg_data)
df_tr = model.transform(agg_data)

In [32]:
train, test = df_tr.randomSplit([0.7,0.3], seed=2021)

In [33]:
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(labelCol='count', featuresCol='features', maxIter=3)
lrModel = lr.fit(train)
predictions = lrModel.transform(test)

In [34]:
print("Coefficients: " + str(lrModel.coefficients))
print("Intercept: " + str(lrModel.intercept))

Coefficients: [-1177.6794802513764,0.0,176.94168284522559,1908.147964971849,4305.624231654213,0.0,-823.3042687866946,4773.6443021584055,837.8466822266832,4127.860098986759,0.0,-26.96217197878529]
Intercept: -234287.11963154475


In [35]:
trainingSummary = lrModel.summary
print("RMSE: " + str(trainingSummary.rootMeanSquaredError)) 
print("r2: " + str(trainingSummary.r2))

RMSE: 15472.009610796167
r2: 0.4074282440368653


In [36]:
predictions.select("prediction","count").toPandas().head()

Unnamed: 0,prediction,count
0,70139.238945,47928
1,35384.569772,53294
2,44586.918142,61577
3,54732.689653,88979
4,51599.519033,57177


In [37]:
#Change response varaible to 'label'
df_tr_2 = df_tr.withColumnRenamed('count', 'label')
#Train/test split
train, test = df_tr_2.randomSplit([0.7,0.3], seed=2021)

In [38]:
#Tune with CV - Base ModelLinear Regression tuned 
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator



#Linear regression model
lr = LinearRegression()

#Paramgrid
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.01, 0.5, 2.0]) #regularization param
             .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) #Ridge=0, lasso=1.0
             .addGrid(lr.maxIter, [1, 5, 10]) #maxIter
             .build())



#10 fold cross validation
cv = CrossValidator(estimator=lr, estimatorParamMaps = paramGrid, evaluator=RegressionEvaluator(), numFolds=10)

In [None]:
cvModel = cv.fit(train)

In [None]:
tune_preds = cvModel.transform(test)

In [None]:
# Evaluating metrics
eval_rmse = RegressionEvaluator(metricName="rmse")
eval_r2 = RegressionEvaluator(metricName="r2")
best_rmse = eval_rmse.evaluate(tune_preds)
best_r2 = eval_r2.evaluate(tune_preds)
print("R Squared (R2) on test data : " + str(best_r2)) #0.53685
print("RMSE on test data : " + str(best_rmse)) #107388

In [None]:
#Best model paramaters
bm = cvModel.bestModel

print("Best model regularlization parameter: ", bm._java_obj.getRegParam())
print("Best model elastic net parameter: ", bm._java_obj.getElasticNetParam()) #lasso
print("Best model iteration: ", bm._java_obj.getMaxIter())

In [None]:
#Model Feature weights
best_coef = bm.coefficients

In [None]:
#Get list of importances and sort and take top 10
lis = []
for i in best_coef:
    lis.append(i)
output = list(zip(lis, nums_feat[1:]))
output = sorted(output, key=lambda x: x[0], reverse=True)
#The weights were numpy-float64 so I needed to change its type to be able to create spark df
output = [(float(x[0]), x[1]) for x in output]

In [None]:
#get DF
from pyspark.sql.types import StructType, StructField, DoubleType, StringType, IntegerType


#get DF
schema = StructType([
    StructField("weight", DoubleType(), True),
    StructField("name", StringType(), True)
])
df_ticks = spark.createDataFrame(output, schema).toPandas()

In [None]:
#inspect
df_ticks