# BIG DATA PROJECT - HADOOP HEROES

In [1]:
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf

In [2]:
from os.path import abspath
warehouse_location = abspath('spark-warehouse')
spark = SparkSession.builder \
        .master("local[*]") \
        .appName("ISM6562 Spark Project") \
        .enableHiveSupport() \
        .getOrCreate()

# Let's get the SparkContext object. It's the entry point to the Spark API. It's created when you create a sparksession
sc = spark.sparkContext  
sc.setLogLevel("ERROR") # only display errors (not warnings)

# note: If you have multiple spark sessions running (like from a previous notebook you've run), 
# this spark session webUI will be on a different port than the default (4040). One way to 
# identify this part is with the following line. If there was only one spark session running, 
# this will be 4040. If it's higher, it means there are still other spark sesssions still running.
spark_session_port = spark.sparkContext.uiWebUrl.split(":")[-1]
print("Spark Session WebUI Port: " + spark_session_port)

# It's best if you find that the port number displayed below is not 4040, then you should shut down all other spark sessions and 
# run this code again. If you don't, you may have trouble accessing the data in the spark-warehouse directory.

23/11/09 23:10:09 WARN Utils: Your hostname, localhost.localdomain resolves to a loopback address: 127.0.0.1; using 10.21.10.196 instead (on interface eth0)
23/11/09 23:10:09 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/11/09 23:10:10 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Spark Session WebUI Port: 4040


In [3]:
spark

In [4]:
# Import data from csv file

In [5]:
trip = spark.read.csv('data/yellow_tripdata_2022-02.csv', header=True, inferSchema=True);

# display the first 5 rows of the dataframe
trip.show(5);

                                                                                

+--------+--------------------+---------------------+---------------+-------------+------------+------------+-----------+-----+----------+------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|PULocationID|DOLocationID|fare_amount|extra|tip_amount|total_amount|airport_fee|
+--------+--------------------+---------------------+---------------+-------------+------------+------------+-----------+-----+----------+------------+-----------+
|       1|    02-01-2022 00:06|     02-01-2022 00:19|              1|          5.4|         138|         252|       17.0| 1.75|       3.9|       23.45|       1.25|
|       1|    02-01-2022 00:38|     02-01-2022 00:55|              1|          6.4|         138|          41|       21.0| 1.75|       0.0|        30.1|       1.25|
|       1|    02-01-2022 00:03|     02-01-2022 00:26|              1|         12.5|         138|         200|       35.5| 1.75|       0.0|        44.6|       1.25|
|       2|    02

