## Importar las librerias

In [0]:
from pyspark.sql import SparkSession, Row
from pyspark.sql.types import IntegerType, StringType, FloatType
from pyspark.sql.functions import avg
import pandas as pd
from pyspark.sql.functions import col

## Crear la sesion

In [0]:
spark = SparkSession.builder \
    .master("local") \
    .appName("Ejercicio 3") \
    .getOrCreate()

## Crear los dataframes en pandas y spark

In [0]:
data = [
    Row(PeliculaID=1, Critico='Critico1', Puntuacion=3.2),
    Row(PeliculaID=2, Critico='Critico2', Puntuacion=3.4),
    Row(PeliculaID=3, Critico='Critico3', Puntuacion=4.8),
    Row(PeliculaID=4, Critico='Critico1', Puntuacion=4.0),
    Row(PeliculaID=5, Critico='Critico2', Puntuacion=4.9),
    Row(PeliculaID=6, Critico='Critico3', Puntuacion=2.0),
    Row(PeliculaID=7, Critico='Critico1', Puntuacion=1.5),
    Row(PeliculaID=8, Critico='Critico2', Puntuacion=5.0),
    Row(PeliculaID=9, Critico='Critico3', Puntuacion=1.0),
    Row(PeliculaID=10, Critico='Critico0', Puntuacion=5.0),
    Row(PeliculaID=1, Critico='Critico1', Puntuacion=4.2),
    Row(PeliculaID=2, Critico='Critico2', Puntuacion=2.4),
    Row(PeliculaID=3, Critico='Critico3', Puntuacion=1.8),
    Row(PeliculaID=4, Critico='Critico1', Puntuacion=2.0),
    Row(PeliculaID=5, Critico='Critico2', Puntuacion=3.9),
    Row(PeliculaID=6, Critico='Critico3', Puntuacion=4.0),
    Row(PeliculaID=7, Critico='Critico1', Puntuacion=4.5),
    Row(PeliculaID=8, Critico='Critico2', Puntuacion=3.0),
    Row(PeliculaID=9, Critico='Critico3', Puntuacion=2.0),
    Row(PeliculaID=10, Critico='Critico0', Puntuacion=3.0),
    Row(PeliculaID=1, Critico='Critico1', Puntuacion=3.2),
    Row(PeliculaID=2, Critico='Critico2', Puntuacion=4.4),
    Row(PeliculaID=3, Critico='Critico3', Puntuacion=2.8),
    Row(PeliculaID=4, Critico='Critico1', Puntuacion=3.0),
    Row(PeliculaID=5, Critico='Critico2', Puntuacion=3.9),
    Row(PeliculaID=6, Critico='Critico3', Puntuacion=5.0),
    Row(PeliculaID=7, Critico='Critico1', Puntuacion=3.5),
    Row(PeliculaID=8, Critico='Critico2', Puntuacion=2.0),
    Row(PeliculaID=9, Critico='Critico3', Puntuacion=4.0),
    Row(PeliculaID=10, Critico='Critico0', Puntuacion=5.0)
]

df = spark.createDataFrame(data)
df.show()

+----------+--------+----------+
|PeliculaID| Critico|Puntuacion|
+----------+--------+----------+
|         1|Critico1|       3.2|
|         2|Critico2|       3.4|
|         3|Critico3|       4.8|
|         4|Critico1|       4.0|
|         5|Critico2|       4.9|
|         6|Critico3|       2.0|
|         7|Critico1|       1.5|
|         8|Critico2|       5.0|
|         9|Critico3|       1.0|
|        10|Critico0|       5.0|
|         1|Critico1|       4.2|
|         2|Critico2|       2.4|
|         3|Critico3|       1.8|
|         4|Critico1|       2.0|
|         5|Critico2|       3.9|
|         6|Critico3|       4.0|
|         7|Critico1|       4.5|
|         8|Critico2|       3.0|
|         9|Critico3|       2.0|
|        10|Critico0|       3.0|
+----------+--------+----------+
only showing top 20 rows



