In [1]:
# import os

# os.environ['SPARK_HOME'] = "/Users/jaychavda/Downloads/spark-3.5.3-bin-hadoop3"
# os.environ['PYSPARK_DRIVER_PYTHON'] = "jupyter"
# os.environ['PYSPARK_DRIVER_PYTHON_OPTS'] = "lab"
# os.environ['PYSPARK_PYTHON'] = "python"


In [29]:
from pyspark.sql import SparkSession

In [30]:

spark = SparkSession.builder \
    .appName("pipeline1") \
    .getOrCreate()


In [31]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, count
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType, DateType
import shutil
import os


nifi_compressed_path = "/Users/jaychavda/Downloads/eco-counter-data.csv"

# Defining schema explicitly
schema = StructType([
    StructField("date", TimestampType(), True),
    StructField("day_of_week", StringType(), True),
    StructField("id", StringType(), True),
    StructField("name", StringType(), True),
    StructField("counts", IntegerType(), True),
    StructField("coordinates", StringType(), True),
    StructField("site_active", StringType(), True),
    StructField("user_type", StringType(), True),
    StructField("installation_date", DateType(), True),
    StructField("photos", StringType(), True),
    StructField("status", StringType(), True),
    StructField("site_owner", StringType(), True)
])


In [32]:

# Load the data as a batch
batch_df = spark.read \
    .format("csv") \
    .option("header", True) \
    .schema(schema) \
    .option("samplingRatio", 0.1) \
    .load(nifi_compressed_path)

# Filter Active Sites Only
active_sites_df = batch_df.filter(col("site_active") == "Yes")

In [33]:
batch_df.show()

