# Fundamentos de Apache Spark: SQL/DataFrames

**Spark SQLtrabaja con DataFrames**. Un DataFrame es una **representación relacional de los datos**. Proporciona funciones con capacidades similares a SQL. Además, permite escribir **consultas tipo SQL** para nuestro análisis de datos.

Los DataFrames son similares a las tablas relacionales o DataFrames en Python / R auqnue con muchas optimizaciones que se ejecutan de manera "oculta" para el usuario. Hay varias formas de crear DataFrames a partir de colecciones, tablas HIVE, tablas relacionales y RDD.

In [1]:
import findspark
findspark.init()

import pandas as pd
import pyspark

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

In [16]:
spark = SparkSession.builder.config("spark.jars", "/postgresql-42.7.4.jar").getOrCreate()

In [4]:
emp = [(1, "AAA", "dept1", 1000),
    (2, "BBB", "dept1", 1100),
    (3, "CCC", "dept1", 3000),
    (4, "DDD", "dept1", 1500),
    (5, "EEE", "dept2", 8000),
    (6, "FFF", "dept2", 7200),
    (7, "GGG", "dept3", 7100),
    (8, "HHH", "dept3", 3700),
    (9, "III", "dept3", 4500),
    (10, "JJJ", "dept5", 3400)]

dept = [("dept1", "Department - 1"),
        ("dept2", "Department - 2"),
        ("dept3", "Department - 3"),
        ("dept4", "Department - 4")

       ]

# Crea un df
df = spark.createDataFrame(emp, ["id", "name", "dept", "salary"])

deptdf = spark.createDataFrame(dept, ["id", "name"]) 

In [13]:
# Crea una tabla a partir de una tabla hive
df = spark.table("tbl_name")

AnalysisException: [TABLE_OR_VIEW_NOT_FOUND] The table or view `tbl_name` cannot be found. Verify the spelling and correctness of the schema and catalog.
If you did not qualify the name with a schema, verify the current_schema() output, or qualify the name with the correct schema and catalog.
To tolerate the error on drop use DROP VIEW IF EXISTS or DROP TABLE IF EXISTS.;
'UnresolvedRelation [tbl_name], [], false


In [6]:
df.show()

+---+----+-----+------+
| id|name| dept|salary|
+---+----+-----+------+
|  1| AAA|dept1|  1000|
|  2| BBB|dept1|  1100|
|  3| CCC|dept1|  3000|
|  4| DDD|dept1|  1500|
|  5| EEE|dept2|  8000|
|  6| FFF|dept2|  7200|
|  7| GGG|dept3|  7100|
|  8| HHH|dept3|  3700|
|  9| III|dept3|  4500|
| 10| JJJ|dept5|  3400|
+---+----+-----+------+



In [9]:
#Muestra el contenido de un df
deptdf.show()

+-----+--------------+
|   id|          name|
+-----+--------------+
|dept1|Department - 1|
|dept2|Department - 2|
|dept3|Department - 3|
|dept4|Department - 4|
+-----+--------------+



In [8]:
#Cuenta total de registros
df.count()

10

In [22]:
# Lista las columnas
df.columns

['id', 'name', 'dept', 'salary']

In [24]:
# Muestra el tipo de dato de cada columna
df.dtypes

[('id', 'bigint'),
 ('name', 'string'),
 ('dept', 'string'),
 ('salary', 'bigint')]

In [41]:
## Muestra como spark guarda los dataframes
print(df.schema)

print()
# Hace lo mismo pero de una forma más ordenada
df.printSchema()

StructType([StructField('id', LongType(), True), StructField('name', StringType(), True), StructField('dept', StringType(), True), StructField('salary', LongType(), True)])

root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- dept: string (nullable = true)
 |-- salary: long (nullable = true)



In [42]:
# Filtra por columnas

df.select("id","name").show()

+---+----+
| id|name|
+---+----+
|  1| AAA|
|  2| BBB|
|  3| CCC|
|  4| DDD|
|  5| EEE|
|  6| FFF|
|  7| GGG|
|  8| HHH|
|  9| III|
| 10| JJJ|
+---+----+



In [50]:
#Todos sirven para filtar

df.filter(df["name"] == 'AAA').show()

df.filter(df.id ==1 ).show()

df.filter(col("id")  == 1).show()

# Con este puedes hacerlo igualito al where de SQL
df.filter("salary = 8000 or id = 2").show()

+---+----+-----+------+
| id|name| dept|salary|
+---+----+-----+------+
|  1| AAA|dept1|  1000|
+---+----+-----+------+

+---+----+-----+------+
| id|name| dept|salary|
+---+----+-----+------+
|  1| AAA|dept1|  1000|
+---+----+-----+------+

