In [0]:
displayHTML("<h1>Sesión 2: Extracción de datos </h1> <img src='https://upload.wikimedia.org/wikipedia/commons/thumb/f/f3/Apache_Spark_logo.svg/1200px-Apache_Spark_logo.svg.png' alt='RDD Linage' height='300' width='550'> <ul><strong>Objetivos:</strong> <li>Diseñando un proceso ETL</li> <li>Conectando Spark con Azure Blob Storage y Amazon S3</li> <li>Conectando Spark con una Base de Datos Relacional</li> <li>Manejando Esquemas y errores al leer</li><li>Productivizando proceso de carga</li></ul>")

In [0]:
displayHTML("Recordemos qué es Apache Spark <a href='https://youtu.be/hbWEPNuPOs4m'>YouTube</a>")

-sandbox
Un proceso de Extraer, Transformar y Cargar permite mover la data desde un lugar, transformarlo agregando una estructura y luego dejarlo en una data storage. 

Dentro del desarrollo de un ETL debemos considerar:

* Determinar el tipo de dato y la conexión
* Definir el esquema de la data
* Manejar y limpiear la data
* Automatizar los flujos o workloads.

Veamos el siguiente gráfico acerca de Cómo Databricks plantea el desarrollo de ETLs

<img src="https://files.training.databricks.com/images/eLearning/ETL-Part-1/ETL-overview.png" style="border: 1px solid #aaa; border-radius: 10px 10px 10px 10px; box-shadow: 5px 5px 5px #aaa"/>

-sandbox
### Spark como engine de procesamiento

Una de las ventajas de Spark es conectarse a una infinidad de fuentes y poder dejar la data transformada en otra infinidad de destinos. 

Esto permite que Spark pueda ser el corazón de un pipeline de datos: <br><br>

1. Databricks pone en el centro a Spark como montor único.
2. Spark aquí puede ser **a procesar terabytes de información en cientos de nodos**.  
 - El mismo código que escribimos para procesar 1mb puede ser reutilizado para procesar 1tb.
2. Spark se enforca solo en el procesamiento, lo que deja al almacenamiento a preferencia de la empresa.  
 - Puede manejar conexiones como S3, Azure Blob Storage, Redshift y Kafka.  
 - Podemos crear cluster solo por el tiempo que dure el procesamiento y luego descartarlos opteniendo un ahorro en los costos y permitiendonos siempre usar la última versión de Spark.
 
<img src="https://files.training.databricks.com/images/eLearning/ETL-Part-1/Workload_Tools_2-01.png" style="border: 1px solid #aaa; border-radius: 10px 10px 10px 10px; box-shadow: 5px 5px 5px #aaa"/>

### Primer ETL

Empecemos leyendo datos desde Azure Blob Storage, con la data de ventas de un supermercado

In [0]:
spark

In [0]:
#Primero creamos nuestro Blob Storage, cargamos nuestra data, generamos nuestra autenticación y escrimos el código
spark.conf.set(
  "fs.azure.sas.spark.datahack.blob.core.windows.net", "DefaultEndpointsProtocol=https;AccountName=datalakedatahack;AccountKey=r0HzJXAU7QWGtpMuvYz436Tn9KHetGlTMhnRfTl+vzpO3m30cBvt2/JIcVDqh4UyYTRRVbreppz1FzcZ9l/esA==;EndpointSuffix=core.windows.net")

In [0]:
#Leemos un dataframe

# Leer un CSV 
simpleDF = (spark
  .read
  .option("header", True)
  .csv("wasbs://spark@datahack.blob.core.windows.net/2010-12-01.csv")
  .sample(withReplacement=False, fraction=0.3, seed=3) # cargamos solo una muestra de la data
           )

# Mostrando el contenido de la data
print(spark.read)
display(simpleDF)

InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country
536365,85123A,WHITE HANGING HEART T-LIGHT HOLDER,6,2010-12-01 08:26:00,2.55,17850.0,United Kingdom
536365,84029E,RED WOOLLY HOTTIE WHITE HEART.,6,2010-12-01 08:26:00,3.39,17850.0,United Kingdom
536365,22752,SET 7 BABUSHKA NESTING BOXES,2,2010-12-01 08:26:00,7.65,17850.0,United Kingdom
536367,21754,HOME BUILDING BLOCK WORD,3,2010-12-01 08:34:00,5.95,13047.0,United Kingdom
536367,48187,DOORMAT NEW ENGLAND,4,2010-12-01 08:34:00,7.95,13047.0,United Kingdom
536368,22912,YELLOW COAT RACK PARIS FASHION,3,2010-12-01 08:34:00,4.95,13047.0,United Kingdom
536370,22726,ALARM CLOCK BAKELIKE GREEN,12,2010-12-01 08:45:00,3.75,12583.0,France
536370,21724,PANDA AND BUNNIES STICKER SHEET,12,2010-12-01 08:45:00,0.85,12583.0,France
536370,10002,INFLATABLE POLITICAL GLOBE,48,2010-12-01 08:45:00,0.85,12583.0,France
536370,22540,MINI JIGSAW CIRCUS PARADE,24,2010-12-01 08:45:00,0.42,12583.0,France


In [0]:
#Leemos un dataframe

# Leer un CSV 
simpleDFGCS = (spark
  .read
  .option("header", True)
  .csv("gs://sparkdatapath/2010-12-01.csv") # cargamos solo una muestra de la data
           )
# Mostrando el contenido de la data
#display(simpleDF)

### Data Validation

Dentro de un ETL es necesario validar si estamos cargando la data correcta. Algunos indicadores que puedo considerar:

* Si estoy recibiendo la misma cantidad de data que el origen
* Si todo los campos que esperaba están disponibles
* No existe valores missing inesperados

Filtremos para ver los productos con un monto menor a 10 libras y una cantidad mayor que 2

In [0]:
from pyspark.sql.functions import col

possibleErrorDF = (simpleDF
  .filter((col("UnitPrice") <= 10) & (col("Quantity") > 2))
  .select("Quantity", "Country")                 
  .groupBy("Country")
  .count()
  .orderBy("Country")
)
#Tipos de Operaciones en Spark
#Spark no procesa hasta que se ejecuté una acción
#Transformaciones y #Acciones

display(possibleErrorDF)

Country,count
EIRE,4
France,4
Germany,3
Netherlands,1
Norway,22
United Kingdom,399


Observamos que hay una gran cantidad de ventas en UK, que puede deberse a aregistros duplicados.

### Guardando en DBFS

DBFS es el sistema de archivos distribuidos de Databricks. Para cerrar este flujo escribiremos nuestro DataFrame en DBFS en formato **<a href="http://parquet.apache.org/presentations/" target="_blank">Parquet</a>**, dicho formato es columnar y es el más compatible y recomendado para trabajar con Spark.


<img src="https://datos.gob.es/sites/default/files/styles/blog_image/public/blog/image/logo_formato_parquet.jpg?itok=CT-UucXj"/>

Para guardar usamos el método `.write`.

* Los cluster que creamos cuenta con una capacidad limitada de almacenamiento en la carpeta `/tmp/`.  Esto nos servirá para poner nuestros archivos transformados.  
* Si nos quedamos si espacio podemos ejecutar `dbutils.fs.rm("/tmp/<mi directorio>", True)` para remover nuestros datos.

In [0]:
dbutils.fs.rm("/data20220407/possibleErrorDF.parquet", True)
savePath = "/data20220407/possibleErrorDF.parquet"

(possibleErrorDF
  .write
  .mode("overwrite") # overwrites a file if it already exists
  .parquet(savePath)
)
print(possibleErrorDF.write)

<img src="https://docs.microsoft.com/es-es/azure/databricks/_static/images/data-import/dbfs-and-local-file-paths.png"/>

-sandbox
### Detallando nuestro ETL

Veamos todo lo que usamos para hacer nuestro primer ETL con Spark.

| Code | Stage |
|------|-------|
| `simpleDF = (spark`                                                                          | Extraer |
| &nbsp;&nbsp;&nbsp;&nbsp;`.read`                                                           | Extraer |
| &nbsp;&nbsp;&nbsp;&nbsp;`.option("header", True)`                                         | Extraer |
| &nbsp;&nbsp;&nbsp;&nbsp;`.csv(<origen>)`                                                  | Extraer |
| `)`                                                                                       | Extraer |
| `possibleErrorDF = (simpleDF`                                                                  | Transformar |
| &nbsp;&nbsp;&nbsp;&nbsp;`.filter((col("UnitPrice") <= 10) & (col("Quantity") > 2))`             | Transformar |
| &nbsp;&nbsp;&nbsp;&nbsp;`.select("Quantity", "Country")`                    | Transformar |
| `)`                                                                                       | Transformar |
| `(possibleErrorDF.write`                                                                 | Cargar |
| &nbsp;&nbsp;&nbsp;&nbsp;`.parquet(<destino>))`                                      | Cargar |

