In [1]:
import pandas as pd
import geopandas as gpd
import pyspark
import findspark
import glob
import plotly.express as px

In [2]:
findspark.init()

try:
    spark = pyspark.sql.SparkSession.builder.appName("Ecobici").getOrCreate()
    print("SparkSession created successfully!")
except Exception as e:
    print(f"Error creating SparkSession: {e}")

SparkSession created successfully!


In [3]:
path = 'Datos/Historicos/*.csv'
files = glob.glob(path)

dfs = []
for file in files:
    df = spark.read.csv(file, header=True, inferSchema=True)
    df = df.toDF(*[col.replace(' ', '_') for col in df.columns])
    print(f"Loaded {file} with {df.count()} rows and {len(df.columns)} columns")
    dfs.append(df)

df = dfs[0]
for i in range(1, len(dfs)):
    df = df.union(dfs[i])

df.printSchema()

Loaded Datos/Historicos\2024-05.csv with 2053563 rows and 9 columns
Loaded Datos/Historicos\2024-06.csv with 1893520 rows and 9 columns
Loaded Datos/Historicos\2024-07.csv with 1825650 rows and 9 columns
Loaded Datos/Historicos\2024-08.csv with 1891319 rows and 9 columns
Loaded Datos/Historicos\2024-09.csv with 1852179 rows and 9 columns
Loaded Datos/Historicos\2024-10.csv with 2054609 rows and 9 columns
Loaded Datos/Historicos\2024-11.csv with 1942474 rows and 9 columns
Loaded Datos/Historicos\2024-12.csv with 1680650 rows and 9 columns
Loaded Datos/Historicos\2024_04.csv with 1961633 rows and 9 columns
Loaded Datos/Historicos\2025-01.csv with 1809775 rows and 9 columns
Loaded Datos/Historicos\2025-02.csv with 1745620 rows and 9 columns
Loaded Datos/Historicos\2025-03.csv with 1832587 rows and 9 columns
root
 |-- Genero_Usuario: string (nullable = true)
 |-- Edad_Usuario: string (nullable = true)
 |-- Bici: integer (nullable = true)
 |-- Ciclo_Estacion_Retiro: string (nullable = true)

In [4]:
df_stations = spark.read.csv('Datos/cicloestaciones_ecobici.csv', header=True, inferSchema=True)
df_stations.printSchema()

root
 |-- sistema: string (nullable = true)
 |-- num_cicloe: string (nullable = true)
 |-- calle_prin: string (nullable = true)
 |-- calle_secu: string (nullable = true)
 |-- colonia: string (nullable = true)
 |-- alcaldia: string (nullable = true)
 |-- latitud: double (nullable = true)
 |-- longitud: double (nullable = true)
 |-- sitio_de_e: string (nullable = true)
 |-- estatus: string (nullable = true)



In [5]:
df.show(5)

+--------------+------------+-------+---------------------+------------+-------------------+--------------------+------------+-------------------+
|Genero_Usuario|Edad_Usuario|   Bici|Ciclo_Estacion_Retiro|Fecha_Retiro|        Hora_Retiro|Ciclo_EstacionArribo|Fecha_Arribo|        Hora_Arribo|
+--------------+------------+-------+---------------------+------------+-------------------+--------------------+------------+-------------------+
|             F|          25|5266515|                  007|  30/04/2024|2025-05-04 23:34:05|                 459|  01/05/2024|2025-05-04 00:00:01|
|             F|          39|4350518|                  472|  30/04/2024|2025-05-04 23:55:43|                 477|  01/05/2024|2025-05-04 00:00:02|
|             F|          48|3732609|                  073|  30/04/2024|2025-05-04 23:46:46|                 447|  01/05/2024|2025-05-04 00:00:12|
|             M|          31|4956959|                  644|  30/04/2024|2025-05-04 23:53:36|                 631|  01/

In [5]:
df.describe().show()

