# Examen PySpark

Instrucciones: Lea cuidadosamente las preguntas, escriba el código correspondiente y ejecútelo para mostrar sus resultados.

#### Importante: Todos los ejercicios deberán realizarse con funciones de NumPy, Pandas o PySpark (no podrán crearse vistas temporales para realizarse en SQL, salvo que se indique lo contrario).

## Bloque 1: Spark Core

1.1 Utilizando NumPy, construya un arreglo con 50 elementos aleatorios distribuidos de forma normal con media 50 y desviación estándar 10. Imprima el arreglo.

In [1]:
import numpy as np

In [2]:
media = 50
desv_estandar = 10
n = 50

arreglo = np.random.normal(loc=media, scale=desv_estandar, size=n)
print(arreglo)

[51.41211895 41.30382136 49.40267228 43.74319984 44.9896942  48.86832578
 35.35454696 48.39321791 61.81990937 57.16836919 51.26872697 48.16641319
 41.04109791 35.2360609  61.51464773 68.49301323 49.94368585 50.27991755
 79.61585714 50.11499132 52.26859163 38.16070342 22.67376466 68.72406406
 27.64495173 19.75590009 43.71880852 36.57553935 64.08030082 39.46789899
 53.28395941 26.58124005 63.12519008 57.1151237  43.0041912  42.57461162
 60.11752267 62.15897516 53.85281622 49.03915124 74.47144248 46.5119436
 43.30367002 71.66917706 49.88421323 55.88142623 35.71516196 33.00220337
 42.27604202 48.06667009]


1.2. Construya el objeto de Spark (Core) que le permita trabajar con objetos RDD.

In [3]:
!pip install pyspark


Collecting pyspark
  Downloading pyspark-3.4.1.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m1.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.1-py2.py3-none-any.whl size=311285398 sha256=da989016e11554faaff370a03137829cc5f5cbc9cc50ce4a16e0dd50b45cde7d
  Stored in directory: /root/.cache/pip/wheels/0d/77/a3/ff2f74cc9ab41f8f594dabf0579c2a7c6de920d584206e0834
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.1


In [4]:
from pyspark import SparkContext
from pyspark.sql import SparkSession

In [5]:
sc = SparkContext("local", "testSpark")

In [6]:
#sc.stop()

1.3. Convierta el arreglo de NumPy a un RDD con 2 particiones. Muestre los
primeros 5 elementos.

In [7]:
# Convertir el arreglo de NumPy en un RDD con 2 particiones
rdd_2 = sc.parallelize(arreglo, 2)
print(rdd_2.take(5))


[51.41211895005393, 41.30382135624631, 49.40267228330854, 43.74319983860883, 44.98969420089373]


1.4. Suponiendo que los datos de la lista miden grados Fahrenheit, aplique una función lambda al RDD que convierta las mediciones a grados Centígrados. Muestre los primeros 5 elementos.

C = (F - 32) * 5 / 9

In [8]:
f_to_c = lambda f: (f - 32) * 5/9

rdd_celsius = rdd_2.map(f_to_c)
print(rdd_celsius.take(5))

[10.784510527807738, 5.168789642359061, 9.668151268504744, 6.523999910338239, 7.216496778274294]


1.5. Utilice una función Lambda para mostrar únicamente las temperaturas mayores a 15 grados Centigrados.

In [9]:
# lambda para filtrar las temperaturas mayores a 15 grados Celsius
t_mayores_15C = rdd_celsius.filter(lambda temp: temp > 15)
print(t_mayores_15C.collect())

[16.56661631657016, 16.39702651869861, 20.27389623848664, 26.453253967414, 20.402257812256305, 17.822389343971327, 17.29177226854383, 15.620845928891644, 16.75498619925976, 23.59524582369334, 22.038431701485734]


1.6. Calcule la temperatura media en grados Centígrados.

In [10]:
# calcular la temperatura media ¡
t_media_celsius = rdd_celsius.mean()
print(t_media_celsius)

9.36483935913651


1.7. Obtenga las 3 temperaturas más altas en grados Centígrados.

In [11]:
t_mas_altas = rdd_celsius.top(3)
print(t_mas_altas)

[26.453253967414, 23.59524582369334, 22.038431701485734]


## Bloque 2: Spark SQL

2.1. Utilizando Numpy, construya un arreglo con 50 números enteros entre 1 y 3 (1 y 3 incluidos).

In [12]:
arreglo_2 = np.random.randint(1, 4, 50)
print(arreglo_2)