* Importante entender que si bien trabajamos con una muestra de datos, el código es fácilmente escalable.

## Ejercicio 1: ¿Cómo le fue a Apple en la bolsa?

Es momento de que pongas en marcha lo aprendido. Tu objetivo es descubrir que día la acción de Apple superó los 100 dólares (usemos el valor "close"). Puedes descargar la data de forma manual desde aquí https://datahack.blob.core.windows.net/spark/AAPL.csv y cargarla en DBFS.

In [0]:
# Leer un CSV 
appleDF = (spark
  .read
  .option("header", True)
  .option("inferSchema", True)
  .csv("/FileStore/tables/AAPL.csv") #Importante aqui utilizaremos la ruta del DBFS
) # Importante aqui trabajaremos con toda la data, recuerda quitar el monto

display(appleDF)

Date,Open,High,Low,Close,Adj Close,Volume
2019-10-28,61.855,62.3125,61.68,62.262501,61.65081,96572800
2019-10-29,62.2425,62.4375,60.642502,60.822498,60.224953,142839600
2019-10-30,61.189999,61.325001,60.302502,60.814999,60.217525,124522000
2019-10-31,61.810001,62.2925,59.314999,62.189999,61.579021,139162000
2019-11-01,62.384998,63.982498,62.290001,63.955002,63.326683,151125200
2019-11-04,64.332497,64.462502,63.845001,64.375,63.742554,103272000
2019-11-05,64.262497,64.547501,64.080002,64.282501,63.65097,79897600
2019-11-06,64.192497,64.372498,63.842499,64.309998,63.678192,75864400
2019-11-07,64.684998,65.087502,64.527496,64.857498,64.413116,94940400
2019-11-08,64.672501,65.110001,64.212502,65.035004,64.589409,69986400


### Tipos de datos en Spark

#### Booleans

In [0]:
df = (spark
  .read
  .option("header", True)
  .option("inferSchema", "true")
  .csv("wasbs://spark@datahack.blob.core.windows.net/2010-12-01.csv")
  )

In [0]:
df.show()

In [0]:
from pyspark.sql.functions import col

df.where(col("InvoiceNo") != 536365)\
.select("InvoiceNo", "Description")\
.show(5, False)

In [0]:
#Otra forma es hacer el filtrado en el predicado
df.where("InvoiceNo <> 536365").show(5, False)#False es definido para que no corte los string que tienen mas de 20 caracteres

In [0]:
from pyspark.sql.functions import instr

priceFilter = col("UnitPrice") > 600

descripFilter = instr(df.Description, "POSTAGE") >= 1

df.where(priceFilter| descripFilter).show()

df.where((col("UnitPrice") > 600) | (instr(df.Description, "POSTAGE") >= 1)).show()

In [0]:
#Podemos especificar una columna boolean
from pyspark.sql.functions import instr

DOTCodeFilter = col("StockCode") == "DOT"
priceFilter = col("UnitPrice") > 600
descripFilter = instr(col("Description"), "POSTAGE") >= 1

df.withColumn("isExpensive", DOTCodeFilter & (priceFilter | descripFilter))\
  .show(5)

df.withColumn("isExpensive", (col("StockCode") == "DOT") & ((col("UnitPrice") > 600) | (instr(col("Description"), "POSTAGE") >= 1)))\
  .show(5)

In [0]:
df.withColumn("isExpensive", DOTCodeFilter & (priceFilter | descripFilter))\
  .where("isExpensive")\
  .select("unitPrice", "isExpensive").show(5)

df.withColumn("isExpensive", DOTCodeFilter & (priceFilter | descripFilter))\
  .where(col("isExpensive") == True)\
  .select("unitPrice", "isExpensive").show(5)

isExpensive = col("isExpensive") == True
df.withColumn("isExpensive", DOTCodeFilter & (priceFilter | descripFilter))\
  .where(isExpensive)\
  .select("unitPrice", "isExpensive").show(5)

