In [1]:
import uuid
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit, col, sum as sum_, max as max_, min as min_, avg, count
import os

In [2]:
# Creación de una sesión Spark para procesar los datos
def create_spark_session():
    return SparkSession.builder \
        .appName("Data Processing with Spark") \
        .config("spark.jars", "../driver/postgresql-42.7.3.jar") \
        .getOrCreate()

In [3]:
# Lista y ordena los archivos CSV en un directorio que no contengan 'validation' en el nombre
def get_csv_files(directory):
    return sorted([os.path.join(directory, f) for f in os.listdir(directory) if f.endswith('.csv') and 'validation' not in f])

In [4]:
# Carga un archivo CSV, actualiza las estadísticas y lo guarda en una base de datos PostgreSQL
def load_data_and_update_stats(file_name, spark, batch_uuid):
    try:
        data = spark.read.csv(file_name, header=True, inferSchema=True)
        data = data.withColumn("source_file", lit(os.path.basename(file_name)))
        data = data.withColumn("batch_uuid", lit(batch_uuid))
        file_size = os.path.getsize(file_name)
        date_process = os.path.getsize(file_name)
        data = data.withColumn("file_size", lit(file_size))
        data.write.format("jdbc").options(
            url='jdbc:postgresql://localhost:5434/pipeline',
            driver='org.postgresql.Driver',
            dbtable='transaction_data_spark',
            user='postgres',
            password='developer').mode('append').save()
        print(f"Loaded {file_name} with file size {file_size} bytes and batch UUID {batch_uuid}")
    except Exception as e:
        print(f"Failed to load data from {file_name}. Error: {str(e)}")
    return data


In [5]:
# Agrega estadísticas de precio desde los datos y recupera los resultados
def aggregate_stats(data):
    stats = data.agg(
        sum_("price").alias("sum_price"),
        avg("price").alias("avg_price"),
        min_("price").alias("min_price"),
        max_("price").alias("max_price"),
        count(lit(1)).alias("count")
    ).collect()[0]
    return stats

In [6]:
# Procesa todos los archivos en un directorio y valida contra un archivo de validación específico
def process_files_and_validate(directory, validation_file):
    spark = create_spark_session()
    batch_uuid = str(uuid.uuid4())
    csv_files = get_csv_files(directory)
    all_data = None

    for file in csv_files:
        data = load_data_and_update_stats(file, spark, batch_uuid)
        if all_data is None:
            all_data = data
        else:
            all_data = all_data.union(data)

    in_memory_stats = aggregate_stats(all_data)
    print("Current in-memory stats:", in_memory_stats.asDict())

    print("\nProcessing validation file...")
    validation_data = load_data_and_update_stats(validation_file, spark, batch_uuid)
    validation_stats = aggregate_stats(validation_data)
    print("Validation file stats:", validation_stats.asDict())

    final_data = all_data.union(validation_data)
    final_db_stats = aggregate_stats(final_data)
    print("Final database stats after loading validation.csv:", final_db_stats.asDict())

    return in_memory_stats, validation_stats, final_db_stats

In [7]:
# Directorio y archivo de validación
directory = '../input'
validation_file = '../input/validation.csv'


In [8]:
# Ejecución del pipeline
final_stats, validation_stats, final_db_stats = process_files_and_validate(directory, validation_file)

24/05/12 21:00:57 WARN Utils: Your hostname, mugen resolves to a loopback address: 127.0.1.1; using 192.168.68.113 instead (on interface wlp2s0)
24/05/12 21:00:57 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
24/05/12 21:00:58 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


Loaded ../input/2012-1.csv with file size 374 bytes and batch UUID 36b7c0bd-0be3-4a86-aaf8-96df1d004bda
Loaded ../input/2012-2.csv with file size 482 bytes and batch UUID 36b7c0bd-0be3-4a86-aaf8-96df1d004bda
Loaded ../input/2012-3.csv with file size 513 bytes and batch UUID 36b7c0bd-0be3-4a86-aaf8-96df1d004bda
Loaded ../input/2012-4.csv with file size 495 bytes and batch UUID 36b7c0bd-0be3-4a86-aaf8-96df1d004bda
Loaded ../input/2012-5.csv with file size 513 bytes and batch UUID 36b7c0bd-0be3-4a86-aaf8-96df1d004bda
Current in-memory stats: {'sum_price': 8046, 'avg_price': 57.884892086330936, 'min_price': 10, 'max_price': 100, 'count': 143}

Processing validation file...
Loaded ../input/validation.csv with file size 145 bytes and batch UUID 36b7c0bd-0be3-4a86-aaf8-96df1d004bda
Validation file stats: {'sum_price': 334, 'avg_price': 41.75, 'min_price': 11, 'max_price': 92, 'count': 8}
Final database stats after loading validation.csv: {'sum_price': 8380, 'avg_price': 57.006802721088434, 'm