In [7]:
# Tablas para completar con PSD y Atterberg

# Project, Program y Location deben ser cargados casi completamente manualmente para PSD

# Location -> Podemos tener el TP-1 pero eso no es global y deberia ser usado
# en conjunto con programa y/o proyecto para ser utilizados como ID

# Sample -> Como ID podemos tener un UUID o utilizar el Sample ID que viene en el pdf
# Otra vez, el valor del PDF no el global por lo que deberiamos utilizar un ID compuesto
# Por otro lado el ensayo anterior utiliza UUIDs para eso, lo que me parece mejor
# From y To salen de los datos

# LabTest -> LabTestID (SRK-123-1234), el resto de los datos no estan en el informe


StatementMeta(, c7eb8af5-3089-4721-b463-0e678b71a636, 8, Finished, Available, Finished)

In [8]:
##
# Construye las tablas si es que no existen - VERSIÓN CORREGIDA
##
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType

# PSD
psd_schema = StructType([
    StructField("LabTestID", StringType(), nullable=False),
    StructField("PSDType", StringType(), nullable=True),
    StructField("SieveSize_mm", StringType(), nullable=False),
    StructField("PercentPassing", DoubleType(), nullable=True),  # CAMBIO: DoubleType y nullable=True
    StructField("createdAt", TimestampType(), nullable=False)
])

# Atterberg
atterberg_schema = StructType([
    StructField("LabTestID", StringType(), nullable=False),
    StructField("LiquidLimit", DoubleType(), nullable=True),  # CAMBIO: DoubleType
    StructField("PlasticLimit", DoubleType(), nullable=True),  # CAMBIO: DoubleType
    StructField("PlasticityIndex", DoubleType(), nullable=True),  # CAMBIO: DoubleType
    StructField("ContractionPerc", DoubleType(), nullable=True),  # CAMBIO: DoubleType
    StructField("Classification", StringType(), nullable=True),
    StructField("createdAt", TimestampType(), nullable=False)
])

# GrainSize
grain_size_schema = StructType([
    StructField("LabTestID", StringType(), nullable=False),
    StructField("CobblePercent", DoubleType(), nullable=True),  # CAMBIO: DoubleType
    StructField("GravelPercent", DoubleType(), nullable=True),  # CAMBIO: DoubleType
    StructField("SandPercent", DoubleType(), nullable=True),  # CAMBIO: DoubleType
    StructField("FinePercent", DoubleType(), nullable=True),  # CAMBIO: DoubleType
    StructField("SiltPercent", DoubleType(), nullable=True),  # CAMBIO: DoubleType
    StructField("ClayPercent", DoubleType(), nullable=True),  # CAMBIO: DoubleType
    StructField("createdAt", TimestampType(), nullable=False)
])

# Crea las tablas si es que no existen ya
spark.createDataFrame([], psd_schema).write.format("delta").mode("ignore").saveAsTable("PSD")
spark.createDataFrame([], atterberg_schema).write.format("delta").mode("ignore").saveAsTable("Atterberg")
spark.createDataFrame([], grain_size_schema).write.format("delta").mode("ignore").saveAsTable("GrainSize")

StatementMeta(, c7eb8af5-3089-4721-b463-0e678b71a636, 9, Finished, Available, Finished)

In [9]:
##
# Leemos los datos que se van a procesar - VERSIÓN CORREGIDA
##
import pandas as pd
import numpy as np
from uuid import uuid4
from datetime import datetime

# Se leen desde la tabla de bronce todos los datos que corresponden a esta fuente de datos
PSD_SOURCE_NAME = "SRK-169-Summary Sheet.pdf"

psd_df = spark.read.table("bronze_shortcut.data_PSD").where(f"Data_filename = '{PSD_SOURCE_NAME}'").toPandas()
atter_df = spark.read.table("bronze_shortcut.data_atterberg").where(f"Data_filename = '{PSD_SOURCE_NAME}'").toPandas()

# Función para limpiar valores numéricos
def clean_numeric_value(value):
    """Convierte valores a numérico manejando casos especiales"""
    if pd.isna(value) or value in ['-', 'NP', '']:
        return np.nan
    try:
        return float(value)
    except (ValueError, TypeError):
        return np.nan

def build_psd_records(full_psd, row_i):
    """Toma una de las filas de la tabla de bronce y la convierte 
    en un conjunto de valores de la tabla de plata"""
    # Solo queremos los valores
    sieve_sizes = list(set(full_psd.columns) - set(['Data_filename', 'Depth_m', 'GM', 'Lab_No', 'Sample']))
    df = full_psd.loc[:, sieve_sizes].iloc[row_i]

    # Transponemos y movemos el indice a una columna
    df = df.T.reset_index()

    # Renombramos las columnas para que matcheen con la tabla de PSD
    df = df.rename(columns={"index": "SieveSize_mm", row_i: "PercentPassing"})
    
    # CAMBIO: Limpieza de valores numéricos
    df["PercentPassing"] = df["PercentPassing"].apply(clean_numeric_value)

    # Agregamos los valores que faltan
    df["LabTestID"] = full_psd.iloc[row_i]["Lab_No"]
    df["PSDType"] = ""
    df["createdAt"] = datetime.now()

    return df

def build_PSD_table(df):
    """Convierte todos los datos de PSD en registros de la tabla de plata"""
    all_psd = [
        build_psd_records(df, i)
        for i in range(len(df))
    ]
    
    return pd.concat(all_psd, ignore_index=True)

