## **Práctica BiciMAD**

### Introducción

El objetivo de esta práctica es, data una muestra de datos de BiciMAD, obtener utilizando pyspark la localización de una posible nueva parada, en función del tráfico de la zona.

### Datos usados y forma de obtener el resultado

Los datos que tomaremos serán los datos proporcionados por la página web de BiciMAD, https://opendata.emtmadrid.es/Datos-estaticos/Datos-generales-(1)
Utilizaremos los archivos correspondientes a junio del 2021, tanto de la situación de las estaciones bicimad como del uso. 
Para el correcto funcionamiento de la práctica, es necesario tener estos dos archivos en una carpeta llamada data donde estemos trabajando.

Para ejecutar el programa no es necesario nada más, simplemente ejecutarlo en terminal.

#### IMPORTS

In [17]:
from math import acos, cos, radians, sin
import random
import numpy as np
import pandas as pd
from scipy.spatial import ConvexHull

from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import RegressionEvaluator

#### FUNCTIONS

In [18]:
def distance_sphere(pt1, pt2): # Devuelve la distancia en km entre dos puntos dadas sus coordenadas (esféricas)
    pt1 = tuple(map(radians, pt1))
    pt2 = tuple(map(radians, pt2))
    x = sin(pt1[0]) * sin(pt2[0]) + cos(pt1[0]) * cos(pt2[0]) * cos(pt1[1] - pt2[1])
    if x > 1: # redondeamos el error para no salirnos del dominio del arccos
        x = 1
    return 6371 * (acos(x))

def get_stop_trafic(spark: SparkSession, filename): # Devuelve un df con el tráfico de cada parada
    movements_df = spark.read.json(filename)
    plug_count = movements_df.\
        withColumnRenamed("idplug_station", "id").\
        groupBy("id").\
        count().\
        withColumnRenamed("count", "plug_count")
    unplug_count = movements_df.\
        withColumnRenamed("idunplug_station", "id").\
        groupBy("id").\
        count().\
        withColumnRenamed("count", "unplug_count")
    trafic_df = plug_count.\
        join(unplug_count, on=["id"], how="left_outer").\
        sort("id").\
        withColumn("trafic", col("plug_count")+col("unplug_count")).\
        select(col("id"), col("trafic"))
    return trafic_df

def get_stops(spark: SparkSession, month_ref): #Devuelve un df con la posición y tráfico de cada parada
    df = spark.read.json(f"data/{month_ref}_stations.json")
    stations_df = spark.\
        createDataFrame(df.first()["stations"]).\
        select(col("id"), col("latitude").cast("float"), col("longitude").cast("float"))
    trafic_df = get_stop_trafic(spark, f"data/{month_ref}_movements.json")
    stops_df = stations_df.join(trafic_df, on=["id"], how="left_outer")
    return stops_df

def get_learning_df(df): # Genera el df de entrada del modelo de ML
    df = np.array(df.where(col("trafic").isNotNull()).collect())
    learning = []
    for i in df:
        row = n_closest_stops(df, (i[1], i[2]))
        row.append(i[3])
        row.insert(0, i[2])
        row.insert(0, i[1])
        learning.append(row)
    return spark.createDataFrame(pd.DataFrame(learning), schema=["lat", "lon", "dist*trafic1", "dist*trafic2", "dist*trafic3", "trafic"])
    
def n_closest_stops(df, pt, n=3): #Devuelve las n paradas que minimizan la distancia por el tráfico a cierto punto (pt) 
    df2 = np.copy(df)
    dists = lambda stop: [distance_sphere(pt, [stop[1], stop[2]]) * stop[3]]
    d = np.array([dists(i) for i in df2])
    d = d[d[:,0].argsort()]
    return d[1:n+1,:].flatten().tolist()
    