+---+----+-----+------+
| id|name| dept|salary|
+---+----+-----+------+
|  1| AAA|dept1|  1000|
+---+----+-----+------+

+---+----+-----+------+
| id|name| dept|salary|
+---+----+-----+------+
|  2| BBB|dept1|  1100|
|  5| EEE|dept2|  8000|
+---+----+-----+------+



In [52]:
df = df.drop("id")
df.show()

+----+-----+------+
|name| dept|salary|
+----+-----+------+
| AAA|dept1|  1000|
| BBB|dept1|  1100|
| CCC|dept1|  3000|
| DDD|dept1|  1500|
| EEE|dept2|  8000|
| FFF|dept2|  7200|
| GGG|dept3|  7100|
| HHH|dept3|  3700|
| III|dept3|  4500|
| JJJ|dept5|  3400|
+----+-----+------+



In [51]:
# Así hago agregaciones a los datos
# * Podemos usar la función groupBy para agrupar los datos y luego usar la función "agg" para realizar la agregación de datos agrupados.
(df.groupBy("dept")
     .agg(
         count("salary").alias("count"),
         sum("salary").alias("sum"),
         max("salary").alias("max"),
         min("salary").alias("min"),
         avg("salary").alias("avg")
     ).show()
    
)

+-----+-----+-----+----+----+------+
| dept|count|  sum| max| min|   avg|
+-----+-----+-----+----+----+------+
|dept1|    4| 6600|3000|1000|1650.0|
|dept2|    2|15200|8000|7200|7600.0|
|dept3|    3|15300|7100|3700|5100.0|
|dept5|    1| 3400|3400|3400|3400.0|
+-----+-----+-----+----+----+------+



In [63]:
# Ordena los datos de manera ascendente
df.sort("salary").show()

# Ordena los datos de manera ascendente
df.sort(desc("salary")).show()

+----+-----+------+
|name| dept|salary|
+----+-----+------+
| AAA|dept1|  1000|
| BBB|dept1|  1100|
| DDD|dept1|  1500|
| CCC|dept1|  3000|
| JJJ|dept5|  3400|
| HHH|dept3|  3700|
| III|dept3|  4500|
| GGG|dept3|  7100|
| FFF|dept2|  7200|
| EEE|dept2|  8000|
+----+-----+------+

+----+-----+------+
|name| dept|salary|
+----+-----+------+
| EEE|dept2|  8000|
| FFF|dept2|  7200|
| GGG|dept3|  7100|
| III|dept3|  4500|
| HHH|dept3|  3700|
| JJJ|dept5|  3400|
| CCC|dept1|  3000|
| DDD|dept1|  1500|
| BBB|dept1|  1100|
| AAA|dept1|  1000|
+----+-----+------+



In [71]:
#Columnas derivadas, esas columnas son virtuales, 
# no se agregan al DF
# Pero veo que puedo usarlas en tiempo de ejecución y seguir trabajando con ellas
df.withColumn("bonus", col("salary")*.1).withColumn("neto",col("bonus")+col("salary")).show()


+----+-----+------+-----+------+
|name| dept|salary|bonus|  neto|
+----+-----+------+-----+------+
| AAA|dept1|  1000|100.0|1100.0|
| BBB|dept1|  1100|110.0|1210.0|
| CCC|dept1|  3000|300.0|3300.0|
| DDD|dept1|  1500|150.0|1650.0|
| EEE|dept2|  8000|800.0|8800.0|
| FFF|dept2|  7200|720.0|7920.0|
| GGG|dept3|  7100|710.0|7810.0|
| HHH|dept3|  3700|370.0|4070.0|
| III|dept3|  4500|450.0|4950.0|
| JJJ|dept5|  3400|340.0|3740.0|
+----+-----+------+-----+------+



## JOINS

In [72]:
# INNER JOIN
df.join(deptdf, df["dept"] == deptdf["id"]).show()

+----+-----+------+-----+--------------+
|name| dept|salary|   id|          name|
+----+-----+------+-----+--------------+
| AAA|dept1|  1000|dept1|Department - 1|
| BBB|dept1|  1100|dept1|Department - 1|
| CCC|dept1|  3000|dept1|Department - 1|
| DDD|dept1|  1500|dept1|Department - 1|
| EEE|dept2|  8000|dept2|Department - 2|
| FFF|dept2|  7200|dept2|Department - 2|
| GGG|dept3|  7100|dept3|Department - 3|
| HHH|dept3|  3700|dept3|Department - 3|
| III|dept3|  4500|dept3|Department - 3|
+----+-----+------+-----+--------------+



In [77]:
# LEFT OUTER
df.join(deptdf, df["dept"] == deptdf["id"], "left_outer").show()

