# BIG DATA PROJECT - HADOOP HEROES

In [1]:
from pyspark.sql import SparkSession
import pandas as pd

In [2]:

spark = SparkSession.builder \
        .master("local[*]") \
        .appName("ISM6562 Spark Project") \
        .enableHiveSupport() \
        .getOrCreate()

# Let's get the SparkContext object. It's the entry point to the Spark API. It's created when you create a sparksession
sc = spark.sparkContext  
sc.setLogLevel("ERROR") # only display errors (not warnings)

# note: If you have multiple spark sessions running (like from a previous notebook you've run), 
# this spark session webUI will be on a different port than the default (4040). One way to 
# identify this part is with the following line. If there was only one spark session running, 
# this will be 4040. If it's higher, it means there are still other spark sesssions still running.
spark_session_port = spark.sparkContext.uiWebUrl.split(":")[-1]
print("Spark Session WebUI Port: " + spark_session_port)

# It's best if you find that the port number displayed below is not 4040, then you should shut down all other spark sessions and 
# run this code again. If you don't, you may have trouble accessing the data in the spark-warehouse directory.

23/11/05 17:20:25 WARN Utils: Your hostname, localhost.localdomain resolves to a loopback address: 127.0.0.1; using 10.21.10.196 instead (on interface eth0)
23/11/05 17:20:25 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/11/05 17:20:25 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Spark Session WebUI Port: 4040


In [3]:
spark

In [4]:
spark.catalog.listTables()

[]

In [5]:
df=spark.sql("show databases")
df.show()

+---------+
|namespace|
+---------+
|  default|
|   w10_db|
+---------+



In [6]:
tables = spark.sql("show tables").show()

+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
+---------+---------+-----------+



In [7]:
#Load data to warehouse

In [8]:
trip = spark.read.csv('data/yellow_tripdata_2022-02.csv', header=True, inferSchema=True);

# display the first 5 rows of the dataframe
trip.show(5);

                                                                                

+--------+--------------------+---------------------+---------------+-------------+------------+------------+-----------+-----+----------+------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|PULocationID|DOLocationID|fare_amount|extra|tip_amount|total_amount|airport_fee|
+--------+--------------------+---------------------+---------------+-------------+------------+------------+-----------+-----+----------+------------+-----------+
|       1|    02-01-2022 00:06|     02-01-2022 00:19|              1|          5.4|         138|         252|       17.0| 1.75|       3.9|       23.45|       1.25|
|       1|    02-01-2022 00:38|     02-01-2022 00:55|              1|          6.4|         138|          41|       21.0| 1.75|       0.0|        30.1|       1.25|
|       1|    02-01-2022 00:03|     02-01-2022 00:26|              1|         12.5|         138|         200|       35.5| 1.75|       0.0|        44.6|       1.25|
|       2|    02

In [9]:
tables = spark.sql("show tables").show()

+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
+---------+---------+-----------+



In [10]:
type(trip)

pyspark.sql.dataframe.DataFrame

In [11]:
#Save table in spark data warehouse

In [12]:
spark.sql("CREATE DATABASE IF NOT EXISTS w10_db;")

DataFrame[]

In [13]:
trip.write.mode("overwrite").saveAsTable("w10_db.trip")

                                                                                

In [14]:
spark.catalog.listTables('w10_db')

[Table(name='boston', catalog='spark_catalog', namespace=['w10_db'], description=None, tableType='MANAGED', isTemporary=False),
 Table(name='trip', catalog='spark_catalog', namespace=['w10_db'], description=None, tableType='MANAGED', isTemporary=False)]

In [15]:
df = spark.sql("SELECT * FROM w10_db.trip")
df.show()

+--------+--------------------+---------------------+---------------+-------------+------------+------------+-----------+-----+----------+------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|PULocationID|DOLocationID|fare_amount|extra|tip_amount|total_amount|airport_fee|
+--------+--------------------+---------------------+---------------+-------------+------------+------------+-----------+-----+----------+------------+-----------+
|       2|    02-01-2022 20:44|     02-01-2022 21:08|              1|         5.74|         107|           7|       20.5|  0.5|      4.86|       29.16|        0.0|
|       1|    02-01-2022 20:35|     02-01-2022 20:44|              1|          1.3|         230|         229|        7.0|  3.0|       0.0|        10.8|        0.0|
|       2|    02-01-2022 20:11|     02-01-2022 20:33|              1|         4.37|          79|         236|       18.0|  0.5|      4.36|       26.16|        0.0|
|       2|    02

In [17]:
df.count()

1043585

In [18]:
df.dropna()