In [0]:
(
  spark
    .read
    .option("header", True)
    .option("inferSchema", "true")
    .csv("wasbs://spark@datahack.blob.core.windows.net/2010-12-01.csv") # EXTRACCION (E)
    .withColumn("isExpensive", DOTCodeFilter & (priceFilter | descripFilter))\
    .withColumn("isNotExpensive", ~col("isExpensive"))
    .where("isExpensive")\
    .select("unitPrice", "isExpensive") # TRANSFORMACION (T)
    .write
    .mode("overwrite")
    .parquet("/data20220407/possibleErrorDF2022/") # CARGA (L)
) 

In [0]:
df1 = df.withColumn("isExpensive", DOTCodeFilter & (priceFilter | descripFilter))
df2 = df1.where("isExpensive")
df3 = df2.select("unitPrice", "isExpensive")
df3.show(5)

In [0]:
from pyspark.sql.functions import expr
df.where("Description = 'hello'")

In [0]:
#Manera para asegurar que no compare nulos

df.where(col("Description").eqNullSafe("hello")).show()

#### Numérico

In [0]:
#Tomar la candidad y precio unitario de cada elemento y aplicarle una función
from pyspark.sql.functions import expr, pow
fabricatedQuantity = pow(col("Quantity") * col("UnitPrice"), 2) + 5

df.select(expr("CustomerId"), fabricatedQuantity.alias("realQuantity")).show(2)

df.select(expr("CustomerId"), (pow(col("Quantity") * col("UnitPrice"), 2) + 5).alias("realQuantity")).show(2)

In [0]:
#Podemos multiplicar y dividir elementos de la columna así mismo sumar y restar siempre que estos sean del mismo tipo
df.selectExpr("CustomerId",  "(POWER((Quantity * UnitPrice), 2.0) + 5) as realQuantity").show(10)

In [0]:
#Redondear
from pyspark.sql.functions import lit, round, bround
df.select(round(col("UnitPrice"), 1).alias("rounded"), col("UnitPrice")).show(5)
# Expresiones equivalentes
df.selectExpr("(round(UnitPrice, 1)) as rounded", "UnitPrice").show(5)

In [0]:
#Redondear hacia abajo
from pyspark.sql.functions import lit, round, bround
df.select(round(lit("2.5")), bround(lit("2.5"))).show(2)
# SELECT 2.5, 2.5 FROM VENTAS LIMIT 2

In [0]:
#Correlación de Pearson entre dos columnas
from pyspark.sql.functions import corr
df.stat.corr("Quantity", "UnitPrice")
df.select(corr("Quantity", "UnitPrice")).show()

In [0]:
#Descripción breve de las columnas numéricas
df.select("Quantity").describe().show()

In [0]:
# Si lo necesitas por separado también puedes extraerlo
from pyspark.sql.functions import count, mean, stddev_pop, min, max
#Tambien pueden revisar la librería stat (df.stat)

In [0]:
#Quantiles
colName = "UnitPrice"
quantileProbs = [0.5]
relError = 0.05
df.stat.approxQuantile("UnitPrice", quantileProbs, relError) 

In [0]:
#Puedes realizar crosstabulation
df.stat.crosstab("StockCode", "Quantity").show()

In [0]:
#Frecuencia
dfFrec = df.stat.freqItems(["StockCode", "Quantity"])
display(dfFrec)

StockCode_freqItems,Quantity_freqItems
"List(90214E, 20728, 20755, 21703, 22113, 22524, 22041, 72803A, 72798C, 90181B, 21756, 22694, 90206C, 20970, 21624, 90209C, 84744, 82494L, 22952, 20682, 22583, 21705, 20679, 22220, 90177E, 90214A, 22448, 90214S, 22121, 22802, 84970L, 72818, 90192, 90200C, 22910, 21380, 90211A, 21137, 35271S, 84926A, 20765, 22384, 21524, 22165, 22366, 21221, 21704, 22519, 85035C, 21967, 22114, 22909, 22900, 22447, 21577, 21877, 20726, 85034A, DOT, 84658, 21472, 22804, 22222, 72802C, 21739, 22467, 90214H, 22785, 22446, 22197, 20665, 21733, 22731, 21709, 22086, 40001, 85123A)","List(200, 128, 23, 32, 50, 600, 8, 17, 80, -1, -10, 11, 56, 47, 20, -7, 2, 5, 480, -4, 14, 432, 100, 64, 40, 13, 4, -5, 22, 16, -2, 7, 70, 384, 25, 34, 10, 1, 288, 216, 28, 252, 19, 120, 192, 60, 96, 72, 144, 36, 27, 9, 18, 48, 21, 12, 3, -6, -24, 30, 15, 33, 6, 24, -12, -3)"


