In [1]:
#!/usr/bin/python

# Intialization
import os
import sys

os.environ["SPARK_HOME"] = "/home/talentum/spark"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
# In below two lines, use /usr/bin/python2.7 if you want to use Python 2
os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3.6" 
os.environ["PYSPARK_DRIVER_PYTHON"] = "/usr/bin/python3"
sys.path.insert(0, os.environ["PYLIB"] +"/py4j-0.10.7-src.zip")
sys.path.insert(0, os.environ["PYLIB"] +"/pyspark.zip")

# NOTE: Whichever package you want mention here.
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.11:0.6.0 pyspark-shell' 
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-avro_2.11:2.4.0 pyspark-shell'
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.11:0.6.0,org.apache.spark:spark-avro_2.11:2.4.3 pyspark-shell'
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.11:0.6.0,org.apache.spark:spark-avro_2.11:2.4.0 pyspark-shell'23

In [2]:
#Entrypoint 2.x
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("HiveTableCreation").enableHiveSupport().getOrCreate()

# On yarn:
# spark = SparkSession.builder.appName("Spark SQL basic example").enableHiveSupport().master("yarn").getOrCreate()
# specify .master("yarn")

sc = spark.sparkContext

# Creating hive table 

In [3]:
spark.sql("CREATE DATABASE IF NOT EXISTS taxi_fare_prediction")

DataFrame[]

In [4]:
spark.sql("SHOW DATABASES").show()

+--------------------+
|        databaseName|
+--------------------+
|             default|
|             futurex|
|taxi_fare_prediction|
+--------------------+



In [5]:
spark.sql("DESCRIBE DATABASE EXTENDED taxi_fare_prediction").show(truncate=False)

+-------------------------+-----------------------------------------------------------------+
|database_description_item|database_description_value                                       |
+-------------------------+-----------------------------------------------------------------+
|Database Name            |taxi_fare_prediction                                             |
|Description              |                                                                 |
|Location                 |hdfs://localhost:9000/user/hive/warehouse/taxi_fare_prediction.db|
|Properties               |                                                                 |
+-------------------------+-----------------------------------------------------------------+



In [6]:
spark.sql("USE taxi_fare_prediction")

DataFrame[]

In [7]:
spark.sql("DROP TABLE IF EXISTS taxi_trips_data")
spark.sql("""

CREATE TABLE taxi_trips_data(
    VendorID INT,
    tpep_pickup_datetime TIMESTAMP,
    tpep_dropoff_datetime TIMESTAMP,
    passenger_count INT,
    trip_distance FLOAT,
    RatecodeID INT,
    store_and_fwd_flag STRING,
    PULocationID INT,
    DOLocationID INT,
    payment_type STRING,
    fare_amount FLOAT,    
    extra FLOAT,
    mta_tax FLOAT,
    tip_amount FLOAT,
    tolls_amount FLOAT,
    improvement_surcharge FLOAT,
    total_amount FLOAT,
    congestion_surcharge FLOAT
)

USING CSV

OPTIONS (

    header 'true',

    inferSchema 'true',

    delimiter ','

)

""")

DataFrame[]

In [8]:
df = spark.read.csv("Final_Project/taxi_data.csv", header=True, inferSchema=True)

# Display the DataFrame

df.show()

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+
|       2| 2019-05-01 00:05:26|  2019-05-01 00:05:34|              1|          0.0|         1|                 N|         193|         193|           2|        2.5|  0.5|    0.5|       0.0|         0.0|                  0.3

In [9]:
df.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)



In [10]:
row_count = df.count()
column_count = len(df.columns)
print('Shape: ',(row_count,column_count))

Shape:  (1681873, 18)


# Dropping nulls, columns

In [11]:
from pyspark.sql.functions import col
# Specifying irrelevant columns
columns_to_drop = ['VendorID','passenger_count', 'store_and_fwd_flag', 'RatecodeID', 'fare_amount', 'extra', 'mta_tax', 'tip_amount', 'tolls_amount', 'improvement_surcharge', 'payment_type', 'trip_type', 'congestion_surcharge']


# Drop the specified columns from the DataFrame
df_new = df.drop(*columns_to_drop)

print(df_new.count())

1681873


In [12]:
df_new = df_new.na.drop()
df_new = df_new.filter(~(col("total_amount") == "NaN"))

In [13]:
from pyspark.sql.functions import min, max

# Compute min and max
df_new.select(
    min("total_amount").alias("min_total"),
    max("total_amount").alias("max_total")
    
).show()

