In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

In [2]:
spark=SparkSession.builder\
    .appName("Cleaning")\
    .master("local[*]")\
    .getOrCreate()

In [4]:
# Define the file path
file_path = "D:/OneDrive/Venkat.My_projects/manju_task/Cleaned_Dataset/part-00000-599853f5-ad8e-490e-a389-34a8782609f0-c000/part-00000-599853f5-ad8e-490e-a389-34a8782609f0-c000.csv"

df = spark.read.option("header", True).option("inferSchema", True).csv(file_path)

# Display contents
df.show(5)
df.printSchema()

+-------+---------------+---------------+------+------------+---------------+--------------------+-------------+--------------------+----------+------+---------+
|trip_id|      starttime|       stoptime|bikeid|tripduration|from_station_id|   from_station_name|to_station_id|     to_station_name|  usertype|gender|birthyear|
+-------+---------------+---------------+------+------------+---------------+--------------------+-------------+--------------------+----------+------+---------+
|2354845|6/30/2014 20:36|6/30/2014 20:48|  2083|         679|            212|Wells St & Hubbar...|          301|Clark St & Schill...|Subscriber|  Male|     1993|
|2354683|6/30/2014 19:17|6/30/2014 19:31|  1735|         820|            144|Larrabee St & Web...|          229|Southport Ave & R...|Subscriber|  Male|     1985|
|2354680|6/30/2014 19:16|6/30/2014 19:25|  1282|         524|            100|Orleans St & Merc...|           92|Carpenter St & Hu...|Subscriber|  Male|     1984|
|2354660|6/30/2014 19:15|6/3

In [7]:
df_filtered = df.filter(col("birthyear") <2000)
df_filtered.show()


+-------+---------------+---------------+------+------------+---------------+--------------------+-------------+--------------------+----------+------+---------+
|trip_id|      starttime|       stoptime|bikeid|tripduration|from_station_id|   from_station_name|to_station_id|     to_station_name|  usertype|gender|birthyear|
+-------+---------------+---------------+------+------------+---------------+--------------------+-------------+--------------------+----------+------+---------+
|2354845|6/30/2014 20:36|6/30/2014 20:48|  2083|         679|            212|Wells St & Hubbar...|          301|Clark St & Schill...|Subscriber|  Male|     1993|
|2354683|6/30/2014 19:17|6/30/2014 19:31|  1735|         820|            144|Larrabee St & Web...|          229|Southport Ave & R...|Subscriber|  Male|     1985|
|2354680|6/30/2014 19:16|6/30/2014 19:25|  1282|         524|            100|Orleans St & Merc...|           92|Carpenter St & Hu...|Subscriber|  Male|     1984|
|2354660|6/30/2014 19:15|6/3

In [11]:
# Filter: trips longer than 30 minutes
long_trips = df.filter(col("tripduration") > 1800)
print(long_trips.show())
# Filter: trips by male users only
male_users = df.filter(col("gender") == "Male")

# Filter: users born after 1990
young_users = df.filter(col("birthyear") > 1990)

# Filter: subscribers only
Male = df.filter(col("usertype") == "Male")

+-------+---------------+---------------+------+------------+---------------+--------------------+-------------+--------------------+----------+-------+---------+
|trip_id|      starttime|       stoptime|bikeid|tripduration|from_station_id|   from_station_name|to_station_id|     to_station_name|  usertype| gender|birthyear|
+-------+---------------+---------------+------+------------+---------------+--------------------+-------------+--------------------+----------+-------+---------+
|2354000|6/30/2014 18:25|6/30/2014 18:55|  2666|        1832|             90|     Millennium Park|          114|Sheffield Ave & A...|Subscriber|   Male|     1985|
|2350282|6/30/2014 15:47|6/30/2014 16:19|  2773|        1953|            248|Woodlawn Ave & 55...|          328| Ellis Ave & 58th St|  Customer|Unknown|        0|
|2348932|6/30/2014 13:59|6/30/2014 14:38|  1229|        2348|            195|Columbus Dr & Ran...|           34|Cannon Dr & Fulle...|  Customer|Unknown|        0|
|2347593|6/30/2014 12:

In [13]:
# Average trip duration by gender
avg_trip_by_gender = df.groupBy("gender").agg(avg("tripduration").alias("avg_trip_duration"))

# Count of trips by usertype
trip_count_by_usertype = df.groupBy("usertype").agg(count("*").alias("trip_count"))

# Max and Min trip durations
max_min_trip = df.agg(
    max("tripduration").alias("max_duration"),
    min("tripduration").alias("min_duration")
)

# Number of trips from each station
from_station_trips = df.groupBy("from_station_name").count().orderBy("count", ascending=False)

# Average trip duration per year of birth
avg_trip_by_birthyear = df.groupBy("birthyear").agg(avg("tripduration").alias("avg_trip_duration")).orderBy("birthyear")
print(avg_trip_by_gender.show())
print(trip_count_by_usertype.show())
print(max_min_trip.show())
print(from_station_trips.show())
print(avg_trip_by_birthyear.show())

+-------+-----------------+
| gender|avg_trip_duration|
+-------+-----------------+
| Female|863.1073559966403|
|Unknown|1714.893411926553|
|   Male|694.0347427684115|
+-------+-----------------+

None
+----------+----------+
|  usertype|trip_count|
+----------+----------+
|Subscriber|    591711|
|  Customer|    313988|
+----------+----------+

None
+------------+------------+
|max_duration|min_duration|
+------------+------------+
|       86245|          60|
+------------+------------+

None
+--------------------+-----+
|   from_station_name|count|
+--------------------+-----+
|Streeter Dr & Ill...|19512|
|Lake Shore Dr & M...|15778|
| Theater on the Lake|15100|
|Clinton St & Wash...|14323|
|Michigan Ave & Oa...|13030|
|     Millennium Park|12156|
| Canal St & Adams St|11264|
|Canal St & Madiso...|11020|
|Lake Shore Dr & N...|10489|
|       Museum Campus| 9884|
|Columbus Dr & Ran...| 9293|
|Franklin St & Arc...| 9195|
|Dearborn St & Mon...| 8821|
|McClurg Ct & Illi...| 8703|
|Michigan