In [0]:
#Podemos agregar un ID único a cada elemento de un Dataframe
from pyspark.sql.functions import monotonically_increasing_id
df.select(monotonically_increasing_id()).show(10)

#### Strings

In [0]:
#initcap que capitaliza cada letra
from pyspark.sql.functions import initcap
df.select(initcap(col("Description"))).show()

In [0]:
#Podemos poner mayúsculas y minúsculas
from pyspark.sql.functions import lower, upper
df.select(col("Description"),
    lower(col("Description")),
    upper(lower(col("Description")))).show(2)

In [0]:
#Agregar o remover espacios en blanco
from pyspark.sql.functions import lit, ltrim, rtrim, rpad, lpad, trim
df.select(
    ltrim(lit("    HELLO    ")).alias("ltrim"),
    rtrim(lit("    HELLO    ")).alias("rtrim"),
    trim(lit("    HELLO    ")).alias("trim"),
    lpad(lit("HELLO"), 3, " ").alias("lp"),
    rpad(lit("HELLO"), 10, " ").alias("rp")).show(2)

df.select(col("UnitPrice") + lit(1), col("UnitPrice")).show(10)
df.withColumn("isCleaned", lit(False)).show(10)

In [0]:
#Expresiones regulares
#Busca las palabras BLACK WHITE y las reemplaza con COLOR
from pyspark.sql.functions import regexp_replace
regex_string = "BLACK|WHITE|RED|GREEN|BLUE"
df.select(
  regexp_replace(col("Description"), regex_string, "COLOR").alias("color_clean"),
  col("Description")).show(2, False)

In [0]:
#Reemplazar caracteres - translate
from pyspark.sql.functions import translate
df2 = df.select(translate(col("Description"), "LEET", "1337"),col("Description"))

In [0]:
#Retirar las palabras de la descripción
from pyspark.sql.functions import regexp_extract
extract_str = "(BLACK|WHITE|RED|GREEN|BLUE)"
df.select(
     regexp_extract(col("Description"), extract_str, 1).alias("color_clean"),
     col("Description")).show(2, False)

In [0]:
#Solo deseamos revisar si la palabra se encuentra dentro de una columna
from pyspark.sql.functions import instr
containsBlack = instr(col("Description"), "BLACK") >= 1 #Predicado en una variable
containsWhite = instr(col("Description"), "WHITE") >= 1

df.withColumn("hasSimpleColor", containsBlack | containsWhite)\
  .where("hasSimpleColor")\
  .select("Description").show(3, False)


df.withColumn("hasSimpleColor", (instr(col("Description"), "BLACK") >= 1) | (instr(col("Description"), "WHITE") >= 1))\
  .where("hasSimpleColor")\
  .select("Description").show(3, False)


#### Date y Timestamps

In [0]:
from pyspark.sql.functions import current_date, current_timestamp

dateDF = spark.range(1)\
  .withColumn("today", current_date())\
  .withColumn("now", current_timestamp())
dateDF.show()

In [0]:
dateDF.printSchema()

In [0]:
#Agreguemos y restemos días
from pyspark.sql.functions import date_add, date_sub
dateDF.select(date_sub(col("today"), 5), date_add(col("today"), 5)).show(1)

In [0]:
#Diferencia entre dos fechas
from pyspark.sql.functions import datediff, months_between, to_date
#Calculamos la fecha de hace 7 días y la restamis de la de hoy
dateDF.withColumn("week_ago", date_sub(col("today"), 7))\
  .select(datediff(col("week_ago"), col("today"))).show(1)

#Restamos dos fechas solo por meses SELECT FECHA, '2016-01-01' AS START FROM dateDF
dateDF.select(
    to_date(lit("2016-01-01")).alias("start"),
    to_date(lit("2017-05-22")).alias("end"))\
   .select(months_between(col("start"), col("end"))).show(1)

