In [None]:
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import col,md5
import matplotlib.pyplot as plt


# Crear SparkSession
spark = SparkSession.builder.appName("Grandata_Test").getOrCreate()
spark.range(10).show()



# Creo las variables con la ruta de los archivos de datos 
ruta_eventos_csv="/home/jovyan/data/events.csv.gz"
ruta_destinso_csv="/home/jovyan/data/free_sms_destinations.csv.gz"


# Leer el archivo CSV como DataFrame
df_eventos = spark.read.csv(ruta_eventos_csv, header=True, inferSchema=True)
df_destinos = spark.read.csv(ruta_destinso_csv, header=True, inferSchema=True)

# Mostrar las primeras filas del DataFrame para verificar
df_eventos.show()
df_destinos.show()

# Creo temporales     
df_eventos.createOrReplaceTempView("eventos")
df_destinos.createOrReplaceTempView("destinos_gratuitos")




In [None]:

# Con la query calculamos la el costo total a facturar de la empresa.
df_costo_total = spark.sql(f""" SELECT 
                            SUM( CASE 
                                    WHEN CAST(region AS INT) BETWEEN 1 AND 5 THEN 1.5
                                    WHEN CAST(region AS INT) BETWEEN 6 AND 9 THEN 2
                                    ELSE 0 
                                END ) AS costo_total
                            FROM eventos
                            LEFT JOIN destinos_gratuitos ON eventos.id_destination = destinos_gratuitos.id
                            WHERE destinos_gratuitos.id IS NULL 
                            AND id_destination IS NOT NULL 
                            AND id_source IS NOT NULL
                                        """)


df_costo_total.show()




In [None]:
# Agrupamos por id_source, y sumamos sus costos, ordenamos y nos quedamos con los primero 100
# Dejo fuera de juego aquellos con id null y tambien los que no tienen costo el envio de msj 

df_costo_usuario = spark.sql(f""" SELECT md5(id_source) AS ID_SOURCE,
                            SUM( CASE 
                                    WHEN CAST(region AS INT) BETWEEN 1 AND 5 THEN 1.5
                                    WHEN CAST(region AS INT) BETWEEN 6 AND 9 THEN 2
                                    ELSE 0 
                                END ) AS costo
                            FROM eventos
                            LEFT JOIN destinos_gratuitos ON eventos.id_destination = destinos_gratuitos.id
                            WHERE destinos_gratuitos.id IS NULL 
                            AND id_destination IS NOT NULL 
                            AND id_source IS NOT NULL
                     GROUP BY id_source
                     ORDER BY costo desc 
                     LIMIT 100
                                        """)



df_costo_usuario.show()                  

# Escribimos el parques con compresion gzip
df_costo_usuario.write.mode("overwrite").parquet("/home/jovyan/data/parquet", compression="gzip")


In [1]:
# Agrupo las llamadas por hora
llamadas_por_hora_df = df_eventos.groupBy("hour").count()

# Convertir a Pandas DataFrame para graficar
pd_df = llamadas_por_hora_df.toPandas()

# Ordenar por hora
pd_df.sort_values("hour", inplace=True)

# Graficar
plt.figure(figsize=(10,6))
plt.bar(pd_df["hour"], pd_df["count"], color='blue')
plt.xlabel('Hora del día')
plt.ylabel('Cantidad de llamadas')
plt.title('Histograma de llamadas por hora del día')
plt.savefig('/home/jovyan/data/histograma_llamadas_por_hora.png')

NameError: name 'df_eventos' is not defined