    # Clase - Computación Distribuida

    ## Pyspark Hands-on 
    #### Marcelo Medel Vergara - Diplomado Data Engineer USACH

## Instalación de librerías necesarias

Si utilizas Anaconda para administrar ambientes de desarrollo la mejor vía para asegurar que funcionen correctamente las librerías es instalando directo desde Conda. Adicionalmente se instalan todas las librerías que necesita Spark.
- `conda create -n pyspark-DE`
- `conda activate pyspark-DE`
- `conda install -c conda-forge pyspark python=3.10`

De igual forma se puede instalar a través de PIP, pero sin asegurar funcionamiento correcto ni la instalación de dependencias. Adicionalmente será necesario tener instalado Pandas.
- `conda create -n pyspark-DE python=3.10`
- `conda activate pyspark-DE`
- `pip install pyspark`
- `pip install pandas`.

Para Google Colab:
- `!pip install pyspark`

## SparkSession

SparkSession es una clase en PySpark que existe desde la versión 2.0 (2016) que simplifica la forma de trabajar con Spark, tanto en las configuraciones como en la manipulación de datos estructurados. 


#### Funcionalidades principales de SparkSession:

1. **Configura Spark**: Para profundizar en las configuraciones posibles de Spark, visite https://spark.apache.org/docs/latest/configuration.html.
    - `SparkSession.builder.appName("some name").**config("some config key,value")**.getOrCreate()`
    - `spark.conf.get("some config key")`

2. **Crear DataFrames**: permite leer y escribir (Input/Output) diversas fuentes de datos y crear DataFrames para la manipulación de datos.
    - `spark.createDataFrame(data [, schema])`
    - `spark.read.json("path to some json")`

3. **Ejecutar SQL**: facilita la ejecución de consultas en SQL sobre los DataFrames.
    - `spark.sql("query to some view")`
    
4. **Gestiona contexto de Spark**: facilita la configuración y acceso a diferentes componentes y funcionalidades de Spark
    - `spark.sql.shuffle.partitions`
    - `spark.executor.memory`
    - `spark.catalog.listTables()`
    - `spark.catalog.listColumns("someTable")`
    - `spark.udf.register("someName", someUdf)`

In [1]:
from pyspark.sql import SparkSession

spark = (
        SparkSession.builder
        .appName("hands-on-pyspark")
        .config("spark.executor.memory","2g")
        .getOrCreate()
)

spark.sparkContext.setLogLevel("ERROR") #ALL, OFF, ERROR, DEBUG, INFO, WARN
spark.active()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/10/09 20:36:40 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
spark.conf.get("spark.executor.memory")

'2g'

In [3]:
spark.sparkContext.getConf().getAll()

