In [199]:
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import BooleanType, FloatType
from datetime import *
from settings import obtener_dia_semana, obtener_mes
import time

In [200]:
""" Configuramos Spark """
conf = SparkConf()
conf.setAppName("ProcesamientoDatos")
conf.setMaster("local[4]")

<pyspark.conf.SparkConf at 0x7fcb5c1a4630>

In [201]:
spark = SparkSession.builder.config(conf=conf).getOrCreate()

In [202]:
data = spark.read.format('parquet').load('grande.parquet/')

In [203]:
dia_elegido = obtener_dia_semana("Martes")

In [204]:
hora_fin = datetime.strptime("2013-01 00:30", "%Y-%m %H:%M")
hora_30 = (hora_fin - timedelta(minutes=30))
hora_15 = (hora_fin - timedelta(minutes=30))
tiempo_fin = datetime.strptime("2013-01 00:30", "%Y-%m %H:%M")


def comparar_media_hora(hora):
    """
        Metodo que filtra las horas de los registros para que concuerden
        con las horas de busqueda deseada
        :param hora: Timestamp completo
        :return: True si las horas del timestamp estan entre las deseadas
        False si lo contrario
    """
    if hora.time() <= hora_fin.time() and hora.time() >= hora_30.time():
        return True
    return False


def comparar_cuarto_hora(hora):
    """
        Metodo que filtra las horas de los registros para que concuerden
        con las horas de busqueda deseada
        :param hora: Timestamp completo
        :return: True si las horas del timestamp estan entre las deseadas
        False si lo contrario
    """
    if hora.time() <= hora_fin.time() and hora.time() >= hora_15.time():
        return True
    return False


def relevancia(fecha):
    """
        Metodo que da mas relevancia a los viajes mas cercanos a la
        fecha de busqueda deseada.
        Si la diferencia es menor a un mes de la fecha
        dada los registros tienen más relevancia
        :param fecha: Timestamp completo
        :return: 2 si el viaje esta cerca de la fecha deseada, 1 si no
    """
    diferencia = fecha - tiempo_fin
    if diferencia < timedelta(days=7) and diferencia > timedelta(days=-7):
        return 1.0
    elif diferencia < timedelta(days=14) and diferencia > timedelta(days=-14):
        return 0.75
    elif diferencia < timedelta(days=21) and diferencia > timedelta(days=-21):
        return 0.5
    elif diferencia < timedelta(days=-28) and diferencia > timedelta(days=-28):
        return 0.25
    else:
        return 0

comprobar_media_hora = udf(comparar_media_hora, BooleanType())
comprobar_cuarto_hora = udf(comparar_cuarto_hora, BooleanType())
calcular_relevancia = udf(relevancia, FloatType())

### Consulta 2: Zonas que más beneficios pueden procurar

En esta segunda búsqueda lo que vamos a obtener es las 10 zonas que más beneficios pueden generar en el momento de la búsqueda (hora introducida). La cantidad de benficio de una zona consigue dividiendo el beneficio de una zona por la catidad de taxis vacios que hay en la zona.

El beneficio de una zona es la media de la suma de la tarifa y la propina de los viajes que han acabado en los últimos 15 minutos y empezaron en esa zona. 

Por otro lado, para el número de taxis, se tendrán en cuenta el número de taxis vacíos en una zona durante los 30 minutos anteriores. 

    hora_subida, hora_bajada, area_mas_beneficiosa_1, ..., area_mas_beneficiosa_10, tiempo_ejecucion

Vamos a empezar con los taxis vacíos en una zona.

- Primero filtraremos los viajes acabados en la última media hora y nos quedamos con el último viaje acabado.
- Ahora buscamos el último viaje empezado por los taxis en esa franja de tiempo

Prueba de que nos quedamos con el último viaje acabado de cada taxi, eliminando si ha hecho más de un viaje.

In [205]:
tripsDown = data.filter(data.dia_semana == dia_elegido) \
    .filter(comprobar_media_hora(data.hora_bajada))

In [206]:
tripsUp = data.filter(data.dia_semana == dia_elegido) \
    .filter(comprobar_media_hora(data.hora_subida))

In [207]:
bueno30Down = tripsDown.select("medallon", "hora_bajada", "cuad_latitud_bajada", "cuad_longitud_bajada") \
    .orderBy("hora_bajada", ascending=False) \
    .dropDuplicates(subset=["medallon"])