[3 1 2 1 2 1 2 3 1 2 3 2 1 2 3 2 3 3 3 2 1 3 3 3 2 2 3 2 2 2 3 2 1 2 3 2 2
 2 3 3 3 1 3 1 2 2 3 1 1 1]


2.2. Construya un dataframe en Pandas utilizando los arreglos de 2.1 y 1.1. Asigne los nombres "dia" y "temp". Muestre los primeros 5 elementos.

In [13]:
import pandas as pd

In [14]:
df = pd.DataFrame({"dia": arreglo_2, "temp": arreglo})
print(df.head())

   dia       temp
0    3  51.412119
1    1  41.303821
2    2  49.402672
3    1  43.743200
4    2  44.989694


2.3. Construya el objeto de Spark (SQL) que le permita trabajar con los dataframes de Spark.

In [15]:
spark = SparkSession.builder.appName("testEx2").getOrCreate()

2.4. Convierta el dataframe de Pandas a un dataframe de Spark, definiendo explícitamente el esquema/estructura (utilice el tipo entero para el día y el tipo doble para la temperatura). Muestre los primeros 5 registros.

In [16]:
# convertir pandas df en un Spark df
df_spark = spark.createDataFrame(df)
df_spark.show(5)

+---+-----------------+
|dia|             temp|
+---+-----------------+
|  3|51.41211895005393|
|  1|41.30382135624631|
|  2|49.40267228330854|
|  1|43.74319983860883|
|  2|44.98969420089373|
+---+-----------------+
only showing top 5 rows



2.5. Partiendo del dataframe en Spark, construya un dataframe con el promedio de temperatura agrupado por día. El dataframe deberá contener únicamente las columnas "dia" y "temp_prom" (con esos nombres). Muestre la tabla resultante.

In [17]:
df_temp_pr_bc = df_spark.groupBy("dia").agg({"temp": "avg"})
df_temp_prom = df_temp_pr_bc.withColumnRenamed("avg(temp)", "temp_prom")
df_temp_prom.show()

+---+------------------+
|dia|         temp_prom|
+---+------------------+
|  1| 49.47468100835718|
|  3| 51.02088362811127|
|  2|46.538173245799854|
+---+------------------+



2.6. Repita el ejercicio anterior registrando una vista temporal y ejecutando el código SQL correspondiente. Muestre la tabla resultante.

In [18]:
from pyspark.sql import SparkSession

In [19]:
# registrar el df como una vista temporal
df_spark.createOrReplaceTempView("tabla_t")

#
sql = """
SELECT dia, AVG(temp) AS temp_pers
FROM tabla_t
GROUP BY dia
"""

consulta = spark.sql(sql)
consulta.show()


+---+------------------+
|dia|         temp_pers|
+---+------------------+
|  1| 49.47468100835718|
|  3| 51.02088362811127|
|  2|46.538173245799854|
+---+------------------+



2.7. Combine los valores del dataframe anterior con el original. El dataframe resultante no deberá contener columnas repetidas y tendrá que estar ordenado de forma ascendente por día y temperatura. Muestre los primeros 5 elementos.

In [20]:
from pyspark.sql.functions import col


In [21]:
# 1 df: df_temp_prom
# 2 df: consulta

joined_df = df_temp_prom.join(consulta, on="dia", how="inner").orderBy("dia")
joined_df.show()

+---+------------------+------------------+
|dia|         temp_prom|         temp_pers|
+---+------------------+------------------+
|  1| 49.47468100835718| 49.47468100835718|
|  2|46.538173245799854|46.538173245799854|
|  3| 51.02088362811127| 51.02088362811127|
+---+------------------+------------------+



2.8. Añada una columna adicicional con la diferencia entre la temperatura y su media. Asigne el nombre "resid". Muestre los primeros 5 elementos.

In [22]:
from pyspark.sql import functions as F

In [23]:
df_resid = joined_df.withColumn("resid", F.col("temp_prom") - F.col("temp_pers"))
df_resid.show()

+---+------------------+------------------+-----+
|dia|         temp_prom|         temp_pers|resid|
+---+------------------+------------------+-----+
|  1| 49.47468100835718| 49.47468100835718|  0.0|
|  2|46.538173245799854|46.538173245799854|  0.0|
|  3| 51.02088362811127| 51.02088362811127|  0.0|
+---+------------------+------------------+-----+



### 2.9. Guarde el dataframe resultante en formato JSON. En caso de que el archivo ya exista, deberá sobreescribirse.

In [24]:
df_resid.write.mode("overwrite").json("./temp_prom.json")