In [337]:
import findspark
findspark.init("/home/meghdad/spark-2.4.5-bin-hadoop2.7")

In [338]:
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName("NYC_taxi_demnad").getOrCreate()

In [345]:
df=spark.read.csv("pickups+weather_wallstreet.csv",header=True,inferSchema=True)

In [346]:
## number of rows and columns
print(df.count(),len(df.columns))

## variable types
df.printSchema()

65712 15
root
 |-- datetime: string (nullable = true)
 |-- pickups: integer (nullable = true)
 |-- date: timestamp (nullable = true)
 |-- min_temp: double (nullable = true)
 |-- max_temp: double (nullable = true)
 |-- wind_speed: double (nullable = true)
 |-- wind_gust: double (nullable = true)
 |-- visibility: double (nullable = true)
 |-- pressure: double (nullable = true)
 |-- precipitation: double (nullable = true)
 |-- snow_depth: double (nullable = true)
 |-- fog: integer (nullable = true)
 |-- rain_drizzle: integer (nullable = true)
 |-- snow_ice: integer (nullable = true)
 |-- thunder: integer (nullable = true)



In [347]:
## an example of what each row includes:
for item in df.head(1)[0]:
    print(item)

2009-01-01 00
47
2009-01-01 00:00:00
15.1
26.1
11.6
32.1
10.0
1015.5
0.04
0.0
0
0
0
0


### Data preparation

In [348]:
## missing values in dataframe
from pyspark.sql.functions import col,sum
df.select(*(sum(col(c).isNull().cast("int")).alias(c) for c in df.columns)).show()

+--------+-------+----+--------+--------+----------+---------+----------+--------+-------------+----------+---+------------+--------+-------+
|datetime|pickups|date|min_temp|max_temp|wind_speed|wind_gust|visibility|pressure|precipitation|snow_depth|fog|rain_drizzle|snow_ice|thunder|
+--------+-------+----+--------+--------+----------+---------+----------+--------+-------------+----------+---+------------+--------+-------+
|       0|      0|   0|       0|       0|         0|        0|         0|       0|            0|         0|  0|           0|       0|      0|
+--------+-------+----+--------+--------+----------+---------+----------+--------+-------------+----------+---+------------+--------+-------+



In [350]:
## adding two columns of day of week(dow) and time of day(tod)
from pyspark.sql.functions import split,date_format,col
# df.select(df.columns[1:],date_format("date","E").alias("dow"))

split_column=split(df["datetime"],"-")
df=df.withColumn("tod",split_column.getItem(1))
df=df.withColumn("dow",date_format("date","E"))
print(df.columns)


['datetime', 'pickups', 'date', 'min_temp', 'max_temp', 'wind_speed', 'wind_gust', 'visibility', 'pressure', 'precipitation', 'snow_depth', 'fog', 'rain_drizzle', 'snow_ice', 'thunder', 'tod', 'dow']


In [351]:
## getting dummy variables for dow and tod
from pyspark.sql.functions import when,col
# list of ounique items in "dow" column
dow_cats=df.select("dow").distinct().rdd.flatMap(lambda x:x).collect()
exprs_dow=[when(col("dow")==cat,1).otherwise(0).alias(str(cat)) for cat in dow_cats]

tod_cats=df.select("tod").distinct().rdd.flatMap(lambda x:x).collect()
exprs_tod=[when(col("tod")==cat,1).otherwise(0).alias(str(cat)) for cat in tod_cats]



df=df.select(df.columns+exprs_dow+exprs_tod)
print(df.columns)

['datetime', 'pickups', 'date', 'min_temp', 'max_temp', 'wind_speed', 'wind_gust', 'visibility', 'pressure', 'precipitation', 'snow_depth', 'fog', 'rain_drizzle', 'snow_ice', 'thunder', 'tod', 'dow', 'Sun', 'Mon', 'Thu', 'Sat', 'Wed', 'Fri', 'Tue', '07', '11', '01', '09', '05', '08', '03', '02', '06', '10', '12', '04']


In [352]:
## adding lagged pickups to the dataframe
from pyspark.sql.window import Window
from pyspark.sql.functions import lag
my_window=Window.partitionBy().orderBy("date")
df=df.withColumn("Lagged_pickups",lag(df["pickups"]).over(my_window))
df.select("date","Lagged_pickups","pickups").show(10)


