<img src="https://www.dii.uchile.cl/wp-content/uploads/2021/06/Magi%CC%81ster-en-Ciencia-de-Datos.png" width=350 />


# Clase 4: PySpark 🗄️
**MDS7202: Laboratorio de Programación Científica para Ciencia de Datos**

### Objetivos de la clase

- **Aprender los conceptos básicos de Spark:** Aprender que es spark y que funcionalidades y capacidades nos ofrece este framework.

- **Operaciones en DataFrames de Spark:** Entender y aprovechar las capacidades de Spark para la computación paralela, con el objetivo de realizar operaciones sobre grandes colecciones de datos de manera eficiente en un entorno distribuido.


# Motivación


> **Pregunta ❓**: ¿Qué son los procesos productivos?

<center>
<img src="https://github.com/MDS7202/MDS7202/blob/main/clases/2024-01/imagenes_clase_7/putting-machine-learning-into-production.png?raw=true" width=600/>

## Extract-Transform-Load (ETL)


Una ETL es como una cocina de un restaurante donde llegan ingredientes de varios lugares (la "extracción"), se limpian y preparan (la "transformación"), y luego se combinan para crear platos que se sirven en el comedor o se envían a domicilio (la "carga"). En el mundo de los datos, ETL es el proceso de tomar datos de diferentes lugares, limpiarlos y organizarlos para que tengan sentido juntos, y finalmente ponerlos en una base de datos o sistema donde las personas puedan usarlos fácilmente para tomar decisiones o entender mejor algo.

<center>
<img src="https://github.com/MDS7202/MDS7202/blob/main/clases/2024-01/imagenes_clase_7/etl.png?raw=true" width=500 />

> **Pregunta ❓**: ¿Comó podriamos tratar diferentes conjuntos de datos tabulares y realizar transformaciones y merges sobre ellos?

`Pandas` es una libreria diseñada para el manejo de datos de tipo tabulares, que facilita el trabajo con estructuras de datos conocidas como `DataFrames`, proporcionando un medio intuitivo para manipular datos a través de operaciones optimizadas.

> **Pregunta ❓**: ¿Cuál es la desventaja de utilizar pandas en escenarios con grandes volúmenes de datos?

## Problemas que puede presentar Pandas en producción

1. **Gestión de Memoria:** Pandas carga todos los datos en memoria RAM, lo que significa que si tengo un archivo de 10GB, necesitaremos al menos 10GB de RAM para manejardo. Además Pandas emplea una `Eager Evaluation`, lo que significa que ejecuta operaciones inmediatamente después de que se llaman y esto puede resultar en una gestión ineficiente de la memoria, que se agrava cuando se manejan grandes conjuntos de datos, donde el consumo excesivo de memoria puede ser un problema.

2. **Desempeño con Grandes Volumenes de Datos:** Pandas es excelente en el análisis de conjuntos de datos de tamaño moderado, pero su desempeño puede verse comprometido al trabajar con volúmenes de datos más grandes. Esta limitación surge en parte debido a su dependencia de NumPy, que, aunque potente, no está optimizado para operar con grandes cantidades de datos ni para ejecutar operaciones de forma paralela o distribuida.

3. **Capacidad para Operaciones Paralelas y Distribuidas:** De forma inherente, Pandas no fue diseñado para soportar computación paralela o distribuida. Enfrentar desafíos con conjuntos de datos que superan la capacidad de memoria de una sola máquina implica explorar alternativas y soluciones adicionales. Herramientas como Dask ofrecen paralelización, mientras que la transición a sistemas especializados en procesamiento distribuido, como Apache Spark, puede ser necesaria para manejar eficientemente estas situaciones.

<center>
<img src="https://github.com/MDS7202/MDS7202/blob/main/clases/2024-01/imagenes_clase_7/Pandas-vs-Spark-2.png?raw=true" width=350/>

<img src="https://github.com/MDS7202/MDS7202/blob/main/clases/2024-01/imagenes_clase_7/PandasVsPySpark1.png?raw=true" width=350/>

> **Nota 📝:** Una importante limitación al utilizar Pandas en ambientes de producción es su manejo de grandes conjuntos de datos. Cuando estos exceden la capacidad de memoria disponible, puede provocar que el código falle debido a errores de memoria insuficiente. Esta característica subraya la importancia de considerar otras herramientas o estrategias para el manejo de datos a gran escala.

# Spark + Python = Pyspark

<center>
<img src="https://github.com/MDS7202/MDS7202/blob/main/clases/2024-01/imagenes_clase_7/pyspark.jpeg?raw=true" width=500 />

**Apache Spark** es un potente motor unificado, diseñado para el procesamiento de datos a gran escala, tanto en centros de datos físicos como en entornos de nube. Se distingue por ofrecer almacenamiento en memoria para cálculos intermedios, lo que acelera significativamente sus operaciones en comparación con Hadoop MapReduce. Además, su API integra bibliotecas avanzadas para aprendizaje automático, consultas SQL interactivas, streaming de datos y procesamiento de grafos mediante GraphX.

Para superar las limitaciones de Hadoop, Spark se fundamenta en cuatro pilares esenciales en su diseño:

- **Velocidad:** Aprovecha los avances en hardware, como el incremento en gigabytes y núcleos de procesador, para maximizar el uso de recursos. Mejora la velocidad mediante la construcción de cálculos como Grafos Acíclicos Dirigidos (DAGs), que optimizan y programan las tareas para una ejecución paralela eficiente.

- **Facilidad de uso:** Introduce el Resilient Distributed Dataset (RDD), una estructura de datos que simplifica la manipulación de información tabular de forma inmutable.

- **Modularidad:** Compatible con múltiples lenguajes de programación y diferentes modos de procesamiento, por Batch, Streaming, SQL.

- **Extensibilidad:** Mientras se enfoca en el paralelismo para el procesamiento veloz, permite la separación entre cómputo y almacenamiento, lo que habilita la lectura de diversos tipos de datos (como Amazon S3 y Azure) para su procesamiento en memoria.

Esta filosofía de diseño convierte a Spark en una solución avanzada y flexible para el procesamiento de grandes volúmenes de datos, adaptándose a las necesidades de proyectos modernos de big data y análisis.

<center>
<img src="https://github.com/MDS7202/MDS7202/blob/main/clases/2024-01/imagenes_clase_7/sparkframework.png?raw=true" width=500>


> **Nota 📝:** ¿Como empezo esto? En 2009, un equipo de investigadores de la Universidad de Berkeley, previamente involucrados en el desarrollo de MapReduce, inició un proyecto conocido como Spark. Su objetivo era crear un framework que fuera simple, rápido y fácil de usar. Desde sus inicios, Spark demostró ser significativamente más rápido que Hadoop, superando su velocidad de procesamiento en un factor de 10 a 20 veces, una mejora que ha aumentado aún más con el tiempo. Para 2013, Spark había ganado popularidad y sus creadores lo donaron a la Apache Software Foundation (ASF), estableciendo también la compañía Databricks. El proyecto alcanzó otro hito importante en 2014 con el lanzamiento de Apache Spark 1.0.

## Ejecución Distribuida de Apache Spark

<center>
<img src="https://github.com/MDS7202/MDS7202/blob/main/clases/2024-01/imagenes_clase_7/sparkdriver.png?raw=true" width=500>

En términos generales, la arquitectura de Spark está diseñada alrededor de una aplicación de Spark, que se compone de un programa conductor. Este programa tiene la tarea fundamental de coordinar de manera paralela las diversas operaciones que se llevan a cabo en el clúster de Spark. La comunicación del conductor con las componentes distribuidas del sistema (como los ejecutores y el gestor del clúster) se facilita mediante una `SparkSession`.

**Spark Driver:** Es el corazón de una aplicación de Spark, encargado de instanciar la `SparkSession`. Sus principales funciones incluyen comunicarse con el gestor del clúster para solicitar recursos (CPU, memoria, entre otros) que serán utilizados por los ejecutores de Spark, y transformar las operaciones definidas en el código en gráficos acíclicos dirigidos (DAGs), que permiten una ejecución eficiente de las tareas.

**SparkSession:** Actúa como el punto de entrada para la programación con Spark, ofreciendo una interfaz unificada para diversas operaciones y manipulación de datos dentro de Spark. A través de la `SparkSession`, se pueden crear entornos de ejecución JVM, definir DataFrames y DataSets, leer fuentes de datos, acceder a metadatos, y ejecutar consultas SQL.

**Cluster Manager:** Es el encargado de administrar y asignar los recursos disponibles en el clúster de nodos donde se ejecuta la aplicación de Spark. Su rol es crucial para asegurar que los recursos se utilicen de manera eficiente y que las tareas se distribuyan adecuadamente entre los nodos disponibles.

**Spark Executor:** Opera en cada nodo trabajador dentro del clúster, comunicándose directamente con el programa conductor. Los ejecutores son responsables de llevar a cabo las tareas asignadas por el conductor en los nodos de trabajo, procesando datos y devolviendo los resultados al conductor.

Esta estructura proporciona a Spark la flexibilidad y escalabilidad necesarias para procesar grandes volúmenes de datos de manera eficiente, permitiendo a las aplicaciones aprovechar al máximo los recursos computacionales disponibles.

### Datos Distribuidos y particiones

Los datos se distribuyen físicamente a través de sistemas de almacenamiento, como HDFS o servicios de almacenamiento en la nube, organizados en particiones. Spark aborda estas particiones utilizando conceptos de alto nivel para procesar los datos de manera eficiente.

<center>
<img src="https://github.com/MDS7202/MDS7202/blob/main/clases/2024-01/imagenes_clase_7/partitions.png?raw=true" width=500 />

El particionamiento de datos es clave para optimizar el rendimiento, ya que promueve el **paralelismo**. Al distribuir los datos en segmentos o particiones, se facilita que los ejecutores (executors) de Spark procesen datos que se encuentran físicamente más próximos a ellos, reduciendo así el uso del ancho de banda necesario para la transferencia de datos. De este modo, a cada núcleo de procesamiento de un ejecutor se le asigna una partición específica sobre la cual trabajar, maximizando la eficiencia del procesamiento y minimizando el tiempo de ejecución.

<center>
<img src="https://github.com/MDS7202/MDS7202/blob/main/clases/2024-01/imagenes_clase_7/partitionandexecutors.png?raw=true" width=500 />


Pregunta ❓: ¿Cómo se visualizan los datos particionados dentro de las carpetas?

```sql
data/
├── year=2023/
│   ├── month=01/
│   │   ├── day=01/
│   │   │   ├── datafile1.parquet
│   │   │   ├── datafile2.parquet
│   │   ├── day=02/
│   │   │   ├── datafile1.parquet
│   │   │   ├── datafile2.parquet
│   │   ...
│   ├── month=02/
│   │   ├── day=01/
│   │   │   ├── datafile1.parquet
│   │   │   ├── datafile2.parquet
│   │   ...
│   ...
├── year=2024/
│   ├── month=01/
│   │   ├── day=01/
│   │   │   ├── datafile1.parquet
│   │   │   ├── datafile2.parquet
│   │   ...
│   ...
```

