In [0]:
%pip install --upgrade snowflake-connector-python

path,name,size,modificationTime
s3://mibucketdecargasnowflakedani/credentials/snowflake.json,snowflake.json,45,1746167030000


In [0]:
 %restart_python

In [0]:
import json

json_str = dbutils.fs.head("s3://mibucketdecargasnowflakedani/credentials/snowflake.json", 4096)
creds = json.loads(json_str)

sfOptions = {
  "sfURL"       : "lnwnuwk-uf09760.snowflakecomputing.com",
  "sfUser"      : creds["user"],
  "sfPassword"  : creds["password"],
  "sfDatabase"  : "KAGGLE_DB",
  "sfSchema"    : "BRONZE",
  "sfWarehouse" : "NEWS_WH"
}

try:
    test_df = (
        spark.read
             .format("snowflake")
             .options(**sfOptions)
             .option("query", "SELECT 1 AS test_connection")
             .load()
    )
    test_df.show()
    print("Conectado")
except Exception as e:
    print("Error:")
    print(e)

+---------------+
|TEST_CONNECTION|
+---------------+
|              1|
+---------------+

Conectado


In [0]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

volume_path = "/Volumes/workspace/bronze/volumecsv"

# Obtener lista de ficheros
files = [f.path for f in dbutils.fs.ls(volume_path) if f.name.lower().endswith(".csv")]

# Recorrer lista de ficheros
for path in files:
    # Extraer nombre de fichero
    fname = path.split("/")[-1]
    table_name = fname.rsplit(".",1)[0]
    
    # Leer CSV
    df = (
        spark.read
             .option("header", "true")
             .option("inferSchema", "true")
             .csv(path)
    )
    
    # Guardar en tabla
    (
        df.write
          .format("delta")
          .mode("overwrite")
          .option("overwriteSchema", "true")
          .saveAsTable(f"workspace.bronze.{table_name}")
    )
    print(f"workspace.bronze.{table_name} creada")


workspace.bronze.assessments creada
workspace.bronze.courses creada
workspace.bronze.studentAssessment creada
workspace.bronze.studentInfo creada
workspace.bronze.studentRegistration creada
workspace.bronze.studentVle creada
workspace.bronze.vle creada


In [0]:
import json
import pandas as pd
import snowflake.connector
from snowflake.connector.pandas_tools import write_pandas

# Carga credenciales desde bucket
json_str = dbutils.fs.head("s3://mibucketdecargasnowflakedani/credentials/snowflake.json", 4096)
creds    = json.loads(json_str)

# Conexión a Snowflake
cx = snowflake.connector.connect(
    user=creds["user"],
    password=creds["password"],
    account="lnwnuwk-uf09760",
    warehouse="NEWS_WH",
    database="KAGGLE_DB",
    schema="BRONZE"
)
cur = cx.cursor()

# Listar tablas de workspace.bronze
tables = spark.sql("SHOW TABLES IN `workspace`.`bronze`").collect()

for row in tables:
    table_name    = row.tableName
    snowflake_tbl = table_name.upper()

    # Leer con Spark y pasar a Pandas
    sdf = spark.table(f"workspace.bronze.{table_name}")
    pdf = sdf.toPandas()

    # Generar DDL CREATE TABLE si no existe
    col_defs = []
    for col, dtype in pdf.dtypes.items():
        # mapeo
        if pd.api.types.is_integer_dtype(dtype):
            sf_type = "NUMBER"
        elif pd.api.types.is_float_dtype(dtype):
            sf_type = "FLOAT"
        elif pd.api.types.is_bool_dtype(dtype):
            sf_type = "BOOLEAN"
        elif pd.api.types.is_datetime64_any_dtype(dtype):
            sf_type = "TIMESTAMP_NTZ"
        else:
            sf_type = "TEXT"
        col_defs.append(f'"{col.upper()}" {sf_type}')
    ddl = f"""
        CREATE TABLE IF NOT EXISTS KAGGLE_DB.BRONZE.{snowflake_tbl} (
            {', '.join(col_defs)}
        )
    """
    cur.execute(ddl)
    print(f"BRONZE.{snowflake_tbl} existe")

    # Volcar datos con write_pandas
    success, nchunks, nrows, _ = write_pandas(
        cx,
        pdf,
        table_name=snowflake_tbl,
        schema="BRONZE",
        quote_identifiers=False
    )
    if success:
        print(f"{table_name} -> BRONZE.{snowflake_tbl}: {nrows} filas cargadas")
    else:
        print(f"Error cargando {table_name}")

cur.close()
cx.close()


BRONZE.ASSESSMENTS existe
assessments → BRONZE.ASSESSMENTS: 206 filas cargadas
BRONZE.COURSES existe
courses → BRONZE.COURSES: 22 filas cargadas
BRONZE.STUDENT_ASSESSMENT existe
student_assessment → BRONZE.STUDENT_ASSESSMENT: 173912 filas cargadas
BRONZE.STUDENT_INFO existe
student_info → BRONZE.STUDENT_INFO: 32593 filas cargadas
BRONZE.STUDENT_REGISTRATION existe
student_registration → BRONZE.STUDENT_REGISTRATION: 32593 filas cargadas
BRONZE.STUDENT_VLE existe
student_vle → BRONZE.STUDENT_VLE: 10655280 filas cargadas
BRONZE.VLE existe
vle → BRONZE.VLE: 6364 filas cargadas
