# Fundamentos de Apache Spark

### Crear el DataFrame

In [0]:
prod = [(1, "AAA", "dept1", 100),
    (2, "BBB", "cat1", 300),
    (3, "CCC", "cat1", 320),
    (4, "DDD", "cat1", 350),
    (5, "EEE", "cat2", 80),
    (6, "FFF", "cat2", 77),
    (7, "GGG", "cat3", 70),
    (8, "HHH", "cat3", 78),
    (9, "III", "cat3", 44),
    (10, "JJJ", "cat4", 54)]


df_prod = spark.createDataFrame(prod, ["id", "name", "cat", "price"])



In [0]:
df_prod.show(5)

### Leer Cloud Storage

In [0]:
df_trx = spark.read.format("csv").load("gs://test-nh/data/transaction/transaction_20220101.csv", header=True)

# Operaciones básicas en DataFrames

### count
* Cuenta el número de filas

In [0]:
df_trx.count()

### columns

In [0]:
df_trx.columns

### dtypes
** Muestra los tipos de datos de las columnas

In [0]:
df_trx.dtypes

### printSchema
** Muestra el esquema del dataframe

In [0]:
df_trx.printSchema()

### select
* Seleccione columnas del DataFrame

In [0]:
df_trx.select("transaction_id", "transaction_date").show(5)

### filter

* Filtrar las filas según alguna condición.

In [0]:
df_trx.filter(df_trx["transaction_id"] == 1).show()

### drop
* Elimina una columna en particular

In [0]:
df_new = df_trx.drop("transaction_id")
df_new.show(2)

### Sorting

* Ordena los datos

In [0]:
df_trx.sort("transaction_amount").show(5)

### Spark SQL

* Complementa funciones de analisis de datos en los dataframe

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

### Aggregations


In [0]:
(df_trx.groupBy("payment_method")
    .agg(
        count("transaction_amount").alias("count"),
        sum("transaction_amount").alias("sum"),
        max("transaction_amount").alias("max"),
        min("transaction_amount").alias("min"),
        avg("transaction_amount").alias("avg")
        ).show()
)

### Columnas derivadas
* Podemos usar la función "withColumn" para derivar la columna en función de las columnas existentes ...

In [0]:
df_trx.withColumn("transaction_amount_place", col("transaction_amount") * 1.1).show()

### Joins

* Podemos realizar varios tipos de combinaciones en múltiples DataFrames.

In [0]:
# Inner JOIN.
df_trx.join(df_prod, df_trx["product_id"] == df_prod["id"]).show()

### Left Outer Join

In [0]:
df_trx.join(df_prod, df_trx["product_id"] == df_prod["id"], "left_outer").show()

### Right Outer Join

In [0]:
df_trx.join(df_prod, df_trx["product_id"] == df_prod["id"], "right_outer").show()

### Full Outer Join

In [0]:
df_trx.join(df_prod, df_trx["product_id"] == df_prod["id"], "outer").show()

In [0]:
df_trx.write.format("parquet").mode("overwrite").save("gs://test-nh/data/transaction-output")

k

In [0]:
db = "deltadb"

spark.sql(f"CREATE DATABASE IF NOT EXISTS {db}")
spark.sql(f"USE {db}")

##  verifica si los archivos en la ubicación especificada cumplen con el formato esperado de Delta Lake
spark.sql("SET spark.databricks.delta.formatCheck.enabled = false")

## habilita la optimización automática de escritura en Delta Lake cuando se escriben datos en tablas Delta
spark.sql("SET spark.databricks.delta.properties.defaults.autoOptimize.optimizeWrite = true") 

In [0]:
df_trx.write.format("delta").mode("overwrite").saveAsTable("transaction")

In [0]:
spark.sql("select count(*) from transaction").show()


In [0]:
spark.sql("select * from transaction").show(3)

### Versiones en Deltalake

In [0]:
%sql
DELETE FROM transaction WHERE transaction_id=1;

In [0]:
%sql
SELECT *  FROM transaction WHERE transaction_id=1;

In [0]:
%sql
SELECT *  FROM transaction VERSION AS OF 0 WHERE transaction_id=1;


### Restaurar una versión

In [0]:
%sql RESTORE transaction VERSION AS OF 0

In [0]:
%sql
SELECT *  FROM transaction WHERE transaction_id=1;