# Aplicando EDA al dataset Taxi Fare

Instalamos las dependencias necesarias para realizar el análisis

In [1]:
!pip install pandas
!pip install seaborn
!pip install tqdm



Importamos los módulos a utilizar posteriormente

```
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import time
import numpy as np

from multiprocessing import cpu_count, Pool
from math import radians, cos, sin, asin, sqrt
```

In [2]:
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import numpy as np
import time
from tqdm import tqdm

from multiprocessing import cpu_count, Pool
from math import radians, cos, sin, asin, sqrt

# Esto es solo para la visualización en Pycharm
sns.set_style("ticks")

Inicializamos algunas constantes a utilizar

* **FILE_PATH:** Contiene la ruta de nuestro dataset.
* **EARTH_RADIUS:** Es el valor promedio del radio de la Tierra en kilómetros.
* **CHUNK_SIZE:** Indica el tamaño del conjunto de datos que se procesará en cada iteración.
* **AVAILABLE_CPUS:** Tiene el número de threads que estarán disponibles en el uso de este cuaderno.
* **INITIAL_VALID_COLUMNS:** Tiene los nombres de las columnas que nos interesa cargar en memoria.
* **LIMIT_TO_TEST:** El número de chunks que usaremos para realizar algunos gráficos.

In [3]:
FILE_PATH = "train.csv"
EARTH_RADIUS = 6378.0  # Lo utilizamos en el cálculo de la fórmula de Haversine.
CHUNK_SIZE = 1500000
AVAILABLE_CPUS = cpu_count() - 1  # Disminuimos uno del total para evitar que la pc se queda inutilizable.
INITIAL_VALID_COLUMNS = ['fare_amount',
                         'pickup_datetime',
                         'pickup_longitude',
                         'pickup_latitude',
                         'dropoff_longitude',
                         'dropoff_latitude',
                         'passenger_count'
                         ]
LIMIT_TO_TEST = 0

FILE_PATH, EARTH_RADIUS, CHUNK_SIZE, AVAILABLE_CPUS

('train.csv', 6378.0, 150000, 6)

## Fórmula Haversine

Esta fórmula nos servirá para poder calcular la distancia entre 2 puntos geográficos.

$d=2r\sin^{-1}{\left(\sqrt{\sin^{2}{\left(\frac{\phi_2-\phi_1}{2}\right)}+\cos{(\phi_1)}\cos{(\phi_2)}\sin^{2}{\left(\frac{\lambda_2-\lambda_1}{2}\right)} }\right)}$

Está fórmula será implementada en la función `calculate_haversine_distance`, la cual recibe una columna de tuplas con
los puntos de latitud y longitud tanto de la posición en **pickup** como en **drop off**.

In [4]:
def calculate_haversine_distance(pickup_position, drop_off_position):
    pickup_lat, pickup_lng = pickup_position
    drop_off_lat, drop_off_lng = drop_off_position

    pickup_lat, pickup_lng, drop_off_lat, drop_off_lng = map(
        radians,
        (pickup_lat, pickup_lng, drop_off_lat, drop_off_lng)
    )

    lat_diff = drop_off_lat - pickup_lat
    lng_diff = drop_off_lng - pickup_lng

    distance = sin(lat_diff * 0.5) ** 2 + cos(pickup_lat) * cos(drop_off_lat) * sin(lng_diff * 0.5) ** 2

    return 2 * EARTH_RADIUS * asin(sqrt(distance))

### Obtener estados de hora y día

Necesitamos utilizar las los datos de `pickup_datetime`, para ello, extraeremos en qué día de la semana y hora del día
estamos. Usaremos los estados del día como día de la semana y fin de semana, que serán indicadores para predecir
nuestro `fare_amount`. Además, extraeremos la hora del día, y lo separamos en sus respectivos estados: Madrugada,
mañana, tarde y noche, que serán indicadores para predecir nuestro `fare_amount`.

In [5]:
# 0 si es fin de semana y 1 si es dia laboral
def get_day_status(day):
    weekends = ["Saturday", "Sunday"]
    if day in weekends:
        return 0
    else:
        return 1

# 0 si es madrugada, 1 si es mañana
# 2 si es tarde y 3 si es noche
def get_time_status(hour):
    hour_ = int(hour)
    if hour_ < 6:
        return 0
    elif hour_ < 12:
        return 1
    elif hour_ < 18:
        return 2
    else:
        return 3

## Procesamiento de la data

In [6]:
%%time

