# Analyze bike-sharing system of Barcelona - Spark DataFrame-based programming

In [None]:
# In this analysis, I am going to consider the occupancy of the stations where users can pick up or drop off
# bikes in order to identify the most "critical" timeslots (day of the week, hour) for each station.

In [None]:
# data is located on the big data cluster and I am going to read data from there.
# there are two types of data:
# 1. register.csv: This contains the historical information about the number of used and free slots for
#    ~3000 stations from May 2008 to September 2008. Each line of register.csv
#    corresponds to one reading about the situation of one station at a specific timestamp.
# 2. stations.csv: It contains the description of the stations (station_id, latitude, longitude, name). 

In [3]:
registerPath = "/data/students/bigdata_internet/lab3/register.csv"

In [4]:
stationPath = "/data/students/bigdata_internet/lab3/stations.csv"

In [None]:
'''In this analysis, PySpark was utilized for its robust distributed computing capabilities, 
ideal for handling large datasets efficiently.
If you're using the PySpark shell, no additional setup is necessary. 
However, for those working in a Python environment, setting up PySpark involves the following steps:
1. Install PySpark: Begin by installing PySpark using pip:
pip install pyspark
2. Configure PySpark: In your Python script or interactive session, include the following configuration 
to initialize PySpark:
```python
from pyspark import SparkConf, SparkContext
conf = SparkConf().setAppName("MyApp")
sc = SparkContext(conf=conf)
```
Ensure to execute this configuration before performing any PySpark operations.
For comprehensive installation and configuration instructions, refer to the official PySpark documentation: 
PySpark Installation Guide
'''

In [None]:
# Reading register data as a DataFrame

In [5]:
spark = SparkSession.builder.getOrCreate()

In [None]:
# The file is separated with tab so I will pu the separator \t.

In [6]:
registerDF = spark.read.load(registerPath, format="csv", header=True, sep='\t')

                                                                                

In [6]:
registerDF.count()

                                                                                

25319028

In [None]:
# To clean the data, I am going to filter data that their used_slot != 0 or their free_slot != 0 
# because whether there are some bicycles in station or not and it is not possible to have 0 for both. 

In [7]:
registerDF_clean = registerDF.filter("used_slots != 0 OR free_slots != 0")

In [8]:
registerDF_clean.count()

                                                                                

25104121

In [None]:
# There are 25,319,028 rows (without the header) in the original file and it decreases to 25,104,121 
# (without the header) after we did the filter and deleted wrong data.

1.2 stations.csv

In [None]:
# Reading station data as a DataFrame

In [8]:
stationDF = spark.read.load(stationPath, format="csv", header=True, sep='\t')

In [None]:
# Write a Spark application that selects the pairs (station, timeslot) that are characterized 
# by a high "criticality" value

In [7]:
# Convert timestamp to timeslot (weekday, hour)

In [9]:
registerDF_timeslot = registerDF_clean.selectExpr("station", "(date_format(timestamp,'EEEE'), hour(timestamp)) as timeslot", "used_slots", "free_slots")

........................................................................................

In [None]:
# Computes the criticality value C(Si, Tj) for each pair (Si, Tj)

In [None]:
# Filter only those data that have free_slot = 0 which means that all of the slots were used

In [10]:
zeroFreeSlots = registerDF_timeslot.filter("free_slots = 0")

In [None]:
# Group the dataframe based on (station_id, timeslot) and count the number of free_slots = 0 readings 

In [11]:
zeroFreeSlotsGroup = zeroFreeSlots.groupBy("station", "timeslot").count()

In [None]:
# Rename the columns

In [12]:
zeroFreeSlotsGroup = zeroFreeSlotsGroup.withColumnRenamed("count", "count_zero_free_slots")

In [None]:
# Group the original dataframe based on (station_id, timeslot) and count the number of all readings 

In [13]:
registerDFTotalGroup = registerDF_timeslot.groupBy("station", "timeslot").count()

In [None]:
# Rename the columns

In [14]:
registerDFTotalGroup = registerDFTotalGroup.withColumnRenamed("count", "count_total_free_slots")

In [None]:
# Join two previous dataframes

In [15]:
joinedRegister = zeroFreeSlotsGroup.join(
    registerDFTotalGroup,
    (zeroFreeSlotsGroup.station == registerDFTotalGroup.station) &
    (zeroFreeSlotsGroup.timeslot == registerDFTotalGroup.timeslot)
).select(
    zeroFreeSlotsGroup['station'],
    zeroFreeSlotsGroup['timeslot'],
    zeroFreeSlotsGroup['count_zero_free_slots'],
    registerDFTotalGroup['count_total_free_slots']
)

23/12/23 09:02:30 WARN sql.Column: Constructing trivially true equals predicate, 'station#10 = station#10'. Perhaps you need to use aliases.
23/12/23 09:02:30 WARN sql.Column: Constructing trivially true equals predicate, 'timeslot#36 = timeslot#36'. Perhaps you need to use aliases.


In [None]:
# Calculate the criticality value

In [16]:
criticalityRegister = joinedRegister.selectExpr("station", "timeslot", "count_zero_free_slots/count_total_free_slots as criticality")

In [None]:
# Now, I will select only the critical pairs (Si, Tj) having a criticality value C(Si, Tj) greater than 
# a minimum threshold (0.6).

In [17]:
criticalRegister = criticalityRegister.filter("criticality >= 0.6")

In [None]:
# Order the results by increasing criticality.

In [18]:
orderedCriticalRegister = criticalRegister.sort("criticality", ascending=True)

In [None]:
# Show the most critical (station_id, timeslot) in Barcelona

In [None]:
orderedCriticalRegister.show()