### Spark Jobs

Durante una sesión interactiva de Spark, el nodo maestro (driver) descompone tu aplicación en uno o varios trabajos de Spark. Un Job en Spark es el bloque de ejecución completo que se genera cuando llamas a una acción y que desencadena la ejecución del Gráfico Acíclico Dirigido (DAG) construido de las transformaciones previas.

<center>
<img src="https://github.com/MDS7202/MDS7202/blob/main/clases/2024-01/imagenes_clase_7/sparkjobs.png?raw=true" width=500 />

**Etapas en Spark:** Dentro de los nodos del DAG, las etapas se establecen en función de las operaciones que pueden ejecutarse de manera secuencial o en paralelo. Sin embargo, dado que no todas las operaciones pueden completarse en una sola etapa, estas pueden subdividirse en múltiples etapas que facilitan la transferencia de datos entre los ejecutores.

<center>
<img src="https://github.com/MDS7202/MDS7202/blob/main/clases/2024-01/imagenes_clase_7/sparksstages.png?raw=true" width=500 />

**Tareas en Spark:** Cada etapa incluye varias tareas, y cada tarea se asigna a un núcleo de procesador, operando exclusivamente sobre una partición de datos. Este aspecto es crucial, ya que un ejecutor con 12 núcleos puede gestionar 12 o más tareas simultáneamente sobre 12 o más particiones de datos en paralelo. Este diseño permite que Spark alcance un alto grado de paralelismo y eficiencia en el procesamiento de datos.

<center>
<img src="https://github.com/MDS7202/MDS7202/blob/main/clases/2024-01/imagenes_clase_7/sparktask.png?raw=true" width=500 />

Pregunta ❓: ¿Cómo se vería esto desglosada en Jobs, stages y tasks?

```python
# 1. Leer un archivo CSV
df = spark.read.csv("path/to/csv/file.csv", header=True, inferSchema=True)

# 2. Filtrar las filas donde la edad es mayor a 30
filtered_df = df.filter(df["edad"] > 30)

# 3. Seleccionar sólo la columna con los ruts
selected_df = filtered_df.select('rut')

# 4. Obtener los elementos únicos del dataset
distinct_df = selected_df.distinct()

# 5. Contar la cantidad de elementos únicos
distinct_df.count()
```

Job: Toda la secuencia desde leer el archivo hasta contar la cantidad de ruts únicos se considera un `Job` en Spark. Un job comienza con una acción, en este caso, `count()`. Cada vez que se solicita **una acción en Spark**, se crea un nuevo **job**.

Stage: Los `stages` se dividen en función de las operaciones que requieren que los datos sean enviados a través del clúster (**shuffle**). En nuestro ejemplo simple, podríamos tener dos stages:

  - Stage 1: Leer el archivo CSV, filtrar las filas y seleccionar la columna `rut`. Este stage no requiere un shuffle porque tanto la lectura del archivo como el filtrado pueden realizarse en paralelo en cada partición del archivo sin necesidad de intercambio de datos entre particiones.

  - Stage 2: Obtener los elementos únicos del dataset con `distinct`. Este stage se consideraría por separado ya que el obtener los elementos únicos implicará un shuffle de los datos.

Task: Las tareas son las unidades de trabajo más pequeñas que se envían a los ejecutores. Cada tarea trabaja en una partición de los datos y realiza las operaciones necesarias. Por ejemplo, en el primer stage, habría una tarea por partición del archivo CSV que primero leería su sección del archivo, luego aplicaría el filtro y seleccionaría la columna rut. La cantidad de tareas está determinada por el número de particiones en los datos.

## Estructuras de Datos en Spark

### RDD

<center>
<img src="https://github.com/MDS7202/MDS7202/blob/main/clases/2024-01/imagenes_clase_7/sparkrdd.png?raw=true" width=500 />



Un RDD (Resilient Distributed Dataset) es una colección distribuida e inmutable de elementos que constituye el pilar fundamental de Apache Spark. Este modelo permite que Spark realice operaciones de computación en paralelo de manera eficiente, distribuyendo los datos a través del clúster y ejecutando operaciones en paralelo. **Los RDD son esenciales para entender la arquitectura y la ejecución de tareas en Spark**. Alguna de las caracteristicas vitales de RDD son:

- Inmutabilidad
- Dependencias
- Particiones

A medida que Spark ha evolucionado, especialmente desde mediados de 2019, ha habido un cambio en el enfoque de manipulación de datos hacia estructuras más avanzadas como los DataFrames y los DataSets. Estas son abstracciones de nivel superior sobre los RDD, ofreciendo una interfaz más rica y optimizaciones automáticas, pero internamente, Spark sigue fundamentándose en el concepto de RDD.

#### Creación de RDD

Un RDD puede estar compuesto por distintos tipos de datos, incluyendo objetos de Python, Java, o Scala, y hasta clases definidas por el usuario. La distribución de estos datos se organiza en particiones, que pueden ser procesadas concurrentemente en varios nodos del clúster, aprovechando así la arquitectura distribuida de Spark.

Existen dos enfoques principales para crear RDDs:

1. **Distribuyendo una colección de objetos existentes**, como una lista o un set en el programa, lo cual se puede realizar mediante métodos como `sc.parallelize()`.
2. **Cargando un conjunto de datos desde un origen externo**, lo cual se logra mediante el uso de métodos como `sc.textFile()`, permitiendo la lectura de archivos de texto y su conversión directa en RDD.

### Instalación

A diferencia de otras librerías, la instalación de pyspark no es 100% manejada por pip, ya que también depende de la correcta configuración e instalación de ambientes de java como openjdk en versiones específicas. Su instalación podría diferir bastante dependiendo del sistema operativo, ya que en windows no existe nativamente un manejador de paquetes como ´´´apt´´´ o ´´´apt-get´´´ que estandaricen estas instalaciones, y se debe instalar manualmente con instalador descargado.

#### Linux

En linux, se puede realizar todo desde librería de comandos.

Es posible que comandos como "sudo" sean más dificiles de ejecutar desde jupyter. Si se usan desde linea de comandos hay que tener cuidado de realizarlo dentro del ambiente virtual a utilizar

In [None]:
# Linux
!pip install pyspark
!apt-get install openjdk-8-jdk-headless -qq > /dev/null # might need sudo. Do in terminal
# Might try sudo apt install openjdk-17-jdk -y

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