bueno30Up = tripsUp.select("medallon", "hora_subida").orderBy("hora_subida", ascending=False) \
    .dropDuplicates(subset=["medallon"]).withColumnRenamed("medallon", "taxi")

In [208]:
#spark falla join si el nombre de la columna es el mismo, renombrado a taxi
joined = bueno30Down.join(bueno30Up, bueno30Down.medallon == bueno30Up.taxi, "leftouter") \
    .select("medallon", "hora_bajada", "hora_subida", "cuad_latitud_bajada", "cuad_longitud_bajada")

In [209]:
estado_taxis = joined.select(joined.medallon, joined.cuad_latitud_bajada, \
    joined.cuad_longitud_bajada, joined.hora_bajada, \
    when(joined.hora_subida > joined.hora_bajada, 1).otherwise(0).alias("taxi_ocupado"))

Añadimos el factor de influencia

In [210]:
taxis_filtrados = estado_taxis.filter(estado_taxis.taxi_ocupado == 0) \
    .withColumn("influencia", calcular_relevancia(estado_taxis.hora_bajada))

In [211]:
taxis_filtrados.show()

+--------------------+-------------------+--------------------+--------------------+------------+----------+
|            medallon|cuad_latitud_bajada|cuad_longitud_bajada|         hora_bajada|taxi_ocupado|influencia|
+--------------------+-------------------+--------------------+--------------------+------------+----------+
|D563F5CC514A87541...|                170|                 152|2013-01-16 00:15:...|           0|       0.5|
|7550D0BD520A691EC...|                148|                 160|2013-01-16 00:13:...|           0|       0.5|
|59DF6039EC312EE6D...|                164|                 157|2013-01-16 00:08:...|           0|       0.5|
|BF46B95E44ED3BE1B...|                154|                 162|2013-01-16 00:07:...|           0|       0.5|
|DA350783B6954CC67...|                162|                 158|2013-01-16 00:04:...|           0|       0.5|
|73039762E0F4B253E...|                170|                 162|2013-01-16 00:02:...|           0|       0.5|
|911B6F71706854496.

In [212]:
taxis_libres = taxis_filtrados.groupBy("cuad_latitud_bajada", "cuad_longitud_bajada").count() \
    .select(col("cuad_latitud_bajada"), col("cuad_longitud_bajada"), col("count").alias("taxis_libres"))

In [213]:
influencia_taxis_libres = taxis_filtrados.groupBy("cuad_latitud_bajada", "cuad_longitud_bajada") \
    .avg("influencia") \
    .select(col("cuad_latitud_bajada").alias("latitud"), \
            col("cuad_longitud_bajada").alias("longitud"), \
            col("avg(influencia)").alias("influencia"))

In [214]:
taxis_libres.show()
influencia_taxis_libres.show()

+-------------------+--------------------+------------+
|cuad_latitud_bajada|cuad_longitud_bajada|taxis_libres|
+-------------------+--------------------+------------+
|                178|                 160|           3|
|                171|                 152|          19|
|                163|                 172|           3|
|                151|                 163|           7|
|                181|                 155|           2|
|                172|                 166|           2|
|                144|                 166|           1|
|                158|                 188|           1|
|                163|                 153|           8|
|                165|                 157|          21|
|                171|                 159|           8|
|                179|                 179|           1|
|                178|                 157|           1|
|                156|                 167|           2|
|                149|                 163|      

In [215]:
condition = [taxis_libres.cuad_latitud_bajada == influencia_taxis_libres.latitud, \
             taxis_libres.cuad_longitud_bajada == influencia_taxis_libres.longitud]

In [229]:
taxis_libres_prop = taxis_libres.join(influencia_taxis_libres, condition) \
    .select(col("cuad_latitud_bajada"), col("cuad_longitud_bajada"), \
            round(col("taxis_libres") * col("influencia")).alias("proporcion_taxis_libres"))

In [227]:
taxis_libres_prop.show()

+-------------------+--------------------+-----------------------+
|cuad_latitud_bajada|cuad_longitud_bajada|proporcion_taxis_libres|
+-------------------+--------------------+-----------------------+
|                178|                 160|                    2.0|
|                151|                 163|                    5.0|
|                163|                 172|                    3.0|
|                171|                 152|                   11.0|
|                144|                 166|                    1.0|
|                172|                 166|                    2.0|
|                181|                 155|                    2.0|
|                158|                 188|                    1.0|
|                163|                 153|                    6.0|
|                165|                 157|                   15.0|
|                171|                 159|                    6.0|
|                179|                 179|                    

