# S07_SparkSQL

In [1]:
import pyspark
print(pyspark.__version__)

3.5.3


In [6]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("sepomex").getOrCreate()

locatelTiempo = "locateltiempo.cvs"
df = spark.read.csv(locatelTiempo, header = True, inferSchema = True, sep="|")
df.printSchema()


from pyspark.sql.types import StructType, StructField, StringType, IntegerType
 
schema = StructType([
    StructField("d_codigo", StringType(), True),
    StructField("d_asenta", StringType(), True),
    StructField("d_tipo_asenta", StringType(), True),
    StructField("D_mnpio", StringType(), True),
    StructField("d_estado", StringType(), True),
    StructField("d_ciudad", StringType(), True),
    StructField("d_CP", StringType(), True),
    StructField("c_estado", StringType(), True),
    StructField("c_oficina", StringType(), True),
    StructField("c_CP", StringType(), True),
    StructField("c_tipo_asenta", StringType(), True),
    StructField("c_mnpio", StringType(), True),
    StructField("id_asenta_cpcons", StringType(), True),
    StructField("d_zona", StringType(), True),
    StructField("c_cve_ciudad", StringType(), True)
])

sepomex_df = spark.read.option("header","true")\
    .option("delimiter","|")\
    .option("skipRows","1")\
    .option("encoding","UTF-8")\
    .schema(schema)\
    .csv("sepomex.csv")
sepomex_df.count()

root
 |-- 2024-01-30,2024-10-19T09:17:32.000-06:00,QUEJA DE TRANSPORTE PUBLICO,FEMENINO,,CERRADO,NA,NA,,: string (nullable = true)



113147

In [7]:
strql = """SELECT
MIN(edad) AS min_edad, MAX(edad) AS max_edad, AVG(edad) avg_edad,
STDDEV(edad) as stddev_edad, COUNT(edad) AS nr
PERCERTILE_CONT(0.5) WITH GROUP(ORDER BY edad) AS mediana_edad
FROM locatel
"""

spark.sql(strql).show()

ParseException: 
[PARSE_SYNTAX_ERROR] Syntax error at or near 'PERCERTILE_CONT'.(line 4, pos 0)

== SQL ==
SELECT
MIN(edad) AS min_edad, MAX(edad) AS max_edad, AVG(edad) avg_edad,
STDDEV(edad) as stddev_edad, COUNT(edad) AS nr
PERCERTILE_CONT(0.5) WITH GROUP(ORDER BY edad) AS mediana_edad
^^^
FROM locatel


In [None]:
spark.sql("SELECT COUNT(*) FROM LOCATEL")

In [None]:
strql = """SELECT edad, COUNT(*) nr 
FROM locatel 
GROUP BY edad 
ORDER BY edad DESC"""

spark.sql(strql).show(200)

In [None]:
from pyspark.sql.functions import col, when, avg

edad_promedio = df.filter(col("edad") != 16090.0).agg(avg("edad")).first()[0]
edad_promedio

In [None]:
#Reemplazar el valor por edad promedio
df = df.withColumn(
    "edad", when(col("edad") == 16090.0, edad_promedio).otherwise(col("edad"))
)

df.createOrReplaceTempView("locatel")

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns
df_edad = spark.sql("SELECT edad FROM locatel WHERE edad IS NOT NULL")
#.toPandas()

plt.figure(figsize=(8,6))
sns.boxplot(df_edad('edad'),orient='h')

plt.title("Boxplot de edad")
plt.xlabel("Edad")

plt.tight_layout()
plt.show()

In [None]:
plt.figure(figsize=(12,6))
sns.histplot(data=df_edad,x="edad",kde=True)
plt.xlabel("Edad")
plt.ylabel("Frecuencia")
plt.show()

## Clase 23/10/2024

In [None]:
strql = """SELECT d_estado, COUNT(*) nr 
FROM locatel l
LEFT JOIN sepomex s ON(l.d_codigo = s.d_codigo)
GROUP BY d_estado
HAVING nr <= 100
ORDER BY 2 DESC,1
"""

spark.sql(strql).show(40, truncate=False)

In [None]:
strql = """SELECT d_estado, COUNT(*) nr 
FROM locatel l
LEFT JOIN sepomex s ON(l.d_codigo = s.d_codigo)
GROUP BY d_estado
HAVING nr <= 100
ORDER BY 2 DESC}
LIMIT 5
"""

spark.sql(strql).show(40, truncate=False)