# Obtenemos el número de filas que tiene el archivo 'train.csv'
with open(FILE_PATH) as file:
    n_rows = len(file.readlines())

print("Current number of rows: {}".format(n_rows))

Current number of rows: 55423857
CPU times: user 10.6 s, sys: 6.56 s, total: 17.1 s
Wall time: 1min 58s


El dataset es enorme, tiene **55423857** registros sobre los precios de los taxis, dentro de este dataset tenemos casos
donde no existen ciertos registros, o donde tenemos valores bastante extraños, tales como precios negativos o registros
donde no existe ningún pasajero, estos registros pueden afectar enormemente los resultados de nuestro modelo, para ello
es que debemos procesar la data y eliminar estos valores de nuestro dataset.

Al ser un dataset tan grande el tiempo de cómputo necesario para procesar esta data en un solo hilo de nuestro
procesador es bastante alto, por lo que debemos paralelizar este proceso, y asegurarnos que se usen al máximo los
recursos que tenemos disponibles en nuestra computadora.

La librería **Pandas** nos ofrece utilizar un proceso llamado **chunking** que consiste en dividir un gran dataset en
pequeños trozos (**chunks**) esto lo logramos pasándole el parámetro `chunksize=(int)` al método `read_csv` de pandas,
tal como se observa en la siguiente celda.

In [7]:
df_chunks = pd.read_csv(FILE_PATH, chunksize=CHUNK_SIZE, usecols=INITIAL_VALID_COLUMNS)

En la celda anterior podemos observar que además de pasarle el parámetro `chunksize` al método `read_csv`.

Otro parámetro pasado al método fue `usecols` el cual nos sirve para indicar que columnas queremos cargar en memoria.
La columna `key` no es necesaria debido a que es una copia de la columna `pickup_datetime`, podríamos eliminarla
posteriormente pero en temas de memoria esa columna extra al ser del tipo `object` nos quita espacio innecesariamente.

### Función de paralelización

Como se mencionó anteriormente llevaremos el procesamiento del dataset de forma paralela para ello utilizaremos el
módulo multiprocessing de python, específicamente la clase Pool que nos permitirá asignar un proceso a cada uno de
los hilos disponibles.

In [8]:
# Variable que guardará todos los resultados procesados en cada iteración.
chunk_list = []

def parallelize_chunk_processing(chunk, func):

    # Dividimos cada chunk en partes más pequeñas que son las que serán procesadas por cada hilo del procesador
    chunk_split = np.array_split(chunk, AVAILABLE_CPUS)

    # Creamos un pool de n hilos donde n es el número asignado previamente a **AVAILABLE_CPUS**
    pool = Pool(AVAILABLE_CPUS)

    # Creamos un nuevo dataset a partir de los resultados procesados en cada hilo.
    chunk = pd.concat(pool.map(func, chunk_split))

    # Cerramos el pool y creamos una barrera con el método join
    pool.close()
    pool.join()

    chunk_list.append(chunk)

Una vez definida la función de paralelización procedemos con el tratamiento del dataset, tras varias pruebas decidimos
que se deben aplicar las siguientes modificaciones al dataset original.

* Eliminar los `nan` detectados por pandas al cargar el `chunk`
* Reemplazar los valores `zero` en las columnas que no deberían tener dicho valor con un valor nan, tales como
**pickup_longitude, pickup_latitude, dropoff_longitude, dropoff_latitude, passenger_count**.
* Obtener la distancia entre los puntos de **pickup** y **drop off**.
* Eliminar las columnas que ya no serán útiles y eliminar los valores `nan` creados por nosotros.
* Eliminar los precios negativos (**Campo `fare_amount`**)
* **TODO** Agregar más cosas a procesar, ya sea al final o entre los otros pasos.

Para todo esto definimos la función `process_chunk` que recibirá como parámetro el chunk de cada iteración.