+---------+---------+
|min_total|max_total|
+---------+---------+
|   -300.8|  90000.0|
+---------+---------+



In [14]:
from pyspark.sql.functions import min, max

# Compute min and max
df_new.select(
    min("trip_distance").alias("min_total"),
    max("trip_distance").alias("max_total")
    
).show()

+---------+---------+
|min_total|max_total|
+---------+---------+
|    -29.2|    202.7|
+---------+---------+



In [15]:
# Filter rows where total_amount < 0
negative_total_amount_df = df_new.filter(df_new.total_amount <= 0)

# Count how many such rows exist
amount_negative = negative_total_amount_df.count()

print("Number of rows with total_amount <= 0:", amount_negative)

Number of rows with total_amount <= 0: 3714


In [16]:
# Filter rows where total_amount < 0
negative_trip_distance_df = df_new.filter(df_new.trip_distance <= 0)

# Count how many such rows exist
distance_negative = negative_trip_distance_df.count()

print("Number of rows with trip_distance <= 0:", distance_negative)

Number of rows with trip_distance <= 0: 15191


In [17]:
df_new = df_new[df_new['total_amount'] > 0]
df_new = df_new[df_new['trip_distance'] > 0]

In [18]:
df_new = df_new[~df_new['PULocationID'].isin([264, 265]) & ~df_new['DOLocationID'].isin([264, 265])]

In [19]:
df_new.count()

1644242

# Creating time category

In [20]:
from pyspark.sql.functions import hour, dayofmonth, month, when, col

# Extract hour, day, and month from pickup time
df_new = df_new.withColumn("hour", hour(col("tpep_pickup_datetime")))
df_new = df_new.withColumn("day", dayofmonth(col("tpep_pickup_datetime")))
df_new = df_new.withColumn("month", month(col("tpep_pickup_datetime")))

# Categorize time into parts of day (label)
df_new = df_new.withColumn(
    "time_category",
    when((col("hour") >= 0) & (col("hour") < 6), "Early Morning")
    .when((col("hour") >= 6) & (col("hour") < 12), "Morning")
    .when((col("hour") >= 12) & (col("hour") < 16), "Afternoon")
    .when((col("hour") >= 16) & (col("hour") < 20), "Evening")
    .otherwise("Night")
)

# Create time category encoding
df_new = df_new.withColumn(
    "time_category_encoded",
    when((col("hour") >= 0) & (col("hour") < 6), 1)
    .when((col("hour") >= 6) & (col("hour") < 12), 2)
    .when((col("hour") >= 12) & (col("hour") < 16), 3)
    .when((col("hour") >= 16) & (col("hour") < 20), 4)
    .otherwise(5)
)


In [21]:
df_new.filter(col("time_category_encoded").isNull()).count()

0

In [22]:
df_new = df_new[['time_category_encoded','month', 'day', 'PULocationID', 'DOLocationID', 'trip_distance', 'total_amount']]

In [23]:
df_new = df_new.orderBy("month","day","time_category_encoded")

In [24]:
df_new.show()

+---------------------+-----+---+------------+------------+-------------+------------+
|time_category_encoded|month|day|PULocationID|DOLocationID|trip_distance|total_amount|
+---------------------+-----+---+------------+------------+-------------+------------+
|                    1|    1|  1|         238|         244|         5.25|        17.3|
|                    1|    1|  1|          79|         129|         8.45|       33.06|
|                    1|    1|  1|         144|         261|          1.7|       11.15|
|                    1|    1|  1|          75|         229|         2.84|       13.56|
|                    1|    1|  1|         231|         162|          4.9|       17.91|
|                    1|    1|  1|         249|         186|          2.5|       15.96|
|                    1|    1|  1|         125|         238|         10.7|        47.3|
|                    1|    1|  1|         140|         262|          1.0|        7.82|
|                    1|    1|  1|         1

In [25]:
df_new.count()

1644242

In [26]:
import pandas as pd

In [27]:
preprocessed_data = df_new.toPandas()

In [28]:
preprocessed_data.head()

Unnamed: 0,time_category_encoded,month,day,PULocationID,DOLocationID,trip_distance,total_amount
0,1,1,1,238,151,0.7,8.5
1,1,1,1,79,263,4.1,18.3
2,1,1,1,107,229,1.7,11.8
3,1,1,1,142,238,1.8,12.95
4,1,1,1,140,262,1.0,7.82


In [29]:
preprocessed_data.to_csv("/home/talentum/preprocessed_data.csv", index=False)