In [None]:
!pip install pyspark




In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("CODTECH_BIG_DATA_TASK").getOrCreate()

df = spark.read.csv("/content/yellow_tripdata_2015-01.csv", header=True, inferSchema=True)
print("Data Loaded Successfully")
df.show(5)


Data Loaded Successfully
+--------+--------------------+---------------------+---------------+-------------+-------------------+------------------+----------+------------------+-------------------+------------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|   pickup_longitude|   pickup_latitude|RateCodeID|store_and_fwd_flag|  dropoff_longitude|  dropoff_latitude|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|
+--------+--------------------+---------------------+---------------+-------------+-------------------+------------------+----------+------------------+-------------------+------------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+
|       2| 2015-01-15 19:05:39|  2015-01-15 19:23:42|              1|         1.59|   -73.993896484375|40

In [None]:
df_clean = df.dropna()
print("Rows after cleaning:", df_clean.count())


Rows after cleaning: 14033457


In [None]:
print("Total Rows:", df_clean.count())


Total Rows: 14033457


In [None]:
from pyspark.sql.functions import avg, col, when
from pyspark.sql.types import DoubleType

df_clean2 = df_clean.withColumn(
    "fare_amount_clean",
    when(col("fare_amount").rlike(r"^-?\d+(\.\d+)?$"), col("fare_amount").cast(DoubleType()))
    .otherwise(None)
)

df_clean2.select(avg("fare_amount_clean").alias("Average Fare")).show()


df_clean.select(avg(when(col("fare_amount").rlike(r"^-?\d*\.?\d+$"), col("fare_amount").cast(DoubleType())).otherwise(None))).show()

+-------------------+
|       Average Fare|
+-------------------+
|6.951980371939585E7|
+-------------------+

+-----------------------------------------------------------------------------------------------+
|avg(CASE WHEN RLIKE(fare_amount, ^-?\d*\.?\d+$) THEN CAST(fare_amount AS DOUBLE) ELSE NULL END)|
+-----------------------------------------------------------------------------------------------+
|                                                                            6.951974427240919E7|
+-----------------------------------------------------------------------------------------------+



In [None]:
df_final = df_clean2.dropna(subset=["fare_amount_clean"])


In [None]:
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import col

df_final = df_final.withColumn(
    "total_amount_clean",
    col("total_amount").cast(DoubleType())
)



In [None]:
from pyspark.sql.functions import col, when, regexp_replace
from pyspark.sql.types import DoubleType

df_final = df_final.withColumn(
    "total_amount_clean",
    when(col("total_amount").rlike(r"^-?\d+(\.\d+)?$"), col("total_amount").cast(DoubleType()))
    .otherwise(None)
)



In [None]:
df_final.groupBy("payment_type") \
    .sum("total_amount_clean") \
    .orderBy("sum(total_amount_clean)", ascending=False) \
    .show()


+-------------------+-----------------------+
|       payment_type|sum(total_amount_clean)|
+-------------------+-----------------------+
|                  1|   7.001692323764845E15|
|                  2|   4.985202414510723E13|
|                  3|     501605.69696577213|
|                  4|     137494.37381088323|
|                  N|                 139.32|
|                  5|      54.34564743041992|
|-73.782073974609375|                   52.0|
|                  6|        40.808837890625|
|-73.941566467285156|                   30.0|
|              14375|                   21.0|
|-73.958961486816406|                   18.0|
|-74.007865905761719|                   16.5|
|-73.982139587402344|                   15.0|
|  1.986671447753906|                   12.5|
|-73.971481323242187|                   10.0|
|-73.962623596191406|                    9.5|
|     18685607910156|                    9.5|
|-73.954498291015625|                    9.0|
|              46875|             