# Práctica 1

Práctica 1 de Computación de Altas Prestaciones

**ONLY MEASURE TIMES FOR ACTIONS, NOT TRANSFORMATIONS.**

## Preparación

In [1]:
import os
import time
import pandas as pd

import pyspark

In [2]:
import findspark
findspark.init()

In [3]:
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf

conf = SparkConf().set('spark.ui.port', '4050')
sc = SparkContext(conf=conf)
spark = SparkSession.builder.master('local[*]').getOrCreate()  # * is number of cores, can go up to 16 for my laptop

spark

## Datos

Cargamos los datos

In [4]:
from typing import Union, List


def file_belongs_to_included_years(file: str, included_years: List[int]):
    for included_year in included_years:
        if str(included_year) in file:
            return True
    return False

In [5]:
# Load all the information
start = time.time()
# taxi_info = spark.read.csv("/content/drive/MyDrive/MASTER'S YEAR/Computación de Altas Prestaciones/P1/data/").cache()

files_to_include = [f"data/{x}" for x in os.listdir("data") if x.endswith(".parquet")]
included_years = [x for x in range(2022, 2023)]
files_to_include = [file for file in files_to_include if file_belongs_to_included_years(file, included_years)]

taxi_info = spark.read.format("parquet").option("inferSchema", "true").option("timestampFormat","yyyy-MM-dd HH:mm:ss").option("header", "true").option("mode", "DROPMALFORMED").load(files_to_include)
taxi_info_count = taxi_info.count()
end = time.time()

In [6]:
print(f"Count: {taxi_info_count}\nTime: {end-start}s")

Count: 19817583
Time: 3.1660068035125732s


In [7]:
taxi_info.printSchema()