Collecting pyspark
  Downloading pyspark-4.0.0.tar.gz (434.1 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m434.1/434.1 MB[0m [31m6.0 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25h  Installing build dependencies ... [?25ldone
[?25h  Getting requirements to build wheel ... [?25ldone
[?25h  Preparing metadata (pyproject.toml) ... [?25ldone
[?25hCollecting py4j==0.10.9.9 (from pyspark)
  Downloading py4j-0.10.9.9-py2.py3-none-any.whl.metadata (1.3 kB)
Downloading py4j-0.10.9.9-py2.py3-none-any.whl (203 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m203.0/203.0 kB[0m [31m15.3 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (pyproject.toml) ... [?25ldone
[?25h  Created wheel for pyspark: filename=pyspark-4.0.0-py2.py3-none-any.whl size=434741299 sha256=950d1bb857ecf254bf083eaa3916b042f164cd13e867be3d54f186806aa31c94
  Stored in directory: /home/dncortez/.cache/pip/whee

#### Windows

En windows estos son comandos que podrían funcionar, incluyendo instalar manualmente java desde https://adoptium.net/es/temurin/releases?version=11 . De todas formas esta instalación podría no funcionar. Se recomienda usar WSL y probar la opción con linux.

In [None]:
# Windows
!pip install pyspark
# Install java from https://adoptium.net/es/temurin/releases?version=11

import os, sys

os.environ["JAVA_HOME"] = r"C:\Program Files\Eclipse Adoptium\jdk-11.0.28.6-hotspot" #path/to/java_distribution_folder
os.environ["PATH"] = os.path.join(os.environ["JAVA_HOME"], "bin") + os.pathsep + os.environ["PATH"]

# Force Spark to use *this* Python (your venv interpreter)
os.environ["PYSPARK_PYTHON"] = sys.executable
os.environ["PYSPARK_DRIVER_PYTHON"] = sys.executable

# Force IPv4 (avoids Windows IPv6/IPv4 mismatch)
os.environ["SPARK_LOCAL_IP"] = "127.0.0.1"

# Disable worker reuse (avoids stale socket reuse bug on Windows)
os.environ["PYSPARK_SUBMIT_ARGS"] = "--conf spark.python.worker.reuse=false pyspark-shell"

ERROR: THESE PACKAGES DO NOT MATCH THE HASHES FROM THE REQUIREMENTS FILE. If you have updated the package versions, please update the hashes. Otherwise, examine the package contents carefully; someone may have tampered with them.
    pyspark from https://files.pythonhosted.org/packages/9d/0e/5b38d51f1b1c2618cccfbf35093268665af9a3bdb493e5a3ecd991def633/pyspark-4.0.0.tar.gz:
        Expected sha256 38db1b4f6095a080d7605e578d775528990e66dc326311d93e94a71cfc24e5a5
             Got        3e6a031ffe6ec7f9b31d50552f996ee92ca2af70c7a7d8900fe5db1f4fea73a6


[notice] A new release of pip is available: 23.2.1 -> 25.2
[notice] To update, run: python.exe -m pip install --upgrade pip


Collecting pyspark
  Downloading pyspark-4.0.0.tar.gz (434.1 MB)
     ---------------------------------------- 0.0/434.1 MB ? eta -:--:--
     ---------------------------------------- 0.0/434.1 MB ? eta -:--:--
     ---------------------------------------- 0.0/434.1 MB ? eta -:--:--
     ---------------------------------------- 0.0/434.1 MB ? eta -:--:--
     -------------------------------------- 0.0/434.1 MB 217.9 kB/s eta 0:33:13
     -------------------------------------- 0.0/434.1 MB 196.9 kB/s eta 0:36:45
     -------------------------------------- 0.1/434.1 MB 252.2 kB/s eta 0:28:42
     -------------------------------------- 0.1/434.1 MB 252.2 kB/s eta 0:28:42
     -------------------------------------- 0.1/434.1 MB 400.9 kB/s eta 0:18:03
     -------------------------------------- 0.1/434.1 MB 400.9 kB/s eta 0:18:03
     -------------------------------------- 0.2/434.1 MB 436.9 kB/s eta 0:16:34
     -------------------------------------- 0.2/434.1 MB 509.6 kB/s eta 0:14:12
   

In [1]:
import pyspark
pyspark.__version__

'4.0.0'

### Nuestra primera SparkSession

Para iniciar con PySpark, es necesario realizar algunos pasos de configuración previos. Primero, instala la biblioteca pyspark junto con Java 8. Luego, ajusta las variables de entorno adecuadamente para asegurar que el sistema opere sin errores (Sección anterior).


Cuando trabajas con PySpark, iniciar el `SparkSession` es uno de los primeros pasos que debes realizar. El SparkSession es el corazón de cualquier aplicación Spark, ya que actúa como el principal punto de entrada a las funcionalidades de Spark. Permite a Spark acceder a los recursos del cluster y ejecutar operaciones en conjunto de datos distribuidos. Aquí tienes una explicación paso a paso de cómo se inicia el SparkSession usando el código proporcionado:

In [2]:
from pyspark.sql import SparkSession

# Inicializo el Spark Context
spark = SparkSession.builder.master("local[*]").appName("MiApp").getOrCreate()
print("Spark version:", spark.version)
spark

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/08/25 01:09:36 WARN Utils: Your hostname, LAPTOP-LL8A51S5, resolves to a loopback address: 127.0.1.1; using 192.168.17.169 instead (on interface eth0)
25/08/25 01:09:36 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/08/25 01:09:53 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Spark version: 4.0.0


Después de haber inicializado un SparkSession, ya podemos utilizar todos los métodos para crear y manipular RDDs, Dataframes, y Datasets que son las estructuras de datos presentes en Spark.

In [4]:
sc = spark.sparkContext
dataRDD = sc.parallelize([
    ("ignacio", 20),
    ("antonia", 31),
    ("juan", 30),
    ("sebastian", 29),
    ("ignacio", 25),
    ("juan", 40)
])

agesRDD = (
    dataRDD.map(lambda x: (x[0], (x[1], 1)))
    .reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))
    .map(lambda x: (x[0], x[1][0]/x[1][1]))
)
agesRDD.collect()

                                                                                

[('juan', 35.0), ('sebastian', 29.0), ('ignacio', 22.5), ('antonia', 31.0)]

In [6]:
numbers = sc.parallelize([ 1, 2, 3, 4, 5, 6, 10, 10, 2, 3, 4])

In [7]:
?numbers.reduceByKey

[31mSignature:[39m
numbers.reduceByKey(
    func: Callable[[~V, ~V], ~V],
    numPartitions: Optional[int] = [38;5;28;01mNone[39;00m,
    partitionFunc: Callable[[~K], int] = <function portable_hash at [32m0x7f5f1a72efc0[39m>,
) -> [33m'RDD[Tuple[K, V]]'[39m
[31mDocstring:[39m
Merge the values for each key using an associative and commutative reduce function.

This will also perform the merging locally on each mapper before
sending results to a reducer, similarly to a "combiner" in MapReduce.

Output will be partitioned with `numPartitions` partitions, or
the default parallelism level if `numPartitions` is not specified.
Default partitioner is hash-partition.

.. versionadded:: 1.6.0

Parameters
----------
func : function
    the reduce function
numPartitions : int, optional
    the number of partitions in new :class:`RDD`
partitionFunc : function, optional, default `portable_hash`
    function to compute the partition index

Returns
-------
:class:`RDD`
    a :class:`RDD` co

In [8]:
print(agesRDD.toDebugString())

b'(12) PythonRDD[11] at collect at /tmp/ipykernel_942/1187016698.py:16 []\n |   MapPartitionsRDD[10] at mapPartitions at PythonRDD.scala:168 []\n |   ShuffledRDD[9] at partitionBy at NativeMethodAccessorImpl.java:0 []\n +-(12) PairwiseRDD[8] at reduceByKey at /tmp/ipykernel_942/1187016698.py:13 []\n    |   PythonRDD[7] at reduceByKey at /tmp/ipykernel_942/1187016698.py:13 []\n    |   ParallelCollectionRDD[6] at readRDDFromFile at PythonRDD.scala:297 []'


El siguiente ejemplo ilustra la creación de un RDD a partir de un archivo externo utilizando el método `sc.textFile()`:

In [9]:
!wget https://raw.githubusercontent.com/MDS7202/MDS7202/main/clases/2024-01/imagenes_clase_7/Shrek-Script.txt

--2025-08-25 01:12:21--  https://raw.githubusercontent.com/MDS7202/MDS7202/main/clases/2024-01/imagenes_clase_7/Shrek-Script.txt
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.109.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 72841 (71K) [text/plain]
Saving to: ‘Shrek-Script.txt.2’


2025-08-25 01:12:21 (4.68 MB/s) - ‘Shrek-Script.txt.2’ saved [72841/72841]



In [10]:
# Cargamos archivo de texto
text_file = sc.textFile("Shrek-Script.txt")

# Separo en palabras
words = text_file.flatMap(lambda line: line.split(" "))

# Transformo a clave-valor
key_value = words.map(lambda word: (word, 1))

# Agrupo por clave
wordCounts = key_value.reduceByKey(lambda a,b:a +b)

# Muestro el primero
wordCounts.first()

('(2001)', 1)

In [11]:
wordCounts.max(lambda x: x[1])

('the', 437)

In [12]:
wordCounts.filter(lambda x: x[1] > 100).sortBy(lambda x: x[1], ascending=False).collect()

[('the', 437),
 ('a', 285),
 ('SHREK', 266),
 ('and', 265),
 ('I', 263),
 ('to', 237),
 ('DONKEY', 224),
 ('you', 221),
 ('Shrek', 140),
 ('FIONA', 137),
 ('of', 133),
 ('is', 122)]

In [13]:
# Muestro todos
for x in wordCounts.collect():
     print(x)

# Mostramos la cantidad
wordCounts.count()

('(2001)', 1)
('Writer', 1)
('Credits:', 1)
('Steig', 1)
('Ted', 1)
('Terry', 1)
('S.H.', 1)
('(Screenplay);', 1)
('Cameron,', 1)
('Chris', 1)
('Vernon', 1)
('(Additional', 1)
('Dialogue).', 1)
('OPENING', 1)
('A', 15)
('FAIRYTALE', 1)
('BOOK', 1)
('Voiceover:', 2)
('But', 30)
('an', 21)
('her', 53)
('of', 133)
('fearful', 1)
('sort', 1)
('could', 6)
('by', 26)
('first', 12)
('She', 29)
('castle', 4)
('terrible', 1)
('fire-breathing', 2)
('dragon.', 5)
('knights', 5)
('to', 237)
('from', 36)
('this', 47)
('dreadful', 1)
('prison,', 1)
('but', 36)
('none', 1)
('prevailed.', 1)
('room', 2)
('tallest', 4)
('for', 52)
('true', 19)
('love', 19)
('and', 265)
('(Page', 1)
('ripped', 1)
('CUT', 47)
('TOILET.', 1)
('Like', 4)
("that's", 17)
('ever', 9)
('happen.', 2)
('What', 37)
('load', 1)
('-', 87)
('(Toilet', 1)
('flush,', 1)
('bursts', 2)
('plays:', 4)
('Smashmouth', 1)
('play.)', 1)
('Montage', 2)
('ogre..', 1)
('NIGHT.', 6)
('have', 53)
('torches', 2)
('are', 65)
('running', 7)
('swamp.'

3409

> **Nota 📝:** Mediante los RDD (Resilient Distributed Datasets), se llevan a cabo las operaciones fundamentales de Spark, tales como filtrado, mapeo, reducción y agrupación, proporcionando una base sólida para el procesamiento de datos. Posteriormente, estos mismos principios se aplican para realizar operaciones más avanzadas en DataFrames, permitiendo un manejo de datos más sofisticado sin que tengamos que intervenir directamente en la complejidad subyacente.

In [14]:
# Importante apagar la sesión interactiva una vez que terminemos
sc.stop()

> **Pregunta ❓**: ¿Cuál debería ser la razón de apagar esto?

### Dataframes

<center>
<img src="https://github.com/MDS7202/MDS7202/blob/main/clases/2024-01/imagenes_clase_7/sparkDataframes.png?raw=true" width=500 />



Los DataFrames de Spark son estructuras de datos distribuidas, inmutables y esquematizadas, diseñadas para el procesamiento eficiente de grandes volúmenes de datos en clusters. Inspirados en los dataframes de R y Pandas, combinan la simplicidad de las APIs de alto nivel con la potencia del procesamiento paralelo distribuido. Cada DataFrame posee un esquema que define el nombre y tipo de sus columnas, permitiendo la manipulación de datos estructurados de manera intuitiva y eficaz. Además, los DataFrames facilitan la ejecución de consultas SQL y se integran con el ecosistema de Spark, ofreciendo una plataforma robusta para análisis de big data, con optimizaciones automáticas para mejorar el rendimiento y la eficiencia en el procesamiento de datos.

Como explicamos anteriormente, la SparkSession proporciona un punto de entrada único a las funcionalidades de Spark, simplificando la interacción del usuario con las capacidades de Spark. Algunos de los métodos mas relevantes son los siguientes:

* `.appName(name):` Define el nombre de la aplicación Spark, útil para identificar la aplicación en la interfaz de usuario de Spark y en los registros.
* `.master(url):` Especifica el modo de despliegue de la aplicación Spark (por ejemplo, local, local[4], spark://host:port, mesos://host:port, etc.).
* `.config(key, value):` Permite establecer configuraciones específicas de Spark, como límites de memoria, propiedades de ejecución, configuraciones de serialización, entre otros.

In [15]:
import pyspark
from pyspark.sql import SparkSession

sparksession = SparkSession.builder.master("local").appName("Ejemplo Dataframes").getOrCreate()

Luego comprobamos las definiciones entregadas a nuestra sesión de Spark:

In [16]:
sparksession

Trabajar con DataFrames en Spark comúnmente implica cargar datos desde una variedad de formatos, como Parquet, CSV, TXT, entre otros, dentro del entorno de Spark. Para ilustrar cómo se hace, cargaremos un archivo Parquet utilizando el método `read`.

In [17]:
%%capture
!wget https://gitlab.com/imezadelajara/datos_clase_7_mds7202/-/raw/main/fraudes.parquet

In [18]:
df = (
    sparksession.read
    .option("header",'True')
    #.option('delimiter', ',') # Formato del delimiter en el archivo de lectura
    .parquet("fraudes.parquet")
)
df.count()

                                                                                

600000


Un `Schema` en Spark define la estructura de los datos, especificando los nombres de las columnas, los tipos de datos de cada columna, y si una columna puede contener valores nulos. En el contexto de Spark SQL y DataFrames, el esquema es una descripción formal de la organización de los datos, lo que facilita el procesamiento, la manipulación y el análisis de grandes conjuntos de datos de manera eficiente.

In [19]:
df.printSchema()

root
 |-- Transaction ID: string (nullable = true)
 |-- Customer ID: string (nullable = true)
 |-- Transaction Amount: double (nullable = true)
 |-- Transaction Date: string (nullable = true)
 |-- Payment Method: string (nullable = true)
 |-- Product Category: string (nullable = true)
 |-- Quantity: long (nullable = true)
 |-- Customer Age: long (nullable = true)
 |-- Customer Location: string (nullable = true)
 |-- Device Used: string (nullable = true)
 |-- IP Address: string (nullable = true)
 |-- Shipping Address: string (nullable = true)
 |-- Billing Address: string (nullable = true)
 |-- Is Fraudulent: long (nullable = true)
 |-- Account Age Days: long (nullable = true)
 |-- Transaction Hour: long (nullable = true)



## Transformaciones, Acciones y Lazy evaluation

<center>
<img src="https://github.com/MDS7202/MDS7202/blob/main/clases/2024-01/imagenes_clase_7/lazy.png?raw=true" width=500 />



En Spark, las operaciones se dividen en dos categorías principales: transformaciones y acciones.

- **Transformaciones**: Estas operaciones generan un nuevo DataFrame a partir de uno existente sin modificar el conjunto de datos original, aprovechando la característica de inmutabilidad de los DataFrames de Spark. Permiten realizar manipulaciones como filtrar, mapear y agrupar los datos.

- **Acciones**: Son las operaciones que desencadenan la ejecución de todas las transformaciones acumuladas. Las transformaciones se organizan y almacenan internamente hasta que se invoca una acción, momento en el cual Spark lleva a cabo las operaciones necesarias.

Es crucial destacar que Spark opera bajo el principio de `lazy evaluation` , lo que significa que las operaciones se planifican y almacenan sin ejecutarse inmediatamente. Esta evaluación perezosa permite a Spark optimizar el plan de ejecución de las queries, encadenando transformaciones y manteniendo un linaje de los datos para proporcionar tolerancia a fallos. Esta arquitectura contrasta significativamente con sistemas como Hadoop, donde la tolerancia a fallos y la optimización de ejecución no son tan intrínsecamente gestionadas, ofreciendo a Spark una ventaja considerable en eficiencia y resiliencia.

| Operación   | Tipo           |
| ----------- | -------------- |
| `orderBy()` | Transformación |
| `groupBy()` | Transformación |
| `filter()`  | Transformación |
| `select()`  | Transformación |
| `join()`    | Transformación |
| `show()`    | Acción         |
| `take()`    | Acción         |
| `count()`   | Acción         |
| `collect()` | Acción         |
| `save()`    | Acción         |

A partir de esto, podemos deducir que todos los métodos que consolidan información a partir de las tablas se clasifican como acciones, debido a que requieren ejecutar el linaje de comandos aplicados anteriormente para producir resultados.

**Analicemos la diferencia en tiempo de ejecución entre una transformación y una acción:**

In [20]:
%%time
df_group1 = df.groupBy('Product Category').agg(
    {
        'Customer Age':'median',
        'Product Category':'count',
        'Transaction Amount':'mean'
    }
)

CPU times: user 0 ns, sys: 5.04 ms, total: 5.04 ms
Wall time: 118 ms


In [21]:
%%time
df_group1.show(10)

[Stage 4:>                                                          (0 + 1) / 1]

+----------------+--------------------+-----------------------+-----------------------+
|Product Category|median(Customer Age)|avg(Transaction Amount)|count(Product Category)|
+----------------+--------------------+-----------------------+-----------------------+
| health & beauty|                35.0|     226.31680804387682|                 120340|
|    toys & games|                34.0|     227.39503040971658|                 119863|
|     electronics|                34.0|     226.74130695314096|                 119989|
|        clothing|                35.0|     226.21975282397648|                 119955|
|   home & garden|                35.0|     227.50232927002205|                 119853|
+----------------+--------------------+-----------------------+-----------------------+

CPU times: user 6.21 ms, sys: 0 ns, total: 6.21 ms
Wall time: 1.51 s


                                                                                

In [22]:
sparksession.stop()

## Operaciones sobre DataFrames con Spark

<center>
<img src="https://github.com/MDS7202/MDS7202/blob/main/clases/2024-01/imagenes_clase_7/operationsDataframe.png?raw=true" width=500 />

Al igual que en Pandas, Spark nos permite realizar una amplia gama de operaciones sobre los DataFrames, facilitando la creación de nuevas estructuras de datos que se ajustan a las necesidades específicas de un problema. En esta sección, exploraremos diversos comandos que abordan las operaciones más habituales que podrías necesitar en tus proyectos de análisis de datos con Spark.

In [23]:
import pyspark
from pyspark.sql import SparkSession

sparksession = SparkSession.builder.master("local").appName("Ejemplo Dataframes").getOrCreate()

df = (
    sparksession.read
    .option("header",'True')
    .parquet("fraudes.parquet")

)

In [24]:
df

DataFrame[Transaction ID: string, Customer ID: string, Transaction Amount: double, Transaction Date: string, Payment Method: string, Product Category: string, Quantity: bigint, Customer Age: bigint, Customer Location: string, Device Used: string, IP Address: string, Shipping Address: string, Billing Address: string, Is Fraudulent: bigint, Account Age Days: bigint, Transaction Hour: bigint]

### Operaciones Básicas

Como ya habiamos visto, el método `.printSchema()` es extremadamente útil para entender rápidamente la estructura de nuestros datos, incluyendo los nombres de las columnas y sus tipos de datos, así como si alguna columna permite valores nulos.

In [25]:
# Revisemos el esquema del dataframe
df.printSchema()

root
 |-- Transaction ID: string (nullable = true)
 |-- Customer ID: string (nullable = true)
 |-- Transaction Amount: double (nullable = true)
 |-- Transaction Date: string (nullable = true)
 |-- Payment Method: string (nullable = true)
 |-- Product Category: string (nullable = true)
 |-- Quantity: long (nullable = true)
 |-- Customer Age: long (nullable = true)
 |-- Customer Location: string (nullable = true)
 |-- Device Used: string (nullable = true)
 |-- IP Address: string (nullable = true)
 |-- Shipping Address: string (nullable = true)
 |-- Billing Address: string (nullable = true)
 |-- Is Fraudulent: long (nullable = true)
 |-- Account Age Days: long (nullable = true)
 |-- Transaction Hour: long (nullable = true)



In [26]:
# Podemos ver el tipo de objeto que es un dataframe
type(df)

pyspark.sql.classic.dataframe.DataFrame

`.head(N)` retorna las N primeras filas del DataFrame df como un objeto Row. Es especialmente útil para obtener una rápida visión general de los datos y su formato sin necesidad de cargar todo el conjunto de datos en memoria. Es similar a take(N), pero más intuitivo para quienes provienen de un entorno de Pandas.



In [27]:
# Comprobar los N primeros elementos del dataframe
df.head(2)

                                                                                

[Row(Transaction ID='15d2e414-8735-46fc-9e02-80b472b2580f', Customer ID='d1b87f62-51b2-493b-ad6a-77e0fe13e785', Transaction Amount=58.09, Transaction Date='2024-02-20 05:58:41', Payment Method='bank transfer', Product Category='electronics', Quantity=1, Customer Age=17, Customer Location='Amandaborough', Device Used='tablet', IP Address='212.195.49.198', Shipping Address='Unit 8934 Box 0058\nDPO AA 05437', Billing Address='Unit 8934 Box 0058\nDPO AA 05437', Is Fraudulent=0, Account Age Days=30, Transaction Hour=5),
 Row(Transaction ID='0bfee1a0-6d5e-40da-a446-d04e73b1b177', Customer ID='37de64d5-e901-4a56-9ea0-af0c24c069cf', Transaction Amount=389.96, Transaction Date='2024-02-25 08:09:45', Payment Method='debit card', Product Category='electronics', Quantity=2, Customer Age=40, Customer Location='East Timothy', Device Used='desktop', IP Address='208.106.249.121', Shipping Address='634 May Keys\nPort Cherylview, NV 75063', Billing Address='634 May Keys\nPort Cherylview, NV 75063', Is F

`show(n)` imprime las primeras n filas del DataFrame df en una tabla ASCII fácil de leer, en este caso, las primeras 4 filas. Es una herramienta conveniente para una inspección rápida de los datos, incluyendo el formato de las columnas y los primeros valores, lo que ayuda a entender el contexto de los datos con los que estamos trabajando.

In [28]:
# Mostrar un ejemplo de 4 filas
df.show(4)

+--------------------+--------------------+------------------+-------------------+--------------+----------------+--------+------------+-----------------+-----------+---------------+--------------------+--------------------+-------------+----------------+----------------+
|      Transaction ID|         Customer ID|Transaction Amount|   Transaction Date|Payment Method|Product Category|Quantity|Customer Age|Customer Location|Device Used|     IP Address|    Shipping Address|     Billing Address|Is Fraudulent|Account Age Days|Transaction Hour|
+--------------------+--------------------+------------------+-------------------+--------------+----------------+--------+------------+-----------------+-----------+---------------+--------------------+--------------------+-------------+----------------+----------------+
|15d2e414-8735-46f...|d1b87f62-51b2-493...|             58.09|2024-02-20 05:58:41| bank transfer|     electronics|       1|          17|    Amandaborough|     tablet| 212.195.49.198

El método `.sample()` genera una muestra aleatoria del DataFrame df, donde fraction determina la fracción aproximada de filas a retornar. En este ejemplo, se solicita un 10% de las filas. El parámetro seed asegura la reproducibilidad del muestreo. Es útil para trabajar con un subconjunto representativo de los datos cuando el conjunto completo es demasiado grande para procesar eficientemente.

In [29]:
# Realizar un muestreo con el dataframe
df.sample(fraction=0.1, seed=3).show()

+--------------------+--------------------+------------------+-------------------+--------------+----------------+--------+------------+-----------------+-----------+---------------+--------------------+--------------------+-------------+----------------+----------------+
|      Transaction ID|         Customer ID|Transaction Amount|   Transaction Date|Payment Method|Product Category|Quantity|Customer Age|Customer Location|Device Used|     IP Address|    Shipping Address|     Billing Address|Is Fraudulent|Account Age Days|Transaction Hour|
+--------------------+--------------------+------------------+-------------------+--------------+----------------+--------+------------+-----------------+-----------+---------------+--------------------+--------------------+-------------+----------------+----------------+
|1b46cabd-fa15-47b...|a748a557-cf3d-481...|            267.12|2024-03-09 00:04:37|   credit card|   home & garden|       2|          13|         West Amy|     tablet| 123.118.78.180

El método `select()` permite especificar un subconjunto de columnas para incluir en un nuevo DataFrame. Aquí, seleccionamos solo las columnas 'Transaction ID', 'Transaction Amount' y 'Is Fraudulent' de df. Este enfoque es útil para enfocar el análisis en ciertas características de interés o para reducir la carga de memoria al trabajar solo con las columnas relevantes.

In [30]:
# Seleccionar partes del dataframe
df.select(['Transaction ID','Transaction Amount', 'Is Fraudulent']).show()

+--------------------+------------------+-------------+
|      Transaction ID|Transaction Amount|Is Fraudulent|
+--------------------+------------------+-------------+
|15d2e414-8735-46f...|             58.09|            0|
|0bfee1a0-6d5e-40d...|            389.96|            0|
|e588eef4-b754-468...|            134.19|            0|
|4de46e52-60c3-49d...|            226.17|            0|
|074a76de-fe2d-443...|            121.53|            0|
|4e707452-7c8a-4cb...|            166.41|            0|
|7ed952fe-8ae1-4f1...|             92.88|            0|
|0b2fb5aa-7171-472...|            318.14|            0|
|1f52366c-7f40-439...|             47.92|            0|
|3f10dfde-9c4c-408...|            121.78|            0|
|75f19b14-516c-4f1...|            633.39|            0|
|0dae14e6-aca5-48b...|             56.31|            0|
|fb09ac9b-8c76-4ca...|            275.87|            0|
|3a25fa55-ec25-4b4...|            178.94|            0|
|c696ffee-f01d-444...|            374.04|       

`dtypes` retorna una lista de tuplas con los nombres de las columnas y sus tipos de datos correspondientes en el DataFrame df. Este atributo no es un método y se utiliza para obtener rápidamente el esquema de datos de un DataFrame en términos de tipos de Python, lo que es crucial para la validación de datos y la planificación de operaciones de transformación y análisis.

In [31]:
df.dtypes

[('Transaction ID', 'string'),
 ('Customer ID', 'string'),
 ('Transaction Amount', 'double'),
 ('Transaction Date', 'string'),
 ('Payment Method', 'string'),
 ('Product Category', 'string'),
 ('Quantity', 'bigint'),
 ('Customer Age', 'bigint'),
 ('Customer Location', 'string'),
 ('Device Used', 'string'),
 ('IP Address', 'string'),
 ('Shipping Address', 'string'),
 ('Billing Address', 'string'),
 ('Is Fraudulent', 'bigint'),
 ('Account Age Days', 'bigint'),
 ('Transaction Hour', 'bigint')]

> **Pregunta ❓**: ¿Qué pasa si quiero cambiar el tipo de dato de una columna?

```python
df[col_name].cast(new_type)
```

`.cast()` se utiliza para cambiar el tipo de datos de una columna específica en el DataFrame df. Aquí, col_name es el nombre de la columna que quieres modificar, y new_type es el nuevo tipo de datos que deseas aplicar a la columna. El método cast(new_type) es especialmente útil para asegurar que los tipos de datos de tus columnas se alineen con las operaciones analíticas que planeas realizar.

A diferencia de Pandas, donde las operaciones pueden aplicarse a múltiples columnas de manera simultánea, este método en PySpark se ejecuta individualmente en una sola columna. Por lo tanto, si necesitas aplicar cambios a varias columnas, sería conveniente definir una función como la siguiente para automatizar el proceso:

In [32]:
from pyspark.sql.types import StringType, IntegerType, FloatType


def cast_columns(df, cols_types):
    """
    Cambia el tipo de múltiples columnas en un DataFrame de Spark.

    Parámetros:
    - df: DataFrame de Spark.
    - cols_types: Diccionario con nombres de columnas como claves y tipos de datos de Spark como valores.

    Retorna:
    - DataFrame de Spark con tipos de columnas modificados.
    """
    for col_name, new_type in cols_types.items():
        df = df.withColumn(col_name, df[col_name].cast(new_type))
    return df

# Ejemplo de uso
cols_to_cast = {"Quantity": IntegerType(), "Is Fraudulent": FloatType()}
df_con_casteo = cast_columns(df, cols_to_cast)
df_con_casteo.printSchema()

root
 |-- Transaction ID: string (nullable = true)
 |-- Customer ID: string (nullable = true)
 |-- Transaction Amount: double (nullable = true)
 |-- Transaction Date: string (nullable = true)
 |-- Payment Method: string (nullable = true)
 |-- Product Category: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- Customer Age: long (nullable = true)
 |-- Customer Location: string (nullable = true)
 |-- Device Used: string (nullable = true)
 |-- IP Address: string (nullable = true)
 |-- Shipping Address: string (nullable = true)
 |-- Billing Address: string (nullable = true)
 |-- Is Fraudulent: float (nullable = true)
 |-- Account Age Days: long (nullable = true)
 |-- Transaction Hour: long (nullable = true)



`.withColumn()` añade una nueva columna al DataFrame df, llamada 'Transaction Amount CLP', calculada multiplicando cada valor en la columna 'Transaction Amount' por 939.79. Este tipo de operación es útil para realizar conversiones de unidades o calcular nuevos valores basados en datos existentes dentro del DataFrame.

In [33]:
### Adding Columns in data frame
df=df.withColumn('Transaction Amount CLP',df['Transaction Amount']*939.79)

# Mostraro dos elementos para comprobar
df.show(2)

+--------------------+--------------------+------------------+-------------------+--------------+----------------+--------+------------+-----------------+-----------+---------------+--------------------+--------------------+-------------+----------------+----------------+----------------------+
|      Transaction ID|         Customer ID|Transaction Amount|   Transaction Date|Payment Method|Product Category|Quantity|Customer Age|Customer Location|Device Used|     IP Address|    Shipping Address|     Billing Address|Is Fraudulent|Account Age Days|Transaction Hour|Transaction Amount CLP|
+--------------------+--------------------+------------------+-------------------+--------------+----------------+--------+------------+-----------------+-----------+---------------+--------------------+--------------------+-------------+----------------+----------------+----------------------+
|15d2e414-8735-46f...|d1b87f62-51b2-493...|             58.09|2024-02-20 05:58:41| bank transfer|     electronic

                                                                                

Aquí, `drop('Transaction Amount CLP')` elimina la columna 'Transaction Amount CLP' del DataFrame df, y show(10) imprime las primeras filas del DataFrame resultante. Este comando es útil cuando necesitas limpiar tu DataFrame eliminando columnas que ya no son necesarias para tu análisis.

In [34]:
### Drop columns
df.drop('Transaction Amount CLP').show(10)

+--------------------+--------------------+------------------+-------------------+--------------+----------------+--------+------------+-----------------+-----------+---------------+--------------------+--------------------+-------------+----------------+----------------+
|      Transaction ID|         Customer ID|Transaction Amount|   Transaction Date|Payment Method|Product Category|Quantity|Customer Age|Customer Location|Device Used|     IP Address|    Shipping Address|     Billing Address|Is Fraudulent|Account Age Days|Transaction Hour|
+--------------------+--------------------+------------------+-------------------+--------------+----------------+--------+------------+-----------------+-----------+---------------+--------------------+--------------------+-------------+----------------+----------------+
|15d2e414-8735-46f...|d1b87f62-51b2-493...|             58.09|2024-02-20 05:58:41| bank transfer|     electronics|       1|          17|    Amandaborough|     tablet| 212.195.49.198

`.withColumnRenamed()` cambia el nombre de la columna 'Customer ID' a 'User ID' en el DataFrame df. El cambio se refleja inmediatamente en el esquema del DataFrame, y show() permite visualizar las primeras filas para confirmar el cambio. Renombrar columnas puede ser crucial para clarificar el significado de los datos o para estandarizar los nombres de las columnas a través de múltiples DataFrames.

In [35]:
# Renombrar columna
df.withColumnRenamed('Customer ID','User ID').show()

+--------------------+--------------------+------------------+-------------------+--------------+----------------+--------+------------+-----------------+-----------+---------------+--------------------+--------------------+-------------+----------------+----------------+----------------------+
|      Transaction ID|             User ID|Transaction Amount|   Transaction Date|Payment Method|Product Category|Quantity|Customer Age|Customer Location|Device Used|     IP Address|    Shipping Address|     Billing Address|Is Fraudulent|Account Age Days|Transaction Hour|Transaction Amount CLP|
+--------------------+--------------------+------------------+-------------------+--------------+----------------+--------+------------+-----------------+-----------+---------------+--------------------+--------------------+-------------+----------------+----------------+----------------------+
|15d2e414-8735-46f...|d1b87f62-51b2-493...|             58.09|2024-02-20 05:58:41| bank transfer|     electronic

In [36]:
df_renombrada = df.select("*")
for col_name in df_renombrada.columns:
  df_renombrada = df_renombrada.withColumnRenamed(col_name, col_name.replace(" ", "_"))
df_renombrada.show(3)

+--------------------+--------------------+------------------+-------------------+--------------+----------------+--------+------------+-----------------+-----------+---------------+--------------------+--------------------+-------------+----------------+----------------+----------------------+
|      Transaction_ID|         Customer_ID|Transaction_Amount|   Transaction_Date|Payment_Method|Product_Category|Quantity|Customer_Age|Customer_Location|Device_Used|     IP_Address|    Shipping_Address|     Billing_Address|Is_Fraudulent|Account_Age_Days|Transaction_Hour|Transaction_Amount_CLP|
+--------------------+--------------------+------------------+-------------------+--------------+----------------+--------+------------+-----------------+-----------+---------------+--------------------+--------------------+-------------+----------------+----------------+----------------------+
|15d2e414-8735-46f...|d1b87f62-51b2-493...|             58.09|2024-02-20 05:58:41| bank transfer|     electronic

El método `.when()` dentro de un withColumn en PySpark se utiliza para realizar operaciones condicionales sobre los datos de un DataFrame. Funciona de manera similar a las declaraciones IF...ELSE en otros lenguajes de programación, permitiéndote especificar una condición y el valor que debe tomarse cuando esa condición es verdadera. También puedes encadenar múltiples condiciones usando .otherwise() para especificar un valor por defecto si las condiciones anteriores no se cumplen.

En este ejemplo, `when(rand() < umbral_nulo, None).otherwise(df["Transaction Amount"])` introduce valores nulos (None) de manera aleatoria en la columna 'Transaction Amount', basado en un umbral definido por umbral_nulo. La función rand() genera un número aleatorio entre 0 y 1, y si este número es menor que el umbral especificado, el valor en la columna se reemplaza por None (nulo); de lo contrario, se mantiene el valor original. Este enfoque puede ser útil para pruebas o simulaciones que requieren representar la ausencia de datos de manera aleatoria.

In [37]:
from pyspark.sql.functions import when, rand, col, sum

umbral_nulo = 0.8  # 80% de probabilidad de ser nulo

# Agregar nulos aleatoriamente a la columna "Edad"
df_nulos = (
    df
    .withColumn(
        "Transaction Amount",
        when(rand() < umbral_nulo, None).otherwise(df["Transaction Amount"])
    )
)
df_nulos = (
    df_nulos
    .withColumn(
        "Customer Age",
        when(rand() < umbral_nulo, None).otherwise(df_nulos["Customer Age"])
    )
)

In [38]:
df_nulos.show(4)

+--------------------+--------------------+------------------+-------------------+--------------+----------------+--------+------------+-----------------+-----------+---------------+--------------------+--------------------+-------------+----------------+----------------+----------------------+
|      Transaction ID|         Customer ID|Transaction Amount|   Transaction Date|Payment Method|Product Category|Quantity|Customer Age|Customer Location|Device Used|     IP Address|    Shipping Address|     Billing Address|Is Fraudulent|Account Age Days|Transaction Hour|Transaction Amount CLP|
+--------------------+--------------------+------------------+-------------------+--------------+----------------+--------+------------+-----------------+-----------+---------------+--------------------+--------------------+-------------+----------------+----------------+----------------------+
|15d2e414-8735-46f...|d1b87f62-51b2-493...|              NULL|2024-02-20 05:58:41| bank transfer|     electronic

### Manejo de Nulos

A diferencia de pandas, Spark no posee un método que le permita calcular directamente los nulos, esto se hace un tanto más complejo:

In [39]:
exprs = [sum(when(col(c).isNull(), 1).otherwise(0)).alias(c) for c in df_nulos.columns]

df_nulos.agg(*exprs).show()

[Stage 10:>                                                         (0 + 1) / 1]

+--------------+-----------+------------------+----------------+--------------+----------------+--------+------------+-----------------+-----------+----------+----------------+---------------+-------------+----------------+----------------+----------------------+
|Transaction ID|Customer ID|Transaction Amount|Transaction Date|Payment Method|Product Category|Quantity|Customer Age|Customer Location|Device Used|IP Address|Shipping Address|Billing Address|Is Fraudulent|Account Age Days|Transaction Hour|Transaction Amount CLP|
+--------------+-----------+------------------+----------------+--------------+----------------+--------+------------+-----------------+-----------+----------+----------------+---------------+-------------+----------------+----------------+----------------------+
|             0|          0|            480002|               0|             0|               0|       0|      480578|                0|          0|         0|               0|              0|            0|  

                                                                                

In [40]:
exprs

[Column<'sum(CASE WHEN isNull(Transaction ID) THEN 1 ELSE 0 END) AS Transaction ID'>,
 Column<'sum(CASE WHEN isNull(Customer ID) THEN 1 ELSE 0 END) AS Customer ID'>,
 Column<'sum(CASE WHEN isNull(Transaction Amount) THEN 1 ELSE 0 END) AS Transaction Amount'>,
 Column<'sum(CASE WHEN isNull(Transaction Date) THEN 1 ELSE 0 END) AS Transaction Date'>,
 Column<'sum(CASE WHEN isNull(Payment Method) THEN 1 ELSE 0 END) AS Payment Method'>,
 Column<'sum(CASE WHEN isNull(Product Category) THEN 1 ELSE 0 END) AS Product Category'>,
 Column<'sum(CASE WHEN isNull(Quantity) THEN 1 ELSE 0 END) AS Quantity'>,
 Column<'sum(CASE WHEN isNull(Customer Age) THEN 1 ELSE 0 END) AS Customer Age'>,
 Column<'sum(CASE WHEN isNull(Customer Location) THEN 1 ELSE 0 END) AS Customer Location'>,
 Column<'sum(CASE WHEN isNull(Device Used) THEN 1 ELSE 0 END) AS Device Used'>,
 Column<'sum(CASE WHEN isNull(IP Address) THEN 1 ELSE 0 END) AS IP Address'>,
 Column<'sum(CASE WHEN isNull(Shipping Address) THEN 1 ELSE 0 END) A

Sin embargo, métodos como `.dropna()` eliminan las filas del DataFrame que contienen valores nulos (None o NaN) en cualquier columna. Es útil cuando deseas asegurar que tu análisis o los modelos de machine learning no se vean afectados por datos incompletos.

In [41]:
df_nulos.count()

600000

In [42]:
df_nulos.dropna().count()

                                                                                

23883

In [43]:
# Drop si cualquiera de las columnas tiene un nulo
df_nulos.dropna(how="any").count()

                                                                                

23883

In [44]:
# Drop si todas las columnas tiene un nulo
df_nulos.dropna(how="all").count()

                                                                                

600000

In [45]:
# Subset
df_nulos.dropna(how="any",subset=['Transaction Amount']).count()

119998

Por otro lado, para rellenar los valores nulos en el DataFrame df con un valor específico que proporcionas (valor). Este método puede aplicarse a todo el DataFrame o especificarse para ciertas columnas, mejorando la calidad de los datos para el análisis al tratar con valores ausentes.

In [46]:
# Fill con un valor que desees
df_filled = df_nulos.fillna({'Customer Age': -18})
# Buscamos los nulos rellenados
df_filled.filter(df_filled['Customer Age']==-18).show(4)

+--------------------+--------------------+------------------+-------------------+--------------+----------------+--------+------------+-----------------+-----------+---------------+--------------------+--------------------+-------------+----------------+----------------+----------------------+
|      Transaction ID|         Customer ID|Transaction Amount|   Transaction Date|Payment Method|Product Category|Quantity|Customer Age|Customer Location|Device Used|     IP Address|    Shipping Address|     Billing Address|Is Fraudulent|Account Age Days|Transaction Hour|Transaction Amount CLP|
+--------------------+--------------------+------------------+-------------------+--------------+----------------+--------+------------+-----------------+-----------+---------------+--------------------+--------------------+-------------+----------------+----------------+----------------------+
|15d2e414-8735-46f...|d1b87f62-51b2-493...|              NULL|2024-02-20 05:58:41| bank transfer|     electronic

> **Pregunta ❓**: ¿Existe algún método más inteligente para rellenar nulos?

In [43]:
!pip install numpy

Collecting numpy
  Downloading numpy-2.3.2-cp312-cp312-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl.metadata (62 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m62.1/62.1 kB[0m [31m2.6 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading numpy-2.3.2-cp312-cp312-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl (16.6 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m16.6/16.6 MB[0m [31m23.3 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25hInstalling collected packages: numpy
Successfully installed numpy-2.3.2


In [47]:
# Extra
from pyspark.ml.feature import Imputer

columns_to_impute = ['Transaction Amount', 'Customer Age']
imputer = Imputer(
  inputCols = columns_to_impute,
  outputCols = ["{}_imputed".format(c) for c in columns_to_impute],
  strategy = 'median'
)

In [48]:
imputer_model = imputer.fit(df_nulos)
df_imputed = imputer_model.transform(df_nulos)
(
    df_imputed
    .filter(
        # (df_imputed['Transaction Amount'].isNull())
        (df_imputed['Customer Age'].isNull())
    ).show(4)
)

+--------------------+--------------------+------------------+-------------------+--------------+----------------+--------+------------+-----------------+-----------+---------------+--------------------+--------------------+-------------+----------------+----------------+----------------------+--------------------------+--------------------+
|      Transaction ID|         Customer ID|Transaction Amount|   Transaction Date|Payment Method|Product Category|Quantity|Customer Age|Customer Location|Device Used|     IP Address|    Shipping Address|     Billing Address|Is Fraudulent|Account Age Days|Transaction Hour|Transaction Amount CLP|Transaction Amount_imputed|Customer Age_imputed|
+--------------------+--------------------+------------------+-------------------+--------------+----------------+--------+------------+-----------------+-----------+---------------+--------------------+--------------------+-------------+----------------+----------------+----------------------+-----------------

### Manejo de Filtros

`.filter` filtra las filas del DataFrame df para incluir solo aquellas que cumplen con la condición especificada. La condicion puede ser una expresión lógica aplicada a las columnas del DataFrame. Es una herramienta esencial para refinar tus conjuntos de datos, seleccionando solo las partes de interés.

Una forma simple de hacer esto, es señalando a traves de un `str` la condición que quieres filtrar:

In [49]:
df.filter("Quantity<=3").show()

+--------------------+--------------------+------------------+-------------------+--------------+----------------+--------+------------+-----------------+-----------+---------------+--------------------+--------------------+-------------+----------------+----------------+----------------------+
|      Transaction ID|         Customer ID|Transaction Amount|   Transaction Date|Payment Method|Product Category|Quantity|Customer Age|Customer Location|Device Used|     IP Address|    Shipping Address|     Billing Address|Is Fraudulent|Account Age Days|Transaction Hour|Transaction Amount CLP|
+--------------------+--------------------+------------------+-------------------+--------------+----------------+--------+------------+-----------------+-----------+---------------+--------------------+--------------------+-------------+----------------+----------------+----------------------+
|15d2e414-8735-46f...|d1b87f62-51b2-493...|             58.09|2024-02-20 05:58:41| bank transfer|     electronic

Por otro lado, hay una forma más similar a Pandas, esta forma utiliza el objeto `Column` de spark para realizar el filtro.

In [50]:
df.filter(df['Transaction Amount']>200).show()

+--------------------+--------------------+------------------+-------------------+--------------+----------------+--------+------------+-----------------+-----------+---------------+--------------------+--------------------+-------------+----------------+----------------+----------------------+
|      Transaction ID|         Customer ID|Transaction Amount|   Transaction Date|Payment Method|Product Category|Quantity|Customer Age|Customer Location|Device Used|     IP Address|    Shipping Address|     Billing Address|Is Fraudulent|Account Age Days|Transaction Hour|Transaction Amount CLP|
+--------------------+--------------------+------------------+-------------------+--------------+----------------+--------+------------+-----------------+-----------+---------------+--------------------+--------------------+-------------+----------------+----------------+----------------------+
|0bfee1a0-6d5e-40d...|37de64d5-e901-4a5...|            389.96|2024-02-25 08:09:45|    debit card|     electronic

Para el caso quisieramos filtrar con más de una condición, deberiamos usar los simbolos `|` para añadir un "ó" y `&` para añadir un "y".

In [46]:
df.filter((df['Transaction Amount']<=200) & (df['Transaction Amount']>=50)).show()

[Stage 34:>                                                         (0 + 1) / 1]

+--------------------+--------------------+------------------+-------------------+--------------+----------------+--------+------------+------------------+-----------+---------------+--------------------+--------------------+-------------+----------------+----------------+----------------------+
|      Transaction ID|         Customer ID|Transaction Amount|   Transaction Date|Payment Method|Product Category|Quantity|Customer Age| Customer Location|Device Used|     IP Address|    Shipping Address|     Billing Address|Is Fraudulent|Account Age Days|Transaction Hour|Transaction Amount CLP|
+--------------------+--------------------+------------------+-------------------+--------------+----------------+--------+------------+------------------+-----------+---------------+--------------------+--------------------+-------------+----------------+----------------+----------------------+
|15d2e414-8735-46f...|d1b87f62-51b2-493...|             58.09|2024-02-20 05:58:41| bank transfer|     electro

                                                                                

Finalmente, si deseamos la negación de estos filtros, tendremos que usar el simbolo `~`:


In [47]:
df.filter(~(df['Transaction Amount']>=100)).show()

[Stage 35:>                                                         (0 + 1) / 1]

+--------------------+--------------------+------------------+-------------------+--------------+----------------+--------+------------+-----------------+-----------+---------------+--------------------+--------------------+-------------+----------------+----------------+----------------------+
|      Transaction ID|         Customer ID|Transaction Amount|   Transaction Date|Payment Method|Product Category|Quantity|Customer Age|Customer Location|Device Used|     IP Address|    Shipping Address|     Billing Address|Is Fraudulent|Account Age Days|Transaction Hour|Transaction Amount CLP|
+--------------------+--------------------+------------------+-------------------+--------------+----------------+--------+------------+-----------------+-----------+---------------+--------------------+--------------------+-------------+----------------+----------------+----------------------+
|15d2e414-8735-46f...|d1b87f62-51b2-493...|             58.09|2024-02-20 05:58:41| bank transfer|     electronic

                                                                                

### Agregaciones

`.groupBy()` Agrupa el DataFrame df por una o más columnas, permitiendo realizar operaciones de agregación sobre los grupos resultantes. Este método es fundamental para el análisis de datos, ya que permite resumir y entender mejor las características distintas de diferentes segmentos o categorías en tus datos.

In [48]:
df.groupBy('Product Category').sum().show()

[Stage 36:>                                                         (0 + 1) / 1]

+----------------+-----------------------+-------------+-----------------+------------------+---------------------+---------------------+---------------------------+
|Product Category|sum(Transaction Amount)|sum(Quantity)|sum(Customer Age)|sum(Is Fraudulent)|sum(Account Age Days)|sum(Transaction Hour)|sum(Transaction Amount CLP)|
+----------------+-----------------------+-------------+-----------------+------------------+---------------------+---------------------+---------------------------+
| health & beauty|   2.7234964680000138E7|       360265|          4155478|              5980|             21646699|              1356016|       2.559514745661702E10|
|    toys & games|    2.725625052999986E7|       359408|          4132384|              6081|             21573251|              1351441|       2.561515168558887E10|
|     electronics|    2.720646268000043E7|       360161|          4137520|              6006|             21584165|              1355189|       2.556836156203693...|
|   

                                                                                

In [49]:
df.groupBy('Product Category').avg().show()

[Stage 39:>                                                         (0 + 1) / 1]

+----------------+-----------------------+------------------+------------------+--------------------+---------------------+---------------------+---------------------------+
|Product Category|avg(Transaction Amount)|     avg(Quantity)| avg(Customer Age)|  avg(Is Fraudulent)|avg(Account Age Days)|avg(Transaction Hour)|avg(Transaction Amount CLP)|
+----------------+-----------------------+------------------+------------------+--------------------+---------------------+---------------------+---------------------------+
| health & beauty|     226.31680804387682|2.9937261093568224| 34.53114508891474| 0.04969253780953964|   179.87949975070634|   11.268206747548613|         212690.27303155244|
|    toys & games|     227.39503040971658| 2.998489942684565| 34.47589331153066|0.050732920083762297|   179.98257176943676|    11.27488048855777|         213703.57562875008|
|     electronics|     226.74130695314096| 3.001616814874697| 34.48249422863763| 0.05005458833726425|   179.88453108201585|    11.

                                                                                

Muy similar al método anterior, `.agg()` realiza una operación de agregación sobre el DataFrame df, como max, min, avg, sum, etc., especificada por un diccionario que mapea nombres de columnas a funciones de agregación. Cuando se usa después de groupBy, aplica estas funciones de agregación a cada grupo.

In [50]:
(
    df
    .groupBy(['Product Category', 'Payment Method'])
    .agg({'Quantity':'sum', 'Transaction Amount':'mean'})
    .show()
)

[Stage 42:>                                                         (0 + 1) / 1]

+----------------+--------------+-----------------------+-------------+
|Product Category|Payment Method|avg(Transaction Amount)|sum(Quantity)|
+----------------+--------------+-----------------------+-------------+
| health & beauty|   credit card|      225.1705797437345|        90889|
|   home & garden|    debit card|     227.96262454273386|        90231|
|   home & garden|        PayPal|     227.29266382099075|        90194|
|        clothing|    debit card|     225.37682668183115|        89516|
|   home & garden| bank transfer|     226.55064501036057|        89239|
| health & beauty|        PayPal|      226.3843668195371|        89372|
|     electronics|   credit card|     227.04638911102143|        89870|
| health & beauty|    debit card|     226.62497625610175|        89748|
|    toys & games|    debit card|     225.43380700000031|        89587|
|    toys & games| bank transfer|     228.85355572682707|        89541|
|   home & garden|   credit card|      228.2026859476474|       

                                                                                

In [51]:
import pyspark.sql.functions as F
(
    df
    .groupBy(['Product Category', 'Payment Method'])
    .agg(
        F.mean(F.col('Transaction Amount')).alias('Avg Transaction Amount'),
        F.sum(F.col('Quantity')).alias('Total Quantity')
    )
    .show()
)

+----------------+--------------+----------------------+--------------+
|Product Category|Payment Method|Avg Transaction Amount|Total Quantity|
+----------------+--------------+----------------------+--------------+
| health & beauty|   credit card|     225.1705797437345|         90889|
|   home & garden|    debit card|    227.96262454273386|         90231|
|   home & garden|        PayPal|    227.29266382099075|         90194|
|        clothing|    debit card|    225.37682668183115|         89516|
|   home & garden| bank transfer|    226.55064501036057|         89239|
| health & beauty|        PayPal|     226.3843668195371|         89372|
|     electronics|   credit card|    227.04638911102143|         89870|
| health & beauty|    debit card|    226.62497625610175|         89748|
|    toys & games|    debit card|    225.43380700000031|         89587|
|    toys & games| bank transfer|    228.85355572682707|         89541|
|   home & garden|   credit card|     228.2026859476474|        

### Melt y Pivot

Al trabajar con grandes conjuntos de datos, a menudo nos encontramos con la necesidad de reestructurarlos para facilitar el análisis. En PySpark, esto se puede lograr mediante operaciones de "melt" y "pivot", que permiten transformar y reorganizar los datos de maneras específicas para adaptarse mejor a nuestras necesidades de análisis. Esta sección explorará cómo clasificar y aplicar estas dos operaciones poderosas dentro del entorno de PySpark.

Operación de `Pivot`.

La operación "pivot" permite transformar los datos de un formato largo a un formato ancho, creando tablas dinámicas que resumen información. Con pivot, puedes girar una columna específica de tu DataFrame para que sus valores únicos se conviertan en columnas individuales, permitiéndote realizar agregaciones en las restantes. Esta operación es esencial para resumir y analizar grandes volúmenes de datos, facilitando la comprensión de las relaciones y tendencias dentro de tus datos.

In [52]:
df.show(5)

+--------------------+--------------------+------------------+-------------------+--------------+----------------+--------+------------+-----------------+-----------+---------------+--------------------+--------------------+-------------+----------------+----------------+----------------------+
|      Transaction ID|         Customer ID|Transaction Amount|   Transaction Date|Payment Method|Product Category|Quantity|Customer Age|Customer Location|Device Used|     IP Address|    Shipping Address|     Billing Address|Is Fraudulent|Account Age Days|Transaction Hour|Transaction Amount CLP|
+--------------------+--------------------+------------------+-------------------+--------------+----------------+--------+------------+-----------------+-----------+---------------+--------------------+--------------------+-------------+----------------+----------------+----------------------+
|15d2e414-8735-46f...|d1b87f62-51b2-493...|             58.09|2024-02-20 05:58:41| bank transfer|     electronic

In [53]:
df_plot = (
    df.groupBy("Product Category")
    .pivot("Is Fraudulent")
    .mean('Transaction Amount')
)
df_plot.show()

+----------------+------------------+-----------------+
|Product Category|                 0|                1|
+----------------+------------------+-----------------+
| health & beauty|209.73250848198688|543.4707374581941|
|    toys & games|209.64851092439974|559.4513501068927|
|     electronics|209.98856776888175|544.6779487179508|
|        clothing|209.76507762613306|541.6669715823115|
|   home & garden|210.00183023032963| 551.278646273998|
+----------------+------------------+-----------------+



Operación de `Melt`
La operación "melt" es un proceso de transformación de datos que convierte columnas de tu DataFrame en filas, lo que resulta útil cuando necesitas desnormalizar tus datos o convertir datos anchos en datos largos para análisis o visualización.

In [54]:
df_plot = df_plot.melt(
    ids='Product Category',
    values=['0', '1'],
    variableColumnName='Is Fraudulent',
    valueColumnName='Avg Amount'
)
df_plot.show()

+----------------+-------------+------------------+
|Product Category|Is Fraudulent|        Avg Amount|
+----------------+-------------+------------------+
| health & beauty|            0|209.73250848198688|
| health & beauty|            1| 543.4707374581941|
|    toys & games|            0|209.64851092439974|
|    toys & games|            1| 559.4513501068927|
|     electronics|            0|209.98856776888175|
|     electronics|            1| 544.6779487179508|
|        clothing|            0|209.76507762613306|
|        clothing|            1| 541.6669715823115|
|   home & garden|            0|210.00183023032963|
|   home & garden|            1|  551.278646273998|
+----------------+-------------+------------------+



> **Pregunta ❓**: ¿Qué pasa si ploteamos directamente estos dataframes?

Creamos visualizaciones utilizando Plotly, una biblioteca de gráficos avanzada que simplifica la creación de visualizaciones interactivas en comparación con Matplotlib. Plotly destaca por su facilidad de uso y por ofrecer gráficos dinámicos que mejoran significativamente la experiencia de análisis de datos.

In [60]:
!pip install plotly
!pip install pandas

Collecting pandas
  Downloading pandas-2.3.2-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (91 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m91.2/91.2 kB[0m [31m3.4 MB/s[0m eta [36m0:00:00[0m
Collecting pytz>=2020.1 (from pandas)
  Downloading pytz-2025.2-py2.py3-none-any.whl.metadata (22 kB)
Collecting tzdata>=2022.7 (from pandas)
  Downloading tzdata-2025.2-py2.py3-none-any.whl.metadata (1.4 kB)
Downloading pandas-2.3.2-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (12.0 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m12.0/12.0 MB[0m [31m81.6 MB/s[0m eta [36m0:00:00[0m:00:01[0m0:01[0m
[?25hDownloading pytz-2025.2-py2.py3-none-any.whl (509 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m509.2/509.2 kB[0m [31m19.5 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading tzdata-2025.2-py2.py3-none-any.whl (347 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m347.8/347.8 kB

In [63]:
!pip install nbformat

Collecting nbformat
  Downloading nbformat-5.10.4-py3-none-any.whl.metadata (3.6 kB)
Collecting fastjsonschema>=2.15 (from nbformat)
  Downloading fastjsonschema-2.21.2-py3-none-any.whl.metadata (2.3 kB)
Collecting jsonschema>=2.6 (from nbformat)
  Downloading jsonschema-4.25.1-py3-none-any.whl.metadata (7.6 kB)
Collecting attrs>=22.2.0 (from jsonschema>=2.6->nbformat)
  Downloading attrs-25.3.0-py3-none-any.whl.metadata (10 kB)
Collecting jsonschema-specifications>=2023.03.6 (from jsonschema>=2.6->nbformat)
  Downloading jsonschema_specifications-2025.4.1-py3-none-any.whl.metadata (2.9 kB)
Collecting referencing>=0.28.4 (from jsonschema>=2.6->nbformat)
  Downloading referencing-0.36.2-py3-none-any.whl.metadata (2.8 kB)
Collecting rpds-py>=0.7.1 (from jsonschema>=2.6->nbformat)
  Downloading rpds_py-0.27.0-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (4.2 kB)
Collecting typing-extensions>=4.4.0 (from referencing>=0.28.4->jsonschema>=2.6->nbformat)
  Downloading t

In [55]:
import plotly.express as px

px.bar(
    df_plot,
    x='Is Fraudulent',
    y='Avg Amount',
    color='Product Category',
    template='simple_white',
    barmode='group'
)

Plotly podría mostrar el error `ValueError: DataFrame constructor not properly called!`, indicando que los DataFrames de Spark generalmente no se integran de manera directa con bibliotecas de visualización. Para solventar esto, es necesario convertir nuestros DataFrames de Spark a formato Pandas, lo que nos permitirá utilizar plenamente las capacidades gráficas de Plotly.

In [56]:
# Para transformar a pandas usamos el método toPandas()
df_pandas = df_plot.toPandas()
df_pandas.head(4)

Unnamed: 0,Product Category,Is Fraudulent,Avg Amount
0,health & beauty,0,209.732508
1,health & beauty,1,543.470737
2,toys & games,0,209.648511
3,toys & games,1,559.45135


In [57]:
px.bar(
    df_pandas,
    x='Is Fraudulent',
    y='Avg Amount',
    color='Product Category',
    template='simple_white',
    barmode='group'
)

>**Pregunta ❓**: Si al final voy a utilizar Pandas para ciertos análisis, ¿por qué no cargar directamente los datos en un DataFrame de Pandas desde el principio?

### Spark SQL

Spark SQL es una de las funcionalidades de alto nivel más destacadas de Spark, brindando a los desarrolladores la capacidad de emplear ANSI SQL:2003. Esto significa que puedes realizar consultas utilizando la sintaxis del SQL clásico. Para facilitar esta integración, Spark utiliza el optimizador Catalyst, que sigue varios pasos para procesar y optimizar las consultas:

- **Análisis**: Crea un Árbol de Sintaxis Abstracta (AST) para examinar la estructura de las tablas y las operaciones solicitadas en la consulta, como nombres de columnas, tipos de datos y funciones.

- **Optimización Lógica**: Genera múltiples planes de ejecución posibles, optimizando la consulta desde un punto de vista lógico para mejorar la eficiencia.

- **Planificación Física**: A partir del mejor plan lógico, selecciona los operadores físicos específicos que se utilizarán para ejecutar la consulta en el motor de Spark.

- **Generación de Código**: Produce código Java optimizado para ser ejecutado en cada nodo del clúster, garantizando una ejecución eficiente y distribuida.

Esta secuencia de pasos permite que Spark SQL ofrezca un rendimiento excepcional para la ejecución de consultas SQL, aprovechando la infraestructura distribuida de Spark y optimizando las consultas para acelerar el procesamiento de datos.

In [58]:
df_renombrada.show()

+--------------------+--------------------+------------------+-------------------+--------------+----------------+--------+------------+-----------------+-----------+---------------+--------------------+--------------------+-------------+----------------+----------------+----------------------+
|      Transaction_ID|         Customer_ID|Transaction_Amount|   Transaction_Date|Payment_Method|Product_Category|Quantity|Customer_Age|Customer_Location|Device_Used|     IP_Address|    Shipping_Address|     Billing_Address|Is_Fraudulent|Account_Age_Days|Transaction_Hour|Transaction_Amount_CLP|
+--------------------+--------------------+------------------+-------------------+--------------+----------------+--------+------------+-----------------+-----------+---------------+--------------------+--------------------+-------------+----------------+----------------+----------------------+
|15d2e414-8735-46f...|d1b87f62-51b2-493...|             58.09|2024-02-20 05:58:41| bank transfer|     electronic

In [59]:
# Creamos vista temporal
# O puede ser tambien createOrReplaceTempView()
df_renombrada.createTempView("sql_query_test")

In [60]:
sparksession.sql("""
SELECT
  *
FROM sql_query_test
WHERE Is_Fraudulent = 0
  AND Transaction_Amount BETWEEN 40 AND 300
  AND Payment_Method = 'bank transfer'
  AND Product_Category = 'electronics'
  AND Customer_Age < 35
""").show()

+--------------------+--------------------+------------------+-------------------+--------------+----------------+--------+------------+-----------------+-----------+---------------+--------------------+--------------------+-------------+----------------+----------------+----------------------+
|      Transaction_ID|         Customer_ID|Transaction_Amount|   Transaction_Date|Payment_Method|Product_Category|Quantity|Customer_Age|Customer_Location|Device_Used|     IP_Address|    Shipping_Address|     Billing_Address|Is_Fraudulent|Account_Age_Days|Transaction_Hour|Transaction_Amount_CLP|
+--------------------+--------------------+------------------+-------------------+--------------+----------------+--------+------------+-----------------+-----------+---------------+--------------------+--------------------+-------------+----------------+----------------+----------------------+
|15d2e414-8735-46f...|d1b87f62-51b2-493...|             58.09|2024-02-20 05:58:41| bank transfer|     electronic

>**Pregunta ❓**: Pero... ¿si quiero crear una tabla que no se borre?


Este proceso es sencillo y se puede ejecutar mediante el método write. Además, para especificar el formato de almacenamiento y el modo de escritura, se utilizan las siguientes opciones:

- `.format("delta")`: Establece el formato de archivo que queires guardar la tabla. Un archivo muy común son las Delta Lake, un formato de almacenamiento avanzado que ofrece ACID transactions, versionado de datos y capacidad de auditoría.

- `.mode("append")`: Selecciona el modo de escritura en "append", lo que permite añadir los datos al conjunto existente sin sobrescribirlo, facilitando la acumulación de datos de manera incremental.

In [61]:
df_renombrada.write.partitionBy("Product_Category", "Payment_Method").saveAsTable(name='IM_example')

SparkRuntimeException: [LOCATION_ALREADY_EXISTS] Cannot name the managed table as `spark_catalog`.`default`.`im_example`, as its associated location 'file:/mnt/c/Diego/Clases/MDS7202%20-%202025%20-%20Primavera/Clases/spark-warehouse/im_example' already exists. Please pick a different table name, or remove the existing location first. SQLSTATE: 42710

In [86]:
sparksession.sql(
    "select * from IM_example"
).show(2)

+--------------------+--------------------+------------------+-------------------+--------+------------+-----------------+-----------+---------------+--------------------+--------------------+-------------+----------------+----------------+----------------------+----------------+--------------+
|      Transaction_ID|         Customer_ID|Transaction_Amount|   Transaction_Date|Quantity|Customer_Age|Customer_Location|Device_Used|     IP_Address|    Shipping_Address|     Billing_Address|Is_Fraudulent|Account_Age_Days|Transaction_Hour|Transaction_Amount_CLP|Product_Category|Payment_Method|
+--------------------+--------------------+------------------+-------------------+--------+------------+-----------------+-----------+---------------+--------------------+--------------------+-------------+----------------+----------------+----------------------+----------------+--------------+
|0bfee1a0-6d5e-40d...|37de64d5-e901-4a5...|            389.96|2024-02-25 08:09:45|       2|          40|     Eas

### ¿Cómo guardo los datos con Spark?

<center>
<img src="https://github.com/MDS7202/MDS7202/blob/main/clases/2024-01/imagenes_clase_7/savedata.png?raw=true" width=500 />

Guardar datos de manera eficiente en Spark es crucial para asegurar la integridad, accesibilidad y rendimiento óptimo de tus aplicaciones de análisis de datos. Aquí te presentamos algunas buenas prácticas para guardar datos en Spark, junto con ejemplos de código para ilustrarte mejor el proceso.

**Buenas Prácticas para Guardar Datos**

1. **Elegir el Formato Adecuado**
Dependiendo de tus necesidades de análisis y almacenamiento, elige el formato de archivo más adecuado. Parquet y ORC son excelentes para operaciones analíticas debido a su compresión y optimización de lectura. CSV y JSON son más adecuados para interoperabilidad con otras herramientas.

  - **Parquet**: Ideal para almacenamiento eficiente y operaciones analíticas.
  - **ORC**: Similar a Parquet con optimizaciones para ciertos casos de uso.
  - **CSV/JSON**: Buena opción para interoperabilidad y casos de uso más simples.

2. **Particionar los Datos**
Particionar tus datos al guardarlos puede mejorar significativamente el rendimiento de lectura. Considera particionar tus datos basándote en columnas que suelen usarse en filtros o consultas.

3. **Coalesce o Repartition**
Antes de guardar, ajusta el número de particiones usando `coalesce` (reduce el número de particiones sin provocar shuffle) o `repartition` (permite aumentar o disminuir el número de particiones, pero provoca shuffle).

  - **Coalesce**: Útil para reducir el número de archivos sin un shuffle completo.
  - **Repartition**: Usado cuando necesitas un número específico de archivos o redistribuir datos (por ejemplo, después de un filtrado masivo).

Guardar en Formato Parquet con Particionado

In [87]:
(
    df.write.partitionBy("Product Category", "Payment Method")
    .format("parquet")
    .save("/mnt/c/Diego/Clases/MDS7202 - 2025 - Primavera/parquets/example1/")
)

                                                                                

Uso de Coalesce para Reducir el Número de Particiones

In [89]:
df.coalesce(5).write.format("parquet").save("/mnt/c/Diego/Clases/MDS7202 - 2025 - Primavera/parquets/example2/")

AnalysisException: [PATH_ALREADY_EXISTS] Path file:/mnt/c/Diego/Clases/MDS7202 - 2025 - Primavera/parquets/example2 already exists. Set mode as "overwrite" to overwrite the existing path. SQLSTATE: 42K04

In [65]:
df.rdd.getNumPartitions()

1

Adoptar estas buenas prácticas y elegir las configuraciones adecuadas para tu caso de uso específico te ayudará a optimizar el almacenamiento de datos y el rendimiento de tus aplicaciones Spark. Recuerda siempre considerar las características de tus datos y los requisitos de análisis cuando elijas cómo y dónde guardar tus datos.

Como pueden observar, al utilizar estos comandos para guardar DataFrames, Spark genera múltiples archivos. Sin embargo, no es necesario cargar estos archivos individualmente; Spark maneja automáticamente este proceso en segundo plano:

In [90]:
df_load = sparksession.read.parquet("./parquets/example1/Product Category= /Payment Method=")
df_load.show(2)

25/08/24 04:29:46 WARN FileStreamSink: Assume no metadata directory. Error while looking for metadata directory in the path: ./parquets/example1/Product Category= /Payment Method=.
java.io.FileNotFoundException: File parquets/example1/Product Category= /Payment Method= does not exist
	at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:917)
	at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:1238)
	at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:907)
	at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:462)
	at org.apache.spark.sql.execution.streaming.FileStreamSink$.hasMetadata(FileStreamSink.scala:56)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:381)
	at org.apache.spark.sql.catalyst.analysis.ResolveDataSource.org$apache$spark$sql$catalyst$analysis$ResolveDataSource$$loadV1BatchSource(ResolveDataSource

AnalysisException: [PATH_NOT_FOUND] Path does not exist: file:/mnt/c/Diego/Clases/MDS7202 - 2025 - Primavera/Clases/parquets/example1/Product Category= /Payment Method=. SQLSTATE: 42K03

In [None]:
df_load.count()

600000

## Finalizamos Sesión

Finalmente, para apagar Spark, debemos llamar al método stop () del SparkContext, en nuestro caso la variable _sc_:

In [None]:
sparksession.stop()

## Referencias

1. Learning Spark - Holden Karau, Andy Konwinski, Patrick Wendell and Matei Zaharia.  O’Reilly Media, Inc. 2015. ISBN 978-1-449-35862-4
2. Spark Overview. http://spark.apache.org/docs/latest/
3. PySpark – Word Count Example. https://pythonexamples.org/pyspark-word-count-example/
