**Fundamento de apache spark**

**Nombre:** Victor Cabrera

**Materia:** Marcos de referencia para Big Data

**Docente:** Veronica Chimbo

In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m3.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=0ab08415c7e14deba39051ce57e4f285627fe5d8fd33128393ffeca431b0ddd1
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [2]:
!pip install findspark

Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


In [3]:
import findspark
findspark.init()

In [4]:
import pandas as pd
import pyspark

In [5]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

In [6]:
spark = SparkSession.builder.appName("example").config("spark.serializer", "org.apache.spark.serializer.KryoSerializer").getOrCreate()


In [7]:
spark

# Crear el dataframe

In [8]:
emp = [(1, "AAA", "dept1", 1000),
(2, "BBB", "dept1", 1100),
(3, "CCC", "dept1", 3000),
(4, "DDD", "dept1", 1500),
(5, "EEE", "dept2", 8000),
(6, "FFF", "dept2", 7200),
(7, "GGG", "dept3", 7100),
(8, "HHH", "dept3", 3700),
(9, "III", "dept3", 4500),
(10, "JJJ", "dept5", 3400)]

dept=[("dept1", "Deparment-1"),
("dept2", "Deparment-2"),
("dept3", "Deparment-3"),
("dept4", "Deparment-4")]
df=spark.createDataFrame(emp,["id", "name", "dept", "salary"])
deptdf=spark.createDataFrame(dept, ["id", "name"])


In [9]:
df.show()

+---+----+-----+------+
| id|name| dept|salary|
+---+----+-----+------+
|  1| AAA|dept1|  1000|
|  2| BBB|dept1|  1100|
|  3| CCC|dept1|  3000|
|  4| DDD|dept1|  1500|
|  5| EEE|dept2|  8000|
|  6| FFF|dept2|  7200|
|  7| GGG|dept3|  7100|
|  8| HHH|dept3|  3700|
|  9| III|dept3|  4500|
| 10| JJJ|dept5|  3400|
+---+----+-----+------+



Operaciones basicas

Cuenta el numero de filas

In [10]:
df.count()

10

In [11]:
deptdf.count()

4

Ver columnas de nuestro dataframe

In [12]:
df.columns

['id', 'name', 'dept', 'salary']

In [13]:
deptdf.columns

['id', 'name']

Ver el tipo de dato

In [14]:
df.dtypes

[('id', 'bigint'),
 ('name', 'string'),
 ('dept', 'string'),
 ('salary', 'bigint')]

In [15]:
deptdf.dtypes

[('id', 'string'), ('name', 'string')]

Schema

Comprueba como spark almacena el esquema del DataFRame

In [16]:
df.schema

StructType([StructField('id', LongType(), True), StructField('name', StringType(), True), StructField('dept', StringType(), True), StructField('salary', LongType(), True)])

In [17]:
deptdf.schema

StructType([StructField('id', StringType(), True), StructField('name', StringType(), True)])

print schema

In [18]:
df.printSchema

In [19]:
deptdf.printSchema

Select

Seleccionamos clumnas del DataFrame


In [20]:
df.select("id", "name").show()

+---+----+
| id|name|
+---+----+
|  1| AAA|
|  2| BBB|
|  3| CCC|
|  4| DDD|
|  5| EEE|
|  6| FFF|
|  7| GGG|
|  8| HHH|
|  9| III|
| 10| JJJ|
+---+----+



In [21]:
deptdf.select("name").show()

+-----------+
|       name|
+-----------+
|Deparment-1|
|Deparment-2|
|Deparment-3|
|Deparment-4|
+-----------+



Filter



*   Filtrar las filas
*   Intentamos encontrar filas con id=1.
*   Hay diferentes formas de especificar la condicion.



In [22]:
df.filter(df["id"]==1).show()
df.filter(df.id==1).show()

+---+----+-----+------+
| id|name| dept|salary|
+---+----+-----+------+
|  1| AAA|dept1|  1000|
+---+----+-----+------+

+---+----+-----+------+
| id|name| dept|salary|
+---+----+-----+------+
|  1| AAA|dept1|  1000|
+---+----+-----+------+