In [0]:
from pyspark.sql.functions import to_date, lit, col

# SELECT CAST("2021-02-08") AS DATE FROM DUAL LIMIT 5
spark.range(5).withColumn("date", lit("2021-02-08")).select(to_date(col("date"))).show(5)

In [0]:
#Podemos definir el formato de días basandonos en https://docs.oracle.com/javase/8/docs/api/java/text/SimpleDateFormat.html
from pyspark.sql.functions import to_date, lit
dateFormat = "yyyy-dd-MM"
cleanDateDF = spark.range(1).select(  # SELECT CAST("2019-12-11") AS DATE AS date, CAST("2019-20-12") AS DATE AS date2 FROM DUAL
    to_date(lit("2019-12-11"), dateFormat).alias("date"),
    to_date(lit("2019-20-12"), dateFormat).alias("date2"))

cleanDateDF.createOrReplaceTempView("dateTable2")

In [0]:
%sql

select date2, 37878 as otracolumna from dateTable2

date2,otracolumna
2019-12-20,37878


In [0]:
#Trabajando con timestamp
from pyspark.sql.functions import to_timestamp
cleanDateDF.select(to_timestamp(col("date"), dateFormat)).show()

In [0]:
cleanDateDF.show()

In [0]:
cleanDateDF.filter(col("date2") > lit("2017-12-12")).show()

#### Nulls
Spark puede trabajar mejor con nulos que con campos en blanco

In [0]:
#Retornar el primer valor no nulo de una columna
from pyspark.sql.functions import coalesce
df.select(coalesce(col("Description"), col("CustomerId")), col("Description"), col("CustomerId")).show()


In [0]:
#Dropear toda la fila si alguno de sus valores es nulo
df.na.drop()
df.na.drop("any") # PELIGROSO

In [0]:
#Remueve la fila si todos sus elementos son nulos
df.na.drop("all")

In [0]:
#También podemos remover las filas si dos de las columnas tienen nulos
df.na.drop("all", subset=["StockCode", "InvoiceNo"])

In [0]:
#Puedes cambiar los valores nulos con un valor predefinido
df.na.fill("All Null values become this string") #Para columnas string
df.na.fill(0)#Columnas integer
df.na.fill(float(5))#Columnas double

In [0]:
#Solo para los nulos de algunas columnas
df.na.fill("all", subset=["StockCode", "InvoiceNo"])

In [0]:
#Reemplazar nulos con tambien puede manejar un elemento tipo map
fill_cols_vals = {"StockCode": 5, "Description" : "No Value"}
df.na.fill(fill_cols_vals)

In [0]:
#Replace
#reemplazamos con UNKNOWS todos los valores vacíos de la columna Description, para optimizar el consumo con Nulls
df.na.replace([""], ["UNKNOWN"], "Description")

#### Tipos de Datos Complejos

In [0]:
#Creando
df.selectExpr("struct(Description, InvoiceNo) as complex").show(3, False)

In [0]:
from pyspark.sql.functions import struct
complexDF = df.select(struct("Description", "InvoiceNo").alias("complex"))


In [0]:
complexDF.select("complex.Description").show()

In [0]:
#Manipular data de un tipo de dato complejo hay varias opciones

complexDF.select(col("complex").getField("Description")).show()

In [0]:
#Otra opción
complexDF.select("complex.*").show()

In [0]:
#Generamos desde una columna un Array usando split
from pyspark.sql.functions import split
df.select(split(col("Description"), " ")).show(2, False)

In [0]:
#Podemos acceder a un elemento del array fácilmente
df.select(split(col("Description"), " ").alias("array_col"))\
  .selectExpr("array_col[0]").show(2)

In [0]:
#Largo del array
from pyspark.sql.functions import size
df.select(size(split(col("Description"), " "))).show(2)

In [0]:
#Saber si un elemento del array está contenido
from pyspark.sql.functions import array_contains
df.select(array_contains(split(col("Description"), " "), "WHITE")).show(2)


In [0]:
#Explode
#Toma una columna que es un Array y crea una fila por cada elemento del Array copiando todos los otros elementos del array original
from pyspark.sql.functions import split, explode

df.withColumn("splitted", split(col("Description"), " "))\
  .withColumn("exploded", explode(col("splitted")))\
  .select("Description", "InvoiceNo", "exploded").show(10, False)