+-------+--------------+------------------+------------------+---------------------+------------+--------------------+------------+
|summary|Genero_Usuario|      Edad_Usuario|              Bici|Ciclo_Estacion_Retiro|Fecha_Retiro|Ciclo_EstacionArribo|Fecha_Arribo|
+-------+--------------+------------------+------------------+---------------------+------------+--------------------+------------+
|  count|      22543579|          22543579|          22543579|             22543579|    22543579|            22543579|    22543579|
|   mean|          NULL|33.730556660746174| 5460355.305850549|    283.9727847558076|        NULL|  281.91180882962334|        NULL|
| stddev|          NULL| 9.703150530531856|2028107.8582312502|    209.6969449148901|        NULL|  211.21391377902677|        NULL|
|    min|             ?|               100|           2000461|                  001|  01/01/2025|                 001|  01/01/2025|
|    max|             O|              NULL|           8999708|              

In [6]:
df.select([pyspark.sql.functions.count(pyspark.sql.functions.when(pyspark.sql.functions.col(c).isNull(), c)).alias(c) for c in df.columns]).show()

+--------------+------------+----+---------------------+------------+-----------+--------------------+------------+-----------+
|Genero_Usuario|Edad_Usuario|Bici|Ciclo_Estacion_Retiro|Fecha_Retiro|Hora_Retiro|Ciclo_EstacionArribo|Fecha_Arribo|Hora_Arribo|
+--------------+------------+----+---------------------+------------+-----------+--------------------+------------+-----------+
|             0|           0|   0|                    0|           0|          0|                   0|           0|          0|
+--------------+------------+----+---------------------+------------+-----------+--------------------+------------+-----------+



In [6]:
# Shot the counr of retires by Ciclo_Estacion_Retiro using the main street from the stations dataframe
df.groupBy('Ciclo_Estacion_Retiro').count().orderBy('count', ascending=False).show(10)

+---------------------+------+
|Ciclo_Estacion_Retiro| count|
+---------------------+------+
|              271-272|233423|
|                  027|129886|
|                  064|125392|
|              237-238|115681|
|                  548|113858|
|              107-108|112825|
|                  031|108698|
|                  014|107733|
|                  208|103857|
|              192-193|102451|
+---------------------+------+
only showing top 10 rows



In [7]:
df.groupBy('Ciclo_EstacionArribo').count().orderBy('count', ascending=False).show(10)

+--------------------+------+
|Ciclo_EstacionArribo| count|
+--------------------+------+
|             271-272|362817|
|                 014|176147|
|                 027|136885|
|                 064|129047|
|                 548|121762|
|             107-108|110387|
|                 031|106276|
|                 028|104917|
|                 018|103027|
|                 001|102829|
+--------------------+------+
only showing top 10 rows



In [8]:
df_stations.select('num_cicloe').distinct().count()

678

In [21]:
from pyspark.sql import functions as F # Use F as alias for functions

# Calculate retiro counts
station_departure_counts = df.groupBy('Ciclo_Estacion_Retiro') \
                             .count() \
                             .withColumnRenamed('Ciclo_Estacion_Retiro', 'station_id') \
                             .withColumnRenamed('count', 'count_retiro')

# Calculate arribo counts
station_arrival_counts = df.groupBy('Ciclo_EstacionArribo') \
                           .count() \
                           .withColumnRenamed('Ciclo_EstacionArribo', 'station_id') \
                           .withColumnRenamed('count', 'count_arribo')

# Select and rename relevant columns from df_stations
stations_info = df_stations.select(
    F.col('num_cicloe').alias('station_id'),
    F.col('calle_prin').alias('station_name'),
    'latitud',
    'longitud'
)

# Join the departure counts with station info
station_counts_with_info = station_departure_counts.join(
    stations_info,
    on='station_id',
    how='left'
)

# Join the arrival counts
station_count_clean = station_counts_with_info.join(
    station_arrival_counts,
    on='station_id',
    how='left'
)

# Fill potential nulls in counts with 0 if a station only had departures or arrivals
station_count_clean = station_count_clean.fillna(0, subset=['count_retiro', 'count_arribo'])


station_count_clean.orderBy('count_retiro', ascending=False).show(10)

station_count = station_count_clean

+----------+------------+--------------------+---------+----------+------------+
|station_id|count_retiro|        station_name|  latitud|  longitud|count_arribo|
+----------+------------+--------------------+---------+----------+------------+
|   271-272|      233423|        Jesus Garcia|19.443684|-99.152465|      362817|
|       027|      129886|Av. Paseo de la R...| 19.42916|-99.162703|      136885|
|       064|      125392|              Sonora|19.412982|-99.166936|      129047|
|   237-238|      115681|        Andres Bello|19.426744|-99.193986|       83413|
|       548|      113858|Doctor Mariano Az...| 19.44701|-99.154318|      121762|
|   107-108|      112825|               Tolsa|19.427329|-99.149971|      110387|
|       031|      108698|            Hamburgo|19.427937|-99.161245|      106276|
|       014|      107733|Av. Paseo de la R...| 19.42454|-99.173247|      176147|
|       208|      103857|             Hesiodo|19.434182|-99.189835|       87082|
|   192-193|      102451|   