In [23]:
deptdf.filter(deptdf.id=="dept3").show()

+-----+-----------+
|   id|       name|
+-----+-----------+
|dept3|Deparment-3|
+-----+-----------+



In [24]:
df.filter(col("id")==1).show()
df.filter("id==1").show()

+---+----+-----+------+
| id|name| dept|salary|
+---+----+-----+------+
|  1| AAA|dept1|  1000|
+---+----+-----+------+

+---+----+-----+------+
| id|name| dept|salary|
+---+----+-----+------+
|  1| AAA|dept1|  1000|
+---+----+-----+------+



drop

Elimina una columna

In [25]:
newdf=df.drop("id")
newdf.show(2)

+----+-----+------+
|name| dept|salary|
+----+-----+------+
| AAA|dept1|  1000|
| BBB|dept1|  1100|
+----+-----+------+
only showing top 2 rows



In [26]:
newdf=deptdf.drop("id")
newdf.show(2)

+-----------+
|       name|
+-----------+
|Deparment-1|
|Deparment-2|
+-----------+
only showing top 2 rows



Usamos la funcion groupby para agrupar lo datos

In [27]:
from pyspark.sql.functions import count, sum, avg, max, min

In [28]:
(df.groupBy("dept") .agg(
      count("salary").alias("count"),
      sum("salary").alias("sum"),
      avg("salary").alias("avg"),
      max("salary").alias("max"),
      min("salary").alias("min") ).show())

+-----+-----+-----+------+----+----+
| dept|count|  sum|   avg| max| min|
+-----+-----+-----+------+----+----+
|dept1|    4| 6600|1650.0|3000|1000|
|dept2|    2|15200|7600.0|8000|7200|
|dept5|    1| 3400|3400.0|3400|3400|
|dept3|    3|15300|5100.0|7100|3700|
+-----+-----+-----+------+----+----+



In [29]:
df.sort("salary").show(5)

+---+----+-----+------+
| id|name| dept|salary|
+---+----+-----+------+
|  1| AAA|dept1|  1000|
|  2| BBB|dept1|  1100|
|  4| DDD|dept1|  1500|
|  3| CCC|dept1|  3000|
| 10| JJJ|dept5|  3400|
+---+----+-----+------+
only showing top 5 rows



In [30]:
# Sort the data in descending order.
df.sort(desc("salary")).show(5)



+---+----+-----+------+
| id|name| dept|salary|
+---+----+-----+------+
|  5| EEE|dept2|  8000|
|  6| FFF|dept2|  7200|
|  7| GGG|dept3|  7100|
|  9| III|dept3|  4500|
|  8| HHH|dept3|  3700|
+---+----+-----+------+
only showing top 5 rows



Columnas derivadas

Usaos la funcion "withcolum" para derivar

In [31]:
df.withColumn("bonus", col("salary")* .5).show()

+---+----+-----+------+------+
| id|name| dept|salary| bonus|
+---+----+-----+------+------+
|  1| AAA|dept1|  1000| 500.0|
|  2| BBB|dept1|  1100| 550.0|
|  3| CCC|dept1|  3000|1500.0|
|  4| DDD|dept1|  1500| 750.0|
|  5| EEE|dept2|  8000|4000.0|
|  6| FFF|dept2|  7200|3600.0|
|  7| GGG|dept3|  7100|3550.0|
|  8| HHH|dept3|  3700|1850.0|
|  9| III|dept3|  4500|2250.0|
| 10| JJJ|dept5|  3400|1700.0|
+---+----+-----+------+------+



Joins

In [32]:
# Inner JOIN.
df.join(deptdf, df["dept"] == deptdf["id"]).show()

+---+----+-----+------+-----+-----------+
| id|name| dept|salary|   id|       name|
+---+----+-----+------+-----+-----------+
|  1| AAA|dept1|  1000|dept1|Deparment-1|
|  2| BBB|dept1|  1100|dept1|Deparment-1|
|  3| CCC|dept1|  3000|dept1|Deparment-1|
|  4| DDD|dept1|  1500|dept1|Deparment-1|
|  5| EEE|dept2|  8000|dept2|Deparment-2|
|  6| FFF|dept2|  7200|dept2|Deparment-2|
|  7| GGG|dept3|  7100|dept3|Deparment-3|
|  8| HHH|dept3|  3700|dept3|Deparment-3|
|  9| III|dept3|  4500|dept3|Deparment-3|
+---+----+-----+------+-----+-----------+