In [0]:
#Maps
#Tipo clave valor
from pyspark.sql.functions import create_map
df.select(create_map(col("Description"), col("InvoiceNo")).alias("complex_map"))\
  .show(10,False)

In [0]:
#También puedes aplicarle explode para generar columnas
df.na.drop("any").select(create_map(col("Description"), col("InvoiceNo")).alias("complex_map"))\
  .selectExpr("explode(complex_map)").show(2)


#### JSON

In [0]:
#Generando un JSON
jsonDF = spark.range(1).selectExpr("""'{"myJSONKey" : {"myJSONValue" : [1, 2, 3]}}' as jsonString""")
jsonDF.show()

In [0]:
#Query a JSON object con un diccionario usando get_json_object, puedes usar json_tuple si este objeto no tiene anidación
from pyspark.sql.functions import get_json_object, json_tuple
display(jsonDF.select(json_tuple(col("jsonString"), "myJSONKey")))

c0
"{""myJSONValue"":[1,2,3]}"


In [0]:
jsonDF.select(get_json_object(col("jsonString"), "$.myJSONKey.myJSONValue[1]")).show(2)

In [0]:
#También podemos convertir un Struct de dato a JSON
from pyspark.sql.functions import to_json
df2 = df.selectExpr("(InvoiceNo, Description) as myStruct")\
  .select(to_json(col("myStruct")))
df2.printSchema()

In [0]:
from pyspark.sql.functions import from_json
from pyspark.sql.types import *
parseSchema = StructType((
  StructField("InvoiceNo",StringType(),True),
  StructField("Description",StringType(),True)))
df.selectExpr("(InvoiceNo, Description) as myStruct")\
  .select(to_json(col("myStruct")).alias("newJSON")).show(2,False)

In [0]:
#Podemos usar from_json para construir un array 
from pyspark.sql.functions import from_json
from pyspark.sql.types import *
parseSchema = StructType((
  StructField("InvoiceNo",StringType(),True),
  StructField("Description",StringType(),True)))
df.selectExpr("(InvoiceNo, Description) as myStruct")\
  .select(to_json(col("myStruct")).alias("newJSON"))\
  .select(from_json(col("newJSON"), parseSchema), col("newJSON")).show(2,False) #De Json a Array definiendo un esquema

### UDFs
Funciones definidas por el usuario

* Funciones definidas por el usuario (UDFs) 
* Permite definir tus propias trasnformaciones, incluso usando librerías externas
* Estas funciones son temporales para ser usadas en el SparkSession
* Debemos considerar el performance

In [0]:
#1. Creamos la función
udfExampleDF = spark.range(5).toDF("num")
def power3(double_value):#Nuestro input es específico e incluso debemos evitar nulos
  return double_value ** 3
power3(2.0)

Debemos registrarla en Spark para que pueda ser usada por todos nuestros workers. Spark serializará la función en el Driver y la transferirá por red a todos los executors

Si es en Scala o Java no hay mucha pérdida de performance
Si la función es en Python, Spark inicia un proceso Python en el worker y serializar toda la data a formato Python para que pueda ser entendida. Recordemos que Spark trabaja en el JVM de cada worker.

Finalmente ejecuta la función Python fila a fila y la retorna a la JVM y Spark
Es recomendable hacerlo en Scala porque caso contrario tendríamos a la JVM del worker y al proceso Python compitiendo por recursos

In [0]:
#Registro
from pyspark.sql.functions import udf
power3udf = udf(power3)

In [0]:
#Uso
from pyspark.sql.functions import col
udfExampleDF.select(power3udf(col("num"))).show(5)

In [0]:
#Debemos manejar el mismo tipo de dato en este caso la función espera Doubles pero recibe enteros por eso debemos actualizarla
from pyspark.sql.types import IntegerType, DoubleType
spark.udf.register("power3py", power3, IntegerType())

In [0]:
udfExampleDF.selectExpr("power3py(num)").show(5)

#### Read & Write

**Leyendo archivos**

esta basado en el atributo spark.read
Podemos customizarlo con:
* Formato
* Esquema
* Modo lectura
* Otras opciones

In [0]:
spark.read.format("csv")
  .option("mode", "FAILFAST")
  .option("inferSchema", "true")
  .option("path", "path/to/file(s)")
  .schema(someSchema)
  .load()