DataFrame[VendorID: int, tpep_pickup_datetime: string, tpep_dropoff_datetime: string, passenger_count: int, trip_distance: double, PULocationID: int, DOLocationID: int, fare_amount: double, extra: double, tip_amount: double, total_amount: double, airport_fee: double]

In [19]:
df.count()

1043585

In [20]:
df.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: string (nullable = true)
 |-- tpep_dropoff_datetime: string (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- airport_fee: double (nullable = true)



# Model Training

In [21]:
train_data,test_data=df.randomSplit([0.7,0.3])

In [22]:
from pyspark.ml.feature import StringIndexer
# Use StringIndexer to convert the categorical columns to hold numerical data
 
tpep_pickup_datetime_indexer = StringIndexer(inputCol='tpep_pickup_datetime',outputCol='tpep_pickup_datetime_index',handleInvalid='keep')
tpep_dropoff_datetime_indexer = StringIndexer(inputCol='tpep_dropoff_datetime',outputCol='tpep_dropoff_datetime_index',handleInvalid='keep')


In [23]:
df.columns

['VendorID',
 'tpep_pickup_datetime',
 'tpep_dropoff_datetime',
 'passenger_count',
 'trip_distance',
 'PULocationID',
 'DOLocationID',
 'fare_amount',
 'extra',
 'tip_amount',
 'total_amount',
 'airport_fee']

In [24]:
from pyspark.ml.feature import VectorAssembler
# Vector assembler is used to create a vector of input features
 
assembler = VectorAssembler(
    inputCols=[
        'passenger_count',
        'trip_distance',
        'airport_fee',
        'PULocationID',
        'DOLocationID',
        'tpep_dropoff_datetime_index',
        'tpep_pickup_datetime_index'
    ],
    outputCol="features"
)

In [25]:
from pyspark.ml import Pipeline

# Pipeline is used to pass the data through indexer and assembler simultaneously. Also, it helps to pre-rocess the test data
# in the same way as that of the train data
# https://spark.apache.org/docs/latest/ml-pipeline.html
 
pipe = Pipeline(stages=[
    tpep_dropoff_datetime_indexer,
    tpep_pickup_datetime_indexer,
    assembler
    ]
)

In [26]:
fitted_pipe=pipe.fit(train_data)

                                                                                

In [27]:
train_data=fitted_pipe.transform(train_data)
train_data.show(5)

+--------+--------------------+---------------------+---------------+-------------+------------+------------+-----------+-----+----------+------------+-----------+---------------------------+--------------------------+--------------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|PULocationID|DOLocationID|fare_amount|extra|tip_amount|total_amount|airport_fee|tpep_dropoff_datetime_index|tpep_pickup_datetime_index|            features|
+--------+--------------------+---------------------+---------------+-------------+------------+------------+-----------+-----+----------+------------+-----------+---------------------------+--------------------------+--------------------+
|       1|    02-01-2022 20:00|     02-01-2022 20:03|              1|          0.7|         142|          48|        5.0|  3.5|       0.0|         9.3|        0.0|                     4907.0|                    5681.0|[1.0,0.7,0.0,142....|
|       1|    02-01-2022 20:00|     02-0

In [28]:
test_data=fitted_pipe.transform(test_data)
test_data.show(5)

+--------+--------------------+---------------------+---------------+-------------+------------+------------+-----------+-----+----------+------------+-----------+---------------------------+--------------------------+--------------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|PULocationID|DOLocationID|fare_amount|extra|tip_amount|total_amount|airport_fee|tpep_dropoff_datetime_index|tpep_pickup_datetime_index|            features|
+--------+--------------------+---------------------+---------------+-------------+------------+------------+-----------+-----+----------+------------+-----------+---------------------------+--------------------------+--------------------+
|       1|    02-01-2022 20:00|     02-01-2022 20:03|              1|          0.9|         236|          43|        5.0|  3.5|       0.0|         9.3|        0.0|                     4907.0|                    5681.0|[1.0,0.9,0.0,236....|
|       1|    02-01-2022 20:00|     02-0

In [29]:
# For those interested in utilizing the ML/AI power of Tensorflow with Spark....
# https://github.com/tensorflow/ecosystem/tree/master/spark/spark-tensorflow-distributor

# In this course, we'll use the SparkML (admitedely, it's not as powerful as Tensorflow, but 
# it's easy to use and demonstrate ML on a Spark Cluster)

from pyspark.ml.regression import LinearRegression

lr_model = LinearRegression(labelCol='fare_amount')
fit_model = lr_model.fit(train_data.select(['features','fare_amount']))


                                                                                

In [30]:
results = fit_model.transform(test_data)
results.show()

+--------+--------------------+---------------------+---------------+-------------+------------+------------+-----------+-----+----------+------------+-----------+---------------------------+--------------------------+--------------------+------------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|PULocationID|DOLocationID|fare_amount|extra|tip_amount|total_amount|airport_fee|tpep_dropoff_datetime_index|tpep_pickup_datetime_index|            features|        prediction|
+--------+--------------------+---------------------+---------------+-------------+------------+------------+-----------+-----+----------+------------+-----------+---------------------------+--------------------------+--------------------+------------------+
|       1|    02-01-2022 20:00|     02-01-2022 20:03|              1|          0.9|         236|          43|        5.0|  3.5|       0.0|         9.3|        0.0|                     4907.0|                    5681.0|[1.0,

In [31]:
results.select(['fare_amount','prediction']).show()

+-----------+------------------+
|fare_amount|        prediction|
+-----------+------------------+
|        5.0| 7.325684472461832|
|        5.0| 7.335673906152495|
|        4.5|  6.76598532718908|
|        6.0| 7.206579152383525|
|        7.0| 7.598296551711231|
|       10.0|10.207571469921898|
|        4.5| 6.453174761357875|
|        7.0| 7.559089773335944|
|        8.5| 8.573937289638781|
|       10.0|10.496288230277898|
|       10.0|10.387890106126495|
|       12.0| 11.97284474056937|
|       14.5| 15.64280133404752|
|        8.5| 9.428280250884686|
|       12.5|14.079053693154249|
|        8.5| 8.076741987549134|
|       17.0| 19.26696476069771|
|       12.5|12.604092981100594|
|        4.0| 6.253722174207616|
|       13.0|14.111956840251924|
+-----------+------------------+
only showing top 20 rows



In [32]:
#Evaluate performance

In [33]:
test_results = fit_model.evaluate(test_data)

                                                                                

In [34]:
test_results.residuals.show()

+--------------------+
|           residuals|
+--------------------+
|  -2.325684472461832|
|  -2.335673906152495|
|   -2.26598532718908|
|  -1.206579152383525|
| -0.5982965517112309|
|-0.20757146992189845|
| -1.9531747613578752|
| -0.5590897733359439|
|-0.07393728963878132|
| -0.4962882302778979|
|-0.38789010612649477|
|0.027155259430630352|
| -1.1428013340475207|
| -0.9282802508846864|
| -1.5790536931542487|
|  0.4232580124508658|
|  -2.266964760697711|
|-0.10409298110059417|
|  -2.253722174207616|
| -1.1119568402519242|
+--------------------+
only showing top 20 rows



In [35]:
print(f"{'RMSE:':7s} {test_results.rootMeanSquaredError:>7.3f}")
print(f"{'Ex Var:':7s} {test_results.explainedVariance:>7.3f}")
print(f"{'MAE:':7s} {test_results.meanAbsoluteError:>7.3f}")
print(f"{'MSE:':7s} {test_results.meanSquaredError:>7.3f}")
print(f"{'RMSE:':7s} {test_results.rootMeanSquaredError:>7.3f}")
print(f"{'R2:':7s} {test_results.r2:>7.3f}")

RMSE:     5.617
Ex Var:  98.906
MAE:      2.085
MSE:     31.549
RMSE:     5.617
R2:       0.754


# Logistic Regression

Whether a taxi trip results in a tip or not. Here's a modified version of your code for logistic regression:

In [52]:
from pyspark.ml.classification import LogisticRegression

In [69]:
trip.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: string (nullable = true)
 |-- tpep_dropoff_datetime: string (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- airport_fee: double (nullable = true)



In [53]:
# Define categorical and numeric columns
categorical_columns = ['VendorID', 'PULocationID', 'DOLocationID']
numeric_columns = ['passenger_count', 'trip_distance', 'extra', 'airport_fee']

In [54]:
indexers = [StringIndexer(inputCol=col, outputCol=col + "_index", handleInvalid='keep') for col in categorical_columns]

In [55]:
# Use StringIndexer to convert the categorical columns to hold numerical data
VendorID_indexer=StringIndexer(inputCol='VenodorID', outputCol='VendorID_index',handleInvalid='keep')
PULocationID_indexer=StringIndexer(inputCol='PULocationID', outputCol='PULocationID_index',handleInvalid='keep')
DOLocationID_indexer=StringIndexer(inputCol='DOLocationID', outputCol='DOLocationID_index',handleInvalid='keep')

In [57]:
from pyspark.ml.feature import OneHotEncoder

In [58]:
data_encoder = OneHotEncoder(
    inputCols=[
        'VendorID_index',
        'PULocationID_index',
        'DOLocationID_index'
    ], 
    outputCols= [
        'VendorID_vec',
        'PULocationID_vec',
        'DOLocationID_vec'],
    handleInvalid='keep'
)

In [59]:
assembler = VectorAssembler(
    inputCols=[
        'VendorID_vec',
        'PULocationID_vec',
        'DOLocationID_vec'
        ],
    outputCol="features"
)

In [60]:
model=LogisticRegression(labelCol='tip_amount')

In [61]:
pipe = Pipeline(
    stages=[
        VendorID_indexer,
    PULocationID_indexer,
    DOLocationID_indexer,
        data_encoder,
        assembler,
        lr_model
    ]
)
  

In [63]:
# run the pipeline
fit_model=pipe.fit(train_data)

# Store the results in a dataframe
results = fit_model.transform(test_data)

Py4JJavaError: An error occurred while calling o519.fit.
: org.apache.spark.SparkException: Input column VenodorID does not exist.
	at org.apache.spark.ml.feature.StringIndexerBase.$anonfun$validateAndTransformSchema$2(StringIndexer.scala:128)
	at scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:293)
	at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
	at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
	at scala.collection.TraversableLike.flatMap(TraversableLike.scala:293)
	at scala.collection.TraversableLike.flatMap$(TraversableLike.scala:290)
	at scala.collection.mutable.ArrayOps$ofRef.flatMap(ArrayOps.scala:198)
	at org.apache.spark.ml.feature.StringIndexerBase.validateAndTransformSchema(StringIndexer.scala:123)
	at org.apache.spark.ml.feature.StringIndexerBase.validateAndTransformSchema$(StringIndexer.scala:115)
	at org.apache.spark.ml.feature.StringIndexer.validateAndTransformSchema(StringIndexer.scala:145)
	at org.apache.spark.ml.feature.StringIndexer.transformSchema(StringIndexer.scala:252)
	at org.apache.spark.ml.PipelineStage.transformSchema(Pipeline.scala:71)
	at org.apache.spark.ml.feature.StringIndexer.fit(StringIndexer.scala:237)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)


In [46]:
#Create Vector Assembler

In [47]:
assembler=VectorAssembler(
    inputCols=[
        'passenger_count',
        'trip_distance',
        'airport_fee',
        'extra',
        'VendorID_index',
        'PULocationID_index',
        'DOLocationID_index'
    ],
    outputCol="features"
)

In [50]:
pipeline=Pipeline(stages=[
    VendorID_indexer,
    PULocationID_indexer,
    DOLocationID_indexer,
    assembler
    ]
)

In [51]:
# run the pipeline
fit_model=pipeline.fit(train_data)


Py4JJavaError: An error occurred while calling o474.fit.
: org.apache.spark.SparkException: Input column VenodorID does not exist.
	at org.apache.spark.ml.feature.StringIndexerBase.$anonfun$validateAndTransformSchema$2(StringIndexer.scala:128)
	at scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:293)
	at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
	at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
	at scala.collection.TraversableLike.flatMap(TraversableLike.scala:293)
	at scala.collection.TraversableLike.flatMap$(TraversableLike.scala:290)
	at scala.collection.mutable.ArrayOps$ofRef.flatMap(ArrayOps.scala:198)
	at org.apache.spark.ml.feature.StringIndexerBase.validateAndTransformSchema(StringIndexer.scala:123)
	at org.apache.spark.ml.feature.StringIndexerBase.validateAndTransformSchema$(StringIndexer.scala:115)
	at org.apache.spark.ml.feature.StringIndexer.validateAndTransformSchema(StringIndexer.scala:145)
	at org.apache.spark.ml.feature.StringIndexer.transformSchema(StringIndexer.scala:252)
	at org.apache.spark.ml.PipelineStage.transformSchema(Pipeline.scala:71)
	at org.apache.spark.ml.feature.StringIndexer.fit(StringIndexer.scala:237)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)


In [None]:
# Fit and transform the training data
preprocessing_model = pipeline.fit(train_data)
train_data = preprocessing_model.transform(train_data)

# Fit and transform the testing data
test_data = preprocessing_model.transform(test_data)

# Logistic Regression
from pyspark.ml.classification import LogisticRegression

# Define the logistic regression model
lr = LogisticRegression(featuresCol="features", labelCol="tip_amount", maxIter=10)

# Fit the model to the training data
lr_model = lr.fit(train_data)

# Make predictions on the testing data
predictions = lr_model.transform(test_data)