right_outer

In [33]:
df.join(deptdf, df["dept"] == deptdf["id"], "right_outer").show()

+----+----+-----+------+-----+-----------+
|  id|name| dept|salary|   id|       name|
+----+----+-----+------+-----+-----------+
|   4| DDD|dept1|  1500|dept1|Deparment-1|
|   3| CCC|dept1|  3000|dept1|Deparment-1|
|   2| BBB|dept1|  1100|dept1|Deparment-1|
|   1| AAA|dept1|  1000|dept1|Deparment-1|
|   6| FFF|dept2|  7200|dept2|Deparment-2|
|   5| EEE|dept2|  8000|dept2|Deparment-2|
|   9| III|dept3|  4500|dept3|Deparment-3|
|   8| HHH|dept3|  3700|dept3|Deparment-3|
|   7| GGG|dept3|  7100|dept3|Deparment-3|
|NULL|NULL| NULL|  NULL|dept4|Deparment-4|
+----+----+-----+------+-----+-----------+



left_outer

In [34]:
df.join(deptdf, df["dept"] == deptdf["id"], "left_outer").show()



+---+----+-----+------+-----+-----------+
| id|name| dept|salary|   id|       name|
+---+----+-----+------+-----+-----------+
|  1| AAA|dept1|  1000|dept1|Deparment-1|
|  2| BBB|dept1|  1100|dept1|Deparment-1|
|  3| CCC|dept1|  3000|dept1|Deparment-1|
|  4| DDD|dept1|  1500|dept1|Deparment-1|
|  5| EEE|dept2|  8000|dept2|Deparment-2|
| 10| JJJ|dept5|  3400| NULL|       NULL|
|  7| GGG|dept3|  7100|dept3|Deparment-3|
|  8| HHH|dept3|  3700|dept3|Deparment-3|
|  9| III|dept3|  4500|dept3|Deparment-3|
|  6| FFF|dept2|  7200|dept2|Deparment-2|
+---+----+-----+------+-----+-----------+



outer

In [35]:
df.join(deptdf, df["dept"] == deptdf["id"], "outer").show()



+----+----+-----+------+-----+-----------+
|  id|name| dept|salary|   id|       name|
+----+----+-----+------+-----+-----------+
|   1| AAA|dept1|  1000|dept1|Deparment-1|
|   2| BBB|dept1|  1100|dept1|Deparment-1|
|   3| CCC|dept1|  3000|dept1|Deparment-1|
|   4| DDD|dept1|  1500|dept1|Deparment-1|
|   5| EEE|dept2|  8000|dept2|Deparment-2|
|   6| FFF|dept2|  7200|dept2|Deparment-2|
|   7| GGG|dept3|  7100|dept3|Deparment-3|
|   8| HHH|dept3|  3700|dept3|Deparment-3|
|   9| III|dept3|  4500|dept3|Deparment-3|
|NULL|NULL| NULL|  NULL|dept4|Deparment-4|
|  10| JJJ|dept5|  3400| NULL|       NULL|
+----+----+-----+------+-----+-----------+



In [36]:
# Register DataFrame as Temporary Table
df.createOrReplaceTempView("temp_table")
# Execute SQL-Like query.
spark.sql("select * from temp_table where id = 3").show()



+---+----+-----+------+
| id|name| dept|salary|
+---+----+-----+------+
|  3| CCC|dept1|  3000|
+---+----+-----+------+



In [37]:
spark.sql("select distinct id from temp_table").show(10)



+---+
| id|
+---+
|  5|
|  1|
|  3|
|  2|
|  4|
|  7|
|  6|
|  9|
| 10|
|  8|
+---+



In [38]:
spark.sql("select * from temp_table where salary >= 1500").show(10)