+----+-----+------+-----+--------------+
|name| dept|salary|   id|          name|
+----+-----+------+-----+--------------+
| AAA|dept1|  1000|dept1|Department - 1|
| BBB|dept1|  1100|dept1|Department - 1|
| CCC|dept1|  3000|dept1|Department - 1|
| DDD|dept1|  1500|dept1|Department - 1|
| EEE|dept2|  8000|dept2|Department - 2|
| FFF|dept2|  7200|dept2|Department - 2|
| GGG|dept3|  7100|dept3|Department - 3|
| HHH|dept3|  3700|dept3|Department - 3|
| III|dept3|  4500|dept3|Department - 3|
| JJJ|dept5|  3400| NULL|          NULL|
+----+-----+------+-----+--------------+



In [86]:
# RIGHT OUTER
df.join(deptdf,df["dept"] == deptdf["id"], "right_outer").filter( df["name"] != "NULL").show()

+----+-----+------+-----+--------------+
|name| dept|salary|   id|          name|
+----+-----+------+-----+--------------+
| AAA|dept1|  1000|dept1|Department - 1|
| BBB|dept1|  1100|dept1|Department - 1|
| CCC|dept1|  3000|dept1|Department - 1|
| DDD|dept1|  1500|dept1|Department - 1|
| EEE|dept2|  8000|dept2|Department - 2|
| FFF|dept2|  7200|dept2|Department - 2|
| GGG|dept3|  7100|dept3|Department - 3|
| HHH|dept3|  3700|dept3|Department - 3|
| III|dept3|  4500|dept3|Department - 3|
+----+-----+------+-----+--------------+



In [36]:
# FULL OUTER
df.join(deptdf, df["dept"] == deptdf["id"], "outer").show()

+----+----+-----+------+-----+--------------+
|  id|name| dept|salary|   id|          name|
+----+----+-----+------+-----+--------------+
|   1| AAA|dept1|  1000|dept1|Department - 1|
|   2| BBB|dept1|  1100|dept1|Department - 1|
|   3| CCC|dept1|  3000|dept1|Department - 1|
|   4| DDD|dept1|  1500|dept1|Department - 1|
|   5| EEE|dept2|  8000|dept2|Department - 2|
|   6| FFF|dept2|  7200|dept2|Department - 2|
|   7| GGG|dept3|  7100|dept3|Department - 3|
|   8| HHH|dept3|  3700|dept3|Department - 3|
|   9| III|dept3|  4500|dept3|Department - 3|
|NULL|NULL| NULL|  NULL|dept4|Department - 4|
|  10| JJJ|dept5|  3400| NULL|          NULL|
+----+----+-----+------+-----+--------------+



## Consultas SQL
+ Se deben crear vistas temporales de los DF si quieremos usar
SQL para poder manipularlos

In [91]:
df.createOrReplaceTempView("temp_table")

spark.sql("SELECT * FROM temp_table LIMIT 1").show()

+----+-----+------+
|name| dept|salary|
+----+-----+------+
| AAA|dept1|  1000|
+----+-----+------+



## Crea un DF a partir de una tabla relacional

In [47]:
jdbc_properties = {
    "user": "kafka_dev",
    "password": "kafka_dev123",
    "driver": "org.postgresql.Driver"
}

In [46]:
# ASÍ TRANSFORMO UNA TABLA DE DB EN UN DF
relational_df = spark.read.jdbc(url = 'jdbc:postgresql://postgres:5432/kafka', 
                                table = 'test.users',
                                properties = jdbc_properties)

In [43]:
relational_df.filter(relational_df['age'] < 24).show()

+---+-------+---+--------------------+
| id|   name|age|         insdatetime|
+---+-------+---+--------------------+
|  1|Gerardo| 23|2024-08-29 17:28:...|
|  3|     Lu| 23|2024-08-29 17:28:...|
|  4|  Naomi| 23|2024-08-29 17:28:...|
|  8| Carlos| 17|2024-08-29 17:28:...|
+---+-------+---+--------------------+



In [45]:
# ASÍ GUARDO UN DATAFRAME EN UNA DB
deptdf.write.jdbc(url = 'jdbc:postgresql://postgres:5432/kafka', table='test.departments',properties=jdbc_properties, mode='overwrite')


### Guardar el DataFrame como una tabla externa HIVE

In [None]:
df.write.saveAsTable("DB_NAME.TBL_NAME", path=<location_of_external_table>)

### Crea un DataFrame a partir de un archivo CSV
* Podemos crear un DataFrame usando un archivo CSV y podemos especificar varias opciones como un separador, encabezado, esquema, inferSchema y varias otras opciones.

In [None]:
 df = spark.read.csv("path_to_csv_file", sep="|", header=True, inferSchema=True)

In [None]:
### Guardar un DataFrame como un archivo CSV

In [None]:
df.write.csv("path_to_CSV_File", sep="|", header=True, mode="overwrite")