In [None]:
import os

# Configura HADOOP_HOME y PATH en el entorno del cuaderno
os.environ["HADOOP_HOME"] = "C:\\hadoop"
os.environ["PATH"] += os.pathsep + os.path.join(os.environ["HADOOP_HOME"], "bin")

# Verifica las variables
print("HADOOP_HOME:", os.getenv("HADOOP_HOME"))
print("PATH:", os.getenv("PATH"))

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_date
import os


# Inicia la sesión de Spark
spark = SparkSession.builder \
    .appName("IowaLiquorSales") \
    .config("spark.jars.packages", "io.delta:delta-core_2.12:2.4.0") \
    .config("spark.hadoop.fs.file.impl", "org.apache.hadoop.fs.LocalFileSystem") \
    .config("spark.hadoop.fs.azure.enable.native-hadoop", "false") \
    .config("spark.hadoop.io.native.lib", "false") \
    .getOrCreate()


# Ruta al archivo CSV descargado
csv_path = "./Data/Iowa_Liquor_Sales.csv"

In [None]:
spark.stop()  # Detiene la sesión de Spark activa


In [4]:
# Leer el archivo CSV con opciones adicionales para evitar la desanidación y manejo de comas
df = spark.read.option("header", "true") \
    .option("inferSchema", "true") \
    .option("quote", "\"") \
    .option("escape", "\"") \
    .option("multiLine", "true") \
    .csv(csv_path)

# Mostrar las primeras filas para ver cómo quedó el dataframe
df.show(5)
df.printSchema()


+-------------------+----------+------------+--------------------+-------------------+------------+--------+--------------------+-------------+-------+--------+-------------+-------------+--------------------+-----------+--------------------+----+------------------+-----------------+-------------------+------------+--------------+--------------------+---------------------+
|Invoice/Item Number|      Date|Store Number|          Store Name|            Address|        City|Zip Code|      Store Location|County Number| County|Category|Category Name|Vendor Number|         Vendor Name|Item Number|    Item Description|Pack|Bottle Volume (ml)|State Bottle Cost|State Bottle Retail|Bottles Sold|Sale (Dollars)|Volume Sold (Liters)|Volume Sold (Gallons)|
+-------------------+----------+------------+--------------------+-------------------+------------+--------+--------------------+-------------+-------+--------+-------------+-------------+--------------------+-----------+--------------------+----+-

In [5]:
from pyspark.sql.functions import regexp_replace

# Limpiar la columna 'Store Location' reemplazando las comas por un espacio (o cualquier otro carácter)
df = df.withColumn("Store Location", regexp_replace(df["Store Location"], ",", " "))

# Verificar que la columna 'Store Location' haya sido limpiada correctamente
df.show(5)

+-------------------+----------+------------+--------------------+-------------------+------------+--------+--------------------+-------------+-------+--------+-------------+-------------+--------------------+-----------+--------------------+----+------------------+-----------------+-------------------+------------+--------------+--------------------+---------------------+
|Invoice/Item Number|      Date|Store Number|          Store Name|            Address|        City|Zip Code|      Store Location|County Number| County|Category|Category Name|Vendor Number|         Vendor Name|Item Number|    Item Description|Pack|Bottle Volume (ml)|State Bottle Cost|State Bottle Retail|Bottles Sold|Sale (Dollars)|Volume Sold (Liters)|Volume Sold (Gallons)|
+-------------------+----------+------------+--------------------+-------------------+------------+--------+--------------------+-------------+-------+--------+-------------+-------------+--------------------+-----------+--------------------+----+-

In [None]:
# Ruta de salida simplificada
output_folder = "./Data/csv"
# os.makedirs(output_folder, exist_ok=True)

# Arreglar la columna de ubicación de la tienda (según lo que mencionaste)
df = df.withColumn("sale_date", to_date(df["Date"], "MM/dd/yyyy"))

# Filtrar los registros con fechas nulas
df = df.filter(df["sale_date"].isNotNull())

# Obtiene las fechas únicas en el dataset
unique_dates = [row["sale_date"] for row in df.select("sale_date").distinct().collect()]

# Itera por cada fecha y guarda los registros correspondientes en un archivo CSV usando Pandas
for date in unique_dates:
    date_str = date.strftime("%Y-%m-%d")  # Convierte la fecha a string con el formato solicitado
    output_file = os.path.join(output_folder, f"proyecto_{date_str}.csv")  # Ruta para el archivo CSV

    # Filtra los registros para la fecha actual
    daily_df = df.filter(df["sale_date"] == date)

    # Convertir el DataFrame de Spark a Pandas
    pandas_df = daily_df.toPandas()

    # Escribe el archivo CSV usando Pandas
    pandas_df.to_csv(output_file, index=False)
    print(f"Archivo generado: {output_file}")

print(f"Proceso completado. Archivos generados en {output_folder}")