In [6]:
trip.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: string (nullable = true)
 |-- tpep_dropoff_datetime: string (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- airport_fee: double (nullable = true)



In [7]:
#Creating the DB and table in spark

In [8]:
trip.createOrReplaceTempView("trip_tmp_view")

In [9]:
df = spark.sql("SELECT * FROM trip_tmp_view")
df.show(5)

+--------+--------------------+---------------------+---------------+-------------+------------+------------+-----------+-----+----------+------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|PULocationID|DOLocationID|fare_amount|extra|tip_amount|total_amount|airport_fee|
+--------+--------------------+---------------------+---------------+-------------+------------+------------+-----------+-----+----------+------------+-----------+
|       1|    02-01-2022 00:06|     02-01-2022 00:19|              1|          5.4|         138|         252|       17.0| 1.75|       3.9|       23.45|       1.25|
|       1|    02-01-2022 00:38|     02-01-2022 00:55|              1|          6.4|         138|          41|       21.0| 1.75|       0.0|        30.1|       1.25|
|       1|    02-01-2022 00:03|     02-01-2022 00:26|              1|         12.5|         138|         200|       35.5| 1.75|       0.0|        44.6|       1.25|
|       2|    02

In [10]:
type(trip)

pyspark.sql.dataframe.DataFrame

In [11]:
tables = spark.sql("show tables").show()

+---------+-------------+-----------+
|namespace|    tableName|isTemporary|
+---------+-------------+-----------+
|         |trip_tmp_view|       true|
+---------+-------------+-----------+



In [12]:
#Save table in spark data warehouse

In [13]:
spark.sql("CREATE DATABASE IF NOT EXISTS trip_db;")

DataFrame[]

In [14]:
trip.write.mode("overwrite").saveAsTable("trip_db.trip")

                                                                                

In [15]:
spark.catalog.listTables('trip_db')

[Table(name='trip', catalog='spark_catalog', namespace=['trip_db'], description=None, tableType='MANAGED', isTemporary=False),
 Table(name='trip_tmp_view', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True)]

In [16]:
df = spark.sql("SELECT * FROM trip_db.trip")
df.show(5)

+--------+--------------------+---------------------+---------------+-------------+------------+------------+-----------+-----+----------+------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|PULocationID|DOLocationID|fare_amount|extra|tip_amount|total_amount|airport_fee|
+--------+--------------------+---------------------+---------------+-------------+------------+------------+-----------+-----+----------+------------+-----------+
|       2|    02-01-2022 20:44|     02-01-2022 21:08|              1|         5.74|         107|           7|       20.5|  0.5|      4.86|       29.16|        0.0|
|       1|    02-01-2022 20:35|     02-01-2022 20:44|              1|          1.3|         230|         229|        7.0|  3.0|       0.0|        10.8|        0.0|
|       2|    02-01-2022 20:11|     02-01-2022 20:33|              1|         4.37|          79|         236|       18.0|  0.5|      4.36|       26.16|        0.0|
|       2|    02

In [17]:
# DATA EXPLORATION WITH SQL

In [18]:
type(trip)

pyspark.sql.dataframe.DataFrame

In [19]:
df = spark.sql("describe trip_db.trip")
df.show()

+--------------------+---------+-------+
|            col_name|data_type|comment|
+--------------------+---------+-------+
|            VendorID|      int|   NULL|
|tpep_pickup_datetime|   string|   NULL|
|tpep_dropoff_date...|   string|   NULL|
|     passenger_count|      int|   NULL|
|       trip_distance|   double|   NULL|
|        PULocationID|      int|   NULL|
|        DOLocationID|      int|   NULL|
|         fare_amount|   double|   NULL|
|               extra|   double|   NULL|
|          tip_amount|   double|   NULL|
|        total_amount|   double|   NULL|
|         airport_fee|   double|   NULL|
+--------------------+---------+-------+



In [None]:
### Data EXPLORATION

In [29]:
# Run the SQL query to find the vendor 1 trips
vendor1_df = spark.sql("SELECT COUNT(VendorID) as vendorID1 FROM trip_db.trip WHERE VendorID=1")
vendor1_df.show()

+---------+
|vendorID1|
+---------+
|   319007|
+---------+



In [30]:
vendor2_df = spark.sql("SELECT COUNT(VendorID) as vendorID2 FROM trip_db.trip WHERE VendorID=2")
vendor2_df.show()

+---------+
|vendorID2|
+---------+
|   724578|
+---------+



In [28]:
#Find the top 10 trips with highest tip amounts

top_10_trips_df=spark.sql("""
SELECT VendorID, fare_amount, tip_amount FROM (
SELECT *,ROW_NUMBER() OVER (Order by tip_amount DESC) as row_num from trip_db.trip)
where row_num<=10""")

top_10_trips_df.show()


+--------+-----------+----------+
|VendorID|fare_amount|tip_amount|
+--------+-----------+----------+
|       2|       52.0|    310.32|
|       2|        5.0|    151.25|
|       2|      130.0|     132.0|
|       2|       97.0|     125.0|
|       2|        8.5|     125.0|
|       2|       14.0|    111.11|
|       2|        2.5|     106.0|
|       2|      510.0|    102.06|
|       2|       14.5|     101.0|
|       2|        2.5|     100.0|
+--------+-----------+----------+



In [33]:
# Find the most popular drop-off location
most_popular_dropoff_location = spark.sql("""
    SELECT DOLocationID, COUNT(*) AS dropoff_count
    FROM trip_db.trip
    GROUP BY DOLocationID
    ORDER BY dropoff_count DESC
    LIMIT 1
""")

most_popular_dropoff_location.show()


+------------+-------------+
|DOLocationID|dropoff_count|
+------------+-------------+
|         236|        54140|
+------------+-------------+



In [34]:
# Find the most popular drop-off location
most_popular_pickup_location = spark.sql("""
    SELECT PULocationID, COUNT(*) AS pickup_count
    FROM trip_db.trip
    GROUP BY PULocationID
    ORDER BY pickup_count DESC
    LIMIT 1
""")

most_popular_pickup_location.show()


+------------+------------+
|PULocationID|pickup_count|
+------------+------------+
|         237|       55209|
+------------+------------+



In [38]:
from pyspark.sql.functions import hour, dayofweek

# Extract hour of the day from 'tpep_pickup_datetime' and 'tpep_dropoff_datetime'
df = df.withColumn('pickup_hour', hour(df['tpep_pickup_datetime']))
df = df.withColumn('dropoff_hour', hour(df['tpep_dropoff_datetime']))

# Extract day of the week from 'tpep_pickup_datetime' and 'tpep_dropoff_datetime'
df = df.withColumn('pickup_day_of_week', dayofweek(df['tpep_pickup_datetime']))
df = df.withColumn('dropoff_day_of_week', dayofweek(df['tpep_dropoff_datetime']))


AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `tpep_pickup_datetime` cannot be resolved. Did you mean one of the following? [`col_name`, `data_type`, `comment`].

# Model Training

In [None]:
train_data,test_data=df.randomSplit([0.7,0.3])

In [None]:
from pyspark.ml.feature import StringIndexer
# Use StringIndexer to convert the categorical columns to hold numerical data
 
tpep_pickup_datetime_indexer = StringIndexer(inputCol='tpep_pickup_datetime',outputCol='tpep_pickup_datetime_index',handleInvalid='keep')
tpep_dropoff_datetime_indexer = StringIndexer(inputCol='tpep_dropoff_datetime',outputCol='tpep_dropoff_datetime_index',handleInvalid='keep')


In [None]:
df.columns

In [None]:
from pyspark.ml.feature import VectorAssembler
# Vector assembler is used to create a vector of input features
 
assembler = VectorAssembler(
    inputCols=[
        'passenger_count',
        'trip_distance',
        'airport_fee',
        'PULocationID',
        'DOLocationID',
        'tpep_dropoff_datetime_index',
        'tpep_pickup_datetime_index'
    ],
    outputCol="features"
)

In [None]:
from pyspark.ml import Pipeline

# Pipeline is used to pass the data through indexer and assembler simultaneously. Also, it helps to pre-rocess the test data
# in the same way as that of the train data
# https://spark.apache.org/docs/latest/ml-pipeline.html
 
pipe = Pipeline(stages=[
    tpep_dropoff_datetime_indexer,
    tpep_pickup_datetime_indexer,
    assembler
    ]
)

In [None]:
fitted_pipe=pipe.fit(train_data)

In [None]:
train_data=fitted_pipe.transform(train_data)
train_data.show(5)

In [None]:
test_data=fitted_pipe.transform(test_data)
test_data.show(5)

In [None]:
# For those interested in utilizing the ML/AI power of Tensorflow with Spark....
# https://github.com/tensorflow/ecosystem/tree/master/spark/spark-tensorflow-distributor

# In this course, we'll use the SparkML (admitedely, it's not as powerful as Tensorflow, but 
# it's easy to use and demonstrate ML on a Spark Cluster)

from pyspark.ml.regression import LinearRegression

lr_model = LinearRegression(labelCol='fare_amount')
fit_model = lr_model.fit(train_data.select(['features','fare_amount']))


In [1]:
results = fit_model.transform(test_data)
results.show()

NameError: name 'fit_model' is not defined

In [None]:
results.select(['fare_amount','prediction']).show()

In [None]:
#Evaluate performance

In [None]:
test_results = fit_model.evaluate(test_data)

In [None]:
test_results.residuals.show()

In [None]:
print(f"{'RMSE:':7s} {test_results.rootMeanSquaredError:>7.3f}")
print(f"{'Ex Var:':7s} {test_results.explainedVariance:>7.3f}")
print(f"{'MAE:':7s} {test_results.meanAbsoluteError:>7.3f}")
print(f"{'MSE:':7s} {test_results.meanSquaredError:>7.3f}")
print(f"{'RMSE:':7s} {test_results.rootMeanSquaredError:>7.3f}")
print(f"{'R2:':7s} {test_results.r2:>7.3f}")

# Logistic Regression

Whether a taxi trip results in a tip or not. Here's a modified version of your code for logistic regression:

# DECISION TREE

In [None]:
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import VectorAssembler,StringIndexer
from pyspark.ml import Pipeline


In [None]:
# Use StringIndexer to convert the categorical columns to hold numerical data
VendorID_indexer=StringIndexer(inputCol='VendorID', outputCol='VendorID_index',handleInvalid='keep')
PULocationID_indexer=StringIndexer(inputCol='PULocationID', outputCol='PULocationID_index',handleInvalid='keep')
DOLocationID_indexer=StringIndexer(inputCol='DOLocationID', outputCol='DOLocationID_index',handleInvalid='keep')

In [None]:
assembler = VectorAssembler(
    inputCols=[
        'VendorID_index',
        'PULocationID_index',
        'DOLocationID_index',
        'passenger_count', 'trip_distance', 'extra', 'airport_fee'
    ],
    outputCol="features_dtree"
)

In [None]:
dt_model = DecisionTreeClassifier(labelCol='TIP_STATUS',maxBins=16000)

In [None]:
pipe = Pipeline(
    stages=[
        VendorID_indexer,
        PULocationID_indexer,
        DOLocationID_indexer,
        assembler,
        dt_model
    ]
)

In [None]:
fit_model=pipe.fit(train_data)

In [None]:
results = fit_model.transform(test_data)

In [None]:
results.select(['TIP_STATUS','prediction']).show()

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [None]:
ACC_evaluator = MulticlassClassificationEvaluator(
    labelCol="TIP_STATUS", predictionCol="prediction", metricName="accuracy")

accuracy = ACC_evaluator.evaluate(results)

print(f"The accuracy of the decision tree classifier is {accuracy}")