+---+----+-----+------+
| id|name| dept|salary|
+---+----+-----+------+
|  3| CCC|dept1|  3000|
|  4| DDD|dept1|  1500|
|  5| EEE|dept2|  8000|
|  6| FFF|dept2|  7200|
|  7| GGG|dept3|  7100|
|  8| HHH|dept3|  3700|
|  9| III|dept3|  4500|
| 10| JJJ|dept5|  3400|
+---+----+-----+------+



Guardar un dataframe como un archivo CSV

In [39]:
df.coalesce(1).write.format("csv").option("header", "true").option("delimiter", ";").save("clientes1")

# Practica 1

In [40]:
from pyspark.sql.types import StructField, StringType, IntegerType

Creammos los esquemas o las tablas

In [41]:
# define el esquema de las tablas
persona_schema = StructType([
StructField("idpersona", IntegerType(), nullable=False),
StructField("nombre", StringType(), nullable=False),
StructField("apellido", StringType(), nullable=False),
StructField("cedula", StringType(), nullable=False),
StructField("celular", StringType(), nullable=False),
])

materia_schema = StructType([
StructField("idmateria", IntegerType(), nullable=False),
StructField("nombre", StringType(), nullable=False),
StructField("num_horas", StringType(), nullable=False)
])

curso_schema = StructType([
StructField("idcurso", IntegerType(), nullable=False),
StructField("idpersona", IntegerType(), nullable=False),
StructField("idmateria", IntegerType(), nullable=False),
StructField("nombre", StringType(), nullable=False),
StructField("nota", StringType(), nullable=False)])

Insertamos datos necesarios en cada una de las tablas

In [42]:
persona_data = [
(1, "Victor", "Cabrera", "0107973687", "0993118236"),
(2, "Maicol", "Lojano", "1401416993", "0994806071"),
(3, "Ariel", "Saquicela", "0107973451", "0993328231"),
(4, "Paul", "Rodriguez", "0103975687", "0983118245"),
(5, "Danny", "Auquilla", "0107993682", "0973118229"),
]

In [43]:
materia_data = [
(1, "Aprendizaje Profundo", "3"),
(2, "Marcos de Referencia para Big Data", "4"),
(3, "Mineria de Datos", "2"),
(4, "Inteligencia de Negocios", "2"),
(5, "Etica Profesional", "1"),
(6, "Emprendimiento", "2")
]

In [44]:
curso_data = [
    (1, 1, 1, "M3A", "9.06"),
    (2, 2, 2, "M3A", "8.27"),
    (3, 3, 3, "M3A", "7.59"),
    (4, 4, 4, "M3A", "6.30"),
    (5, 5, 5, "M3A", "7.21"),
    (6, 6, 6, "M3A", "8.20"),
    (7, 1, 1, "M3A", "7.06"),
    (8, 2, 2, "M3A", "6.27"),
    (9, 3, 3, "M3A", "8.59"),
    (10, 4, 4, "M3A", "9.30"),
    (11, 5, 5, "M3A", "7.21"),
    (12, 6, 6, "M3A", "8.20"),
    (13, 1, 1, "M3A", "6.06"),
    (14, 2, 2, "M3A", "7.27"),
    (15, 3, 3, "M3A", "8.59"),
    (16, 4, 4, "M3A", "9.30"),
    (17, 5, 5, "M3A", "6.21"),
    (18, 6, 6, "M3A", "7.20"),
    (19, 1, 1, "M3A", "8.06"),
    (20, 2, 2, "M3A", "7.27"),
    (21, 3, 3, "M3A", "8.59"),
    (22, 4, 4, "M3A", "7.30"),
    (23, 5, 5, "M3A", "8.21"),
    (24, 6, 6, "M3A", "7.20"),
    (25, 1, 1, "M3A", "8.06"),
    (26, 2, 2, "M3A", "9.27"),
    (27, 3, 3, "M3A", "8.59"),
    (28, 4, 4, "M3A", "7.30"),
    (29, 5, 5, "M3A", "9.21"),
    (30, 6, 6, "M3A", "8.20"),
]