def build_atterberg_table(df):
    """Convierte todos los datos de Atterberg en registros de la tabla de plata"""
    # Solo las columnas que nos interesan
    att = df.loc[:, ["Lab_No", "LL_%", "PI_%", "LS_%"]].copy()

    # CAMBIO: Limpieza de valores numéricos
    for col in ['LL_%', 'PI_%', 'LS_%']:
        att[col] = att[col].apply(clean_numeric_value)

    # Son renombradas
    att = att.rename(columns={
        "Lab_No": "LabTestID", 
        "LL_%": "LiquidLimit", 
        "PI_%": "PlasticLimit", 
        "LS_%": "ContractionPerc"
    })

    # Agregamos las que faltan
    att["PlasticityIndex"] = 0
    att["Classification"] = ""
    att["createdAt"] = datetime.now()
    
    return att

def _calc_depths(depths):
    """Calcula las profundidades "From", "Middle" y "To", las devuelve en tuplas"""
    # Si solo tengo una profundidad; son iguales
    if "-" not in depths:
        try:
            depth = float(depths.strip())
            return depth, depth, depth
        except:
            return np.nan, np.nan, np.nan
    
    # Tengo dos profundidades, calculo la media
    try:
        profundidades = [float(d.strip()) for d in depths.split("-")]
        prof_media = sum(profundidades)/2
        return profundidades[0], prof_media, profundidades[1]
    except:
        return np.nan, np.nan, np.nan

def build_tables(df):
    """Construye las tablas "LabTest" y "Sample" para la capa de plata"""
    # Solo nos interesan ciertas columnas
    df = df.loc[:, ["Lab_No", "Sample", "Depth_m"]].rename(columns={"Lab_No": "LabTestID", "Sample": "Comment"}).copy()

    # Calculamos los valores que nos faltan
    df["SampleID"] = df.apply(lambda r: str(uuid4()), axis=1)
    df["uuid"] = df.apply(lambda r: str(uuid4()), axis=1)
    
    # CAMBIO: Manejo de errores en profundidades
    depths = df["Depth_m"].apply(_calc_depths)
    df["DepthFrom_m"] = depths.apply(lambda x: x[0])
    df["MiddleDepth_m"] = depths.apply(lambda x: x[1])
    df["DepthTo_m"] = depths.apply(lambda x: x[2])

    # Some empty but mandatory columns
    df["TestType"] = ""
    df["TestDate"] = None
    df["ReceivedDate"] = None
    df["LocationID"] = None
    df["SampleType"] = None
    df["MaterialType"] = None
    df["State"] = None
    df["Laboratory"] = None
    df["createdAt"] = datetime.now()

    labtest = df.loc[:, [
        "uuid", "LabTestID", "SampleID", "TestType", "Comment", 
        "TestDate", "ReceivedDate", "createdAt"
    ]]
    sample =  df.loc[:, [
        "SampleID", "DepthFrom_m", "MiddleDepth_m", "DepthTo_m", 
        "Comment", "LocationID", "SampleType", "MaterialType", 
        "State", "Laboratory", "createdAt"
    ]]

    return labtest, sample

PSD_table_data = build_PSD_table(psd_df)
Atterberg_table_data = build_atterberg_table(atter_df)

def verificar_esquema(df, tabla):
    df_columns = set(df.columns)
    tabla_columns = set(spark.table(tabla).columns)
    
    print(f"\nVerificación para {tabla}:")
    print(f"Columnas en DF faltantes en tabla: {df_columns - tabla_columns}")
    print(f"Columnas en tabla faltantes en DF: {tabla_columns - df_columns}")
    
    return df_columns == tabla_columns

verificar_esquema(Atterberg_table_data, "Atterberg")
verificar_esquema(PSD_table_data, "PSD")

labtest_table, sample_table = build_tables(psd_df)

StatementMeta(, c7eb8af5-3089-4721-b463-0e678b71a636, 10, Finished, Available, Finished)


Verificación para Atterberg:
Columnas en DF faltantes en tabla: {'ContractionPerc'}
Columnas en tabla faltantes en DF: {'uuid'}

Verificación para PSD:
Columnas en DF faltantes en tabla: set()
Columnas en tabla faltantes en DF: {'uuid'}


In [10]:
##
# Guardamos los datos en las tablas de plata
##
from delta.tables import DeltaTable
from pyspark.sql.functions import expr
from uuid import uuid4

def generate_uuid_with_dashes():
    """Función para generar UUIDs con guiones"""
    return str(uuid4())

# Registramos la UDF
spark.udf.register("generate_uuid_with_dashes", generate_uuid_with_dashes)

def guardar_datos(datos, nombre_tabla):
    """Guarda los datos en la tabla dada."""
    nuevos_datos = spark.createDataFrame(datos)
    
    if "uuid" not in nuevos_datos.columns:
        nuevos_datos = nuevos_datos.withColumn("uuid", expr("generate_uuid_with_dashes()"))
    
    columnas = nuevos_datos.columns

    # Condición de merge dinámica
    merge_condition = ("old.LabTestID = new.LabTestID" if nombre_tabla in ["PSD", "Atterberg", "LabTest"] 
                      else "old.SampleID = new.SampleID")

    DeltaTable.forName(spark, nombre_tabla).alias("old").merge(
        nuevos_datos.alias("new"),
        merge_condition
    ).whenNotMatchedInsertAll().execute()

# Procesamiento
guardar_datos(PSD_table_data, "PSD")
guardar_datos(Atterberg_table_data, "Atterberg")
guardar_datos(labtest_table, "LabTest")
guardar_datos(sample_table, "Sample")

StatementMeta(, c7eb8af5-3089-4721-b463-0e678b71a636, 11, Finished, Available, Finished)

In [14]:
%%sql
select * from psd limit 5

StatementMeta(, c7eb8af5-3089-4721-b463-0e678b71a636, 15, Finished, Available, Finished)

<Spark SQL result set with 5 rows and 6 fields>