# ** New York Taxis Data Analysis**

In [1]:
# Importing libraries
from pyspark.sql import SparkSession
from pyspark.sql import functions as f
from pyspark.sql.functions import avg, col, count, round, when

In [2]:
# Create a Spark session
spark = SparkSession.builder.appName("MCD-G1").getOrCreate()

25/08/30 02:06:05 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


1. Load the Datased

In [3]:
# Read the data
df = spark.read.parquet("gs://mcd_procesamiento_disrtibuido/zone=landing/src=new_yorktaxi")

                                                                                

2. Look at the schema

In [4]:
# Print the schema
df.printSchema()

root
 |-- vendor_id: string (nullable = true)
 |-- pickup_datetime: timestamp_ntz (nullable = true)
 |-- dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: decimal(38,9) (nullable = true)
 |-- rate_code: string (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- fare_amount: decimal(38,9) (nullable = true)
 |-- extra: decimal(38,9) (nullable = true)
 |-- mta_tax: decimal(38,9) (nullable = true)
 |-- tip_amount: decimal(38,9) (nullable = true)
 |-- tolls_amount: decimal(38,9) (nullable = true)
 |-- imp_surcharge: decimal(38,9) (nullable = true)
 |-- airport_fee: decimal(38,9) (nullable = true)
 |-- total_amount: decimal(38,9) (nullable = true)
 |-- pickup_location_id: string (nullable = true)
 |-- dropoff_location_id: string (nullable = true)
 |-- data_file_year: long (nullable = true)
 |-- data_file_month: long (nullable = true)



3. Check the amount of records

In [5]:
# Check the amount of records
df.count()

                                                                                

102871376

4. Total Trip Duration

In [6]:
# Add new column trip_duration_minutes
df = df.withColumn(
    "trip_duration_minutes",
    f.round((f.unix_timestamp("dropoff_datetime") - f.unix_timestamp("pickup_datetime")) / 60)
)

# Print trip_duration_minutes column
df.select("trip_duration_minutes").show(10)

                                                                                

+---------------------+
|trip_duration_minutes|
+---------------------+
|                  0.0|
|                  0.0|
|                  1.0|
|                  0.0|
|                  0.0|
|                  0.0|
|                  0.0|
|                  0.0|
|                  1.0|
|                  6.0|
+---------------------+
only showing top 10 rows



5. Description of the payment type

In [7]:
# Map payment_type codes to descriptions
df = df.withColumn(
    "payment_type_desc",
    when(col("payment_type") == "1", "Credit Card")
    .when(col("payment_type") == "2", "Cash")
    .when(col("payment_type") == "3", "No Charge")
    .when(col("payment_type") == "4", "Dispute")
    .when(col("payment_type") == "5", "Unknown")
    .when(col("payment_type") == "6", "Voided Trip")
    .otherwise("Other"))

# Print payment_type_desc column
df.select("payment_type_desc").show(10)

+-----------------+
|payment_type_desc|
+-----------------+
|      Credit Card|
|      Credit Card|
|      Credit Card|
|      Credit Card|
|             Cash|
|      Credit Card|
|      Credit Card|
|             Cash|
|             Cash|
|      Credit Card|
+-----------------+
only showing top 10 rows




[Stage 5:>                                                          (0 + 1) / 1]

                                                                                

6. Amount of travels per Vendor by month

In [None]:
# Aggregate the trip count per vendor per month
trips_per_vendor_month = df.groupBy("vendor_id", "data_file_year", "data_file_month").agg(count("*").alias("trip_count"))

print("=== Amount of travels per vendor by month ===")
trips_per_vendor_month.show(truncate=False)

=== Número de Viajes por Proveedor por Mes ===


[Stage 8:>                                                          (0 + 1) / 1]

+---------+--------------+---------------+----------+
|vendor_id|data_file_year|data_file_month|trip_count|
+---------+--------------+---------------+----------+
|2        |2018          |8              |4700433   |
|2        |2018          |7              |4683654   |
|5        |2018          |8              |44        |
|1        |2018          |8              |3131523   |
|1        |2018          |7              |3163910   |
|4        |2018          |7              |3578      |
|4        |2018          |8              |23039     |
|1        |2018          |9              |3211746   |
|1        |2018          |1              |3846134   |
|4        |2018          |9              |79452     |
|2        |2018          |6              |5013605   |
|2        |2018          |9              |4757772   |
|2        |2018          |1              |4914553   |
|4        |2018          |6              |1105      |
|5        |2018          |9              |123       |
|1        |2018          |6 


                                                                                

7. Average Ticket

In [None]:
# Calculate overall average ticket
average_ticket_total = df.agg(round(avg("total_amount"), 2).alias("average_total_ticket"))

print("=== Average Ticket  ===")
average_ticket_total.show()

=== Ticket Promedio Total ===




+--------------------+
|average_total_ticket|
+--------------------+
|               16.43|
+--------------------+




                                                                                

8. Average Ticket Grouped By Payment Type

In [None]:
# Calculate average ticket grouped by payment_type_desc
average_ticket_by_payment = df.groupBy("payment_type_desc").agg(round(avg("total_amount"), 2).alias("average_total_ticket"))

print("=== Average Ticket Grouped By Payment Type ===")
average_ticket_by_payment.show(truncate=False)

=== Ticket Promedio por Medio de Pago ===




+-----------------+--------------------+
|payment_type_desc|average_total_ticket|
+-----------------+--------------------+
|Credit Card      |17.70               |
|Cash             |13.40               |
|Dispute          |12.75               |
|No Charge        |18.82               |
|Other            |48.34               |
|Unknown          |32.51               |
+-----------------+--------------------+




                                                                                