In [45]:
df_persona = spark.createDataFrame(persona_data, schema=persona_schema)
df_materia = spark.createDataFrame(materia_data, schema=materia_schema)
df_curso = spark.createDataFrame(curso_data, schema=curso_schema)

In [46]:
df_persona.printSchema()
df_persona.show()

root
 |-- idpersona: integer (nullable = false)
 |-- nombre: string (nullable = false)
 |-- apellido: string (nullable = false)
 |-- cedula: string (nullable = false)
 |-- celular: string (nullable = false)

+---------+------+---------+----------+----------+
|idpersona|nombre| apellido|    cedula|   celular|
+---------+------+---------+----------+----------+
|        1|Victor|  Cabrera|0107973687|0993118236|
|        2|Maicol|   Lojano|1401416993|0994806071|
|        3| Ariel|Saquicela|0107973451|0993328231|
|        4|  Paul|Rodriguez|0103975687|0983118245|
|        5| Danny| Auquilla|0107993682|0973118229|
+---------+------+---------+----------+----------+



In [47]:
df_persona.count()

5

In [48]:
df_materia.count()

6

In [49]:
df_curso.count()

30

In [50]:
df_materia.printSchema()
df_materia.show()

root
 |-- idmateria: integer (nullable = false)
 |-- nombre: string (nullable = false)
 |-- num_horas: string (nullable = false)

+---------+--------------------+---------+
|idmateria|              nombre|num_horas|
+---------+--------------------+---------+
|        1|Aprendizaje Profundo|        3|
|        2|Marcos de Referen...|        4|
|        3|    Mineria de Datos|        2|
|        4|Inteligencia de N...|        2|
|        5|   Etica Profesional|        1|
|        6|      Emprendimiento|        2|
+---------+--------------------+---------+



In [51]:
df_curso.printSchema()
df_curso.show()

root
 |-- idcurso: integer (nullable = false)
 |-- idpersona: integer (nullable = false)
 |-- idmateria: integer (nullable = false)
 |-- nombre: string (nullable = false)
 |-- nota: string (nullable = false)

+-------+---------+---------+------+----+
|idcurso|idpersona|idmateria|nombre|nota|
+-------+---------+---------+------+----+
|      1|        1|        1|   M3A|9.06|
|      2|        2|        2|   M3A|8.27|
|      3|        3|        3|   M3A|7.59|
|      4|        4|        4|   M3A|6.30|
|      5|        5|        5|   M3A|7.21|
|      6|        6|        6|   M3A|8.20|
|      7|        1|        1|   M3A|7.06|
|      8|        2|        2|   M3A|6.27|
|      9|        3|        3|   M3A|8.59|
|     10|        4|        4|   M3A|9.30|
|     11|        5|        5|   M3A|7.21|
|     12|        6|        6|   M3A|8.20|
|     13|        1|        1|   M3A|6.06|
|     14|        2|        2|   M3A|7.27|
|     15|        3|        3|   M3A|8.59|
|     16|        4|        4|   M3A

In [52]:
df_persona.dtypes

[('idpersona', 'int'),
 ('nombre', 'string'),
 ('apellido', 'string'),
 ('cedula', 'string'),
 ('celular', 'string')]

In [53]:
df_materia.dtypes

[('idmateria', 'int'), ('nombre', 'string'), ('num_horas', 'string')]

In [54]:
df_curso.dtypes

[('idcurso', 'int'),
 ('idpersona', 'int'),
 ('idmateria', 'int'),
 ('nombre', 'string'),
 ('nota', 'string')]

In [70]:
df_persona.schema

StructType([StructField('idpersona', IntegerType(), False), StructField('nombre', StringType(), False), StructField('apellido', StringType(), False), StructField('cedula', StringType(), False), StructField('celular', StringType(), False)])

In [56]:
df_persona.printSchema

Agrupar

In [57]:
from pyspark.sql.functions import count, sum, avg, max, min

In [58]:
# Registrar DataFrame como tabla temporal
df_curso.createOrReplaceTempView("curso_temp")

