In [None]:
# ================================================================================
# PASO 1: IMPORTACIÓN DE LIBRERÍAS
# ================================================================================
import os
import pandas as pd
import findspark
from datetime import datetime
from io import BytesIO
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType
from sqlalchemy import create_engine
from dotenv import load_dotenv
import boto3
import logging

findspark.init()
spark = SparkSession.builder \
    .appName("Capa DIAMOND") \
    .getOrCreate()

load_dotenv("/home/jovyan/.env")

True

In [None]:
# ================================================================================
# PASO 2: CONFIGURACIÓN DE VARIABLES
# ================================================================================


BUCKET_SILVER     = os.getenv("MINIO_BUCKET_SILVER")
BUCKET_DIAMOND    = os.getenv("MINIO_BUCKET_DIAMOND")
MINIO_ENDPOINT    = os.getenv("MINIO_ENDPOINT")
MINIO_ACCESS_KEY  = os.getenv("MINIO_ROOT_USER")
MINIO_SECRET_KEY  = os.getenv("MINIO_ROOT_PASSWORD")
DB_URL            = os.getenv("DB_URL")

today = datetime.now().strftime("%Y%m%d")
dominio = "pacientes"
ruta_parquet = f"s3a://{BUCKET_SILVER}/LOCAL_{dominio.upper()}/pacientes_refinados_{today}*.parquet"


In [None]:
# ================================================================================
# PASO 3: DESCARGA DESDE MinIO CON BOTO3 Y LECTURA CON SPARK
# ================================================================================


# Configurar logging 
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")

#Cliente Boto3 
s3 = boto3.client(
    "s3",
    endpoint_url=MINIO_ENDPOINT,
    aws_access_key_id=MINIO_ACCESS_KEY,
    aws_secret_access_key=MINIO_SECRET_KEY
)

# Buscar el archivo parquet más reciente del dia
today = datetime.now().strftime("%Y%m%d")
prefix = f"LOCAL_{dominio.upper()}/pacientes_refinados_{today}"

response = s3.list_objects_v2(Bucket=BUCKET_SILVER, Prefix=prefix)
archivos = sorted(
    [obj["Key"] for obj in response.get("Contents", []) if obj["Key"].endswith(".parquet")],
    reverse=True
)

if not archivos:
    raise FileNotFoundError(f"No se encontró ningún archivo Parquet con prefijo: {prefix}")

key_silver = archivos[0]
logging.info(f"Archivo Parquet encontrado: s3://{BUCKET_SILVER}/{key_silver}")

#Descargar el archivo a /tmp
ruta_local_parquet = f"/tmp/{os.path.basename(key_silver)}"
with open(ruta_local_parquet, "wb") as f:
    s3.download_fileobj(BUCKET_SILVER, key_silver, f)

#Leer con Spark desde disco local 
df_diamond = spark.read.parquet(f"file://{ruta_local_parquet}")


2025-05-09 18:33:21,896 - INFO - 📄 Archivo Parquet encontrado: s3://dev-silver/LOCAL_PACIENTES/pacientes_refinados_202505091724.parquet


In [29]:
df_diamond.printSchema()

root
 |-- id: string (nullable = true)
 |-- nombre: string (nullable = true)
 |-- edad: string (nullable = true)
 |-- obra_social: string (nullable = true)
 |-- fecha_turno: date (nullable = true)



In [23]:
# Leer schema del archivo parquet ya cargado
schema = df_diamond.schema

In [28]:
print(schema)

StructType([StructField('id', StringType(), True), StructField('nombre', StringType(), True), StructField('edad', StringType(), True), StructField('obra_social', StringType(), True), StructField('fecha_turno', DateType(), True)])


In [24]:
# Mapeo directo de tipos Spark -> PostgreSQL
type_mapping = {
    "StringType": "TEXT",
    "IntegerType": "INTEGER",
    "LongType": "BIGINT",
    "ShortType": "SMALLINT",
    "DoubleType": "DOUBLE PRECISION",
    "FloatType": "REAL",
    "BooleanType": "BOOLEAN",
    "DateType": "DATE",
    "TimestampType": "TIMESTAMP",
    "DecimalType": "NUMERIC"
}

In [25]:
# Construcción comprimida de columnas
columnas_sql = [
    f"{field.name} {type_mapping.get(type(field.dataType).__name__, 'TEXT')}"
    for field in schema.fields
]

In [None]:
archivo = os.path.basename(key_silver)  
nombre_tabla = archivo.split("_")[0].lower() + "_diamond"


In [33]:
# Cargar variables del entorno
load_dotenv()

jdbc_url = os.getenv("DB_URL")
jdbc_properties = {
    "user": os.getenv("POSTGRES_USER"),
    "password": os.getenv("POSTGRES_PASSWORD"),
    "driver": "org.postgresql.Driver"
}

# Insertar a PostgreSQL usando Spark
df_diamond.write \
    .mode("overwrite") \
    .jdbc(url=jdbc_url, table=nombre_tabla, properties=jdbc_properties)

print(f"✅ Datos insertados correctamente en la tabla '{nombre_tabla}'.")


Py4JJavaError: An error occurred while calling o68.jdbc.
: java.lang.ClassNotFoundException: org.postgresql.Driver
	at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:476)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:594)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:527)
	at org.apache.spark.sql.execution.datasources.jdbc.DriverRegistry$.register(DriverRegistry.scala:46)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$1(JDBCOptions.scala:103)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$1$adapted(JDBCOptions.scala:103)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.<init>(JDBCOptions.scala:103)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcOptionsInWrite.<init>(JDBCOptions.scala:254)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcOptionsInWrite.<init>(JDBCOptions.scala:258)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:47)
	at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:48)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:142)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:869)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:391)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:364)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:251)
	at org.apache.spark.sql.DataFrameWriter.jdbc(DataFrameWriter.scala:766)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)
