# PySpark_Project

## Setup
### Java

#### Checking Java version

In [1]:
!java -version

java version "19.0.2" 2023-01-17
Java(TM) SE Runtime Environment (build 19.0.2+7-44)
Java HotSpot(TM) 64-Bit Server VM (build 19.0.2+7-44, mixed mode, sharing)


#### Setting up Java environment

In [1]:
import os
os.environ["JAVA_HOME"] = r"C:\Progra~1\Java\jdk-18.0.2.1"
os.environ["JAVA_PATH"] = r"C:\Progra~1\Java\jdk-18.0.2.1\bin"
os.environ["SPARK_HOME"] = r"D:\School\Big_Data_Analysis\spark-3.3.2-bin-hadoop3"
os.environ["HADOOP_HOME"] = r"D:\School\Big_Data_Analysis\hadoop-3.3.0"

### Spark
#### Downloading

In [3]:
!pip install findspark pyspark



#### Creating Spark Session

In [2]:
import findspark
findspark.init()
findspark.find()

'D:\\School\\Big_Data_Analysis\\spark-3.3.2-bin-hadoop3'

In [3]:
import pyspark
print(pyspark.__version__)
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("NYC_Taxi_Analysis") \
    .getOrCreate()
#    .config("spark.driver.memory", "20g") \
#    .config("spark.executor.memory", "20g") \


3.3.2


#### Stopping Session

In [35]:
spark.stop()

#### Creating Spark Context

In [6]:
from pyspark import SparkConf, SparkContext

conf = SparkConf() # .setAll([('spark.driver.memory', '8g'), ('spark.executor.memory', '8g')]) # Set memory to 8GB
conf.setMaster('local')
conf.setAppName('BD_Project')
sc = SparkContext(conf=conf)

ValueError: Cannot run multiple SparkContexts at once; existing SparkContext(app=NYC_Taxi_Analysis, master=local[*]) created by getOrCreate at C:\Users\mark9\AppData\Local\Temp\ipykernel_17256\1437898141.py:5 

## Run

#### Load Data

In [23]:
# Read multiple CSV files into separate DataFrames
df1 = spark.read.format("csv").option("header", "true").load("data/yellow_tripdata_2015-01.csv")
df2 = spark.read.format("csv").option("header", "true").load("data/yellow_tripdata_2016-01.csv")
df3 = spark.read.format("csv").option("header", "true").load("data/yellow_tripdata_2016-02.csv")
df4 = spark.read.format("csv").option("header", "true").load("data/yellow_tripdata_2016-03.csv")

# Combine the DataFrames into a single DataFrame
taxi_df = df1.union(df2).union(df3).union(df4)

In [24]:
# Show the combined DataFrame
#taxi_df.describe().show()
taxi_df.select('trip_distance').show()

+-------------+
|trip_distance|
+-------------+
|         1.59|
|         3.30|
|         1.80|
|          .50|
|         3.00|
|         9.00|
|         2.20|
|          .80|
|        18.20|
|          .90|
|          .90|
|         1.10|
|          .30|
|         3.10|
|         1.10|
|         2.38|
|         2.83|
|         8.33|
|         2.37|
|         7.13|
+-------------+
only showing top 20 rows



In [25]:
taxi_df.printSchema()