def get_min_max_lat_lon(df):
    min_lat = df.agg({"latitude":"min"}).head()["min(latitude)"]
    max_lat = df.agg({"latitude":"max"}).head()["max(latitude)"]
    min_lon = df.agg({"longitude":"min"}).head()["min(longitude)"]
    max_lon = df.agg({"longitude":"max"}).head()["max(longitude)"]
    return min_lat, max_lat, min_lon, max_lon

def pt_in_hull(pt, hull, eps=10**(-4)): # Evalúa si un punto está en el área de servicio de BiciMad
    return all((np.dot(eq[:-1], pt) + eq[-1] <= eps) for eq in hull.equations)

def generate_points(sc: SparkContext, n, df): # mins_maxs = (min_lat, max_lat, min_lon, max_lon)
    pts = []
    i = 0
    hull = ConvexHull(np.array(df.select(col("latitude"), col("longitude")).collect()))
    min_lat, max_lat, min_lon, max_lon = get_min_max_lat_lon(df)
    while i < n:
        pt = (random.uniform(min_lat, max_lat), random.uniform(min_lon, max_lon))
        if pt_in_hull(pt, hull) and pt not in pts:   #Consideramos que estén en el área de servicio de BiciMAD
            pts.append(pt)
            i += 1
    return sc.parallelize(pts)

def get_mesh(sc: SparkContext, n, df): # Genera un df con los puntos a evaluar
    pts = np.array(generate_points(sc, n, df).collect())
    df_ = np.array(df.where(col("trafic").isNotNull()).collect())
    testing = []
    for pt in pts:
        row = n_closest_stops(df_, pt)
        row.insert(0, pt[1])
        row.insert(0, pt[0])
        testing.append(row)
    return spark.createDataFrame(pd.DataFrame(testing), schema=["lat", "lon", "dist*trafic1", "dist*trafic2", "dist*trafic3"])

#### EJECUCIÓN DEL PROGRAMA

Comenzamos la sesión de spark.

In [19]:
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext

Creamos las paradas del dataset con sus coordenadas y su tráfico.

In [21]:
stops_df = get_stops(spark, "202106")
stops_df.show()

+---+---------+----------+------+
| id| latitude| longitude|trafic|
+---+---------+----------+------+
|  1|40.417213|-3.7018342|  3798|
|  2|40.417313| -3.701603|  1708|
|  3| 40.42059|-3.7058415|  3390|
|  4|40.430294| -3.706917|  3159|
|  5| 40.42855|-3.7025876|  2258|
|  6| 40.42852|  -3.70205|  4398|
|  7| 40.42415| -3.698447|  3525|
|  8| 40.42519|-3.6977715|  3530|
|  9|40.427868|-3.6954403|  6043|
| 10|40.415607|-3.7095084|  3585|
| 11| 40.42533|  -3.69214|  1946|
| 12| 40.42695|-3.7035918|  3200|
| 13|40.428425|-3.7061932|  6230|
| 14|40.427326|-3.7104416|  3294|
| 15|40.426094| -3.713479|  4524|
| 16| 40.42624|-3.7074454|  3438|
| 17|40.423073|-3.7075064|  5069|
| 18|40.423264|-3.7038312|  3207|
| 19|40.420776|-3.6996503|  5370|
| 20| 40.42186|-3.6954982|  2867|
+---+---------+----------+------+
only showing top 20 rows



Creamos el modelo de ml que aplicaremos a los datos existentes (paradas).

In [23]:
learning_df = get_learning_df(stops_df)
feature_assembler = VectorAssembler(inputCols=["lat", "lon", "dist*trafic1", "dist*trafic2", "dist*trafic3"], 
                                    outputCol="features")
output = feature_assembler.transform(learning_df)

Entrenamos el modelo anterior. Una vez hecho esto, comprobamos su eficiencia.

In [24]:
final_data = output.select(["features", "lat", "lon", "trafic"])
train_data, test_data = final_data.randomSplit([0.7, 0.3])

lr = LinearRegression(featuresCol="features", labelCol="trafic",
                      maxIter=10, regParam=0.3, elasticNetParam=0.8)
