In [2]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m4.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488493 sha256=c596d9c299223ee0d84ab66e7a43425bc650bfd2bd1e1e95dacdd7086bcfd7b8
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [49]:
!pyspark --version

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.5.1
      /_/
                        
Using Scala version 2.12.18, OpenJDK 64-Bit Server VM, 11.0.22
Branch HEAD
Compiled by user heartsavior on 2024-02-15T11:24:58Z
Revision fd86f85e181fc2dc0f50a096855acf83a6cc5d9c
Url https://github.com/apache/spark
Type --help for more information.


In [50]:
#QUESTION 1
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Spark Test").getOrCreate()
spark.version

'3.5.1'

In [51]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType
from pyspark.sql.functions import col ,unix_timestamp, max

In [52]:
# Definimos el schema del csv fhv
schema = StructType([
    StructField("dispatching_base_num", StringType(), True),
    StructField("pickup_datetime", TimestampType(), True),
    StructField("dropOff_datetime", TimestampType(), True),
    StructField("PUlocationID", IntegerType(), True),
    StructField("DOlocationID", IntegerType(), True),
    StructField("SR_Flag", StringType(), True),
    StructField("Affiliated_base_number", StringType(), True)
])

In [53]:
file_path = "/content/fhv_tripdata_2019-10.csv"

In [54]:
# Leer el archivo con Spark
df = spark.read.csv(file_path, header=True, schema=schema)

df.printSchema()


root
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropOff_datetime: timestamp (nullable = true)
 |-- PUlocationID: integer (nullable = true)
 |-- DOlocationID: integer (nullable = true)
 |-- SR_Flag: string (nullable = true)
 |-- Affiliated_base_number: string (nullable = true)



In [56]:
#QUESTION 1
import os

df_repartitioned = df.repartition(6)

# Carpeta de salida para archivos Parquet
parquet_output_path = "/content/parquet_repartition"

# Carpeta de salida para archivos Parquet
df_repartitioned.write.parquet(parquet_output_path, mode="overwrite")


In [57]:
parquet_files = [file for file in os.listdir(parquet_output_path) if file.endswith(".parquet")]

total_size = 0
for file in parquet_files:
    total_size += os.path.getsize(os.path.join(parquet_output_path, file))
    print(total_size)

# Imprimir el tamaño total
print(f"Tamaño total de archivos Parquet: {total_size / (1024 * 1024):.2f} MB")

# Calcular y imprimir el tamaño promedio de los archivos Parquet
average_size = total_size / len(parquet_files) if len(parquet_files) > 0 else 0
print(f"Tamaño promedio de archivos Parquet: {average_size / (1024 * 1024):.2f} MB")

6548003
13081421
19623186
26163076
32719544
39257481
Tamaño total de archivos Parquet: 37.44 MB
Tamaño promedio de archivos Parquet: 6.24 MB


In [58]:
# Save the repartitioned DataFrame to Parquet format
parquet_output_path = "/content/parquet_output_case"
df.write.parquet(parquet_output_path, mode="overwrite")

In [59]:
#QUESTION 3
# Leer los archivos Parquet en un DataFrame
df_parquet = spark.read.parquet(parquet_output_path)
df_parquet.show()
# Filtrar los viajes que comenzaron el 15 de octubre
trips_oct_15 = df_parquet.filter(col("pickup_datetime").cast("date") == "2019-10-15 00:00:00")

# Contar el número de viajes
count_oct_15 = trips_oct_15.count()

# Imprimir el resultado
print(f"Número de viajes en taxi el 15 de octubre: {count_oct_15}")

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropOff_datetime|PUlocationID|DOlocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B00009|2019-10-01 00:23:00|2019-10-01 00:35:00|         264|         264|   NULL|                B00009|
|              B00013|2019-10-01 00:11:29|2019-10-01 00:13:22|         264|         264|   NULL|                B00013|
|              B00014|2019-10-01 00:11:43|2019-10-01 00:37:20|         264|         264|   NULL|                B00014|
|              B00014|2019-10-01 00:56:29|2019-10-01 00:57:47|         264|         264|   NULL|                B00014|
|              B00014|2019-10-01 00:23:09|2019-10-01 00:28:27|         264|         264|   NULL|                B00014|
|     B00021         |2019-10-01 00:00:4

In [60]:
#QUESTION 4
df_parquet = spark.read.parquet(parquet_output_path)

# Calcular la duración del viaje en segundos
df_with_duration = df_parquet.withColumn(
    "trip_duration_seconds",
    (unix_timestamp("dropoff_datetime") - unix_timestamp("pickup_datetime"))
)

# Encontrar la duración máxima del viaje
max_duration = df_with_duration.agg(max("trip_duration_seconds")).collect()[0][0]

# Convertir la duración máxima de segundos a horas
max_duration_hours = max_duration / 3600

# Imprimir el resultado
print(f"Duración del viaje más largo en horas: {max_duration_hours}")

Duración del viaje más largo en horas: 631152.5


In [61]:
#QUESTION 5
# Cargar los datos de búsqueda de zona en una vista temporal
zone_data_url = "/content/taxi_zone_lookup.csv"
zone_data = spark.read.option("header", "true").csv(zone_data_url)
zone_data.createOrReplaceTempView("zone_data")

# Ruta al archivo de datos FHV de octubre de 2019
fhv_data_url = "/content/fhv_tripdata_2019-10.csv"

# Leer los datos FHV en un DataFrame
fhv_data = spark.read.csv(fhv_data_url, header=True)

# Realizar un join entre los datos FHV y los datos de búsqueda de zona
joined_data = fhv_data.join(
    zone_data,
    fhv_data.PUlocationID == zone_data.LocationID,
    "left"
)

# Encontrar la zona de recogida menos frecuente
least_frequent_zone = joined_data.groupBy("Zone").count().orderBy(col("count")).first()

# Imprimir el nombre de la zona de recogida menos frecuente
print(f"Nombre de la zona de recogida menos frecuente: {least_frequent_zone['Zone']}")

Nombre de la zona de recogida menos frecuente: Jamaica Bay