### Beneficio zona

- Acotamos a viajes de los últimos 15 min
- Agrupamos por zona de inicio del viaje
- Calculamos la media de la tarifa y de la propina de cada zona
- Sumamos cada media y obtenemos el beneficio medio de las zonas

In [218]:
trips15 = tripsDown.filter(comprobar_cuarto_hora(data.hora_bajada)) \
    .withColumn("influencia", calcular_relevancia(estado_taxis.hora_bajada))

In [219]:
beneficios_prueba = trips15.groupBy("cuad_latitud_subida", "cuad_longitud_subida") \
    .avg("tarifa", "propina", "influencia") \
    .select(col("cuad_latitud_subida"), col("cuad_longitud_subida"), \
            (col("avg(tarifa)") + col("avg(propina)")).alias("beneficios"), \
            col("avg(influencia)").alias("influencia"))

In [231]:
beneficios = trips15.groupBy("cuad_latitud_subida", "cuad_longitud_subida") \
    .avg("tarifa", "propina", "influencia") \
    .select(col("cuad_latitud_subida"), col("cuad_longitud_subida"), \
        ((col("avg(tarifa)") + col("avg(propina)")) * col("avg(influencia)")).alias("beneficios"))

In [232]:
beneficios.show()

+-------------------+--------------------+------------------+
|cuad_latitud_subida|cuad_longitud_subida|        beneficios|
+-------------------+--------------------+------------------+
|                151|                 163|            14.687|
|                171|                 152|      14.374140625|
|                163|                 172|               5.0|
|                172|                 166|         12.833333|
|                161|                 162|               9.5|
|                153|                 161| 7.135714285714286|
|                150|                 165|              19.5|
|                165|                 157| 9.376285232558141|
|                163|                 153| 9.506316685185185|
|                171|                 159|              10.0|
|                178|                 157|          10.51875|
|                149|                 163|             15.87|
|                169|                 154| 9.957118755434783|
|       

In [225]:
beneficios_prueba.show()

+-------------------+--------------------+----------+------------------+
|cuad_latitud_subida|cuad_longitud_subida|beneficios|        influencia|
+-------------------+--------------------+----------+------------------+
|                151|                 163| 15.460000|              0.95|
|                171|                 152| 17.691250|            0.8125|
|                163|                 172| 10.000000|               0.5|
|                172|                 166| 12.833333|               1.0|
|                161|                 162|  9.500000|               1.0|
|                153|                 161|  7.400000|0.9642857142857143|
|                150|                 165| 19.500000|               1.0|
|                165|                 157| 10.404652|0.9011627906976745|
|                163|                 153| 11.282222|0.8425925925925926|
|                171|                 159| 10.000000|               1.0|
|                178|                 157| 14.02500

Aquí unimos ambas tablas, sumando 1 a los taxis libres para evitar la división entre 0 y sumar la repartición de beneficios que tendría que hacer el taxista con el compañero

In [236]:
condicion = [beneficios.cuad_latitud_subida == taxis_libres.cuad_latitud_bajada, \
             beneficios.cuad_longitud_subida == taxis_libres.cuad_longitud_bajada]
profitable = beneficios.join(taxis_libres_prop, condicion, "leftouter") \
    .select(col("cuad_latitud_subida").alias("cuad_latitud"), \
            col("cuad_longitud_subida").alias("cuad_longitud"), \
            (col("beneficios") / col("proporcion_taxis_libres")).alias("beneficio")) \
    .orderBy("beneficio", ascending=False)

In [237]:
profitable.show()

+------------+-------------+------------------+
|cuad_latitud|cuad_longitud|         beneficio|
+------------+-------------+------------------+
|         186|          189|        48.8588065|
|         165|          174|              43.3|
|         185|          190|37.285379999999996|
|         168|          178|              28.5|
|         164|          162|          26.46875|
|         170|          157|              21.0|
|         148|          161|            20.324|
|         157|          168|             19.75|
|         168|          161|             19.75|
|         147|          164|             19.25|
|         171|          154|              17.0|
|         180|          155|              17.0|
|         145|          162|             16.75|
|         149|          163|             15.87|
|         162|          169|              15.5|
|         158|          174|              14.0|
|         187|          159|          13.34375|
|         176|          158|            