In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

In [2]:
spark=SparkSession.builder\
    .appName("Cleaning")\
    .master("local[*]")\
    .getOrCreate()

In [20]:
# Define the file path
file_path = "D:/OneDrive/Venkat.My_projects/manju_task/TASK_1/Divvy_Trips_2014_Q1Q2.csv"

df = spark.read.option("header", True).option("inferSchema", True).csv(file_path)

# Display contents
df.show(5)
df.printSchema()

+-------+---------------+---------------+------+------------+---------------+--------------------+-------------+--------------------+----------+------+---------+
|trip_id|      starttime|       stoptime|bikeid|tripduration|from_station_id|   from_station_name|to_station_id|     to_station_name|  usertype|gender|birthyear|
+-------+---------------+---------------+------+------------+---------------+--------------------+-------------+--------------------+----------+------+---------+
|2355134|6/30/2014 23:57|  7/1/2014 0:07|  2006|         604|            131|Lincoln Ave & Bel...|          303|Broadway & Cornel...|Subscriber|  Male|     1988|
|2355133|6/30/2014 23:56|  7/1/2014 0:00|  2217|         263|            282|Halsted St & Maxw...|           22|  May St & Taylor St|Subscriber|  Male|     1992|
|2355130|6/30/2014 23:33|6/30/2014 23:35|  2798|         126|            327|Sheffield Ave & W...|          225|Halsted St & Dick...|Subscriber|  Male|     1993|
|2355129|6/30/2014 23:26|  7

In [21]:
from pyspark.sql.functions import col, when, count, isnan

# Separate numeric and non-numeric columns
numeric_cols = [f.name for f in df.schema.fields if str(f.dataType) in ('DoubleType', 'IntegerType', 'LongType', 'FloatType')]

# For numeric: check both isnan() and isNull()
# For others: only check isNull()
missing_counts = df.select([
    count(when(isnan(c) | col(c).isNull(), c)).alias(c) if c in numeric_cols
    else count(when(col(c).isNull(), c)).alias(c)
    for c in df.columns
])

missing_counts.show(truncate=False)


+-------+---------+--------+------+------------+---------------+-----------------+-------------+---------------+--------+------+---------+
|trip_id|starttime|stoptime|bikeid|tripduration|from_station_id|from_station_name|to_station_id|to_station_name|usertype|gender|birthyear|
+-------+---------+--------+------+------------+---------------+-----------------+-------------+---------------+--------+------+---------+
|0      |0        |0       |0     |0           |0              |0                |0            |0              |0       |314022|313977   |
+-------+---------+--------+------+------------+---------------+-----------------+-------------+---------------+--------+------+---------+



In [23]:
required_cols = ["trip_id", "starttime", "stoptime","tripduration"]
df_cleaned = df.dropna(subset=required_cols)


In [25]:
df_cleaned = df_cleaned.fillna({
    "usertype": "Unknown",
    "gender": "Unknown",
    "birthyear": 0
})


In [26]:
df_cleaned = df_cleaned.dropDuplicates()
print(f"Rows after removing duplicates and missing values: {df_cleaned.count()}")


Rows after removing duplicates and missing values: 905699


In [27]:
output="D:/OneDrive/Venkat.My_projects/manju_task/manju_cleaned_data"

In [28]:
df_cleaned.coalesce(1).write.mode("overwrite").option("header", True).csv(output)
print("✅ Cleaned CSV saved at:", output)

✅ Cleaned CSV saved at: D:/OneDrive/Venkat.My_projects/manju_task/manju_cleaned_data