Modos de lectura
* permissive= Define como nulo todos los valores corruptos que encuentre y pone todos los datos corruptos en una columna string llamada called_corrupt_record 
* dropMalformed = Elimina toda la fila que contiene algun dato corrupto
* failFast = Falla inmediatamente si encuentra un dato corrupto

**Write**

In [0]:
#Esta basado en el atributo spark.write
df.write.format("csv")\
  .option("mode", "OVERWRITE")\
  .option("path", "/FileStore/tables/out.csv")\
  .save()

In [0]:
dread = spark.read.format("json")\
  .option("inferSchema", "true")\
  .load("/FileStore/tables/out.csv")
dread.show()

In [0]:
#Esta basado en el atributo spark.write
dataframe.write.format("avro")
  .option("mode", "OVERWRITE")
  .option("dateFormat", "yyyy-MM-dd")
  .option("path", "path/to/file(s)")
  .save()

**Modos de guardado**

* append = agrega la data resultante a la lista de archivos que ya existen en el directorio de destino
* overwrite = completamente escribe sobre el directorio de destino
* errorIfExist = Salta un error si algun archivo ya existe en el directorio
* ignore = Si la data o archivos existen no ha

### Ejercicio 2: Analisemos los vuelos en Latinoamérica

1. Leer el archivo vuelos.json https://datahack.blob.core.windows.net/spark/vuelos.json
2. Responder las siguientes preguntas
  2.1. Cual es el destino más visitado
  2.1. Cuantos vuelos parten de Argentina
3. Guardarlo en un CVS en DBFS
4. Guadarlo en parquet en DBFS

In [0]:
# Leer un CSV 
df = (spark
  .read
  .json("wasbs://spark@datahack.blob.core.windows.net/vuelos.json") 
) # Importante aqui trabajaremos con toda la data, recuerda quitar el monto
 

In [0]:
display(df)

DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME,count
United States,Romania,15
United States,Croatia,1
United States,Ireland,344
Egypt,United States,15
United States,India,62
United States,Singapore,1
United States,Grenada,62
Costa Rica,United States,588
Senegal,United States,40
Moldova,United States,1


In [0]:
from pyspark.sql.functions import desc

df.orderBy(desc("count")).show(5)

In [0]:
topsDF = df.orderBy(desc("count"))

savePath = "/flights/tops.parquet"
 
(topsDF
  .write
  .mode("overwrite") # overwrites a file if it already exists
  .parquet(savePath)
)

In [0]:
# Leer un CSV 
appleDF = (spark
  .read
  .option("header", True)
  .csv("/FileStore/tables/AAPL.csv") #Importante aqui utilizaremos la ruta del DBFS
) # Importante aqui trabajaremos con toda la data, recuerda quitar el monto
 
display(appleDF)

Date,Open,High,Low,Close,Adj Close,Volume
2019-10-28,61.855,62.3125,61.68,62.262501,61.65081,96572800
2019-10-29,62.2425,62.4375,60.642502,60.822498,60.224953,142839600
2019-10-30,61.189999,61.325001,60.302502,60.814999,60.217525,124522000
2019-10-31,61.810001,62.2925,59.314999,62.189999,61.579021,139162000
2019-11-01,62.384998,63.982498,62.290001,63.955002,63.326683,151125200
2019-11-04,64.332497,64.462502,63.845001,64.375,63.742554,103272000
2019-11-05,64.262497,64.547501,64.080002,64.282501,63.65097,79897600
2019-11-06,64.192497,64.372498,63.842499,64.309998,63.678192,75864400
2019-11-07,64.684998,65.087502,64.527496,64.857498,64.413116,94940400
2019-11-08,64.672501,65.110001,64.212502,65.035004,64.589409,69986400


In [0]:
from pyspark.sql.functions import col
 
appleDFover100 = (appleDF
  .filter((col("close") > 100))
  .select("Date")
  .orderBy("Date")
)
 
display(appleDFover100)

Date
2020-07-31
2020-08-03
2020-08-04
2020-08-05
2020-08-06
2020-08-07
2020-08-10
2020-08-11
2020-08-12
2020-08-13


In [0]:
savePath = "/apple20220407/apple.csv"
 
(appleDFover100
  .write
  .mode("overwrite") # overwrites a file if it already exists
  .csv(savePath)
)