# Ejercicio aplicado de DataFrames y Spark SQL

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import findspark, pyspark
findspark.init()

In [3]:
spark_core = SparkSession.builder.getOrCreate()

In [18]:
data = [(1, "AAA", "dept1", 1000),
        (2, "BBB", "dept1", 1100),
        (3, "CCC", "dept1", 3000),
        (4, "DDD", "dept1", 1500),
        (5, "EEE", "dept2", 8000),
        (6, "FFF", "dept2", 7200),
        (7, "GGG", "dept3", 7100),
        (8, "HHH", "dept3", 3700),
        (9, "III", "dept3", 4500),
        (10, "JJJ", "dept5", 3400),
        (11, "KKK", "dept5", 3100),
        (12, "KFK", "dept5", 3100),
        (13, "KKF", "dept5", 3100),
        (14, "KBV", "dept6", 4100)]

dept = [("dept1", "Departament - 1"),
        ("dept2", "Departament - 2"),
        ("dept3", "Departament - 3"),
        ("dept4", "Departament - 4")]

df = spark_core.createDataFrame(data, ["id", "name", "dept", "salary"])
df2 = spark_core.createDataFrame(dept, ["id","name"])

In [5]:
df.show()

+---+----+-----+------+
| id|name| dept|salary|
+---+----+-----+------+
|  1| AAA|dept1|  1000|
|  2| BBB|dept1|  1100|
|  3| CCC|dept1|  3000|
|  4| DDD|dept1|  1500|
|  5| EEE|dept2|  8000|
|  6| FFF|dept2|  7200|
|  7| GGG|dept3|  7100|
|  8| HHH|dept3|  3700|
|  9| III|dept3|  4500|
| 10| JJJ|dept5|  3400|
+---+----+-----+------+



In [6]:
df2.show()

+-----+---------------+
|   id|           name|
+-----+---------------+
|dept1|Departament - 1|
|dept2|Departament - 2|
|dept3|Departament - 3|
|dept4|Departament - 4|
+-----+---------------+



