In [8]:
#pip install pyspark findspark
#pip install google.cloud

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit, current_timestamp
from pyspark.sql.functions import col
from pyspark.sql.types import DecimalType
from datetime import datetime, timedelta
import os

## Env variables configuration

In [2]:
os.environ["JAVA_HOME"] = r"C:\Program Files\Java\jdk-11"  
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = r"D:\GAMIC\PORTFOLIO\CLOTHING RETAIL - PRS POWERED BY LLM\credentials.json"  # Ruta del JSON de GCP

# Conection conf
PG_HOST = "localhost"
PG_PORT = "5432"
PG_DATABASE = "FASHION_STORE"
PG_USER = "postgres"
PG_PASSWORD = "gamic"

BQ_PROJECT_ID = "clothing-retail-prs-llm"
BQ_DATASET = "fashion_retail_dataset"

## Init session on Spark with conectors

In [3]:
spark = SparkSession.builder \
    .appName("PhysicalStoreETL") \
    .config("spark.jars.packages", (
        "org.postgresql:postgresql:42.5.1,"
        "com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.28.0"
    )) \
    .config("temporaryGcsBucket", "bucket_fashion_retail") \
    .getOrCreate()

## Define tables for process

In [4]:
TABLES = {
    "customers": "registration_date",
    "transactions": "purchase_date",
    "customer_segments": "update_date",
    "inventory_history": "date"
}


## Define dates

In [5]:
#last_run_date = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d")
last_run_date = "2025-01-01"

## ETL pipeline for each table

In [6]:
for table, date_field in TABLES.items():
    print(f"\nProcesando: {table.upper()}")
    
    query = f"SELECT * FROM {table} WHERE {date_field} >= '{last_run_date}'"
    print("Consulta:", query)
    
    try:
        # Read data from PostgreSQL
        pg_df = spark.read \
            .format("jdbc") \
            .option("url", f"jdbc:postgresql://{PG_HOST}:{PG_PORT}/{PG_DATABASE}") \
            .option("query", query) \
            .option("user", PG_USER) \
            .option("password", PG_PASSWORD) \
            .option("driver", "org.postgresql.Driver") \
            .load()
        
        if pg_df.rdd.isEmpty():
            print(f"Sin datos nuevos en {table}")
            continue
        
        # Especific trasformation for table
        if table == "customers":
            pg_df = pg_df.withColumn("data_source", lit("physical_store"))

        # Load on bigquery
        pg_df.write \
            .format("bigquery") \
            .option("table", f"{BQ_PROJECT_ID}.{BQ_DATASET}.{table}") \
            .option("writeMethod", "direct") \
            .mode("append") \
            .save()
        
        print(f"Cargados {pg_df.count()} registros en BigQuery para la tabla {table}")
    
    except Exception as e:
        print(f"Error procesando la tabla {table}: {str(e)}")



Procesando: CUSTOMERS
Consulta: SELECT * FROM customers WHERE registration_date >= '2025-01-01'
Cargados 5 registros en BigQuery para la tabla customers

Procesando: TRANSACTIONS
Consulta: SELECT * FROM transactions WHERE purchase_date >= '2025-01-01'
Cargados 3 registros en BigQuery para la tabla transactions

Procesando: CUSTOMER_SEGMENTS
Consulta: SELECT * FROM customer_segments WHERE update_date >= '2025-01-01'
Cargados 30 registros en BigQuery para la tabla customer_segments

Procesando: INVENTORY_HISTORY
Consulta: SELECT * FROM inventory_history WHERE date >= '2025-01-01'
Cargados 150 registros en BigQuery para la tabla inventory_history


## Close session

In [7]:
spark.stop()
print("\n¡ETL completede succefully!")


¡ETL completede succefully!