+-------------------+-----------+---------+--------------------+------+--------------------+-----------+----------+-----------------+--------------------+------+----------+
|               date|day_of_week|       id|                name|counts|         coordinates|site_active| user_type|installation_date|              photos|status|site_owner|
+-------------------+-----------+---------+--------------------+------+--------------------+-----------+----------+-----------------+--------------------+------+----------+
|2024-11-09 10:30:00|   Saturday|100013346|CRY_LKP_W (Cary L...|     0|35.74969, -78.813619|        Yes|   bicycle|       2021-02-11|https://filer.eco...|   raw|      Cary|
|2024-10-26 09:30:00|   Saturday|100034841|           RAL_HAR_S|     0|35.777953, -78.62...|        Yes| undefined|       2018-05-31|                NULL|   raw|      Cary|
|2024-11-09 15:30:00|   Saturday|100013346|CRY_LKP_W (Cary L...|     0|35.74969, -78.813619|        Yes|   bicycle|       2021-02-11|ht

24/11/25 00:51:28 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: Date, Day of Week, Site ID, Site Name, Counts, Coordinates, Site In Use, Traffic Type, Installation Date, Photo, Status, Site Owner
 Schema: date, day_of_week, id, name, counts, coordinates, site_active, user_type, installation_date, photos, status, site_owner
Expected: day_of_week but found: Day of Week
CSV file: file:///Users/jaychavda/Downloads/eco-counter-data.csv


In [34]:
# Average Counts Per Day of Week
average_counts_per_day_df = batch_df.groupBy("day_of_week") \
    .agg(avg("counts").alias("average_counts"))

# Status Analysis
status_counts_df = batch_df.groupBy("status").count()

# User Type Metrics (Active Users Per User Type)
user_type_metrics_df = active_sites_df.groupBy("user_type") \
    .agg(count("*").alias("active_user_count"))

local_csv_output_path = "/Users/jaychavda/Downloads/output_pipeline1"

In [35]:
user_type_metrics_df = active_sites_df.groupBy("user_type") \
    .agg(count("*").alias("active_user_count"))

In [36]:
user_type_metrics_df.show()

+----------+-----------------+
| user_type|active_user_count|
+----------+-----------------+
|pedestrian|           119117|
|       car|            39947|
| undefined|           130692|
|   bicycle|            80398|
+----------+-----------------+



24/11/25 00:51:33 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: Site In Use, Traffic Type
 Schema: site_active, user_type
Expected: site_active but found: Site In Use
CSV file: file:///Users/jaychavda/Downloads/eco-counter-data.csv


In [57]:
from pyspark.sql.functions import col, sum

spark.sparkContext.setLogLevel("ERROR")

batch_df1 = batch_df.withColumn("counts", col("counts").cast("int"))

user_type_per_day_metrics_df = batch_df1.groupBy(["user_type", "day_of_week"]) \
    .agg(sum("counts").alias("active_user_count_daywise"))



+----------+-----------+-------------------------+
| user_type|day_of_week|active_user_count_daywise|
+----------+-----------+-------------------------+
|   bicycle|  Wednesday|                    28123|
|   bicycle|   Thursday|                    25684|
|       car|     Monday|                    41109|
|       car|  Wednesday|                    46965|
|pedestrian|  Wednesday|                   212461|
|pedestrian|   Thursday|                   218912|
|   bicycle|     Sunday|                    50564|
|pedestrian|     Sunday|                   364245|
|       car|   Saturday|                    59745|
|       car|     Sunday|                    54591|
|   bicycle|    Tuesday|                    27899|
|pedestrian|   Saturday|                   348016|
|   bicycle|     Monday|                    26834|
|   bicycle|   Saturday|                    44969|
|   bicycle|     Friday|                    26385|
|pedestrian|     Friday|                   227523|
|       car|   Thursday|       

In [52]:
user_type_per_day_metrics_df = active_sites_df.groupBy(["user_type","day_of_week"]) \
    .agg(count("*").alias("active_user_count_daywise"))
user_type_per_day_metrics_df.show()

24/11/25 01:02:48 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: Day of Week, Site In Use, Traffic Type
 Schema: day_of_week, site_active, user_type
Expected: day_of_week but found: Day of Week
CSV file: file:///Users/jaychavda/Downloads/eco-counter-data.csv


+----------+-----------+-------------------------+
| user_type|day_of_week|active_user_count_daywise|
+----------+-----------+-------------------------+
|   bicycle|  Wednesday|                    11421|
| undefined|     Friday|                    18702|
|   bicycle|   Thursday|                    11552|
| undefined|     Sunday|                    18706|
|       car|     Monday|                     5684|
|       car|  Wednesday|                     5696|
|pedestrian|  Wednesday|                    16945|
| undefined|   Thursday|                    18591|
|pedestrian|   Thursday|                    17050|
| undefined|    Tuesday|                    18728|
|   bicycle|     Sunday|                    11520|
|pedestrian|     Sunday|                    16960|
|       car|   Saturday|                     5700|
|       car|     Sunday|                     5688|
|   bicycle|    Tuesday|                    11535|
|pedestrian|   Saturday|                    16947|
|   bicycle|     Monday|       

In [39]:
print(user_type_per_day_metrics_df.count())

24/11/25 00:51:51 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: Day of Week, Traffic Type
 Schema: day_of_week, user_type
Expected: day_of_week but found: Day of Week
CSV file: file:///Users/jaychavda/Downloads/eco-counter-data.csv
[Stage 25:=====>                                                   (1 + 9) / 10]

35


                                                                                

In [40]:
user_type_per_day_metrics_df.coalesce(1).write \
    .mode("overwrite") \
    .option("header", True) \
    .csv(f"{local_csv_output_path}/user_type_per_day_metrics")

24/11/25 00:51:52 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: Day of Week, Counts, Traffic Type
 Schema: day_of_week, counts, user_type
Expected: day_of_week but found: Day of Week
CSV file: file:///Users/jaychavda/Downloads/eco-counter-data.csv


In [41]:
# Rename the output CSV file to your desired custom name
temp_output_folder = f"{local_csv_output_path}/user_type_per_day_metrics"
custom_output_file = f"{local_csv_output_path}/user_type_per_day_metrics.csv"

# Move the part file to the desired name
for filename in os.listdir(temp_output_folder):
    if filename.endswith(".csv"):
        shutil.move(os.path.join(temp_output_folder, filename), custom_output_file)

# Remove the temporary folder after renaming the file
shutil.rmtree(temp_output_folder)

In [42]:

# Save Average Counts Per Day to a Single CSV File
average_counts_per_day_df.coalesce(1).write \
    .mode("overwrite") \
    .option("header", True) \
    .csv(f"{local_csv_output_path}/average_counts_per_day_temp")

# Rename the output CSV file to your desired custom name
temp_output_folder = f"{local_csv_output_path}/average_counts_per_day_temp"
custom_output_file = f"{local_csv_output_path}/average_counts_per_day.csv"

# Move the part file to the desired name
for filename in os.listdir(temp_output_folder):
    if filename.endswith(".csv"):
        shutil.move(os.path.join(temp_output_folder, filename), custom_output_file)

# Remove the temporary folder after renaming the file
shutil.rmtree(temp_output_folder)

# Save Status Counts to a Single CSV File
status_counts_df.coalesce(1).write \
    .mode("overwrite") \
    .option("header", True) \
    .csv(f"{local_csv_output_path}/status_counts_temp")

# Rename the output CSV file to your desired custom name
temp_output_folder = f"{local_csv_output_path}/status_counts_temp"
custom_output_file = f"{local_csv_output_path}/status_counts.csv"

# Move the part file to the desired name
for filename in os.listdir(temp_output_folder):
    if filename.endswith(".csv"):
        shutil.move(os.path.join(temp_output_folder, filename), custom_output_file)

# Remove the temporary folder after renaming the file
shutil.rmtree(temp_output_folder)

# Save User Type Metrics to a Single CSV File
user_type_metrics_df.coalesce(1).write \
    .mode("overwrite") \
    .option("header", True) \
    .csv(f"{local_csv_output_path}/user_type_metrics_temp")


# Rename the output CSV file to your desired custom name
temp_output_folder = f"{local_csv_output_path}/user_type_metrics_temp"
custom_output_file = f"{local_csv_output_path}/user_type_metrics.csv"

# Move the part file to the desired name
for filename in os.listdir(temp_output_folder):
    if filename.endswith(".csv"):
        shutil.move(os.path.join(temp_output_folder, filename), custom_output_file)

# Remove the temporary folder after renaming the file
shutil.rmtree(temp_output_folder)



24/11/25 00:52:04 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: Day of Week, Counts
 Schema: day_of_week, counts
Expected: day_of_week but found: Day of Week
CSV file: file:///Users/jaychavda/Downloads/eco-counter-data.csv
24/11/25 00:52:05 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: Site In Use, Traffic Type
 Schema: site_active, user_type
Expected: site_active but found: Site In Use
CSV file: file:///Users/jaychavda/Downloads/eco-counter-data.csv


In [43]:
# average_counts_per_day
# user_type_metrics
# status_counts

In [44]:
# Path to the uploaded file
file_path = "/Users/jaychavda/Downloads/output_pipeline1/average_counts_per_day.csv"

# Step 1: Read the CSV file into a DataFrame
df = spark.read.csv(file_path, header=True, inferSchema=True)

# Show the first few rows
print("Original Data:")
df.show()

Original Data:
+-----------+------------------+
|day_of_week|    average_counts|
+-----------+------------------+
|  Wednesday|22.848113088076996|
|    Tuesday| 21.72339286276733|
|     Friday| 23.94567153825537|
|   Thursday|21.912688124446692|
|   Saturday| 26.99172420315899|
|     Monday|22.972885477859894|
|     Sunday|26.638852899352187|
+-----------+------------------+



In [45]:
# Step 2: Perform transformations
# Example Transformation 1: Filter rows where a specific column's value exceeds a threshold
filtered_df = df.filter(col("average_counts") > 22)  # Replace "column_name" with an actual column

filtered_df.show()

+-----------+------------------+
|day_of_week|    average_counts|
+-----------+------------------+
|  Wednesday|22.848113088076996|
|     Friday| 23.94567153825537|
|   Saturday| 26.99172420315899|
|     Monday|22.972885477859894|
|     Sunday|26.638852899352187|
+-----------+------------------+



In [46]:
# Step 3: Show the results
print("Filtered Data:")
filtered_df.show()



Filtered Data:
+-----------+------------------+
|day_of_week|    average_counts|
+-----------+------------------+
|  Wednesday|22.848113088076996|
|     Friday| 23.94567153825537|
|   Saturday| 26.99172420315899|
|     Monday|22.972885477859894|
|     Sunday|26.638852899352187|
+-----------+------------------+



In [47]:
# new transformations

In [28]:
from pyspark.sql.functions import col, split


file_path = "/Users/jaychavda/Downloads/eco-counter-data.csv"

# Define schema explicitly
schema = StructType([
    StructField("date", TimestampType(), True),
    StructField("day_of_week", StringType(), True),
    StructField("id", StringType(), True),
    StructField("name", StringType(), True),
    StructField("counts", IntegerType(), True),
    StructField("coordinates", StringType(), True),
    StructField("site_active", StringType(), True),
    StructField("user_type", StringType(), True),
    StructField("installation_date", DateType(), True),
    StructField("photos", StringType(), True),
    StructField("status", StringType(), True),
    StructField("site_owner", StringType(), True)
])

# Read CSV as a single column
raw_df = spark.read \
    .option("header", False) \
    .option("delimiter", ";") \
    .csv(file_path)

# Split the single column into multiple columns
columns = ["date", "day_of_week", "id", "name", "counts", "coordinates", "site_active",
           "user_type", "installation_date", "photos", "status", "site_owner"]
data_df = raw_df.withColumn("value", split(col("_c0"), ";")) \
    .select([col("value").getItem(i).alias(columns[i]) for i in range(len(columns))])

In [29]:
from pyspark.sql.functions import col

# Ensure 'counts' column is cast to IntegerType
data_df = data_df.withColumn("counts", col("counts").cast("int"))


In [30]:
from pyspark.sql.functions import sum

top_sites_df = data_df.groupBy("name") \
    .agg(sum("counts").alias("total_counts")) \
    .orderBy(col("total_counts").desc()) \
    .limit(10)

In [31]:
weekly_trends_df = data_df.filter(col("site_active") == "Yes") \
    .groupBy("day_of_week") \
    .agg(avg("counts").alias("average_counts_per_day"))


In [32]:
inactive_sites_df = data_df.filter(col("site_active") == "No") \
    .groupBy("site_owner") \
    .count() \
    .alias("inactive_site_count")


In [33]:
counts_by_user_type_df = data_df.groupBy("user_type") \
    .agg(sum("counts").alias("total_counts")) \
    .orderBy(col("total_counts").desc())


In [34]:
from pyspark.sql.functions import month, year
installation_trends_df = data_df.groupBy(year("installation_date").alias("year"),
                                         month("installation_date").alias("month")) \
    .count() \
    .alias("sites_installed")


In [40]:
installation_trends_df.show()

+----+-----+------+
|year|month| count|
+----+-----+------+
|NULL| NULL|443408|
+----+-----+------+