# Joins
![Join](https://miro.medium.com/v2/resize:fit:446/0*-Y8lKRffc5K6Cvxx.png) ![Join2](https://miro.medium.com/v2/resize:fit:1200/1*Lb3WTGX-N6HunApw-jT16g.png)


##### Inner Join

In [7]:
df.join(df2, df["dept"] == df2["id"]).show()

+---+----+-----+------+-----+---------------+
| id|name| dept|salary|   id|           name|
+---+----+-----+------+-----+---------------+
|  1| AAA|dept1|  1000|dept1|Departament - 1|
|  2| BBB|dept1|  1100|dept1|Departament - 1|
|  3| CCC|dept1|  3000|dept1|Departament - 1|
|  4| DDD|dept1|  1500|dept1|Departament - 1|
|  5| EEE|dept2|  8000|dept2|Departament - 2|
|  6| FFF|dept2|  7200|dept2|Departament - 2|
|  7| GGG|dept3|  7100|dept3|Departament - 3|
|  8| HHH|dept3|  3700|dept3|Departament - 3|
|  9| III|dept3|  4500|dept3|Departament - 3|
+---+----+-----+------+-----+---------------+



##### Left Outer Join

In [8]:
df.join(df2, df["dept"] == df2["id"], "left_outer").show()

+---+----+-----+------+-----+---------------+
| id|name| dept|salary|   id|           name|
+---+----+-----+------+-----+---------------+
|  1| AAA|dept1|  1000|dept1|Departament - 1|
|  2| BBB|dept1|  1100|dept1|Departament - 1|
|  3| CCC|dept1|  3000|dept1|Departament - 1|
|  4| DDD|dept1|  1500|dept1|Departament - 1|
|  5| EEE|dept2|  8000|dept2|Departament - 2|
| 10| JJJ|dept5|  3400| null|           null|
|  7| GGG|dept3|  7100|dept3|Departament - 3|
|  8| HHH|dept3|  3700|dept3|Departament - 3|
|  9| III|dept3|  4500|dept3|Departament - 3|
|  6| FFF|dept2|  7200|dept2|Departament - 2|
+---+----+-----+------+-----+---------------+



##### Right Outer Join

In [9]:
df.join(df2, df["dept"] == df2["id"], "right_outer").show()

+----+----+-----+------+-----+---------------+
|  id|name| dept|salary|   id|           name|
+----+----+-----+------+-----+---------------+
|   4| DDD|dept1|  1500|dept1|Departament - 1|
|   3| CCC|dept1|  3000|dept1|Departament - 1|
|   2| BBB|dept1|  1100|dept1|Departament - 1|
|   1| AAA|dept1|  1000|dept1|Departament - 1|
|   6| FFF|dept2|  7200|dept2|Departament - 2|
|   5| EEE|dept2|  8000|dept2|Departament - 2|
|   9| III|dept3|  4500|dept3|Departament - 3|
|   8| HHH|dept3|  3700|dept3|Departament - 3|
|   7| GGG|dept3|  7100|dept3|Departament - 3|
|null|null| null|  null|dept4|Departament - 4|
+----+----+-----+------+-----+---------------+



##### Full Outer Join

In [10]:
df.join(df2, df["dept"] == df2["id"], "outer").show()

+----+----+-----+------+-----+---------------+
|  id|name| dept|salary|   id|           name|
+----+----+-----+------+-----+---------------+
|   1| AAA|dept1|  1000|dept1|Departament - 1|
|   2| BBB|dept1|  1100|dept1|Departament - 1|
|   3| CCC|dept1|  3000|dept1|Departament - 1|
|   4| DDD|dept1|  1500|dept1|Departament - 1|
|   5| EEE|dept2|  8000|dept2|Departament - 2|
|   6| FFF|dept2|  7200|dept2|Departament - 2|
|   7| GGG|dept3|  7100|dept3|Departament - 3|
|   8| HHH|dept3|  3700|dept3|Departament - 3|
|   9| III|dept3|  4500|dept3|Departament - 3|
|null|null| null|  null|dept4|Departament - 4|
|  10| JJJ|dept5|  3400| null|           null|
+----+----+-----+------+-----+---------------+



##### Left Join

In [19]:
df.join(df2, df["dept"] == df2["id"], "left").show()

+---+----+-----+------+-----+---------------+
| id|name| dept|salary|   id|           name|
+---+----+-----+------+-----+---------------+
|  7| GGG|dept3|  7100|dept3|Departament - 3|
|  1| AAA|dept1|  1000|dept1|Departament - 1|
|  2| BBB|dept1|  1100|dept1|Departament - 1|
|  3| CCC|dept1|  3000|dept1|Departament - 1|
|  4| DDD|dept1|  1500|dept1|Departament - 1|
|  5| EEE|dept2|  8000|dept2|Departament - 2|
|  6| FFF|dept2|  7200|dept2|Departament - 2|
| 10| JJJ|dept5|  3400| null|           null|
| 11| KKK|dept5|  3100| null|           null|
| 12| KFK|dept5|  3100| null|           null|
| 13| KKF|dept5|  3100| null|           null|
|  8| HHH|dept3|  3700|dept3|Departament - 3|
|  9| III|dept3|  4500|dept3|Departament - 3|
| 14| KBV|dept6|  4100| null|           null|
+---+----+-----+------+-----+---------------+



##### Right Join

In [20]:
df.join(df2, df["dept"] == df2["id"], "right").show()

+----+----+-----+------+-----+---------------+
|  id|name| dept|salary|   id|           name|
+----+----+-----+------+-----+---------------+
|   4| DDD|dept1|  1500|dept1|Departament - 1|
|   3| CCC|dept1|  3000|dept1|Departament - 1|
|   2| BBB|dept1|  1100|dept1|Departament - 1|
|   1| AAA|dept1|  1000|dept1|Departament - 1|
|   6| FFF|dept2|  7200|dept2|Departament - 2|
|   5| EEE|dept2|  8000|dept2|Departament - 2|
|   9| III|dept3|  4500|dept3|Departament - 3|
|   8| HHH|dept3|  3700|dept3|Departament - 3|
|   7| GGG|dept3|  7100|dept3|Departament - 3|
|null|null| null|  null|dept4|Departament - 4|
+----+----+-----+------+-----+---------------+



##### Consultas SQL
- Ejecucion de consulta de tipo SQL
- Tambien podemos realizar analisis de datos escribiendo consultas similares a SQL. Para realizar consultas similares a SQL, necesitamos registrar el dataframe como una vista temporal

In [13]:
# Register of Dataframe as Temporary Table (temp_table)
df.createOrReplaceTempView("temp_table")

# Execute SQL like query
spark_core.sql("select * from temp_table where id = 1").show()

+---+----+-----+------+
| id|name| dept|salary|
+---+----+-----+------+
|  1| AAA|dept1|  1000|
+---+----+-----+------+



In [14]:
spark_core.sql("select distinct id from temp_table").show()

+---+
| id|
+---+
|  5|
|  1|
|  3|
|  2|
|  4|
|  7|
|  6|
|  9|
| 10|
|  8|
+---+



In [16]:
spark_core.sql("select * from temp_table where salary >= 1500").show()

+---+----+-----+------+
| id|name| dept|salary|
+---+----+-----+------+
|  3| CCC|dept1|  3000|
|  4| DDD|dept1|  1500|
|  5| EEE|dept2|  8000|
|  6| FFF|dept2|  7200|
|  7| GGG|dept3|  7100|
|  8| HHH|dept3|  3700|
|  9| III|dept3|  4500|
| 10| JJJ|dept5|  3400|
+---+----+-----+------+



In [17]:
spark_core.sql("select * from temp_table where salary >= 7000").show()

+---+----+-----+------+
| id|name| dept|salary|
+---+----+-----+------+
|  5| EEE|dept2|  8000|
|  6| FFF|dept2|  7200|
|  7| GGG|dept3|  7100|
+---+----+-----+------+



##### Leyendo la tabla HIVE como Dataframe

In [None]:
# DB_NAME : Name of the HIVE Database
# TBL_Name : Name of the HIVE Table

df = spark_core.table("DB_NAME", "TBL_NAME")

##### Guardar el Dataframe como una tabla externa HIVE

In [None]:
df.write.saveAsTable("DB_NAME.TBL_NAME", path=<location_of_external_table>)

##### Crea un Dataframe a partir de un archivo CSV

In [None]:
df = spark_core.read.csv("path_to_csv_file", sep="|", header= True, inferSchema=True)

##### Guarda un Dataframe como un archivo CSV

In [None]:
df.write.csv("path_to_csv_file", sep="|", header= True, mode="overwrite")

##### Crea un Dataframe a partir de una tabla relacional

In [None]:
# url : JDBC URL of the form jdbc:subprotocol:subname
# TBL_NAME : Name of the relational table
# USER_NAME : User name to connect to Database
# PASSWORD: Password to connect to Database

relational_df = spark_core.read.format('jdbc') /
    .options(url = url, dbtable = <TBL_NAME>, user = <USER_NAME>, password = <PASSWORD>) /
    .load()

##### Guardar el Dataframe como una tabla relacional
- Podemos guardar el Dataframe como una tabla relacional usando una URL JDBC

In [None]:
relational_df.write.format('jdbc') \
    .options(url = url, dbtable = <TBL_NAME>, user = <USER_NAME>, password = <PASSWORD>) \
    .mode('overwrite') \
    .save()