In [1]:
from pyspark.sql import SparkSession

def get_spark_session(app_name="Calcular Violaciones Sanitarias RED"):
    """
    Crea o reutiliza una SparkSession existente.
    
    :param app_name: Nombre de la aplicación Spark.
    :return: Instancia activa de SparkSession.
    """
    return SparkSession.builder.appName(app_name).getOrCreate()

def calculate_red_violations(data_source_path: str, spark: SparkSession = None):
    """
    Procesa datos de inspección y muestra los 10 restaurantes con más violaciones del tipo RED.

    :param data_source_path: Ruta al archivo CSV (por ejemplo, en S3: 's3://bucket/nombre.csv').
    :param spark: Sesión Spark (si no se proporciona, se crea una nueva).
    :return: DataFrame con el resultado de la consulta.
    """
    if spark is None:
        spark = get_spark_session()

    print("Leyendo datos desde:", data_source_path)
    df = spark.read.option("header", "true").csv(data_source_path)

    # Crear vista temporal para consultas SQL
    df.createOrReplaceTempView("restaurant_violations")

    # Ejecutar consulta SQL para encontrar los top 10
    print("Ejecutando consulta SQL para encontrar violaciones RED...")
    result_df = spark.sql("""
        SELECT name, COUNT(*) AS total_red_violations
        FROM restaurant_violations
        WHERE violation_type = 'RED'
        GROUP BY name
        ORDER BY total_red_violations DESC
        LIMIT 10
    """)

    # Mostrar resultados
    print("Resultado:")
    result_df.show(truncate=False)

    return result_df

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
0,application_1749607942991_0001,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [3]:
# Ruta S3 al archivo CSV
data_source = "s3://bucketuni2025/food_establishment_data.csv"

# Crear o reutilizar sesión Spark
spark = get_spark_session()

# Ejecutar análisis
result_df = calculate_red_violations(data_source, spark)


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Leyendo datos desde: s3://bucketuni2025/food_establishment_data.csv
Ejecutando consulta SQL para encontrar violaciones RED...
Resultado:
+----------------------+--------------------+
|name                  |total_red_violations|
+----------------------+--------------------+
|SUBWAY                |322                 |
|T-MOBILE PARK         |315                 |
|WHOLE FOODS MARKET    |299                 |
|PCC COMMUNITY MARKETS |251                 |
|TACO TIME             |240                 |
|MCDONALD'S            |177                 |
|THAI GINGER           |153                 |
|SAFEWAY INC #1508     |143                 |
|TAQUERIA EL RINCONSITO|134                 |
|HIMITSU TERIYAKI      |128                 |
+----------------------+--------------------+