In [None]:
strql = """SELECT d_ciudad, COUNT(*) nr 
FROM locatel l
LEFT JOIN sepomex s ON(l.d_codigo = s.d_codigo)
GROUP BY s.d_ciudad
HAVING nr <= 100
ORDER BY 2 DESC,1 ASC
"""

spark.sql(strql).show(40, truncate=False)

In [None]:
strql = """SELECT d_ciudad, COUNT(*) nr 
FROM locatel l
LEFT JOIN sepomex s ON(l.d_codigo = s.d_codigo)
WHERE s.d_ciudad IS NOT NULL AND s.d_ciudad != "Ciudad de México"
"""

spark.sql(strql).show(40, truncate=False)

In [None]:
strql = """SELECT d_estadoo, s.d_ciudad, s.d_mnpio, COUNT(*) nr 
FROM locatel l
LEFT JOIN sepomex s ON(l.d_codigo = s.d_codigo)
WHERE s.d_estado IN ('Ciudad de México', 'México')
GROUP BY s.d_estado,s.d_ciudad, s.d_mnpio
ORDER BY s.d_estado,s.d_ciudad, s.d_mnpio
"""

spark.sql(strql).show(100, truncate=False)

In [None]:
strql = """SELECT COALESCE(NULLIF(TRIM(s.d_mnpio), ''), 'No Disponible') AS municipio, 
COUNT(*) nr 
FROM locatel l
LEFT JOIN sepomex s ON(l.d_codigo = s.d_codigo)
WHERE s.d_estado IN ('Ciudad de México')
GROUP BY municipio
ORDER BY nr DESC
"""

spark.sql(strql).show(100, truncate=False)

In [None]:
strql = """SELECT COALESCE(NULLIF(TRIM(s.d_ciudad), ''), 'No Disponible') AS ciudad, 
COUNT(*) nr 
FROM locatel l
LEFT JOIN sepomex s ON(l.d_codigo = s.d_codigo)
WHERE s.d_estado NOT IN ('Ciudad de México')
GROUP BY ciudad
ORDER BY nr DESC
LIMIT 10
"""

df_pandas=spark.sql(strql).toPandas()

plt.figure(figsize=(10,6))
plt.bar(df_pandas['ciudad'],df_pandas['nr'], color='blue')
plt.xlabel('Ciudad')
plt.ylabel('Total de registros')
plt.title('Ciudades con mayor numero de llamadas')
plt.xticks(rotation=45)

In [None]:
strql = """SELECT COALESCE(NULLIF(TRIM(s.d_mnpio), ''), 'No Disponible') AS municipio, 
COUNT(*) nr 
FROM locatel l
LEFT JOIN sepomex s ON(l.d_codigo = s.d_codigo)
WHERE s.d_estado NOT IN ('Ciudad de México')
GROUP BY ciudad
ORDER BY nr DESC
LIMIT 10
"""

df_pandas=spark.sql(strql).toPandas()

plt.figure(figsize=(10,6))
plt.bar(df_pandas['ciudad'],df_pandas['nr'], color='blue')
plt.xlabel('Ciudad')
plt.ylabel('Total de registros')
plt.title('Ciudades con mayor numero de llamadas')
plt.xticks(rotation=45)

In [None]:
strql = """SELECT municipio, SUM(CASE WHEN d_estado IS NOT NULL AND estado != 'Ciudad de México' 
    THEN municipio ELSE 'Otro' END AS municipio,
    SUM(CASE WHEN d_estado IS NOT NULL AND d_estado != 'Ciudad de México' 
    THEN nr ELSE 0 END) nr 
    FROM( 
    SELECT d_estado,COALESCE(NULLIF(TRIM(s.d_mnpio), ''), 'No Disponible') AS municipio,
    COUNT(*) nr 
    FROM locatel l
    LEFT JOIN sepomex s ON(l.d_codigo = s.d_codigo)
    GROUP BY d_estado, municipio
    ) x
    GROUP BY municipio
    ORDER BY nr DESC
"""

In [None]:
df_pandas=spark.sql(strql).toPandas()

plt.figure(figsize=(10,6))
plt.pie(df_pandas['nr'], labels=df_pandas['ciudad'], autopct='%1.1f%%',
        startangle=90,color=plt.Paired.colors)
#plt.bar(df_pandas['ciudad'],df_pandas['nr'], color='blue')
#plt.xlabel('Ciudad')
#plt.ylabel('Total de registros')
plt.title('Ciudades con mayor numero de llamadas fuera de la CDMX')
plt.tight_layout()
plt.show()

## clase 28/10/2024

##### Operadores de refinamiento
- roll up: dimensiones generales
- drill down: dimensiones específicas
- pivot:reorientacin de las dimensiones en el informe
- Slice: vertical
- Dice: horizontal, vemos un total

