# Cycling in numbers - A Case Study of Cycle Paths in Rhine-Kreis Neuss

## Description

Five Counting stations have been permanently documenting cycling traffic on central roads since 2016 in Cycle paths in the Rhine-Kreis Neuss. The daily measurement of cycling traffic is done with the help of induction loops laid in the ground. With the permanent collection of data can gain insights on the daily, weekly and annual cycles and on it building long-term cycling developments over several years.

More details on the data source here: https://data.europa.eu/data/datasets/eco-counter-data-rhein-kreis-neuss?locale=en


# Stage 1: Data Processing

The purpose of this notebook is to clean and transform the data and prepare it for analysis.

<a id='1'></a>
## 1. Setup

Since we are going to process data stored from HDFS let's start the service


**Requirements:**

1. Hadoop should be running
2. Dataset should be in this directory in hadoop: [hdfs://localhost:9000//g10_datalake/bronze/cycling/](hdfs://localhost:9000//g10_datalake/bronze/cycling/)


<a id='1.2'></a>
### 1.1 Search for Spark Installation 
This step is required as we are working in the virtual environment.

In [1]:
import findspark
findspark.init()


Changing pandas max column width property to improve data displaying.

In [2]:
import pandas as pd
pd.set_option('display.max_colwidth', None)

<a id='1.3'></a>
### 1.3 Create SparkSession


In [3]:
from pyspark.sql.session import SparkSession

spark = (SparkSession.builder
    .appName("group_10_EDA")
    .getOrCreate())

spark.sparkContext.setLogLevel("ERROR")

25/03/13 18:31:50 WARN Utils: Your hostname, osbdet resolves to a loopback address: 127.0.0.1; using 10.0.2.15 instead (on interface enp0s3)
25/03/13 18:31:50 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/03/13 18:31:52 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


<a id='2'></a>
## 2. The Data

Here we read the data from hadoop

In [4]:
df = (spark.read.option("header", "true")
                 .option("inferSchema", "true")
                 .option("sep", ";")  # Specify semicolon as the delimiter
                 .csv("hdfs://localhost:9000//g10_datalake/bronze/cycling/")
                 .cache())

df.show(5)



+---------+-------------------+------+------+----------+----------+-------------------+
|       Id|              Datum|Anzahl|Status|Channel Id|Zählstelle|        Koordinaten|
+---------+-------------------+------+------+----------+----------+-------------------+
|100019715|2021-10-09 10:00:00|    17|   raw| 101019715| Meerbusch|51.261012, 6.705018|
|100019715|2021-10-09 12:00:00|    46|   raw| 101019715| Meerbusch|51.261012, 6.705018|
|100019715|2021-10-09 15:00:00|   108|   raw| 101019715| Meerbusch|51.261012, 6.705018|
|100019715|2021-10-09 17:00:00|    65|   raw| 101019715| Meerbusch|51.261012, 6.705018|
|100019715|2021-10-09 18:00:00|    26|   raw| 101019715| Meerbusch|51.261012, 6.705018|
+---------+-------------------+------+------+----------+----------+-------------------+
only showing top 5 rows



                                                                                

We will rename the columns to English names of the variables and show the schema of our data

In [5]:
from pyspark.sql.functions import col

df = df.withColumnRenamed("Datum", "Date") \
       .withColumnRenamed("Anzahl", "Count") \
       .withColumnRenamed("Zählstelle", "Counting_Station") \
       .withColumnRenamed("Koordinaten", "Coordinates") \
       .withColumnRenamed("Channel Id", "Channel_Id")

# Show updated schema
df.printSchema()


root
 |-- Id: integer (nullable = true)
 |-- Date: timestamp (nullable = true)
 |-- Count: integer (nullable = true)
 |-- Status: string (nullable = true)
 |-- Channel_Id: integer (nullable = true)
 |-- Counting_Station: string (nullable = true)
 |-- Coordinates: string (nullable = true)



In [6]:
df.limit(5).toPandas()

Unnamed: 0,Id,Date,Count,Status,Channel_Id,Counting_Station,Coordinates
0,100019715,2021-10-09 10:00:00,17,raw,101019715,Meerbusch,"51.261012, 6.705018"
1,100019715,2021-10-09 12:00:00,46,raw,101019715,Meerbusch,"51.261012, 6.705018"
2,100019715,2021-10-09 15:00:00,108,raw,101019715,Meerbusch,"51.261012, 6.705018"
3,100019715,2021-10-09 17:00:00,65,raw,101019715,Meerbusch,"51.261012, 6.705018"
4,100019715,2021-10-09 18:00:00,26,raw,101019715,Meerbusch,"51.261012, 6.705018"


**Summary Statistics of the data**

In [6]:
from pyspark.sql.functions import mean, stddev, min, max, count

summary_stats = df.groupBy("Counting_Station").agg(
    mean("Count").alias("avg_count"),
    stddev("Count").alias("stddev_count"),
    min("Count").alias("min_count"),
    max("Count").alias("max_count"),
    count("Count").alias("count_entries")
)
summary_stats.show()



+----------------+------------------+------------------+---------+---------+-------------+
|Counting_Station|         avg_count|      stddev_count|min_count|max_count|count_entries|
+----------------+------------------+------------------+---------+---------+-------------+
|           Neuss| 4.195613401689653| 9.646175144591327|        0|      211|       276980|
|       Meerbusch| 6.108348822542054|12.980603314519595|        0|      234|       299671|
|        Dormagen|0.8086672225972548| 2.517983888602218|        0|       85|       453294|
|          Jüchen| 2.273837380328395|7.3598334275448645|        0|     1855|       299152|
|    Grevenbroich|1.3672415745891118|3.7640586442326884|        0|      103|       317848|
+----------------+------------------+------------------+---------+---------+-------------+



                                                                                

**Adding Date information to the dataset**

From the date column, we will extract the Year, Month, Day, Hour and Day of the week.

In [7]:
from pyspark.sql.functions import year, month, dayofmonth, hour, dayofweek

df = df.withColumn("Year", year(col("Date"))) \
       .withColumn("Month", month(col("Date"))) \
       .withColumn("Day", dayofmonth(col("Date"))) \
       .withColumn("Hour", hour(col("Date"))) \
       .withColumn("Day_of_Week", dayofweek(col("Date")))
df.show(5)

+---------+-------------------+-----+------+----------+----------------+-------------------+----+-----+---+----+-----------+
|       Id|               Date|Count|Status|Channel_Id|Counting_Station|        Coordinates|Year|Month|Day|Hour|Day_of_Week|
+---------+-------------------+-----+------+----------+----------------+-------------------+----+-----+---+----+-----------+
|100019715|2021-10-09 10:00:00|   17|   raw| 101019715|       Meerbusch|51.261012, 6.705018|2021|   10|  9|  10|          7|
|100019715|2021-10-09 12:00:00|   46|   raw| 101019715|       Meerbusch|51.261012, 6.705018|2021|   10|  9|  12|          7|
|100019715|2021-10-09 15:00:00|  108|   raw| 101019715|       Meerbusch|51.261012, 6.705018|2021|   10|  9|  15|          7|
|100019715|2021-10-09 17:00:00|   65|   raw| 101019715|       Meerbusch|51.261012, 6.705018|2021|   10|  9|  17|          7|
|100019715|2021-10-09 18:00:00|   26|   raw| 101019715|       Meerbusch|51.261012, 6.705018|2021|   10|  9|  18|          7|


**Handling Location information**

We need to split the latitude and longitude information

In [8]:
from pyspark.sql.functions import split

df = df.withColumn("Latitude", split(col("Coordinates"), ", ")[0]) \
       .withColumn("Longitude", split(col("Coordinates"), ", ")[1])
df.show(5)

+---------+-------------------+-----+------+----------+----------------+-------------------+----+-----+---+----+-----------+---------+---------+
|       Id|               Date|Count|Status|Channel_Id|Counting_Station|        Coordinates|Year|Month|Day|Hour|Day_of_Week| Latitude|Longitude|
+---------+-------------------+-----+------+----------+----------------+-------------------+----+-----+---+----+-----------+---------+---------+
|100019715|2021-10-09 10:00:00|   17|   raw| 101019715|       Meerbusch|51.261012, 6.705018|2021|   10|  9|  10|          7|51.261012| 6.705018|
|100019715|2021-10-09 12:00:00|   46|   raw| 101019715|       Meerbusch|51.261012, 6.705018|2021|   10|  9|  12|          7|51.261012| 6.705018|
|100019715|2021-10-09 15:00:00|  108|   raw| 101019715|       Meerbusch|51.261012, 6.705018|2021|   10|  9|  15|          7|51.261012| 6.705018|
|100019715|2021-10-09 17:00:00|   65|   raw| 101019715|       Meerbusch|51.261012, 6.705018|2021|   10|  9|  17|          7|51.261

In [9]:
# Show updated schema
df.printSchema()

root
 |-- Id: integer (nullable = true)
 |-- Date: timestamp (nullable = true)
 |-- Count: integer (nullable = true)
 |-- Status: string (nullable = true)
 |-- Channel_Id: integer (nullable = true)
 |-- Counting_Station: string (nullable = true)
 |-- Coordinates: string (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- Day: integer (nullable = true)
 |-- Hour: integer (nullable = true)
 |-- Day_of_Week: integer (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)



**Write the data back to hadoop**

We will write the transformed data to hadoop. We will also get the data from individual counting statons and write them as separate files to hadoop.


In [10]:
# Get unique Counting_Station values
unique_stations = df.select("Counting_Station").distinct().rdd.flatMap(lambda x: x).collect()

# Define output path in HDFS
output_base_path = "hdfs://localhost:9000/g10_datalake/silver/cycling/"

# Split and save DataFrames
for index, station in enumerate(unique_stations, start=1):
    # Filter data for each Counting_Station
    subset_df = df.filter(df.Counting_Station == station)
    
    # Define output path
    output_path = f"{output_base_path}Counting_Station_{station}"
    
    # Save CSV
    subset_df.coalesce(1).write.mode("overwrite").option("header", "true").csv(output_path)

    # Display row count for each split
    print(f"Saved {subset_df.count()} rows for Counting_Station {station} to {output_path}")


output_path = f"{output_base_path}cleaned_cycling_data"
df.coalesce(1).write.mode("overwrite").option("header", "true").csv(output_path)


                                                                                

Saved 323984 rows for Counting_Station Neuss to hdfs://localhost:9000/g10_datalake/silver/cycling/Counting_Station_Neuss


                                                                                

Saved 319680 rows for Counting_Station Meerbusch to hdfs://localhost:9000/g10_datalake/silver/cycling/Counting_Station_Meerbusch


                                                                                

Saved 488430 rows for Counting_Station Dormagen to hdfs://localhost:9000/g10_datalake/silver/cycling/Counting_Station_Dormagen


                                                                                

Saved 314568 rows for Counting_Station Jüchen to hdfs://localhost:9000/g10_datalake/silver/cycling/Counting_Station_Jüchen


                                                                                

Saved 324384 rows for Counting_Station Grevenbroich to hdfs://localhost:9000/g10_datalake/silver/cycling/Counting_Station_Grevenbroich


                                                                                

                                                                                