# Ejecutar consulta SQL para calcular métricas sobre la columna 'nota' agrupadas por 'idcurso'
consulta_sql = """
    SELECT idcurso,
           COUNT(*) AS count,
           SUM(cast(nota as float)) AS sum_notas,
           AVG(cast(nota as float)) AS avg_notas,
           MAX(cast(nota as float)) AS max_nota,
           MIN(cast(nota as float)) AS min_nota
    FROM curso_temp
    GROUP BY idcurso
"""

# Ejecutar la consulta SQL y mostrar el resultado
resultado = spark.sql(consulta_sql)
resultado.show()


+-------+-----+-----------------+-----------------+--------+--------+
|idcurso|count|        sum_notas|        avg_notas|max_nota|min_nota|
+-------+-----+-----------------+-----------------+--------+--------+
|     12|    1|8.199999809265137|8.199999809265137|     8.2|     8.2|
|      1|    1|  9.0600004196167|  9.0600004196167|    9.06|    9.06|
|     13|    1|6.059999942779541|6.059999942779541|    6.06|    6.06|
|      6|    1|8.199999809265137|8.199999809265137|     8.2|     8.2|
|      3|    1|7.590000152587891|7.590000152587891|    7.59|    7.59|
|      5|    1|7.210000038146973|7.210000038146973|    7.21|    7.21|
|     15|    1| 8.59000015258789| 8.59000015258789|    8.59|    8.59|
|      9|    1| 8.59000015258789| 8.59000015258789|    8.59|    8.59|
|      4|    1|6.300000190734863|6.300000190734863|     6.3|     6.3|
|      8|    1|6.269999980926514|6.269999980926514|    6.27|    6.27|
|      7|    1|7.059999942779541|7.059999942779541|    7.06|    7.06|
|     10|    1|9.300

In [59]:
df_curso.sort("nota").show(5)

+-------+---------+---------+------+----+
|idcurso|idpersona|idmateria|nombre|nota|
+-------+---------+---------+------+----+
|     13|        1|        1|   M3A|6.06|
|     17|        5|        5|   M3A|6.21|
|      8|        2|        2|   M3A|6.27|
|      4|        4|        4|   M3A|6.30|
|      7|        1|        1|   M3A|7.06|
+-------+---------+---------+------+----+
only showing top 5 rows



In [60]:
# Sort the data in descending order.
df_curso.sort(desc("nota")).show(5)

+-------+---------+---------+------+----+
|idcurso|idpersona|idmateria|nombre|nota|
+-------+---------+---------+------+----+
|     10|        4|        4|   M3A|9.30|
|     16|        4|        4|   M3A|9.30|
|     26|        2|        2|   M3A|9.27|
|     29|        5|        5|   M3A|9.21|
|      1|        1|        1|   M3A|9.06|
+-------+---------+---------+------+----+
only showing top 5 rows



Joins

In [61]:
# Realizar el join entre los DataFrames
df_resultado = df_persona.join(df_curso, df_persona["idpersona"] == df_curso["idpersona"]) \
                         .join(df_materia, df_curso["idmateria"] == df_materia["idmateria"]) \
                         .select(df_curso["idcurso"], df_persona["idpersona"], df_materia["nombre"].alias("nombre_materia"),
                                 df_persona["nombre"], df_persona["apellido"], df_curso["nombre"].alias("nombre_curso"),
                                 df_curso["nota"]).show()

+-------+---------+--------------------+------+---------+------------+----+
|idcurso|idpersona|      nombre_materia|nombre| apellido|nombre_curso|nota|
+-------+---------+--------------------+------+---------+------------+----+
|     25|        1|Aprendizaje Profundo|Victor|  Cabrera|         M3A|8.06|
|     19|        1|Aprendizaje Profundo|Victor|  Cabrera|         M3A|8.06|
|     13|        1|Aprendizaje Profundo|Victor|  Cabrera|         M3A|6.06|
|      7|        1|Aprendizaje Profundo|Victor|  Cabrera|         M3A|7.06|
|      1|        1|Aprendizaje Profundo|Victor|  Cabrera|         M3A|9.06|
|     27|        3|    Mineria de Datos| Ariel|Saquicela|         M3A|8.59|
|     21|        3|    Mineria de Datos| Ariel|Saquicela|         M3A|8.59|
|     15|        3|    Mineria de Datos| Ariel|Saquicela|         M3A|8.59|
|      9|        3|    Mineria de Datos| Ariel|Saquicela|         M3A|8.59|
|      3|        3|    Mineria de Datos| Ariel|Saquicela|         M3A|7.59|
|     26|   