root
 |-- VendorID: long (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: double (nullable = true)



In [8]:
# Prep SQL of raw dirty taxi_info
taxi_info.createOrReplaceTempView('taxi_info')
taxi_info_sql = spark.sql("SELECT * FROM taxi_info")
taxi_info_sql.show()

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|airport_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|       1| 2022-01-01 01:35:40|  2022-01-01 01:53:29|            2.0|          3.8|       1.0|                 N|         142|         236|           1|       14.5|  3.0|    0.5|      3.6

## Limpieza inicial

Para el cleanup inicial, usaremos Dataframe por su facilidad de uso. Una vez se obtenga un Dataframe limpio, se hará la conversión a RDD y SQL para empezar las pruebas.


### Análisis
Antes, es necesario ver una descripción general de todas las columnas

**Propina**

In [9]:
# Maybe the negative is due to coupons or discounts, or maybe error for driving too much.
taxi_info.select("tip_amount").describe().show()

+-------+------------------+
|summary|        tip_amount|
+-------+------------------+
|  count|          19817583|
|   mean| 2.662275749267665|
| stddev|3.1830746114616693|
|    min|            -410.0|
|    max|           1400.16|
+-------+------------------+



**Distancias**

The distances can't be 0 or less than 0.

Don't forget the distance units

In [10]:
taxi_info.select("trip_distance").describe().show()

+-------+-----------------+
|summary|    trip_distance|
+-------+-----------------+
|  count|         19817583|
|   mean| 5.94539621860001|
| stddev|606.3143426362707|
|    min|              0.0|
|    max|        357192.65|
+-------+-----------------+



**Tiempo de viaje**

Travel time cannot be negative. That is, the dropoff time cannot be before the pickup time.

In [11]:
times = taxi_info.select(["tpep_pickup_datetime", "tpep_dropoff_datetime"])
times.filter(times.tpep_pickup_datetime > times.tpep_dropoff_datetime).head(10)  # Showing rows with WRONG info
# taxi_info.filter(taxi_info.tpep_pickup_datetime < taxi_info.tpep_dropoff_datetime)  # Todos los que tienen los tiempos que tienen sentido

[Row(tpep_pickup_datetime=datetime.datetime(2022, 1, 24, 16, 23, 1), tpep_dropoff_datetime=datetime.datetime(2022, 1, 22, 7, 0, 37)),
 Row(tpep_pickup_datetime=datetime.datetime(2022, 1, 1, 2, 1, 54), tpep_dropoff_datetime=datetime.datetime(2022, 1, 1, 2, 1, 36)),
 Row(tpep_pickup_datetime=datetime.datetime(2022, 1, 1, 2, 1, 44), tpep_dropoff_datetime=datetime.datetime(2022, 1, 1, 2, 1, 20)),
 Row(tpep_pickup_datetime=datetime.datetime(2022, 1, 1, 5, 1, 37), tpep_dropoff_datetime=datetime.datetime(2022, 1, 1, 5, 1, 19)),
 Row(tpep_pickup_datetime=datetime.datetime(2022, 1, 1, 5, 1, 23), tpep_dropoff_datetime=datetime.datetime(2022, 1, 1, 5, 1, 12)),
 Row(tpep_pickup_datetime=datetime.datetime(2022, 1, 1, 7, 1, 30), tpep_dropoff_datetime=datetime.datetime(2022, 1, 1, 7, 1, 22)),
 Row(tpep_pickup_datetime=datetime.datetime(2022, 1, 1, 7, 1, 58), tpep_dropoff_datetime=datetime.datetime(2022, 1, 1, 7, 1, 16)),
 Row(tpep_pickup_datetime=datetime.datetime(2022, 1, 1, 12, 1, 34), tpep_dropoff

**Número de pasajeros**

The number of passengers must be greater than 0, it makes no sense to transport no passengers.

In [12]:
taxi_info.select(["passenger_count"]).describe().show()

+-------+------------------+
|summary|   passenger_count|
+-------+------------------+
|  count|          19145682|
|   mean|1.3967362980331544|
| stddev|0.9723353471512736|
|    min|               0.0|
|    max|               9.0|
+-------+------------------+



**RateCodeID**

The final rate code in effect at the end of the trip.

Check that all the codes are less than 6.

Some have RatecodeID=99, which is wrong.

In [13]:
taxi_info.select(["RatecodeID"]).describe().show()

+-------+------------------+
|summary|        RatecodeID|
+-------+------------------+
|  count|          19145682|
|   mean|1.4064600049243479|
| stddev|5.7232994134820165|
|    min|               1.0|
|    max|              99.0|
+-------+------------------+



In [14]:
valid_rateIDs = [0, 0.5, 1]
taxi_info.filter(taxi_info.RatecodeID.isin(valid_rateIDs))

DataFrame[VendorID: bigint, tpep_pickup_datetime: timestamp, tpep_dropoff_datetime: timestamp, passenger_count: double, trip_distance: double, RatecodeID: double, store_and_fwd_flag: string, PULocationID: bigint, DOLocationID: bigint, payment_type: bigint, fare_amount: double, extra: double, mta_tax: double, tip_amount: double, tolls_amount: double, improvement_surcharge: double, total_amount: double, congestion_surcharge: double, airport_fee: double]

**Store and Forward flag**

Only Y and N values are allowed

In [15]:
taxi_info.select(["store_and_fwd_flag"]).describe().show()

+-------+------------------+
|summary|store_and_fwd_flag|
+-------+------------------+
|  count|          19145682|
|   mean|              null|
| stddev|              null|
|    min|                 N|
|    max|                 Y|
+-------+------------------+



In [16]:
valid_flags = ["Y", "N"]
taxi_info.filter(taxi_info.store_and_fwd_flag.isin(valid_flags))

DataFrame[VendorID: bigint, tpep_pickup_datetime: timestamp, tpep_dropoff_datetime: timestamp, passenger_count: double, trip_distance: double, RatecodeID: double, store_and_fwd_flag: string, PULocationID: bigint, DOLocationID: bigint, payment_type: bigint, fare_amount: double, extra: double, mta_tax: double, tip_amount: double, tolls_amount: double, improvement_surcharge: double, total_amount: double, congestion_surcharge: double, airport_fee: double]

**Tipos de pago**

Los únicos tipos de pago válidos son [1, 2, 3, 4, 5, 6]

In [17]:
taxi_info.select(["payment_type"]).describe().show()

+-------+------------------+
|summary|      payment_type|
+-------+------------------+
|  count|          19817583|
|   mean| 1.183228247359933|
| stddev|0.5026529181517688|
|    min|                 0|
|    max|                 5|
+-------+------------------+



In [18]:
valid_payment_types = [x for x in range(1, 7)]
taxi_info.filter(taxi_info.payment_type.isin(valid_payment_types))

DataFrame[VendorID: bigint, tpep_pickup_datetime: timestamp, tpep_dropoff_datetime: timestamp, passenger_count: double, trip_distance: double, RatecodeID: double, store_and_fwd_flag: string, PULocationID: bigint, DOLocationID: bigint, payment_type: bigint, fare_amount: double, extra: double, mta_tax: double, tip_amount: double, tolls_amount: double, improvement_surcharge: double, total_amount: double, congestion_surcharge: double, airport_fee: double]

**Fare amount**

Shouldn't be free or negative

In [19]:
taxi_info.select(["fare_amount"]).describe().show()

+-------+------------------+
|summary|       fare_amount|
+-------+------------------+
|  count|          19817583|
|   mean|14.245682625377777|
| stddev|127.87273838820566|
|    min|           -2564.0|
|    max|         401092.32|
+-------+------------------+



**Extra**

Should only be \$0.5 or \$1 for rush hour and overnight charges.

Can check if charged correctly for rush hour and overnight.

TEACHER'S NOTE: ignorar because maybe you have more luggage or have a dog or something.

In [20]:
taxi_info.select(["extra"]).describe().show()

+-------+------------------+
|summary|             extra|
+-------+------------------+
|  count|          19817583|
|   mean| 1.018474091921299|
| stddev|1.2483774309899676|
|    min|              -7.0|
|    max|              33.5|
+-------+------------------+



In [21]:
valid_extras = [0, 0.5, 1]
taxi_info.filter(taxi_info.extra.isin(valid_extras))

DataFrame[VendorID: bigint, tpep_pickup_datetime: timestamp, tpep_dropoff_datetime: timestamp, passenger_count: double, trip_distance: double, RatecodeID: double, store_and_fwd_flag: string, PULocationID: bigint, DOLocationID: bigint, payment_type: bigint, fare_amount: double, extra: double, mta_tax: double, tip_amount: double, tolls_amount: double, improvement_surcharge: double, total_amount: double, congestion_surcharge: double, airport_fee: double]

**Tolls amount**

Tolls cannot be negative, only 0 or positive.

In [22]:
taxi_info.select(["tolls_amount"]).describe().show()

+-------+-------------------+
|summary|       tolls_amount|
+-------+-------------------+
|  count|           19817583|
|   mean|0.49548667766586885|
| stddev| 1.9695300710419825|
|    min|              -83.0|
|    max|             911.87|
+-------+-------------------+



In [23]:
taxi_info.filter(taxi_info.tolls_amount >= 0)

DataFrame[VendorID: bigint, tpep_pickup_datetime: timestamp, tpep_dropoff_datetime: timestamp, passenger_count: double, trip_distance: double, RatecodeID: double, store_and_fwd_flag: string, PULocationID: bigint, DOLocationID: bigint, payment_type: bigint, fare_amount: double, extra: double, mta_tax: double, tip_amount: double, tolls_amount: double, improvement_surcharge: double, total_amount: double, congestion_surcharge: double, airport_fee: double]

**Total amount**

Cannot be negative or 0.

In [24]:
taxi_info.select(["total_amount"]).describe().show()

+-------+------------------+
|summary|      total_amount|
+-------+------------------+
|  count|          19817583|
|   mean|20.902765702622656|
| stddev|128.27128880123067|
|    min|           -2567.8|
|    max|         401095.62|
+-------+------------------+



In [25]:
taxi_info.filter(taxi_info.total_amount >= 0)

DataFrame[VendorID: bigint, tpep_pickup_datetime: timestamp, tpep_dropoff_datetime: timestamp, passenger_count: double, trip_distance: double, RatecodeID: double, store_and_fwd_flag: string, PULocationID: bigint, DOLocationID: bigint, payment_type: bigint, fare_amount: double, extra: double, mta_tax: double, tip_amount: double, tolls_amount: double, improvement_surcharge: double, total_amount: double, congestion_surcharge: double, airport_fee: double]

**Congestion surcharge**

Cannot be negative. Only 0 or positive.

In [26]:
taxi_info.select(["improvement_surcharge"]).describe().show()

+-------+---------------------+
|summary|improvement_surcharge|
+-------+---------------------+
|  count|             19817583|
|   mean|   0.2965171837724947|
| stddev|  0.04518930373656668|
|    min|                 -0.3|
|    max|                  0.3|
+-------+---------------------+



In [27]:
taxi_info.filter(taxi_info.improvement_surcharge >= 0)

DataFrame[VendorID: bigint, tpep_pickup_datetime: timestamp, tpep_dropoff_datetime: timestamp, passenger_count: double, trip_distance: double, RatecodeID: double, store_and_fwd_flag: string, PULocationID: bigint, DOLocationID: bigint, payment_type: bigint, fare_amount: double, extra: double, mta_tax: double, tip_amount: double, tolls_amount: double, improvement_surcharge: double, total_amount: double, congestion_surcharge: double, airport_fee: double]

### Dataframe limpio

Todos los problemas identificados anteriormente serán eliminados

#### Eliminación de datos ilógicos

Todos los datos que no tengan sentido lógico (no pueden ser negativos, no pueden ser nulos, toman un valor discreto que no forma parte de los esperados, etc.) son eliminados

In [28]:
somewhat_clean_taxi_info = taxi_info.filter((taxi_info.tip_amount >= 0)
                                   & (taxi_info.trip_distance > 0)
                                   & (taxi_info.tpep_pickup_datetime < taxi_info.tpep_dropoff_datetime)
                                   & (taxi_info.passenger_count > 0)
                                   & (taxi_info.RatecodeID.isin(valid_rateIDs))
                                   & (taxi_info.store_and_fwd_flag.isin(valid_flags))
                                   & (taxi_info.payment_type.isin(valid_payment_types))
                                   & (taxi_info.fare_amount > 0)
                                   & (taxi_info.extra.isin(valid_extras))
                                   & (taxi_info.tolls_amount >= 0)
                                   & (taxi_info.total_amount >= 0)
                                   & (taxi_info.improvement_surcharge >= 0))
somewhat_clean_taxi_info_count = somewhat_clean_taxi_info.count()
somewhat_clean_taxi_info_count

12736405

#### Eliminar outliers

Todos los datos numéricos tienen valores extremos. Eliminamos todos aquellos pasados Q3 + 1.5*IQR

In [29]:
# Propina
res = somewhat_clean_taxi_info.approxQuantile("tip_amount", [0.25, 0.75], 0.01)
q1 = res[0]
q3 = res[1]
iqr = q3 - q1

iqr

2.16

In [34]:
clean_taxi_info_rdd = somewhat_clean_taxi_info.rdd.filter(lambda x: x["tip_amount"] < q3 + 1.5 * iqr)

In [35]:
clean_taxi_info_count = clean_taxi_info_rdd.count()
clean_taxi_info_count

12075741

# STUDIES

We can now work on doing the desired studies and analysing the different execution times.