<a href="https://colab.research.google.com/github/FernandoBRdgz/diplomado_ds_mod4/blob/main/4.5%20Herramientas%20para%20procesamiento%20y%20explotaci%C3%B3n%20de%20Big%20Data/4.5.2%20DataFrames%20de%20Spark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

### Fundamentos

Los Spark DataFrames son la piedra angular y la forma principal de trabajar con Spark y Python después de Spark 2.0.

Los DataFrames actúan como potentes facilitadores para el manejo de tablas, y manejan fácilmente grandes conjuntos de datos.

El cambio a DataFrames ofrece muchas ventajas:
* Una sintaxis mucho más simple
* Capacidad de usar SQL directamente en el marco de datos
* Las operaciones se distribuyen automáticamente entre los RDD

Si ha usado anteriormente la biblioteca de pandas de Python o incluso R, probablemente ya esté familiarizado con el concepto de DataFrames.

Los Spark DataFrames amplían muchos de estos conceptos, lo que le permite transferir ese conocimiento fácilmente al comprender la sintáxis simple.

Recuerde que la principal ventaja de usar Spark DataFrames frente a otras herramientas es que Spark puede manejar grandes conjuntos de datos que no cabrían en una sola computadora.

In [None]:
dbutils.fs.ls("/")

In [None]:
generation = "mod4gen<x>"

Para montar un Blob Storage en Azure Databricks:
```
dbutils.fs.mount(
  source = "wasbs://<container-name>@<storage-account-name>.blob.core.windows.net",
  mount_point = "/mnt/<mount-name>",
  extra_configs = {"fs.azure.account.key.<storage-account-name>.blob.core.windows.net": "<your-key>"})
```

In [None]:
dbutils.fs.mount(
  source = "wasbs://input@minidatalake.blob.core.windows.net",
  mount_point = f"/mnt/{generation}/input",
  extra_configs = {"fs.azure.account.key.minidatalake.blob.core.windows.net": "<your-key>"})

In [None]:
dbutils.fs.mount(
  source = "wasbs://output@minidatalake.blob.core.windows.net",
  mount_point = f"/mnt/{generation}/output",
  extra_configs = {"fs.azure.account.key.minidatalake.blob.core.windows.net": "<your-key>"})

In [None]:
# dbutils.fs.unmount(f"/mnt/{generation}/input")
# dbutils.fs.unmount(f"/mnt/{generation}/output")

In [None]:
dbutils.fs.ls(f"/mnt/{generation}/input")

In [None]:
df = spark.read.csv(f"/mnt/{generation}/input/appl_stock.csv", inferSchema=True, header=True)

In [None]:
df.show()

In [None]:
df.display()

Carga de datos semi-estructurados

In [None]:
df = spark.read.json(f"/mnt/{generation}/input/people.json")

In [None]:
# Observe cómo faltan datos
display(df)

In [None]:
df.printSchema()

In [None]:
df.columns

In [None]:
df.describe().display()

Algunos tipos de datos facilitan la inferencia del esquema (como formatos tabulares como CSV).

Sin embargo, a menudo se tiene que configurar un esquema manualmente si se está trabajando con un método `.read` que no tiene `inferSchema()` incorporado.

Spark tiene algunas funciones necesarias para realizar esta operación, solo se requiere una estructura más específica.

In [None]:
from pyspark.sql.types import StructField, StringType, IntegerType, StructType

In [None]:
data_schema = [StructField("age", IntegerType(), True),StructField("name", StringType(), True)]

In [None]:
final_struc = StructType(fields=data_schema)

In [None]:
df = spark.read.json(f"/mnt/{generation}/input/people.json", schema=final_struc)

In [None]:
df.printSchema()

In [None]:
df['age']

In [None]:
type(df['age'])

In [None]:
df.select('age')

In [None]:
type(df.select('age'))

In [None]:
df.select('age').display()

In [None]:
# Devuelve la lista de objetos Row
df.head(2)

Múltiples columnas:

In [None]:
df.select(['age','name'])

In [None]:
df.select(['age','name']).display()

Creación de nuevas columnas

In [None]:
# Agregar una nueva columna con una copia simple
df.withColumn('newage', df['age']).display()

In [None]:
df.display()

In [None]:
# Renombrado simple
df.withColumnRenamed('age','supernewage').display()

Operaciones más complejas para crear nuevas columnas.

In [None]:
df.withColumn('doubleage', df['age']*2).display()

In [None]:
df.withColumn('add_one_age', df['age']+1).display()

In [None]:
df.withColumn('half_age', df['age']/2).display()

In [None]:
df.withColumn('half_age',df['age']/2)

### Uso de SQL

Para usar consultas SQL directamente con el dataframe, se debe registrar en una vista temporal

In [None]:
# Registra el DataFrame como una vista temporal de SQL
df.createOrReplaceTempView("people")

In [None]:
sql_results = spark.sql("SELECT * FROM people")

In [None]:
sql_results

In [None]:
sql_results.display()

In [None]:
spark.sql("SELECT * FROM people WHERE age=30").display()