+-------------------+--------------+-------+
|               date|Lagged_pickups|pickups|
+-------------------+--------------+-------+
|2009-01-01 00:00:00|          null|     47|
|2009-01-01 00:00:00|            47|     74|
|2009-01-01 00:00:00|            74|     79|
|2009-01-01 00:00:00|            79|     57|
|2009-01-01 00:00:00|            57|     46|
|2009-01-01 00:00:00|            46|     18|
|2009-01-01 00:00:00|            18|     16|
|2009-01-01 00:00:00|            16|      7|
|2009-01-01 00:00:00|             7|     10|
|2009-01-01 00:00:00|            10|     10|
+-------------------+--------------+-------+
only showing top 10 rows



In [353]:
# deleting irrevant columns
columns_to_drop=['datetime',"date",'dow',"tod"]
df=df.drop(*columns_to_drop)

### Linear regression model without lagged variable 

In [354]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

In [356]:
X_variables=df.columns[1:-1]
# It grabs X columnes and converts them into a single column of feature
assembler=VectorAssembler(inputCols=X_variables,outputCol="features")
output= assembler.transform(df)
output.select("features").show(3)


+--------------------+
|            features|
+--------------------+
|(31,[0,1,2,3,4,5,...|
|(31,[0,1,2,3,4,5,...|
|(31,[0,1,2,3,4,5,...|
+--------------------+
only showing top 3 rows



In [357]:
## determine final data including X:features and y:Crew
final_data=output.select("pickups","features")
final_data.show(4)

+-------+--------------------+
|pickups|            features|
+-------+--------------------+
|     47|(31,[0,1,2,3,4,5,...|
|     74|(31,[0,1,2,3,4,5,...|
|     79|(31,[0,1,2,3,4,5,...|
|     57|(31,[0,1,2,3,4,5,...|
+-------+--------------------+
only showing top 4 rows



In [358]:
## split data into train and test set
train_data,test_data=final_data.randomSplit([0.7,0.3])

print(train_data.describe().show())
test_data.describe().show()

+-------+-----------------+
|summary|          pickups|
+-------+-----------------+
|  count|            45978|
|   mean|83.26021140545478|
| stddev|48.81862166103399|
|    min|                0|
|    max|              302|
+-------+-----------------+

None
+-------+-----------------+
|summary|          pickups|
+-------+-----------------+
|  count|            19734|
|   mean|83.03349549001723|
| stddev|48.97198227418427|
|    min|                0|
|    max|              273|
+-------+-----------------+



In [359]:
## linear regression model
from pyspark.ml.regression import LinearRegression

lr=LinearRegression(featuresCol="features",
                    labelCol="pickups",
                    predictionCol="prediction")

In [360]:
## fit linear model
lr_model=lr.fit(train_data)

## predict model
test_results=lr_model.evaluate(test_data)

In [361]:
## Regression evaluation matrix
print("Root Mean Squared Error is:",round(test_results.rootMeanSquaredError,3))
print("Mean Absolute Error is:",round(test_results.meanAbsoluteError,3))
print("R squared is:",round(test_results.r2,3))

Root Mean Squared Error is: 47.559
Mean Absolute Error is: 38.912
R squared is: 0.057


### Linear regression model with lagged variable 

In [373]:
df=df.dropna()
X_variables=df.columns[1:]
# It grabs X columnes and converts them into a single column of feature
assembler=VectorAssembler(inputCols=X_variables,outputCol="features")
output= assembler.transform(df)
output.select("features").show(3)

+--------------------+
|            features|
+--------------------+
|(32,[0,1,2,3,4,5,...|
|(32,[0,1,2,3,4,5,...|
|(32,[0,1,2,3,4,5,...|
+--------------------+
only showing top 3 rows



In [376]:
## determine final data including X:features and y:Crew
final_data=output.select("pickups","features")

## split data into train and test set
train_data,test_data=final_data.randomSplit([0.7,0.3])

In [378]:
## linear regression model
from pyspark.ml.regression import LinearRegression

lr=LinearRegression(featuresCol="features",
                    labelCol="pickups",
                    predictionCol="prediction")

## fit linear model
lr_model=lr.fit(train_data)

## predict model
test_results=lr_model.evaluate(test_data)

In [379]:
## Regression evaluation matrix
print("Root Mean Squared Error is:",round(test_results.rootMeanSquaredError,3))
print("Mean Absolute Error is:",round(test_results.meanAbsoluteError,3))
print("R squared is:",round(test_results.r2,3))

Root Mean Squared Error is: 25.966
Mean Absolute Error is: 20.492
R squared is: 0.715
