# Imports

In [None]:
import sys, os
is_conda = os.path.exists(os.path.join(sys.prefix, 'conda-meta'))

if not is_conda:
    import findspark 
    findspark.init()

from pyspark.sql import SparkSession
# import pandas as pd
# import numpy as np
import matplotlib.pyplot as plt
from datetime import datetime
from pyspark.sql.functions import col, datediff, unix_timestamp
import csv
from IPython.core.display import display, HTML
from IPython.display import IFrame
from collections import defaultdict

# Para una lectura más distendida de la memoria
MODO_JAJAS = True

# Lectura de datos

In [None]:
if MODO_JAJAS:
    display(IFrame("https://giphy.com/embed/I1U9DTjCqOF3i",width="240", height="135"))

In [None]:
spark = SparkSession.builder.appName("taxis").master("local[*]").getOrCreate()
df = spark.read.csv('./tripdata_2017_01.csv', header=True, inferSchema=True)

In [None]:
df.printSchema()
dfP=df.toPandas()

# Limpieza de datos

In [None]:
display(dfP)
display(dfP.describe().T)

In [None]:
if MODO_JAJAS:
    display(IFrame("https://giphy.com/embed/xsATxBQfeKHCg", width="240", height="180"))

### Elementos extraños en el dataset

Lista de comportamientos extraños en los datos, y por tanto, inválidos a la hora de utilizar datos que deberían ser coherentes basándonos en la información de cada campo proporcionada por la [documentación](https://www1.nyc.gov/assets/tlc/downloads/pdf/data_dictionary_trip_records_yellow.pdf)

* Existen carreras en las que la distancia es 0
* Existen propinas negativas
* "extra" con valores diferentes a 0 (ya que puede no haber extras), 0.5 y 1
* Existen viajes con un precio final negativo
* "MTA_tax" debe valer siempre 0.50. Valores diferentes son erróneos, y por tanto puede que el resto de la información también
    * De forma similar, "Improvement_surcharge" no debe valer menos de 0.30
* Carreras cuya fecha de fin sea igual o anterior a la fecha de inicio
* Existen tarifas con valores negativos. No tiene sentido ya que la tarifa va en función del tiempo y la distancia recorridas
* "Improvement_surcharge" es un valor en desuso, por lo que debería valer en el menor caso 0, no -0.3

### Elementos extraños PERO posibles

* Número de pasajeros es 0. Dado que es un valor que introduce el propio conductor, muy probablemente le de bastante igual introducir bien el valor.
* Un viaje empieza y acaba en la misma zona.



### Limpieza realizada

A partir de los comportamientos observados se ha procedido a eliminar las carreras que cumplen las siguientes condiciones:

- Campo "tip_amount" con valores menor a 0
- Campo "total_amount" con valores menor o igual a 0
- Campo "trip_distance" con valores menor o igual a 0
- Campo "fare_amount" con valores menor o igual a 0
- Campo "extra" con valores diferentes de 0, 0.5 y 1
- Campo "MTA_tax" con valor distinto de 0.5
- Campo "Improvement_surcharge" con valor distinto de 0 o 0.3
- Campo "tpep_dropoff_datetime" es anterior o igual a "tpep_pickup_datetime"

In [None]:
# Convertimos las fechas a timestamp, para que dejen de ser strings a secas
# y guardamos su diferencia para luego tener más fácil el filtrado y otros cálculos

df = df.withColumn(
    "tpep_pickup_timestamp", unix_timestamp(col("tpep_pickup_datetime").cast("timestamp"))
).withColumn(
    "tpep_dropoff_timestamp", unix_timestamp(col("tpep_dropoff_datetime").cast("timestamp"))
).withColumn(
    "time_diff", col("tpep_dropoff_timestamp") - col("tpep_pickup_timestamp")  # Segundos
)

df.createOrReplaceTempView('datosCarreras')
# display(df.toPandas())

In [None]:
datosLimpios = spark.sql("""
    SELECT * FROM datosCarreras WHERE
        tip_amount >= 0 AND
        total_amount > 0 AND
        trip_distance > 0 AND
        fare_amount > 0 AND
        (extra == 0 OR extra == 0.5 OR extra == 1) AND
        mta_tax == 0.5 AND
        improvement_surcharge >= 0 AND
        time_diff > 0
""")
# print(datosLimpios.count())
datosLimpios.createOrReplaceTempView('datosCarrerasLimpios')
# datosLimpiosP = datosLimpios.toPandas()

In [None]:
# display(datosLimpiosP)
# display(datosLimpiosP.describe().T)

## Extracción de información

Ahora que ya hemos limpiado los datos y tenemos entradas coherentes, se puede proceder a extraer información de los mismos. 

La información que se va a extraer es:

* Velocidad media de los taxis en función de la hora.
* Viajes en taxi más comunes
* Registros financieros (propinas, personas, etc.)
    * Timos a turistas
    * Propinas en función de la hora
    * Identificar pasajeros borrachos
* Zonas con poca cobertura



In [None]:
# Bajamos el csv con la información de las zonas para luego poder "traducir"
if not os.path.exists("taxi+_zone_lookup.csv"):
    !wget https://s3.amazonaws.com/nyc-tlc/misc/taxi+_zone_lookup.csv

In [None]:
# Se añade al dataframe los datos de las zonas de subida y bajada. Para ello se va a realizar un join,
# De forma que vaya dentro del dataframe y es accesible también desde un rdd

df_lookup = spark.read.csv('./taxi+_zone_lookup.csv', header=True, inferSchema=False)

datosLimpios = datosLimpios.withColumn("LocationID", col("PULocationID")
).join(
    # Renombramos para poder luego incluir también las de bajada
    df_lookup.withColumnRenamed("Borough", "PUBorough"
            ).withColumnRenamed("Zone", "PUZone"
            ).withColumnRenamed("service_zone", "PUservice_zone"),
    on=['LocationID']
).withColumn("LocationID", col("DOLocationID")
).join(
    df_lookup.withColumnRenamed("Borough", "DOBorough"
            ).withColumnRenamed("Zone", "DOZone"
            ).withColumnRenamed("service_zone", "DOservice_zone"),
    on=['LocationID']
)

datosLimpios.createOrReplaceTempView('datosCarrerasLimpios')

# datosLimpiosP = datosLimpios.toPandas()
# display(datosLimpiosP)
# display(datosLimpiosP.describe().T)

### Velocidad media de los taxis

En este apartado se realizará un análisis de la velocidad media de los taxis, para ello se realizará una transformación de millas a metros sabiendo que 1 milla = 1609.344 metros luego dividiéndolo entre la diferencia de tiempo calculada previamente.

In [None]:
dfMTS = datosLimpios.withColumn(
    "mean_speed", col("trip_distance")*1609.344/col("time_diff")
)

In [None]:
dfMTSP = dfMTS.toPandas()
display(dfMTSP.sort_values(by=["mean_speed"],ascending=False).head(50))
display(dfMTSP.describe().T)

En vista de que las velocidades medias estaban mal y esto, como se puede ver en la tabla, es debido a que el time_diff es muy bajo, probablemente por un error de los tiempos almacenador por los taxistas, por lo tanto se volverá a realizar una consulta eliminando tiempos menores a 3 minutos y se comprobará las velocidades promedio otra vez.


In [None]:
datosLimpiosSinVelocidades = spark.sql("SELECT * FROM datosCarrerasLimpios where time_diff >= 180")
dfMTS = datosLimpiosSinVelocidades.withColumn(
    "mean_speed", col("trip_distance")*1609.344/col("time_diff")
)

In [None]:
resultsTolls = spark.sql("SELECT * FROM datosCarrerasLimpios where tolls_amount > 100").toPandas()
display(resultsTolls)
display(resultsTolls.describe().T)

In [None]:
resultsTimos = spark.sql("SELECT * FROM datosCarrerasLimpios where PULocationID == DOLocationID").toPandas()

In [None]:
dfMTSP = dfMTS.toPandas()
display(dfMTSP.sort_values(by=["mean_speed"],ascending=False).head(50))
display(dfMTSP.describe().T)


### [referenica a velocidades medias]

Como se puede observar, la mayoría de velocidades entre las 50 más rápidas superan el límite de velocidad nacional para zonas de carretera (24.72222 m/s) siendo que solo los 5 últimos lo cumplen, o en otras palabras que los 45 primeros infringen la ley.

Por otro lado se puede ver que los 8 primeros tienen velocidades mayores a 52 metros por segundo, lo que implica velocidades de 187.2 km/s esto puede ser debido a que haya algún tipo de fallo en el tiempo o que lleve velocidades demasiado altas.

Por último mencionar que los 6 primeros tienen velocidades mayores a 65 m/s, cosa que ya debe ser debido a un fallo, accidental o adrede por parte del conductor.


### Zonas de poca cobertura

Observando la variable `store_and_fwd_flag`, creemos que es posible deducir qué zonas de la ciudad de Nueva York dan un mayor problema a la hora de estar conectados con el servidor de la compañía de taxis, es decir, tienen poca cobertura.

In [None]:
if MODO_JAJAS:
    display(IFrame("https://giphy.com/embed/PmdOx0iRRtqkBFlEgI", width="240", height="240"))

In [None]:
sinCobertura_rdd = spark.sql("""
    SELECT DOBorough, DOZone
      FROM datosCarrerasLimpios
      WHERE store_and_fwd_flag == 'Y'
""").rdd

# sC_rdd.flatMap(lambda x: x['locationID']).map(lambda x: (x,1))
zone_tuples = sinCobertura_rdd.map(
    lambda x: (x['DOZone'],1)
).reduceByKey(
    lambda x,y: x+y
).sortBy(
    lambda x: x[1], False
)
borough_tuples = sinCobertura_rdd.map(
    lambda x: (x['DOBorough'],1)
).reduceByKey(
    lambda x,y: x+y
).sortBy(
    lambda x: x[1], False
)

In [None]:
def autolabel(rects):
    # https://matplotlib.org/3.1.1/gallery/lines_bars_and_markers/barchart.html#sphx-glr-gallery-lines-bars-and-markers-barchart-py
    """Attach a text label above each bar in *rects*, displaying its height."""
    for rect in rects:
        height = rect.get_height()
        ax.annotate('{}'.format(height),
                    xy=(rect.get_x() + rect.get_width() / 2, height),
                    xytext=(0, 3),  # 3 points vertical offset
                    textcoords="offset points", fontsize=25,
                    ha='center', va='bottom')

In [None]:
zonas = zone_tuples.take(10)
distritos = borough_tuples.take(10)

distritos_x = [k for k, v in distritos]
distritos_y = [v for k, v in distritos]
zonas_x = [k for k, v in zonas]
zonas_y = [v for k, v in zonas]

fig = plt.figure(figsize=(30, 10))

ax = fig.add_subplot(1, 2, 1)
rects1 = ax.bar(distritos_x, distritos_y)
ax.set_xlabel("Distrito", fontsize=25)
ax.set_ylabel("Registros guardados", fontsize=25)
ax.set_title("Registros guardados por distritos", fontsize=25)
plt.xticks(rotation=90, fontsize=25)
plt.yticks(fontsize=25)
autolabel(rects1)

ax = fig.add_subplot(1, 2, 2)
rects2 = ax.bar(zonas_x, zonas_y)
ax.set_xlabel("Zona", fontsize=25)
ax.set_ylabel("Registros guardados", fontsize=25)
ax.set_title("Registros guardados por zonas", fontsize=25)
plt.xticks(rotation=90, fontsize=25)
plt.yticks(fontsize=25)
autolabel(rects2)

plt.show()

In [None]:
viajes_manhattan = spark.sql("""
    SELECT VendorID
      FROM datosCarrerasLimpios
      WHERE DOBorough == 'Manhattan'
""").count()
int(distritos_y[0])/int(viajes_manhattan)*100

Como se puede apreciar por el número de registros guardados en los taxis, Manhattan es el distrito que tiene más registros guardados con 2490, casi 5 veces más que su distrito posterior, Queens con 435. Por lo que se puede deducir que tiene zonas frecuentadas en las que no se tiene cobertura.

Sin embargo, es un porcentaje pequeño sobre el total, ya que representan menos del 0.3% de todos los viajes existentes que finalizan en Manhattan, por lo que muy probablemente se deba a taxis concretos que tienen problemas de conexión.

Observando que se trata de un porcentaje muy bajo, surge otra pregunta: ¿En qué momento se guardaron los registros? ¿Todos los registros guardados son en horas similares y por tanto puede deberse a una caída del servidor más que a un problema de cobertura?

Para ello se van a observar los registros de Manhattan ya que son los más numerosos.

In [None]:
stored_manhattan = spark.sql("""
    SELECT tpep_dropoff_datetime, DAY(tpep_dropoff_datetime) as day, HOUR(tpep_dropoff_datetime) as hour
      FROM datosCarrerasLimpios
      WHERE store_and_fwd_flag == 'Y' and DOBorough == 'Manhattan'
""")

# Nos vamos a quedar con el día con mayor número de registros guardados
day, count = stored_manhattan.groupBy("day").count().sort("count").tail(1)[0]

# Ahora que tenemos el día, podemos ver las horas en las que se agruparon los mensajes.
# Si todos (o casi todos) los mensajes se agruparon en la misma hora, eso significa 
# (muy probablemente) que hubo una caída del servidor
mensajes_hora = stored_manhattan.filter(col("day")==day).groupBy("hour").count().collect()

mensajes_x = [h for h, cnt in mensajes_hora]
mensajes_y = [cnt for h, cnt in mensajes_hora]

fig = plt.figure(figsize=(30, 10))

ax = fig.add_subplot(1, 1, 1)
rects1 = ax.bar(mensajes_x, mensajes_y)
ax.set_xlabel("Horas", fontsize=25)
ax.set_ylabel("Mensajes", fontsize=25)
ax.set_title("Mensajes guardados por hora", fontsize=25)
plt.xticks(rotation=90, fontsize=25)
plt.yticks(fontsize=25)
autolabel(rects1)

plt.show()

Los mensajes guardados se agrupan entre las 10 AM y las 21 PM, pero dado que se corresponde también con el periodo de actividad de los taxis, porque es cuando la gran mayoría de la población se desplaza por la ciudad, no parece que se deba a una caída del servidor.

Por lo tanto nos reafirmamos en la primera suposición: **Muy probablemente se debe a taxis concretos que tienen problemas de conexión**, no ha una caída generalizada. Además si hubiese sido ese el caso, el número de registros debería haber sido muchísimo mayor

### Viajes más comunes

# Fin

In [None]:
spark.stop()

IDEAS

propinas / hora

Timos

- Vueltas de mas en misma zona
- Tolls valores raros
- Diferencias exageradas de distancias para pares de datos con mismo origen y destino

Velocidad media de los taxis en función de la hora.

Viajes en taxi más comunes

Registros financieros (propinas, personas, etc.)

Zonas sin cobertura a partir del parámetro Store_and_fwd_flag

Fare_amount frente a time_diff y trip distance, infracciones de ley