[('spark.app.name', 'hands-on-pyspark'),
 ('spark.executor.memory', '2g'),
 ('spark.driver.extraJavaOptions',
  '-Djava.net.preferIPv6Addresses=false -XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/jdk.internal.ref=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED -Djdk.reflect.useDirectMethodHandle=false'),
 ('spark.app.startTime', '17

## DataFrame en Spark

Un DataFrame es una estructura de datos bidimensional similar a cualquier tabla en una base de datos estructurada. Algunas de las características que tiene un DataFrame en PySpark son:
- **Distribuido**: Los datos están distribuidos en un clúster de nodos, lo que permite el procesamiento paralelo.

- **Inmutable**: Cada transformación produce un nuevo DataFrame.

- **SQL**: Permite operaciones tipo SQL y ofrece una interfaz similar a Pandas pero a gran escala.

- **Conexiones diversas**: Puede leer datos de múltiples fuentes (ej: JSON, CSV, Parquet, JDBC).

- **Optimización Automática**: Utiliza *Catalyst Optimizer* para optimizar automáticamente las consultas.

### Creación de un DataFrame

In [4]:
from datetime import date

data = [
    {
    "nombre":"Marcelo",
    "nacimiento": date(1987,10,7),
    "mail":"marcelo@gmail.com",
    "x":7990,
    "y":15.26,
    "active":True
    },
    {
    "nombre":"Andrea",
    "nacimiento": date(1994,6,1),
    "mail":"andrea@gmail.com",
    "x":15860,
    "y":-10.15,
    "active":False
    },{
    "nombre":"Juan",
    "nacimiento": date(2000,1,1),
    "mail":"juan@gmail.com",
    "x":8520,
    "y":26.15,
    "active":True
    },
]

In [8]:
df1 = spark.createDataFrame(data)

In [9]:
df1

DataFrame[active: boolean, mail: string, nacimiento: date, nombre: string, x: bigint, y: double]

In [10]:
df1.show()

                                                                                

+------+-----------------+----------+-------+-----+------+
|active|             mail|nacimiento| nombre|    x|     y|
+------+-----------------+----------+-------+-----+------+
|  true|marcelo@gmail.com|1987-10-07|Marcelo| 7990| 15.26|
| false| andrea@gmail.com|1994-06-01| Andrea|15860|-10.15|
|  true|   juan@gmail.com|2000-01-01|   Juan| 8520| 26.15|
+------+-----------------+----------+-------+-----+------+



In [11]:
df1.collect()

[Row(active=True, mail='marcelo@gmail.com', nacimiento=datetime.date(1987, 10, 7), nombre='Marcelo', x=7990, y=15.26),
 Row(active=False, mail='andrea@gmail.com', nacimiento=datetime.date(1994, 6, 1), nombre='Andrea', x=15860, y=-10.15),
 Row(active=True, mail='juan@gmail.com', nacimiento=datetime.date(2000, 1, 1), nombre='Juan', x=8520, y=26.15)]

In [12]:
from pyspark.sql import Row

df2 = spark.createDataFrame(
    [
        Row(nombre="Marcelo", nacimiento=date(1987,10,1), mail="marcelo@gmail.com", x=2356, y=15.69, active=True),
        Row(nombre="Andrea", nacimiento=date(2000,1,1), mail="andrea@gmail.com", x=5982, y=15.69, active=True),
        Row(nombre="Juan", nacimiento=date(1994,11,1), mail="juan@gmail.com", x=146584, y=-58.69, active=False),
    ]
)
df2.show()

+-------+----------+-----------------+------+------+------+
| nombre|nacimiento|             mail|     x|     y|active|
+-------+----------+-----------------+------+------+------+
|Marcelo|1987-10-01|marcelo@gmail.com|  2356| 15.69|  true|
| Andrea|2000-01-01| andrea@gmail.com|  5982| 15.69|  true|
|   Juan|1994-11-01|   juan@gmail.com|146584|-58.69| false|
+-------+----------+-----------------+------+------+------+



In [13]:
df1.printSchema()

root
 |-- active: boolean (nullable = true)
 |-- mail: string (nullable = true)
 |-- nacimiento: date (nullable = true)
 |-- nombre: string (nullable = true)
 |-- x: long (nullable = true)
 |-- y: double (nullable = true)



In [14]:
df2.printSchema()

root
 |-- nombre: string (nullable = true)
 |-- nacimiento: date (nullable = true)
 |-- mail: string (nullable = true)
 |-- x: long (nullable = true)
 |-- y: double (nullable = true)
 |-- active: boolean (nullable = true)



### Data Types

In [18]:
from pyspark.sql.types import (
    StructField, # un campo estructurado
    StringType, # texto
    DateType, # datetime.date
    IntegerType, # enteros
    FloatType, # 2.23
    BooleanType, # true/false
    StructType, # colección (array) de campos estructurados
    DecimalType, # DecimalType(10,2)
    ArrayType, # lista, tupla, ...
    LongType, 
    DoubleType,
    MapType
)

cols = [
    StructField("nombre", StringType(), False),
    StructField("nombre_completo", StructType([
        StructField("nombre", StringType(),False),
        StructField("apellido", StringType(),False),
    ]) ),
    StructField("nacimiento", DateType(), False),
    StructField("mail", StringType(), False),
    StructField("x", IntegerType(), False),
    StructField("y", FloatType(), False),
    StructField("active", BooleanType(), False),
]

schema = StructType(cols)

df3 = spark.createDataFrame([],schema)
df3.show()
df3.printSchema()

+------+---------------+----------+----+---+---+------+
|nombre|nombre_completo|nacimiento|mail|  x|  y|active|
+------+---------------+----------+----+---+---+------+
+------+---------------+----------+----+---+---+------+

root
 |-- nombre: string (nullable = false)
 |-- nombre_completo: struct (nullable = true)
 |    |-- nombre: string (nullable = false)
 |    |-- apellido: string (nullable = false)
 |-- nacimiento: date (nullable = false)
 |-- mail: string (nullable = false)
 |-- x: integer (nullable = false)
 |-- y: float (nullable = false)
 |-- active: boolean (nullable = false)



In [19]:
from datetime import date

data = [
    {
    "nombre":"Marcelo",
    "nombre_completo":["Marcelo","Medel"],
    "nacimiento": date(1987,10,7),
    "mail":"marcelo@gmail.com",
    "x":7990,
    "y":15.26,
    "active":True
    },
    {
    "nombre":"Andrea",
    "nombre_completo":["Andrea","Vergara"],
    "nacimiento": date(1994,6,1),
    "mail":"andrea@gmail.com",
    "x":15860,
    "y":-10.15,
    "active":False
    },{
    "nombre":"Juan",
    "nombre_completo":["Juan","Bautista"],
    "nacimiento": date(2000,1,1),
    "mail":"juan@gmail.com",
    "x":8520,
    "y":26.15,
    "active":True
    },
]

df = spark.createDataFrame(data, schema)
df.show()
df.printSchema()

+-------+-----------------+----------+-----------------+-----+------+------+
| nombre|  nombre_completo|nacimiento|             mail|    x|     y|active|
+-------+-----------------+----------+-----------------+-----+------+------+
|Marcelo| {Marcelo, Medel}|1987-10-07|marcelo@gmail.com| 7990| 15.26|  true|
| Andrea|{Andrea, Vergara}|1994-06-01| andrea@gmail.com|15860|-10.15| false|
|   Juan| {Juan, Bautista}|2000-01-01|   juan@gmail.com| 8520| 26.15|  true|
+-------+-----------------+----------+-----------------+-----+------+------+

root
 |-- nombre: string (nullable = false)
 |-- nombre_completo: struct (nullable = true)
 |    |-- nombre: string (nullable = false)
 |    |-- apellido: string (nullable = false)
 |-- nacimiento: date (nullable = false)
 |-- mail: string (nullable = false)
 |-- x: integer (nullable = false)
 |-- y: float (nullable = false)
 |-- active: boolean (nullable = false)



## Operaciones básicas sobre un DataFrame

### Select, Columns y Expressions

***Select*** permite seleccionar un set de columnas y también puede ser usado para renombrar o aplicar *expresiones* a las columnas.

***Columns*** en Spark hace referencia a cualquier columna conocida en algun RDBMS, planilla de excel, pandas dataframes, etc. Estas columnas pueden ser seleccionadas, manipuladas o removidas de un DataFrame, por lo que estas operaciones son representadas como ***expressions***

-  `col` y `column` son utilizadas para referencias a columnas en un DataFrame
- `expr` es utilizado para crear expresiones mas complejas, recibe un string con SQL para ejecutar la operación.


In [25]:
from pyspark.sql.functions import col, column, expr

df.select('x'
          ,df.nacimiento
          ,col("y")
          ,column("nombre_completo")
          ,expr("nombre_completo")
          ).show()

+-----+----------+------+-----------------+-----------------+
|    x|nacimiento|     y|  nombre_completo|  nombre_completo|
+-----+----------+------+-----------------+-----------------+
| 7990|1987-10-07| 15.26| {Marcelo, Medel}| {Marcelo, Medel}|
|15860|1994-06-01|-10.15|{Andrea, Vergara}|{Andrea, Vergara}|
| 8520|2000-01-01| 26.15| {Juan, Bautista}| {Juan, Bautista}|
+-----+----------+------+-----------------+-----------------+



In [26]:
df.select('nombre_completo.*'
          ,'nombre_completo.apellido'
          ,col('nombre_completo.apellido')
          ,expr('nombre_completo.nombre')
          ).show()

+-------+--------+--------+--------+-------+
| nombre|apellido|apellido|apellido| nombre|
+-------+--------+--------+--------+-------+
|Marcelo|   Medel|   Medel|   Medel|Marcelo|
| Andrea| Vergara| Vergara| Vergara| Andrea|
|   Juan|Bautista|Bautista|Bautista|   Juan|
+-------+--------+--------+--------+-------+



In [34]:
df.select(
    expr("*"),
    expr("round(x*1.19)+1500 as valor_iva"),
    expr("split(mail,'@') as asd")
).show()

+-------+-----------------+----------+-----------------+-----+------+------+---------+--------------------+
| nombre|  nombre_completo|nacimiento|             mail|    x|     y|active|valor_iva|                 asd|
+-------+-----------------+----------+-----------------+-----+------+------+---------+--------------------+
|Marcelo| {Marcelo, Medel}|1987-10-07|marcelo@gmail.com| 7990| 15.26|  true|    11008|[marcelo, gmail.com]|
| Andrea|{Andrea, Vergara}|1994-06-01| andrea@gmail.com|15860|-10.15| false|    20373| [andrea, gmail.com]|
|   Juan| {Juan, Bautista}|2000-01-01|   juan@gmail.com| 8520| 26.15|  true|    11639|   [juan, gmail.com]|
+-------+-----------------+----------+-----------------+-----+------+------+---------+--------------------+



In [37]:
df2 = df.select(
    expr("*"),
    expr("round(x*1.19)+1500 as valor_iva"),
    expr("split(mail,'@') as asd")
)

In [35]:
df.selectExpr("*","round(x*1.19)+1500 as valor_iva").show()

+-------+-----------------+----------+-----------------+-----+------+------+---------+
| nombre|  nombre_completo|nacimiento|             mail|    x|     y|active|valor_iva|
+-------+-----------------+----------+-----------------+-----+------+------+---------+
|Marcelo| {Marcelo, Medel}|1987-10-07|marcelo@gmail.com| 7990| 15.26|  true|    11008|
| Andrea|{Andrea, Vergara}|1994-06-01| andrea@gmail.com|15860|-10.15| false|    20373|
|   Juan| {Juan, Bautista}|2000-01-01|   juan@gmail.com| 8520| 26.15|  true|    11639|
+-------+-----------------+----------+-----------------+-----+------+------+---------+



In [44]:
df.selectExpr("*","round(x*1.19)+1500 as valor_iva").selectExpr("avg(valor_iva)", "max(valor_iva)", "min(valor_iva)").show()

+--------------+--------------+--------------+
|avg(valor_iva)|max(valor_iva)|min(valor_iva)|
+--------------+--------------+--------------+
|    14340.0000|         20373|         11008|
+--------------+--------------+--------------+



### Literals, withColumn, withColumnRenamed y alias

In [46]:
from pyspark.sql.functions import lit

df.select(expr("*"), lit(1).alias("flag")).show()

df.selectExpr("*", "1 as flag").show()

+-------+-----------------+----------+-----------------+-----+------+------+----+
| nombre|  nombre_completo|nacimiento|             mail|    x|     y|active|flag|
+-------+-----------------+----------+-----------------+-----+------+------+----+
|Marcelo| {Marcelo, Medel}|1987-10-07|marcelo@gmail.com| 7990| 15.26|  true|   1|
| Andrea|{Andrea, Vergara}|1994-06-01| andrea@gmail.com|15860|-10.15| false|   1|
|   Juan| {Juan, Bautista}|2000-01-01|   juan@gmail.com| 8520| 26.15|  true|   1|
+-------+-----------------+----------+-----------------+-----+------+------+----+

+-------+-----------------+----------+-----------------+-----+------+------+----+
| nombre|  nombre_completo|nacimiento|             mail|    x|     y|active|flag|
+-------+-----------------+----------+-----------------+-----+------+------+----+
|Marcelo| {Marcelo, Medel}|1987-10-07|marcelo@gmail.com| 7990| 15.26|  true|   1|
| Andrea|{Andrea, Vergara}|1994-06-01| andrea@gmail.com|15860|-10.15| false|   1|
|   Juan| {Juan

In [52]:
df3 = df.selectExpr("*", expr("round(x*1.19) as valor_iva"))
df3.show()

PySparkTypeError: [NOT_ITERABLE] Column is not iterable.

In [49]:
#df.withColumn("flag",lit(2)).show()
df2 = df.withColumn("valor_iva",expr("round(x*1.19)"))
df2.show()

+-------+-----------------+----------+-----------------+-----+------+------+---------+
| nombre|  nombre_completo|nacimiento|             mail|    x|     y|active|valor_iva|
+-------+-----------------+----------+-----------------+-----+------+------+---------+
|Marcelo| {Marcelo, Medel}|1987-10-07|marcelo@gmail.com| 7990| 15.26|  true|     9508|
| Andrea|{Andrea, Vergara}|1994-06-01| andrea@gmail.com|15860|-10.15| false|    18873|
|   Juan| {Juan, Bautista}|2000-01-01|   juan@gmail.com| 8520| 26.15|  true|    10139|
+-------+-----------------+----------+-----------------+-----+------+------+---------+



In [54]:
df2.withColumnRenamed("nombre_completo","nombre_2").select("nombre_2.nombre").show()
df2 = df2.withColumnRenamed("nombre_completo","nombre_2")
df2.show() 

+-------+
| nombre|
+-------+
|Marcelo|
| Andrea|
|   Juan|
+-------+

+-------+-----------------+----------+-----------------+-----+------+------+---------+
| nombre|         nombre_2|nacimiento|             mail|    x|     y|active|valor_iva|
+-------+-----------------+----------+-----------------+-----+------+------+---------+
|Marcelo| {Marcelo, Medel}|1987-10-07|marcelo@gmail.com| 7990| 15.26|  true|     9508|
| Andrea|{Andrea, Vergara}|1994-06-01| andrea@gmail.com|15860|-10.15| false|    18873|
|   Juan| {Juan, Bautista}|2000-01-01|   juan@gmail.com| 8520| 26.15|  true|    10139|
+-------+-----------------+----------+-----------------+-----+------+------+---------+



In [56]:
df2 = df2.withColumnsRenamed(
            {"nombre_completo":"nombre2",
                        "x":"pago",
                        "y":'uso'}
                       )
df2.show()

+-------+-----------------+----------+-----------------+-----+------+------+---------+
| nombre|         nombre_2|nacimiento|             mail| pago|   uso|active|valor_iva|
+-------+-----------------+----------+-----------------+-----+------+------+---------+
|Marcelo| {Marcelo, Medel}|1987-10-07|marcelo@gmail.com| 7990| 15.26|  true|     9508|
| Andrea|{Andrea, Vergara}|1994-06-01| andrea@gmail.com|15860|-10.15| false|    18873|
|   Juan| {Juan, Bautista}|2000-01-01|   juan@gmail.com| 8520| 26.15|  true|    10139|
+-------+-----------------+----------+-----------------+-----+------+------+---------+



### Filter, where, drop y cast

In [None]:
df

### Pandas DataFrame to Spark DataFrame

![spark-pandas](./files/spark_pandas.png)