# BUENAS PRACTICAS

Durante mi experiencia trabajando con Apache Spark en entornos como Databricks, fui recopilando aprendizajes y buenas prácticas a partir del feedback de compañeros, líderes y también de mi propio trabajo diario.

Con el tiempo, esto me permitió desarrollar mi propio notebook de buenas prácticas, que hoy utilizo como base para optimizar y estandarizar procesos.

Gracias a estas mejoras, logré agilizar ejecuciones que antes tardaban más de una hora, reduciéndolas a solo 20-30 minutos, aportando eficiencia y valor al equipo.

## FORMATO DE LECTURA

In [0]:
try:
    registros_c = spark.sql(f"select * from {catalogo}.{schema_name}.{table_name} limit 1;")
    print("Carga Filtrada")
except:
    registros_c = False
    print("Carga Historica")

### Forma Correcta

In [0]:
pais = dbutils.widgets.get("pais")
paises = [p.strip() for p in pais.split(",")]

#filtramos el pais y las columnas
index = index.select("pais", "fecha", "columna3", "columna4") \
      .filter(
          F.col("pais").isin(paises) &
          (~F.col("columna3").rlike("(?i)P(R)?UEBA(S)?"))
      )

try:
    if registros_c.count() != 0:
        # 1. Obtener las últimos 3 fechas por país
        w = Window.partitionBy("pais").orderBy(F.desc("fecha"))
        top3 = index.select("pais", "fecha").distinct() \
            .withColumn("rank", F.row_number().over(w)) \
            .filter(F.col("rank") <= 3) \
            .drop("rank")

        # 4) Mantener solo las filas del índice que están dentro del top3 de su país
        index = index.join(top3, on=["pais", "fecha"], how="inner")

        print("Carga Filtrada")
    else:
        raise Exception("Carga Historica")
except:
  index = index
  print("Carga Historica")

## FORMATO DE ESCRITURA

### Forma Correcta

In [0]:
try:
    if registros_c.count() != 0:
        tuplas_df = df.select("pais", "fecha").distinct()

        rows = tuplas_df.select("pais", "fecha").dropna().collect()

        lista_de_tuplas = [((row["pais"]), row["fecha"]) for row in rows]

        # Generamos el string final
        cadena_in = ', '.join([str(t) for t in lista_de_tuplas])

        print(f"Cadena: {cadena_in}")
        replace_condition = f"(pais, fecha) IN ({cadena_in})"

        df.write \
            .mode("overwrite") \
                .option("replaceWhere", replace_condition) \
                    .saveAsTable(f"{catalogo}.{schema_name}.{table_name}")
        print("Se actualizo la tabla")
    else:
        raise Exception("Carga Histórica")
except:
    print("📦 Ejecutando carga histórica completa")
    df.write \
        .mode("append") \
            .saveAsTable(f"{catalogo}.{schema_name}.{table_name}")
    print("Se completo la carga historica")

### Forma Incorrecta 