In [None]:
from pyspark.sql import functions as F
from pyspark.sql.types import DateType
from datetime import datetime, timedelta
from datetime import datetime
import requests
import time
from pyspark.sql import SparkSession, types as T

In [None]:
def get_public_holidays(start_year, end_year, country_code):
    url = "https://date.nager.at/api/v3/publicholidays/"
    holidays_data = []

    for year in range(start_year, end_year + 1):
        api_url = f"{url}{year}/{country_code}"
        
        try:
            response = requests.get(api_url)
            if response.status_code == 200:
                data = response.json()
                # Convertir la fecha de string a datetime.date
                for holiday in data:
                    holiday_date = datetime.strptime(holiday["date"], "%Y-%m-%d").date()  # Conversión de fecha
                    holidays_data.append((holiday_date, holiday["localName"]))
            else:
                print(f"Failed to retrieve data for {year}, status code: {response.status_code}")
        except requests.exceptions.RequestException as e:
            print(f"Request failed: {e}")
        
        time.sleep(1)  # Para evitar sobrecargar el servidor con solicitudes

    # Definir el esquema para el DataFrame de Spark
    schema = T.StructType([
        T.StructField("fecha", T.DateType(), True),
        T.StructField("nombre_del_festivo", T.StringType(), True)
    ])

    # Crear el DataFrame de Spark con los datos de festivos
    holidays_df = spark.createDataFrame(holidays_data, schema=schema)
    
    return holidays_df


In [None]:
def generar_calendario(anio_inicio, anio_fin):
    # Crear un rango de fechas
    fecha_inicio = datetime(anio_inicio, 1, 1)
    fecha_fin = datetime(anio_fin, 12, 31)
    num_dias = (fecha_fin - fecha_inicio).days + 1
    fechas = [(fecha_inicio + timedelta(days=x)) for x in range(num_dias)]
    
    # Crear un DataFrame de Spark con la lista de fechas
    calendario_df = spark.createDataFrame(fechas, DateType()).toDF("fecha")

    # Agregar columnas de año, mes, semana y día de la semana
    calendario_df = (calendario_df
                     .withColumn("año", F.year("fecha"))
                     .withColumn("mes", F.month("fecha"))
                     .withColumn("semana", F.weekofyear("fecha"))
                     .withColumn("dia_de_la_semana", F.dayofweek("fecha")))

    # Ajustar para que el lunes sea 1 y domingo 7
    calendario_df = calendario_df.withColumn("dia_de_la_semana", 
                                             F.when(F.col("dia_de_la_semana") == 1, 7)
                                             .otherwise(F.col("dia_de_la_semana") - 1))
    
    # Ordenar el DataFrame por fecha de menor a mayor
    calendario_df = calendario_df.orderBy("fecha")

    return calendario_df


In [None]:
# Ejecución de las funciones
public_holidays_data = get_public_holidays(start_year, end_year, country_code)
calendario = generar_calendario(start_year, end_year)

# Unir calendario y festivos en Spark
calendario = calendario.join(public_holidays_data, on="fecha", how="left")

# Agregar columna de festivo ('Sí' o 'No')
calendario = calendario.withColumn("festivo", F.when(F.col("nombre_del_festivo").isNotNull(), "Sí").otherwise("No"))

# Agregar columna de día hábil ('Sí' o 'No')
calendario = calendario.withColumn(
    "dia_habil",
    F.when((F.col("dia_de_la_semana") <= 5) & (F.col("festivo") == "No"), "Sí").otherwise("No")
)