In [None]:
import seaborn as sns
sns.set(style="whitegrid")
strql="""SELECT COALESCE(NULLIF(TRIM(s.d_mnpio), ''), 'No disponible') AS municipio,
COUNT (*) as nr
FROM locatel l
JOIN sepomex s ON (l.d_codigo = s.d_codigo)
GROUP BY municipio
ORDER BY nr DESC
"""

df_pandas = spark.sql(strql).toPandas()
umbral=1000
df_pandas.loc[df_pandas['nr']<umbral,'municipio'] = 'Otros'
df_grouped = df_pandas.groupby('municipio').sum().reset_index()
df_grouped

In [None]:
import seaborn as sns
import matplotlib as plt

sns.set(style="whitegrid")
plt.figure(figsize=(10,8))
sns.barplot(x='nr',y='municipio',data=df_grouped,palette='Paired', hue='municipio')

plt.title('Numero de llamadas por municipio')
plt.xlabel('Numero de llamadas')
plt.ylabel('Municipio')
plt.tight_layout()

plt.show()



In [None]:
import seaborn as sns
import matplotlib as plt

strql="""SELECT YEAR(l.fecha_solicitud) AS anio,
AVG(HOUR(l.hora_solicitud)) hora_promedio,
AVG(l.edad) AS edad_promedio
COUNT (*) as nr
FROM locatel l
JOIN sepomex s ON (l.d_codigo = s.d_codigo)
GROUP BY anio
ORDER BY anio
"""

sns.set(style="whitegrid")
plt.figure(figsize=(10,6))
sns.plot(df_pandas['anio'],df_pandas['nr'],marker='o',linestyle='-',color='b')

plt.title('Numero de llamadas por Año')
plt.xlabel('Numero de llamadas')
plt.ylabel('Año')
plt.tight_layout()

plt.show()

In [None]:
import seaborn as sns
import matplotlib as plt

strql="""SELECT YEAR(l.fecha_solicitud) AS anio,
AVG(HOUR(l.hora_solicitud)) hora_promedio,
AVG(l.edad) AS edad_promedio
COUNT (*) as nr
FROM locatel l
JOIN sepomex s ON (l.d_codigo = s.d_codigo)
WHERE YEAR (fecha_solicitud) = 2021
GROUP BY hora
ORDER BY hora
"""

sns.set(style="whitegrid")
plt.figure(figsize=(10,6))
sns.plot(df_pandas['anio'],df_pandas['nr'],marker='o',linestyle='-',color='b')

plt.title('Numero de llamadas por Hora')
plt.xlabel('Numero de llamadas')
plt.ylabel('Hora')
plt.tight_layout()

plt.show()

In [None]:
import seaborn as sns
import matplotlib as plt

strql="""SELECT YEAR(l.fecha_solicitud) AS anio,
AVG(HOUR(l.hora_solicitud)) hora_promedio,
AVG(l.edad) AS edad_promedio
COUNT (*) as nr
FROM locatel l
JOIN sepomex s ON (l.d_codigo = s.d_codigo)
WHERE YEAR (fecha_solicitud) = 2019 
AND s.D_mnpio = 'Milpa Alta'
GROUP BY hora
ORDER BY hora
"""

sns.set(style="whitegrid")
plt.figure(figsize=(10,6))
sns.plot(df_pandas['anio'],df_pandas['nr'],marker='o',linestyle='-',color='b')

plt.title('Numero de llamadas por Hora')
plt.xlabel('Numero de llamadas')
plt.ylabel('Hora')
plt.tight_layout()

plt.show()

In [None]:
import seaborn as sns
import matplotlib as plt

strql="""
SELECT CASE WHEN nr < 5000 THEN 'Otro' ELSE tema_solicitud END tema,
SUM(nr) nr
FROM (
    SELECT l.tema_solicitud,
    COUNT(*) as num_reg
    FROM locatel l
    JOIN sepomex s ON (l.d_codigo = s.d_codigo)
    GROUP BY l.tema_solicitu
)
COUNT (*) as nr
FROM locatel l
JOIN sepomex s ON (l.d_codigo = s.d_codigo)
GROUP BY tema_solicitu
ORDER BY tema_solicitu
"""

plt.figure(figsize=(12,6))
plt.bar(df_pandas['temas'],df_pandas['nr'], color='pink')
plt.xticks(rotation=90)

plt.title('Numero de llamadas por Tema')
plt.xlabel('Numero de llamadas')
plt.ylabel('Tema')
plt.tight_layout()

plt.show()