right_outer

In [62]:
# Realizar el join right_outer entre los DataFrames
df_resultado = df_persona.join(df_curso, df_persona["idpersona"] == df_curso["idpersona"], "right_outer") \
                         .select(df_persona["idpersona"], df_persona["nombre"], df_persona["apellido"],
                                 df_curso["idcurso"], df_curso["idmateria"], df_curso["nombre"].alias("nombre_curso"),
                                 df_curso["nota"]).show()

+---------+------+---------+-------+---------+------------+----+
|idpersona|nombre| apellido|idcurso|idmateria|nombre_curso|nota|
+---------+------+---------+-------+---------+------------+----+
|        1|Victor|  Cabrera|      1|        1|         M3A|9.06|
|        1|Victor|  Cabrera|      7|        1|         M3A|7.06|
|        1|Victor|  Cabrera|     13|        1|         M3A|6.06|
|     NULL|  NULL|     NULL|      6|        6|         M3A|8.20|
|     NULL|  NULL|     NULL|     12|        6|         M3A|8.20|
|        3| Ariel|Saquicela|      3|        3|         M3A|7.59|
|        3| Ariel|Saquicela|      9|        3|         M3A|8.59|
|        3| Ariel|Saquicela|     15|        3|         M3A|8.59|
|        5| Danny| Auquilla|      5|        5|         M3A|7.21|
|        5| Danny| Auquilla|     11|        5|         M3A|7.21|
|        4|  Paul|Rodriguez|      4|        4|         M3A|6.30|
|        4|  Paul|Rodriguez|     10|        4|         M3A|9.30|
|        2|Maicol|   Loja

left_outer

In [63]:
# Realizar el join left_outer entre los DataFrames
df_resultado = df_persona.join(df_curso, df_persona["idpersona"] == df_curso["idpersona"], "left_outer") \
                         .select(df_persona["idpersona"], df_persona["nombre"], df_persona["apellido"],
                                 df_curso["idcurso"], df_curso["idmateria"], df_curso["nombre"].alias("nombre_curso"),
                                 df_curso["nota"]).show()

+---------+------+---------+-------+---------+------------+----+
|idpersona|nombre| apellido|idcurso|idmateria|nombre_curso|nota|
+---------+------+---------+-------+---------+------------+----+
|        1|Victor|  Cabrera|     25|        1|         M3A|8.06|
|        1|Victor|  Cabrera|     19|        1|         M3A|8.06|
|        1|Victor|  Cabrera|     13|        1|         M3A|6.06|
|        1|Victor|  Cabrera|      7|        1|         M3A|7.06|
|        1|Victor|  Cabrera|      1|        1|         M3A|9.06|
|        2|Maicol|   Lojano|     26|        2|         M3A|9.27|
|        2|Maicol|   Lojano|     20|        2|         M3A|7.27|
|        2|Maicol|   Lojano|     14|        2|         M3A|7.27|
|        2|Maicol|   Lojano|      8|        2|         M3A|6.27|
|        2|Maicol|   Lojano|      2|        2|         M3A|8.27|
|        3| Ariel|Saquicela|     27|        3|         M3A|8.59|
|        3| Ariel|Saquicela|     21|        3|         M3A|8.59|
|        3| Ariel|Saquice

Outer

In [64]:
# Realizar el join outer entre los DataFrames
df_resultado = df_persona.join(df_curso, df_persona["idpersona"] == df_curso["idpersona"], "outer") \
                         .select(df_persona["idpersona"], df_persona["nombre"], df_persona["apellido"],
                                 df_curso["idcurso"], df_curso["idmateria"], df_curso["nombre"].alias("nombre_curso"),
                                 df_curso["nota"]).show()

