In [0]:
# 1. Instala la librería de Kaggle (solo la primera vez)
%pip install kaggle

In [0]:
%pip install openpyxl

In [0]:
%restart_python

In [0]:
import os
from datetime import datetime, timezone
from pathlib import Path
import pandas as pd
from pyspark.sql.functions import *
from pyspark.sql.types import *
from delta.tables import DeltaTable

In [0]:
dbutils.widgets.removeAll()

In [0]:
# Definir widgets para parametrizar notebook
dbutils.widgets.text("dataset", "jillanisofttech/flight-price-prediction-dataset", "Kaggle Dataset (owner/name)")
dbutils.widgets.text("mount_point", "/mnt/storeintelligence", "Mount Point Path")
dbutils.widgets.text("container_csv", "raw", "Container CSV Files")
dbutils.widgets.text("table_name", "flight_booking", "Table Name")

In [0]:
dataset         = dbutils.widgets.get("dataset")
mount_point     = dbutils.widgets.get("mount_point")
container_csv   = dbutils.widgets.get("container_csv")
table           = dbutils.widgets.get("table_name")

In [0]:
secret_scope    = "keyvault-secrets"
fmt             = "csv"
catalog         = "store_intelligence"
database        = "default"
target_folder   = f"/dbfs{mount_point}/raw"
tmp_folder      = f"/dbfs{mount_point}/tmp"
dbfs_tmp        = f"dbfs:{mount_point}/tmp"

In [0]:
# Recuperar las credenciales desde Azure Key Vault
username = dbutils.secrets.get(scope=secret_scope, key="kaggle-username")
key      = dbutils.secrets.get(scope=secret_scope, key="kaggle-key")

In [0]:
# Configurar las variables de entorno para que Kaggle API las reconozca
os.environ["KAGGLE_USERNAME"] = username
os.environ["KAGGLE_KEY"]      = key

In [0]:
# Importar y autenticar la API de Kaggle
from kaggle.api.kaggle_api_extended import KaggleApi
api = KaggleApi()
api.authenticate()

In [0]:
dbutils.fs.rm(dbfs_tmp, recurse=True)

In [0]:
dbutils.fs.mkdirs(dbfs_tmp)

In [0]:
# Descargar dataset
api.dataset_download_files(
    dataset=dataset,
    path=(tmp_folder),
    unzip=True
)

In [0]:
# Convertir a CSV los archivos descargados
for file_path in Path(tmp_folder).rglob("*"):
    suffix = file_path.suffix.lower()
    if suffix in [".xls", ".xlsx"]:
        # Selecciona el engine correcto
        engine = "openpyxl" if suffix == ".xlsx" else "xlrd"

        # Lee el archivo Excel
        df = pd.read_excel(str(file_path), engine=engine)

        csv_name = f"{file_path.stem}.csv"
        csv_path = os.path.join(target_folder, csv_name)
        
        df.to_csv(csv_path, header=True, index=False, encoding="utf-8", mode="w")


In [0]:
dbutils.fs.rm(dbfs_tmp, recurse=True)

In [0]:
csv_path = f"{mount_point}/{container_csv}/*.{fmt}"

In [0]:
#Esquema flight_booking
flight_schema= StructType([
StructField('Airline',StringType(),nullable=True ),
StructField('Date_of_Journey',StringType(),nullable=True),
StructField('Source',StringType(),nullable=True),
StructField('Destination',StringType(),nullable=True),
StructField('Route',StringType(),nullable=True),
StructField('Dep_Time',StringType(),nullable=True),
StructField('Arrival_Time',StringType(),nullable=True),
StructField('Duration',StringType(),nullable=True),
StructField('Total_Stops',StringType(),nullable=True),
StructField('Additional_Info',StringType(),nullable=True),
StructField('Price',FloatType(),nullable=True)
])

In [0]:
df = spark.read.format("csv") \
    .option("header", "true") \
    .schema(flight_schema) \
    .load(csv_path)

In [0]:
# Eliminar duplicados
df_distinct = df.dropDuplicates()

In [0]:
# Convertir a date la columna Date_of_Journey
df_distinct = df_distinct.withColumn("Date_of_Journey", concat_ws("-",split(col("Date_of_Journey"),"/").getItem(2),split(col("Date_of_Journey"),"/").getItem(1),split(col("Date_of_Journey"),"/").getItem(0)).cast("date"))

In [0]:
# Obtener info específica de un external location
external_info = spark.sql("DESCRIBE EXTERNAL LOCATION exlt-adlsstoreintelligence-datalake").collect()
location_url = external_info[0]["url"]
location_url = f"{location_url}tables"

In [0]:
# Crear estructura de la tabla
sql_query = f"""
CREATE TABLE IF NOT EXISTS store_intelligence.default.flight_booking (
    Airline STRING,
    Date_of_Journey DATE,
    Source STRING,
    Destination STRING,
    Route STRING,
    Dep_Time STRING,
    Arrival_Time STRING,
    Duration STRING,
    Total_Stops STRING,
    Additional_Info STRING,
    Price FLOAT
)
USING DELTA
LOCATION '{location_url}/flight_booking'
"""

spark.sql(sql_query)

In [0]:
table_name = f"{catalog}.{database}.{table}"
print(table_name)

In [0]:
# Obtener fechas existentes
new_dates = df_distinct.select("Date_of_Journey").distinct().rdd.flatMap(lambda x: x).collect()
dates_condition = ','.join([f"'{date}'" for date in new_dates])

In [0]:
# Eliminar fechas existentes
sql_query = f"""
DELETE FROM {table_name}
WHERE Date_of_Journey IN ({dates_condition})
"""

spark.sql(sql_query)

In [0]:
display(spark.sql(f"SELECT COUNT(*) FROM {table_name}"))

In [0]:
# Insertar nuevos datos
df_distinct.write \
        .format("delta") \
        .mode("append") \
        .saveAsTable(table_name)

In [0]:
display(spark.sql(f"SELECT COUNT(*) FROM {table_name}"))