In [0]:
pd_df = pd.DataFrame({
    "ID":[1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
    "Titulo":["Pelicula1", "Pelicula2", "Pelicula3", "Pelicula4", "Pelicula5", "Pelicula6", "Pelicula7", "Pelicula8", "Pelicula9","Pelicula10"],
    "Anio":[2020, 2021, 2022, 2019, 2018, 2017, 2023, 2024, 2025, 2019]
})
pd_df

Unnamed: 0,ID,Titulo,Anio
0,1,Pelicula1,2020
1,2,Pelicula2,2021
2,3,Pelicula3,2022
3,4,Pelicula4,2019
4,5,Pelicula5,2018
5,6,Pelicula6,2017
6,7,Pelicula7,2023
7,8,Pelicula8,2024
8,9,Pelicula9,2025
9,10,Pelicula10,2019


## Aplicar un esquema para que se integren los tipos de datos

In [0]:
df = df.withColumn("PeliculaID", col("PeliculaID").cast(IntegerType()))\
    .withColumn("Critico", col("Critico").cast(StringType()))\
    .withColumn("Puntuacion", col("Puntuacion").cast(FloatType()))
df.printSchema()

root
 |-- PeliculaID: integer (nullable = true)
 |-- Critico: string (nullable = true)
 |-- Puntuacion: float (nullable = true)



In [0]:
pd_df = pd_df.astype({"ID":'int64', "Titulo":str, "Anio":'int64'}) 
pd_df = pd_df.rename(columns={"ID":"PeliculaID", "Titulo":"Titulo", "Anio":"Anio"})
pd_df.dtypes

Out[15]: PeliculaID     int64
Titulo        object
Anio           int64
dtype: object

## Integracion de dataframes a Spark

In [0]:
sp_df = spark.createDataFrame(pd_df)
sp_df.show()

+----------+----------+----+
|PeliculaID|    Titulo|Anio|
+----------+----------+----+
|         1| Pelicula1|2020|
|         2| Pelicula2|2021|
|         3| Pelicula3|2022|
|         4| Pelicula4|2019|
|         5| Pelicula5|2018|
|         6| Pelicula6|2017|
|         7| Pelicula7|2023|
|         8| Pelicula8|2024|
|         9| Pelicula9|2025|
|        10|Pelicula10|2019|
+----------+----------+----+



In [0]:
df_pelicula_critica = df.join(sp_df, df["PeliculaID"] == sp_df["PeliculaID"])\
    .select(
        df["PeliculaID"],
        sp_df["Titulo"],
        sp_df["Anio"],
        df["Critico"],
        df["Puntuacion"]
    )
df_pelicula_critica.show()

+----------+----------+----+--------+----------+
|PeliculaID|    Titulo|Anio| Critico|Puntuacion|
+----------+----------+----+--------+----------+
|         1| Pelicula1|2020|Critico1|       3.2|
|         2| Pelicula2|2021|Critico2|       3.4|
|         3| Pelicula3|2022|Critico3|       4.8|
|         4| Pelicula4|2019|Critico1|       4.0|
|         5| Pelicula5|2018|Critico2|       4.9|
|         6| Pelicula6|2017|Critico3|       2.0|
|         7| Pelicula7|2023|Critico1|       1.5|
|         8| Pelicula8|2024|Critico2|       5.0|
|         9| Pelicula9|2025|Critico3|       1.0|
|        10|Pelicula10|2019|Critico0|       5.0|
|         1| Pelicula1|2020|Critico1|       4.2|
|         2| Pelicula2|2021|Critico2|       2.4|
|         3| Pelicula3|2022|Critico3|       1.8|
|         4| Pelicula4|2019|Critico1|       2.0|
|         5| Pelicula5|2018|Critico2|       3.9|
|         6| Pelicula6|2017|Critico3|       4.0|
|         7| Pelicula7|2023|Critico1|       4.5|
|         8| Pelicul

In [0]:
df_pelicula_critica.\
    groupBy("PeliculaID", "Titulo", "Anio").\
    agg(avg("Puntuacion").alias("PuntuacionPromedio"))\
    .show()

+----------+----------+----+------------------+
|PeliculaID|    Titulo|Anio|PuntuacionPromedio|
+----------+----------+----+------------------+
|         2| Pelicula2|2021|3.4000000953674316|
|         1| Pelicula1|2020|3.5333333015441895|
|         3| Pelicula3|2022| 3.133333365122477|
|         6| Pelicula6|2017|3.6666666666666665|
|         4| Pelicula4|2019|               3.0|
|         5| Pelicula5|2018| 4.233333428700765|
|         9| Pelicula9|2025|2.3333333333333335|
|         8| Pelicula8|2024|3.3333333333333335|
|         7| Pelicula7|2023|3.1666666666666665|
|        10|Pelicula10|2019| 4.333333333333333|
+----------+----------+----+------------------+