+---------+------+---------+-------+---------+------------+----+
|idpersona|nombre| apellido|idcurso|idmateria|nombre_curso|nota|
+---------+------+---------+-------+---------+------------+----+
|        1|Victor|  Cabrera|      1|        1|         M3A|9.06|
|        1|Victor|  Cabrera|      7|        1|         M3A|7.06|
|        1|Victor|  Cabrera|     13|        1|         M3A|6.06|
|        1|Victor|  Cabrera|     19|        1|         M3A|8.06|
|        1|Victor|  Cabrera|     25|        1|         M3A|8.06|
|        2|Maicol|   Lojano|      2|        2|         M3A|8.27|
|        2|Maicol|   Lojano|      8|        2|         M3A|6.27|
|        2|Maicol|   Lojano|     14|        2|         M3A|7.27|
|        2|Maicol|   Lojano|     20|        2|         M3A|7.27|
|        2|Maicol|   Lojano|     26|        2|         M3A|9.27|
|        3| Ariel|Saquicela|      3|        3|         M3A|7.59|
|        3| Ariel|Saquicela|      9|        3|         M3A|8.59|
|        3| Ariel|Saquice

In [65]:
# Registrar DataFrame como tabla temporal
df_persona.createOrReplaceTempView("persona_temp")
df_materia.createOrReplaceTempView("materia_temp")
df_curso.createOrReplaceTempView("curso_temp")

# Ejecutar consulta SQL-like
consulta_sql = """
    SELECT p.idpersona, p.nombre, p.apellido, c.idcurso, c.idmateria, c.nombre AS nombre_curso, c.nota
    FROM persona_temp p
    JOIN curso_temp c ON p.idpersona = c.idpersona
    WHERE p.idpersona = 3
"""

resultado = spark.sql(consulta_sql)

# Mostrar el resultado
resultado.show()


+---------+------+---------+-------+---------+------------+----+
|idpersona|nombre| apellido|idcurso|idmateria|nombre_curso|nota|
+---------+------+---------+-------+---------+------------+----+
|        3| Ariel|Saquicela|      3|        3|         M3A|7.59|
|        3| Ariel|Saquicela|      9|        3|         M3A|8.59|
|        3| Ariel|Saquicela|     15|        3|         M3A|8.59|
|        3| Ariel|Saquicela|     21|        3|         M3A|8.59|
|        3| Ariel|Saquicela|     27|        3|         M3A|8.59|
+---------+------+---------+-------+---------+------------+----+



In [66]:
# Registrar DataFrame como tabla temporal
df_persona.createOrReplaceTempView("persona_temp")
df_materia.createOrReplaceTempView("materia_temp")
df_curso.createOrReplaceTempView("curso_temp")

# Ejecutar consulta SQL para obtener valores distintos de 'id' de la tabla temporal
spark.sql("SELECT DISTINCT idcurso FROM curso_temp").show(10)



+-------+
|idcurso|
+-------+
|     12|
|      1|
|     13|
|      6|
|      3|
|      5|
|     15|
|      9|
|      4|
|      8|
+-------+
only showing top 10 rows



In [67]:
spark.sql("SELECT * FROM curso_temp WHERE nota >= 7").show(10)


+-------+---------+---------+------+----+
|idcurso|idpersona|idmateria|nombre|nota|
+-------+---------+---------+------+----+
|      1|        1|        1|   M3A|9.06|
|      2|        2|        2|   M3A|8.27|
|      3|        3|        3|   M3A|7.59|
|      5|        5|        5|   M3A|7.21|
|      6|        6|        6|   M3A|8.20|
|      7|        1|        1|   M3A|7.06|
|      9|        3|        3|   M3A|8.59|
|     10|        4|        4|   M3A|9.30|
|     11|        5|        5|   M3A|7.21|
|     12|        6|        6|   M3A|8.20|
+-------+---------+---------+------+----+
only showing top 10 rows



Guardar un dataframe como un archivo csv

In [68]:
df.coalesce(1).write.format("csv").option("header", "true").option("delimiter", ";").save("persona2")