# Conexión con el cluster de spark

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.\
        builder.\
        appName("pyspark-notebook").\
        master("spark://spark-master:7077").\
        config("spark.executor.memory", "512m").\
        getOrCreate()

spark.sparkContext.setLogLevel("OFF")

# Importación de datos

In [None]:
# Para leer los datos necesitas la ruta del archivo e indicar si tienen una cabecera o no, que en este caso si la tienen.
datos_yellow = spark.read.csv("../data/yellow_tripdata_2019-01.csv", header=True) #Es la ruta relativ a la ubicación relativa en el docker usado 
# Para visualizar los datos que leiste debes usar la función show e indicar el número de filas, en este caso 5.
datos_yellow.show(n=5)

In [None]:
datos_fhv = spark.read.parquet("../data/tu_archivo.parquet")
datos_fhv.show(n=5)

### Ejercicio 1. Contar la cantidad total de viajes 


# Modificación de codigo para la lectura de archivos parquet y csv

RDD

In [None]:
# Importa las funciones necesarias
from operator import add

# Ruta de entrada para los archivos CSV
ruta_csv = "../data/yellow_tripdata_2019-01.csv"

# Ruta de entrada para los archivos Parquet
ruta_parquet = "../data/*.parquet"

# Columnas que deseas seleccionar de los archivos CSV
columnas_csv = ["PULocationID", "passenger_count"]  # Reemplaza con los nombres de tus columnas CSV

# Columnas que deseas seleccionar de los archivos Parquet
columnas_parquet = ["pickup_datetime", "PULocationID"]  # Reemplaza con los nombres de tus columnas Parquet

# 1. Carga todos los archivos CSV y selecciona las columnas deseadas
archivos_csv = spark.read.format("csv").option("header", "true").load(ruta_csv).select(columnas_csv)

# 2. Carga todos los archivos Parquet y selecciona las columnas deseadas
archivos_parquet = spark.read.format("parquet").load(ruta_parquet).select(columnas_parquet)

# 3. Convierte los DataFrames en RDD y realiza las transformaciones
rdd_csv = archivos_csv.rdd \
    .filter(lambda linea: len(linea) > 1) \
    .map(lambda linea: ("yellow", 1) if len(linea) > 1 else ("fhv", 1))

rdd_parquet = archivos_parquet.rdd \
    .map(lambda linea: ("parquet", 1))

# 4. Une los RDD resultantes
rdd_unido = rdd_csv.union(rdd_parquet)

# 5. Aplica reduceByKey
resultado = rdd_unido.reduceByKey(add)

# Muestra el resultado
resultado.collect()

DF

In [None]:
# Vamos a usar la función count para contar la cantidad total de viajes de cada uno de los DataFrames.
cantidad_yellow = datos_yellow.count()
cantidad_fhv = datos_fhv.count()

# Luego, usamos print para visualizar los resultados
print("Cantidad de viajes de tipo yellow: ", cantidad_yellow)
print("Cantidad de viajes de tipo fhv: ", cantidad_fhv)

### Ejercicio 2. Contar las cantidad total de viajes por zona de recogida
Para este ejercicio vas a contar la cantidad total de viajes para cada uno de las zonas de recogida.

In [None]:
archivos_csv.show(5)

In [None]:
archivos_parquet.show(6)

In [None]:
# Verifica si la columna 'mi_columna' contiene al menos un valor no nulo
if archivos_parquet.filter(archivos_parquet.PULocationID.isNotNull()).count() > 0:
    print("La columna 'mi_columna' contiene al menos un valor no nulo.")
else:
    print("La columna 'mi_columna' está completamente compuesta por valores nulos.")


RDD

In [None]:
# Convierte el DataFrame 'archivos_csv' en un RDD 'rdd_csv'
rdd_csv = archivos_csv.rdd

# Ahora puedes aplicar tus transformaciones en 'rdd_csv'
resultado_map_reduce_2 = rdd_csv.map(lambda linea: linea if isinstance(linea, str) else linea[0]).filter(lambda linea: isinstance(linea, str)).map(lambda linea: linea.split(","))


In [None]:
# Mapea cada línea a una tupla con (PULocationID, 1) para contarlos
conteo_por_PULocationID = rdd_csv.map(lambda linea: (linea["PULocationID"], 1))

# Realiza la reducción por clave para sumar o contar los valores
resultado_map_reduce = conteo_por_PULocationID.reduceByKey(lambda a, b: a + b)

# Muestra el resultado
resultado_map_reduce.collect()

In [None]:
# Ordena los resultados en orden descendente según la cantidad de registros
resultado_ordenado_descendente = resultado_map_reduce.sortBy(lambda x: x[1], ascending=False)

# Muestra el resultado ordenado descendente
resultado_ordenado_descendente.collect()