lr_model = lr.fit(train_data)


lr_predictions = lr_model.transform(test_data)
lr_predictions.show()
lr_evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="trafic", metricName="r2")
print(f"R Squared (R2) on test data = {lr_evaluator.evaluate(lr_predictions)}")

+--------------------+------------------+-------------------+------+------------------+
|            features|               lat|                lon|trafic|        prediction|
+--------------------+------------------+-------------------+------+------------------+
|[40.4202842712402...|40.420284271240234|-3.7081246376037598|5446.0|3667.7234927832615|
|[40.4229621887207...|  40.4229621887207| -3.655555248260498|1915.0|2474.4316554000834|
|[40.42919921875,-...|    40.42919921875| -3.696716785430908|3021.0| 3402.851403490524|
|[40.4334068298339...|40.433406829833984| -3.687915325164795|2452.0| 2607.692019357113|
|[40.4343338012695...| 40.43433380126953|-3.6835832595825195|3203.0|2774.2197304078145|
|[40.4371490478515...| 40.43714904785156|-3.6483585834503174|1642.0|1691.4386677766452|
|[40.4383697509765...| 40.43836975097656|  -3.69281005859375|2040.0|2564.3695159368217|
|[40.4431495666503...| 40.44314956665039| -3.657409906387329| 939.0|1624.8590102716116|
|[40.3972625732421...| 40.397262

Por último, evaluamos el modelo en una malla de 10000 puntos tomados de forma aleatoria dentro del área de servicio de BiciMAD.

In [25]:
mesh = get_mesh(sc, 10**5, stops_df)
mesh_test = feature_assembler.transform(mesh).select(["features", "lat", "lon"])
mesh_predictions = lr_model.transform(mesh_test)
mesh_predictions.show()

                                                                                

+--------------------+------------------+-------------------+------------------+
|            features|               lat|                lon|        prediction|
+--------------------+------------------+-------------------+------------------+
|[40.4035542630039...|40.403554263003905|-3.6937027740367383| 3616.657010823721|
|[40.4425364546863...| 40.44253645468634|-3.6806904698107186|2432.6586004258133|
|[40.4227744211040...| 40.42277442110405| -3.687217047982983| 3080.013405438978|
|[40.4440461526848...| 40.44404615268481|-3.6913689459366115|2666.2335425653728|
|[40.4542414824484...| 40.45424148244843|-3.7075525052614164|2984.3463198402897|
|[40.4227035425698...|40.422703542569806|-3.6795825056585776|  2515.83129288035|
|[40.4295104495267...| 40.42951044952676| -3.714135469234338| 3621.964012391516|
|[40.4010887084651...| 40.40108870846514|-3.6679126612089394|3240.0038968678564|
|[40.4436270297437...| 40.44362702974379|-3.6795720710359174|2222.1693609067006|
|[40.4496710901472...| 40.44

In [26]:
max_trafic_pred = mesh_predictions.agg({"prediction": "max"}).head()[0]
best_row = mesh_predictions.filter(col("prediction") == max_trafic_pred).head()
print(f"Un punto candidato a poner una nueva parada de Bicimad es ({best_row['lat']}, {best_row['lon']}) puesto que se estima que tenga un total de {best_row['prediction']} usos al mes")

Un punto candidato a poner una nueva parada de Bicimad es (40.40191282516034, -3.7204495853564508) puesto que se estima que tenga un total de 5150.759971562773 usos al mes


#### RESULTADOS Y CONCLUSIONES

Podemos observar que el punto devuelto por el programa se sitúa cerca del antiguo estadio Vicente Calderón. Ha ayudado mucho haber cursado la asignatura de Programación Declarativa puesto que la principal forma de trabajar con las estructuras de datos de pyspark es el orden superior. La práctica también nos ha permitido adentrarnos en el mundo del machine learning, algo muy presente hoy en día, y que nos será útil en un futuro. 