-------------------------------------------------------------------------------------------

In [None]:
# Store the sorted critical pairs C(Si, Tj) in the output folder (also an argument of the application), 
# by using a csv files (with header), where columns are separated by "tab". Store exactly the following 
# attributes separated by a "tab":
# station / station longitude / station latitude / day of week / hour / criticality value

In [None]:
# Join critical stations dataframe from register data with station data

In [19]:
criticalOutput = orderedCriticalRegister.join(
    stationDF, orderedCriticalRegister.station == stationDF.id).select(
    orderedCriticalRegister["station"], 
    stationDF["longitude"], 
    stationDF["latitude"], 
    orderedCriticalRegister["timeslot"], 
    orderedCriticalRegister["criticality"])

In [20]:
orderedCriticalOutput = criticalOutput.sort("criticality", ascending=True)

In [None]:
# Register the function for week

In [21]:
spark.udf.register('week', lambda x: x[0])

<function __main__.<lambda>(x)>

In [None]:
# Register the function for hour

In [22]:
spark.udf.register('hour', lambda y: y[1])

23/12/23 09:02:46 WARN analysis.SimpleFunctionRegistry: The function hour replaced a previously registered function.


<function __main__.<lambda>(y)>

In [None]:
# Select the desired columns

In [23]:
finalDF = orderedCriticalOutput.selectExpr("station", "longitude", "latitude", "week(timeslot) as week", "hour(timeslot) as hour", "criticality")

In [None]:
# Save the output

In [23]:
finalDF.write.csv('critical-stations-Barcelona-DataFrame', header=True, sep='\t')

                                                                                

In [24]:
finalDF.show()



+-------+---------+---------+--------+----+------------------+
|station|longitude| latitude|    week|hour|       criticality|
+-------+---------+---------+--------+----+------------------+
|      9| 2.185294|41.385006|  Friday|  10|0.6129032258064516|
|     10| 2.185206|41.384875|Saturday|   0| 0.622107969151671|
|     58| 2.170736|41.377536|  Monday|   1|0.6239554317548747|
|      9| 2.185294|41.385006|  Friday|  22|0.6258389261744967|
|     58| 2.170736|41.377536|  Monday|   0|0.6323119777158774|
+-------+---------+---------+--------+----+------------------+



                                                                                

----------------------------------------------------------------------------------------

In [None]:
# In this section, I am going to compute the distance between each station and the city center. 
# The city center has coordinates:
# latitude = 41.386904
# longitude = 2.169989
# To compute the distance implement the Haversine function (use the formula 
# in https://en.wikipedia.org/wiki/Haversine_formula).
# Then, compute the average number of used_slots per station

In [None]:
# Turn the latitude and longitude columns to double type

In [25]:
from pyspark.sql.functions import col

In [27]:
stationDF = stationDF.withColumn("latitude", stationDF["latitude"].cast("double"))
stationDF = stationDF.withColumn("longitude", stationDF["longitude"].cast("double"))

In [None]:
# Define the function to compute the haversine

In [28]:
import math
def haversine(lat, lon):
    # City center coordination
    lat1 = 41.386904
    lon1 = 2.169989
    # Radius of the Earth in kilometers
    R = 6371.0
    # Convert latitude and longitude from degrees to radians
    lat1, lon1, lat, lon = map(math.radians, [lat1, lon1, lat, lon])
    dlat = lat - lat1
    dlon = lon - lon1
    hav = math.sin(dlat / 2) ** 2 + math.cos(lat1) * math.cos(lat) * math.sin(dlon / 2) ** 2
    distance = 2 * R * math.asin(math.sqrt(hav))
    return distance

In [None]:
# Register the haversine function

In [29]:
spark.udf.register('hav', haversine)

<function __main__.haversine(lat, lon)>

In [None]:
# Calculate the distance

In [30]:
dinstanceStationDF = stationDF.selectExpr("id", "hav(latitude, longitude) as distance")

In [None]:
# Join the distanceStationDF dataframe with cleaned register dataframe and select the desired columns

In [31]:
joinedRegisterStation = registerDF_clean.join(
    dinstanceStationDF, registerDF_clean.station == dinstanceStationDF.id).select(
        registerDF_clean['station'], 
        registerDF_clean['used_slots'], 
        dinstanceStationDF['distance'])

In [None]:
# Now, I want to find the stations that are closer than 1.5 km from the center

In [None]:
# Filter distance closer than 1.5 km.

In [32]:
closerStations = joinedRegisterStation.filter("distance < 1.5")

In [None]:
# Turn the used_slots column to float type

In [33]:
closerStations = closerStations.withColumn("used_slots", closerStations["used_slots"].cast("float"))

In [None]:
# Calculate the average of used_slots for closer stations

In [34]:
avgCloserStations = closerStations.agg({"used_slots": "avg"})

In [35]:
avgCloserStations.show()



+---------------+
|avg(used_slots)|
+---------------+
| 8.174875311666|
+---------------+



                                                                                

In [None]:
# Now, I am going to find the stations that are farther than 1.5 km from the center

In [None]:
# Filter distance further than 1.5 km.

In [36]:
furtherStations = joinedRegisterStation.filter("distance >= 1.5")

In [None]:
# Calculate the average of used_slots for further stations

In [37]:
avgFurtherStations = furtherStations.agg({"used_slots": "avg"})

In [38]:
avgFurtherStations.show()



+-----------------+
|  avg(used_slots)|
+-----------------+
|7.913817257872483|
+-----------------+



                                                                                

In [None]:
# The result shows that the average of used slots of closer ones is higher than further ones. 
# The average of used slots for closer stations is 8.17 and it is 7.91 for further ones.