In [9]:
def process_chunk(split_chunk: pd.DataFrame):
    # Eliminamos los valores nan encontrados por pandas
    split_chunk.dropna(inplace=True)

    # Volvemos **nan** aquellas columnas que no deben tener valores zero
    to_nan_columns = ["pickup_longitude", "pickup_latitude", "dropoff_longitude", "dropoff_latitude", "passenger_count"]
    split_chunk[to_nan_columns] = split_chunk[to_nan_columns].replace(0, np.nan)

    # Calculamos la distancia utilizando la fórmula Haversine
    split_chunk['distance'] = split_chunk.apply(lambda row: calculate_haversine_distance(
        pickup_position=(row["pickup_latitude"], row["pickup_longitude"]),
        drop_off_position=(row["dropoff_latitude"], row["dropoff_longitude"])), axis=1)

    # Convertimos el object a datetime para reducir el tiempo de ejecución al obtener el día y la hora 
    split_chunk["pickup_datetime"] = pd.to_datetime(split_chunk["pickup_datetime"], format='%Y-%m-%d %H:%M:%S UTC')

    # Extraemos los días de la semana del datetime con %A
    # Posteriormente aplicamos la función get_day_status para guardar el estado del día
    split_chunk['day_status'] = split_chunk.apply(
        lambda row: get_day_status(row.pickup_datetime.strftime("%A")),
        axis=1
    )

    # Extraemos la hora del día del datetime con %H.
    # Posteriormente aplicamos la función get_time_status para guardar el estado de la hora.
    split_chunk['time_status'] = split_chunk.apply(
        lambda row: get_time_status(row.pickup_datetime.strftime("%H")),
        axis=1
    )

    # Volvemos **nan** aquellas distancias con valor cero
    split_chunk["distance"] = split_chunk["distance"].replace(0, np.nan)

    split_chunk.dropna(inplace=True)
    split_chunk.drop(["pickup_longitude",
                      "pickup_latitude",
                      "dropoff_longitude",
                      "dropoff_latitude",
                      "pickup_datetime"],
                     axis=1,
                     inplace=True)

    split_chunk.drop(split_chunk[split_chunk.distance <= 0.1].index, inplace=True)
    split_chunk.drop(split_chunk[split_chunk.fare_amount <= 0].index, inplace=True)
    split_chunk.drop(
        split_chunk[(split_chunk.passenger_count <= 0) | (split_chunk.passenger_count >= 7)].index, inplace=True)

    return split_chunk

Ya tenemos definida la función que nos servirá para procesar el dataset, ahora deberemos iterar sobre el objeto
`df_chunks` que inicializamos anteriormente.

In [10]:
processed_count = 0

for df_chunk in tqdm(df_chunks, total=n_rows // CHUNK_SIZE):
    parallelize_chunk_processing(df_chunk, process_chunk)

    # Limitante para poder plotear algunas características
    if processed_count >= LIMIT_TO_TEST != 0:
        break

    processed_count += 1

  0%|          | 1/369 [00:53<5:25:08, 53.01s/it]


Cuando el proceso termine tendremos los resultados procesados en la variable `chunk_list` que es lo que utilizaremos
para crear un nuevo dataframe.

In [11]:
df = pd.concat(chunk_list)

df.head()

Unnamed: 0,fare_amount,passenger_count,distance,day_status,time_status
0,4.5,1.0,1.031896,1,2
1,16.9,1.0,8.459418,1,2
2,5.7,2.0,1.391052,1,0
3,7.7,1.0,2.802346,0,0
4,5.3,1.0,2.001353,1,1


In [12]:
df.describe()

Unnamed: 0,fare_amount,passenger_count,distance,day_status,time_status
count,288140.0,288140.0,288140.0,288140.0,288140.0
mean,11.319978,1.692365,4.351984,0.717162,1.848227
std,9.552434,1.308867,71.979275,0.450379,1.035528
min,0.01,1.0,0.100068,0.0,0.0
25%,6.0,1.0,1.296206,0.0,1.0
50%,8.5,1.0,2.198208,1.0,2.0
75%,12.5,2.0,3.981496,1.0,3.0
max,495.0,6.0,10954.538494,1.0,3.0


In [13]:
df.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 288140 entries, 0 to 299999
Data columns (total 5 columns):
 #   Column           Non-Null Count   Dtype  
---  ------           --------------   -----  
 0   fare_amount      288140 non-null  float64
 1   passenger_count  288140 non-null  float64
 2   distance         288140 non-null  float64
 3   day_status       288140 non-null  int64  
 4   time_status      288140 non-null  int64  
dtypes: float64(3), int64(2)
memory usage: 13.2 MB


In [14]:
df["passenger_count"].value_counts()

1.0    199890
2.0     42773
5.0     20556
3.0     12628
6.0      6183
4.0      6110
Name: passenger_count, dtype: int64

In [15]:
df["distance"].value_counts()

1.365033    2
0.571612    1
1.132652    1
1.030655    1
1.585141    1
           ..
1.390321    1
0.943104    1
5.541551    1
3.342721    1
1.324846    1
Name: distance, Length: 288139, dtype: int64