root
 |-- VendorID: string (nullable = true)
 |-- tpep_pickup_datetime: string (nullable = true)
 |-- tpep_dropoff_datetime: string (nullable = true)
 |-- passenger_count: string (nullable = true)
 |-- trip_distance: string (nullable = true)
 |-- pickup_longitude: string (nullable = true)
 |-- pickup_latitude: string (nullable = true)
 |-- RateCodeID: string (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- dropoff_longitude: string (nullable = true)
 |-- dropoff_latitude: string (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- fare_amount: string (nullable = true)
 |-- extra: string (nullable = true)
 |-- mta_tax: string (nullable = true)
 |-- tip_amount: string (nullable = true)
 |-- tolls_amount: string (nullable = true)
 |-- improvement_surcharge: string (nullable = true)
 |-- total_amount: string (nullable = true)



In [26]:
from pyspark.sql.functions import avg

# Group the data by passenger count and calculate the average trip distance
average_distance_by_passenger_count = (
    taxi_df \
    .groupBy("passenger_count")
    .agg(avg("trip_distance").alias("avg_distance"))
    .orderBy("passenger_count")
)

# Show the results
average_distance_by_passenger_count.show()

+---------------+------------------+
|passenger_count|      avg_distance|
+---------------+------------------+
|              0|2.1291575359142922|
|              1|  7.65459483526611|
|              2| 9.357806903264088|
|              3| 9.161549450371313|
|              4| 5.992690105129649|
|              5| 2.954346307893686|
|              6| 2.878429682825396|
|              7| 3.545061728395062|
|              8| 5.225897435897437|
|              9|4.7351470588235305|
+---------------+------------------+



In [45]:
from pyspark.sql.functions import col, round, avg, sqrt, count

# Filter out trips with invalid coordinates and passenger counts
df = taxi_df.filter(col('pickup_longitude').isNotNull()) \
            .filter(col('pickup_latitude').isNotNull()) \
            .filter(col('dropoff_longitude').isNotNull()) \
            .filter(col('dropoff_latitude').isNotNull()) \
            .filter(col('trip_distance').isNotNull())

# Calculate the trip distance and duration
df = df.withColumn('trip_euclidean_distance', round(sqrt((col('dropoff_longitude') - col('pickup_longitude'))**2 + (col('dropoff_latitude') - col('pickup_latitude'))**2), 2)) \
       .withColumn('trip_duration', unix_timestamp(col("tpep_dropoff_datetime"), "yyyy-MM-dd HH:mm:ss") - unix_timestamp(col("tpep_pickup_datetime"), "yyyy-MM-dd HH:mm:ss"))

# Group the data by passenger count and calculate average trip distance
average_distance_by_passenger_count = df.groupBy("passenger_count") \
                                        .agg(avg("trip_distance").alias("avg_distance_MILES"))

# Group the data by passenger count and calculate the average euclidean_distance
average_euclidean_distance_by_passenger_count = df.groupBy('passenger_count') \
                                            .agg(avg('trip_euclidean_distance') \
                                            .alias('avg_euclidean_distance'), count('*').alias('num_trips'))

# Group the data by passenger count and calculate the average trip time
trip_duration_by_passenger_count = df.groupBy('passenger_count') \
                                     .agg(avg('trip_duration') \
                                     .alias('avg_duration'))

# Join the two DataFrames on passenger count and calculate the average trip time in minutes
result = average_euclidean_distance_by_passenger_count.join(trip_duration_by_passenger_count, 'passenger_count') \
            .join(average_distance_by_passenger_count, 'passenger_count') \
            .withColumn('avg_duration_minutes', round(col('avg_duration') / 60, 2)) \
            .select('passenger_count', 'avg_distance_MILES', 'avg_euclidean_distance', 'avg_duration_minutes', 'num_trips') \
            .orderBy('passenger_count')

result.show()

+---------------+------------------+----------------------+--------------------+---------+
|passenger_count|avg_distance_MILES|avg_euclidean_distance|avg_duration_minutes|num_trips|
+---------------+------------------+----------------------+--------------------+---------+
|              0|2.1291575359142922|     5.380060871682492|               18.04|     8214|
|              1|  7.65459483526611|    0.3851992190090032|               14.82| 33537914|
|              2| 9.357806903264088|    0.3460562562597229|               15.57|  6719430|
|              3| 9.161549450371313|    0.3182944332218391|               15.92|  1912291|
|              4| 5.992690105129649|   0.41554308932569695|               15.89|   911351|
|              5| 2.954346307893686|   0.10893346292222877|               16.62|  2551660|
|              6| 2.878429682825396|    0.1029063764571677|               16.36|  1607758|
|              7| 3.545061728395062|     17.76185185185185|               11.46|       81|