<a href="https://colab.research.google.com/github/etarazonav/650044-ABD-ULIMA/blob/main/Notebooks/ABB_SparkSQ3_Operaciones_Basicas_DataFrames.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# <img style="float: left; padding: 0px 10px 0px 0px;" src="https://upload.wikimedia.org/wikipedia/commons/thumb/a/a0/Universidad_de_Lima_logo.png/220px-Universidad_de_Lima_logo.png"  width="120" /> Parte 3: Operaciones Básicas con DataFrames
**Profesor:** Enver G. Tarazona Vargas <br>
**Curso:** Analítica con Big Data <br>
**FACULTAD DE INGENIERÍA - CARRERA DE INGENIERÍA DE SISTEMAS**<br>

In [None]:
!pip install -q pyspark

In [None]:
# Crear una sesión de Spark (si se corre usando spark-submit o con Google Colab)
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

In [None]:
# Carga de archivos
!wget -q https://raw.githubusercontent.com/etarazonav/650044-ABD-ULIMA/refs/heads/main/Datos/remuneracion/remuneracion_bruta_2021.csv
!wget -q https://raw.githubusercontent.com/etarazonav/650044-ABD-ULIMA/refs/heads/main/Datos/ventas.csv

Los datos que se va a utilizar provienen de la plataforma de [datos abiertos](https://www.datosabiertos.gob.pe/dataset/promedio-mensual-de-remuneraciones-brutas-soles-s-de-trabajadores-en-el-sector-privado-por-0) de Perú y representan el promedio mensual de remuneraciones brutas (en soles) de trabajadores en el sector privado por situación educativa, según distritos, para el año 2021.

In [None]:
# Lectura de los datos:
df_full = spark.read.csv('remuneracion_bruta_2021.csv', inferSchema=True, header=True)

# Mostrar el esquema inferido
# df_full.printSchema()

In [None]:
# Mostrar las 5 primeras filas
df_full.show(5)

## 1.&nbsp;Preprocesamiento

### 1.1. Selección de Columnas

Dado que el conjunto de datos posee varias columnas, por facilidad se trabajará solo con algunas de ellas. Se seleccionará solo algunas columnas representativas.

In [None]:
df = df_full.select(['DISTRITOS', "EDUCACIÓN SECUNDARIA COMPLETA", 'GRADO DE BACHILLER', "TITULADO", "GRADO DE MAESTRÍA", ])
df.show(5)

### 1.2. Conversión de tipo de datos

Si uno revisa el esquema de los datos, las columnas numéricas han sido inferidas como `strings`. Para convertir estas columnas a entero se utilizará `withColumn`, para crear una nueva columna, y `cast("int")` para que dicha nueva columna tenga el tipo de datos deseado (entero).

In [None]:
from pyspark.sql.functions import col

In [None]:
# Nota: esto no dará error pero ignorará los miles (donde hay espacio)

# df = df.withColumn("secundaria", col("EDUCACIÓN SECUNDARIA COMPLETA").cast("int"))
# df.show(5)

Debido a que en este conjunto de datos se tiene los miles separados por espacio, para poder convertir adecuadamente primero se debe eliminar dicho espacio. Para esto se puede utilizar la función `regex_replace` y luego aplicar el `cast`.

In [None]:
from pyspark.sql.functions import regexp_replace, initcap

df.withColumn("secundaria", regexp_replace("EDUCACIÓN SECUNDARIA COMPLETA", " ", "").cast("int")).show(5)

Se continuará la aplicación para todas las columnas.

In [None]:
# Crear columnas numéricas (notar que "titulado" se sobrescribe)
df = df.withColumn("secundaria", regexp_replace("EDUCACIÓN SECUNDARIA COMPLETA", " ", "").cast("int"))
df = df.withColumn("bachiller", regexp_replace("GRADO DE BACHILLER", " ", "").cast("int"))
df = df.withColumn("titulado", regexp_replace("TITULADO", " ", "").cast("int"))
df = df.withColumn("maestria", regexp_replace("GRADO DE MAESTRÍA", " ", "").cast("int"))

# Cambiar las mayúsculas de los distritos a solo mayúscula la primera letra
df = df.withColumn("distritos", initcap(col("DISTRITOS")))

df.show(5)

Se seleccionará solo las columnas convertidas a entero.

In [None]:
df = df.select(["distritos", "secundaria", "bachiller", "titulado", "maestria"])
df.show(5)

## 2.&nbsp;Ordenamiento de resultados: orderBy

Se utiliza `orderBy` con el nombre de una columna. Por defecto realiza un ordenamiento ascendente.

In [None]:
df.orderBy("titulado").show(5)

En el caso anterior no se muestra adecuadamente debido a los `NULL` existentes en la columna deseada. Para evitar mostrar estos valores nulos, se puede agregar las siguientes opciones:
* `asc_nulls_last`: para order ascendente (mostrando nulos al final)
* `desc_nulls_last`: para orden descendente (mostrando nulos al final)

Si no hubiesen nulos se podría usar directamente las opciones `asc` y `desc`.

In [None]:
df.orderBy(df["titulado"].asc_nulls_last()).show(5)

In [None]:
df.orderBy(df["titulado"].desc_nulls_last()).show(5)

## 3.&nbsp;Filtraje de datos

### 3.1. Filtraje con una sola condición (una sola columna)

In [None]:
df.filter("bachiller > 10000").show(5)

In [None]:
df.filter(df["bachiller"] > 10000).show(5)

In [None]:
df.filter(df["distritos"] == "San Isidro").show()

In [None]:
df.filter(df["distritos"].isin(["San Isidro", "Miraflores", "Barranco"]) ).show()

In [None]:
df.filter(df["distritos"].startswith("S")).show(4, truncate=False)

In [None]:
df.filter(df["distritos"].like("San %")).show(5, truncate=False)

In [None]:
# Ordenar el resultado
df.filter(df["bachiller"] > 10000)\
  .orderBy("bachiller").show(5)

In [None]:
# Mostrar solo algunas columnas
df.filter(df["bachiller"] > 10000) \
  .select(["distritos", "bachiller"]).show(5)

### 3.2. Filtraje con varias condiciones

Para  varias condiciones se puede utilizar los operadores lógicos `&`, `|`.

In [None]:
# Dos condiciones usando Y lógico

df.filter((df["secundaria"]>10000) &
          (df["bachiller"]>10000)).show(5)

In [None]:
# 2 condiciones usando OR lógico y NOT lógico

df.filter((df["secundaria"]>10000) |
          ~(df["maestria"]<10000)).show(5)

## 4.&nbsp;Mezcla de datos

### 4.1. Apilamiento de filas

Si se tiene dos o más DataFrames que poseen igual estructura, estos pueden ser apilados para formar un solo DataFrame usando `unionAll`.

In [None]:
from pyspark.sql import Row
row = Row("nombre", "mascota", "cantidad")

In [None]:
# Ejemplos de dataframes con similar estructura
df1 = spark.createDataFrame([row("Susana", "gato", 6),
                             row("Carlos", "perro", 1),
                             row("Alberto", "pez", 5)
                             ])

df2 = spark.createDataFrame([row("Pedro", "gato", 2),
                             row("Carla", "tortuga", 1),
                             row("Marcos", "hamster", 3)
                             ])

In [None]:
df = df1.unionAll(df2)
df.show()

### 4.2. Joins

In [None]:
# Ejemplo de dataframes
row1 = Row("nombre", "mascota1", "cuenta1")
df1 = spark.createDataFrame([row1("Susana", "gato", 6),
                             row1("Carlos", "perro", 1),
                             row1("Roberto", "pez", 5),
                             row1("Liliana", "caballo", 1)
                             ])

row2 = Row("nombre", "mascota2", "cuenta2")
df2 = spark.createDataFrame([row2("Susana", "loro", 2),
                             row2("Carlos", "tortuga", 1),
                             row2("Roberto", "hamster", 3),
                             row2("Fernando", "pez", 12)
                             ])

df1.show()
df2.show()

**Inner Join**

Un "inner join" realiza la mezcla de filas que tienen correspondencia en ambos DataFrames y elimina todas las otras filas. Esta es la forma por defecto de realizar el join en Spark.

En este ejemplo, se realizará el "inner join" usando la columna `nombre`.

In [None]:
df1.join(df2, 'nombre', how='inner').show()

Si los datasets no tuviesen el mismo nombre de columna, se puede especificar explícitamente las columnas para las cuales se usará un join. En este caso se mantendrá cada columna por separado (en este ejemplo habrá 2 columnas `nombre`, cada una correspondiendo a un dataframe distinto)

In [None]:
df1.join(df2, df1["nombre"]==df2["nombre"], how='inner').show()

**Outer Join**

 USa todas las filas (registros) de ambos DataFrames, independientemente de si hay correspondencias o no, y completa los valores faltantes con nulos.

In [None]:
df1.join(df2, 'nombre', how='outer').show()

**Left Join**

Usa todas las claves del DataFrame de la izquierda. Los datos del DataFrame de la derecha solo aparecen si existe alguna coincidencia con los de la izquierda.

En este ejemplo, el DataFrame de la izquierda es el `df1`.

In [None]:
df1.join(df2, 'nombre', how='left').show()

## 5.&nbsp;Agregación de datos

En esta parte se trabajará con los siguientes datos.

In [None]:
df = spark.read.csv('ventas.csv', inferSchema=True, header=True)
df.show()

### 5.1. Agregación por columnas: agg

Una forma de realizar agregación de datos por columnas es utilizando `agg` con alguna de las dos sintaxis mostradas a continuación:
* Usando un diccionario (no requiere importar funciones adicionales)
* Usando funciones disponibles en `pyspark.sql.functions` (requiere importar las funciones)

In [None]:
# Forma 1: usando un diccionario

df.agg({'Ventas':'sum'}).show()
# df.agg({'Ventas':'max'}).show()

In [None]:
# Forma 2: usando funciones específicas
from pyspark.sql.functions import sum

df.agg(sum("Ventas")).show()

Suele ser conveniente utilizar `alias` para modificar el nombre de la columna resultante

In [None]:
df.agg(sum("Ventas").alias("Ventas totales")).show()

Otras agregaciones usuales son las siguientes (se puede revisar las funciones disponibles en: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/functions.html


In [None]:
from pyspark.sql.functions import count, avg, min, max, stddev, variance

df.agg(count("Ventas").alias("cuenta"),
       avg("Ventas").alias("promedio"),
       min("Ventas").alias("mínimo"),
       max("Ventas").alias("máximo"),
       stddev("Ventas").alias("desviación"),
       variance("Ventas").alias("varianza")
).show()

Se puede redondear el resultado usando la función `round`.

In [None]:
from pyspark.sql.functions import round

df.agg(count("Ventas").alias("cuenta"),
       round(avg("Ventas"),2).alias("promedio"),
       min("Ventas").alias("mínimo"),
       max("Ventas").alias("máximo"),
       round(stddev("Ventas"),2).alias("desviación"),
       round(variance("Ventas"),2).alias("varianza")
).show()

Si se desea recuperar el valor del resultado se puede utilizar `collect`

In [None]:
v = df.agg({'Ventas':'max'}).collect()
print("Resultado: ", v[0][0])

### 5.2. Agrupamiento usando GroupBy

Una alternativa para trabajar con datos agrupados es utilizar `groupBy` y algunas funciones usuales que son provistas, como `count`, `mean`, `max`, `min`, `sum`, etc.

In [None]:
# Aplicación a todas las columnas
df.groupBy("Compania") \
  .sum().show()

In [None]:
# df.groupBy("Compania").max().show()
# df.groupBy("Compania").min().show()
# df.groupBy("Compania").count().show()
# df.groupBy("Compania").mean().show()

In [None]:
# Aplicación a columnas específicas
df.groupBy("Compania") \
  .sum("Meses").show()

Alternativamente, se puede utilizar `agg` sobre un DataFrame con datos agrupados.

In [None]:
df.groupBy("Compania") \
  .agg({"Ventas":'max'}).show()

In [None]:
df.groupBy("Compania") \
  .agg(max("Ventas")).show()

In [None]:
df.groupBy("Compania") \
  .agg(sum("Ventas").alias("ventas_totales"), \
       round(avg("Meses"),2).alias("meses_promedio")
       ) \
  .show()

Se puede añadir condiciones usando `where`

In [None]:
df.groupBy("Compania") \
  .agg(sum("Ventas").alias("ventas_totales"), \
       round(avg("Meses"),2).alias("meses_promedio")
       ) \
  .where(col("ventas_totales") >= 1000) \
  .show()