In [22]:
fig = px.scatter_mapbox(station_count.toPandas(), lat='latitud', lon='longitud', size='count_retiro',color='count_retiro',
                       hover_name='station_name', hover_data=['count_retiro'],
                       color_discrete_sequence=["fuchsia"], zoom=10, height=300,color_continuous_scale=px.colors.sequential.Plasma)
fig.update_layout(mapbox_style="carto-positron",)
fig.update_layout(margin={"r":0,"t":0,"l":0,"b":0})
fig.show()

In [26]:
fig = px.scatter_mapbox(station_count.toPandas(), lat='latitud', lon='longitud', size='count_arribo',color='count_arribo',
                     hover_name='station_name', hover_data=['count_arribo'],
                     color_discrete_sequence=["fuchsia"], zoom=10, height=300,color_continuous_scale=px.colors.sequential.Plasma)
fig.update_layout(mapbox_style="carto-positron",)
fig.update_layout(margin={"r":0,"t":0,"l":0,"b":0})
fig.show()

In [33]:
from pyspark.sql import functions as F

df_with_hour = df.withColumn("hora_retiro", F.hour(F.to_timestamp(F.col("Hora_Retiro"), "HH:mm:ss")))

hourly_traffic = df_with_hour.groupBy("hora_retiro").count().orderBy("count", ascending=False)

print("Horas de mayor retiro:")
hourly_traffic.show()
from pyspark.sql.window import Window

Horas de mayor retiro:
+-----------+-------+
|hora_retiro|  count|
+-----------+-------+
|         18|1930160|
|         17|1744477|
|          8|1676469|
|         15|1612575|
|         14|1597790|
|         19|1527055|
|         16|1487428|
|         13|1374636|
|          9|1291342|
|          7|1270711|
|         12|1174321|
|         20|1104068|
|         10|1102236|
|         11|1089443|
|         21| 821176|
|          6| 607452|
|         22| 549241|
|         23| 324100|
|          5| 167548|
|          0|  91351|
+-----------+-------+



In [34]:
# Plot the hourly traffic
hourly_traffic_pd = hourly_traffic.toPandas()
fig =px.bar(hourly_traffic_pd, x='hora_retiro', y='count', title="Retiro por hora", labels={'hora_retiro': 'Hour of Day', 'count': 'Count'})
fig.update_layout(xaxis_title="Hora del día", yaxis_title="Cantidad de retiros")
fig.show()


In [27]:
from pyspark.sql import functions as F

df_with_hour = df.withColumn("hora_arribo", F.hour(F.to_timestamp(F.col("Hora_Arribo"), "HH:mm:ss")))

hourly_traffic = df_with_hour.groupBy("hora_arribo").count().orderBy("count", ascending=False)

print("Horas de mayor arribo:")
hourly_traffic.show()
from pyspark.sql.window import Window

Horas de mayor arribo:
+-----------+-------+
|hora_arribo|  count|
+-----------+-------+
|         18|1918909|
|         19|1653769|
|         17|1645729|
|          8|1628360|
|         15|1622107|
|         14|1579970|
|         16|1493043|
|          9|1402736|
|         13|1328890|
|         20|1221266|
|         12|1144961|
|         10|1119466|
|          7|1084509|
|         11|1073938|
|         21| 889238|
|         22| 611977|
|          6| 486932|
|         23| 373299|
|          0| 140847|
|          5| 116409|
+-----------+-------+
only showing top 20 rows



In [31]:
# Plot the hourly traffic
hourly_traffic_pd = hourly_traffic.toPandas()
fig =px.bar(hourly_traffic_pd, x='hora_arribo', y='count', title="Arribos por hora", labels={'hora_arribo': 'Hour of Day', 'count': 'Count'})
fig.update_layout(xaxis_title="Hora del día", yaxis_title="Cantidad de arribos")
fig.show()
