In [None]:
import boto3
import pandas as pd
import awswrangler as wr
import io
from datetime import date, datetime


s3 = boto3.client("s3")

response = s3.list_object_versions(Bucket="cchc-dw-qa-raw", Prefix="boletin_concursal/")

versions = response["Versions"]

path = "s3://cchc-dw-dev-staging/boletin_concursal/insolvencias/"

In [None]:
for index, version in enumerate(versions):
    object = s3.get_object(Bucket="cchc-dw-qa-raw", VersionId=version["VersionId"], Key=version["Key"])
    df = pd.read_parquet(io.BytesIO(object["Body"].read()))

    particion = version["LastModified"]
    df["extraction_date"] = particion.strftime("%Y-%m-%d %H:%M:%S")
    df["year"] = particion.year
    df["month"] = str(particion.month).zfill(2)
    df["day"] = str(particion.day).zfill(2)

    wr.s3.to_parquet(
        df=df,
        path="s3://cchc-dw-dev-raw/_bolcon/insolvencias/",
        compression="gzip",
        partition_cols=["year", "month", "day"],
        mode="overwrite_partitions",
        dataset=True
    )


In [None]:
df = wr.s3.read_parquet("s3://cchc-dw-dev-raw/_bolcon/insolvencias/year=2023/month=02/day=03/")
staging = wr.s3.read_parquet("s3://cchc-dw-dev-staging/bolcon/insolvencias/")

In [None]:
df.rut.nunique()

In [None]:
anterior = s3.get_object(Bucket="cchc-dw-qa-raw", Key="boletin_concursal/insolvencia/01114390402146c5836f92357d21968a.gz.parquet", VersionId="r4DGYAh4Hb3Ak2LwAS14pLXQ6zBhNbGv")
actual = s3.get_object(Bucket="cchc-dw-qa-raw", Key="boletin_concursal/insolvencia/287a9570080c414baccf6a8244bb2ab6.gz.parquet", VersionId="AzY86YBCKZ.BqM_0IX7XhPxpfIzLGRJx")

In [None]:
anterior_df = pd.read_parquet(io.BytesIO(anterior["Body"].read()))
actual_df = pd.read_parquet(io.BytesIO(actual["Body"].read()))

In [None]:
def get_partitions() -> list:
    client = boto3.client("s3")

    response = client.list_objects(Bucket=f"cchc-dw-dev-raw", Prefix=f"_bolcon/insolvencias/")

    p_keys = []
    for k in response["Contents"]:
        p_keys.append(k["Key"].split("/")[2:-1])

    return p_keys

particiones = get_partitions()

In [None]:
to_use = [datetime(int(x[0].split("=")[1]),  int(x[1].split("=")[1]), int(x[2].split("=")[1])).date() for x in particiones]

to_use = set(to_use)
to_use = list(to_use)
to_use.sort()
to_use
#to_use[-1]

In [None]:
# Primera ejecución

df = wr.s3.read_parquet("s3://cchc-dw-dev-raw/_bolcon/insolvencias/year=2022/month=11/day=07/", dataset=True)
df["fecha_publicacion"] = pd.to_datetime(df.fecha_publicacion, format="%d/%m/%Y")

df["fecha_ejecucion"] = pd.NA
df["proceso_finalizado"] = pd.NA

wr.s3.to_parquet(
    df=df,
    path="s3://cchc-dw-dev-staging/boletin_concursal/insolvencias/",
    dataset=True,
    mode="overwrite",
    table="bolcon_insolvencias_historico",
    database="staging_dev",
    schema_evolution=True,
    dtype={
        "fecha_ejecucion" : "string",
        "proceso_finalizado" : "boolean"
    }
)

In [None]:
def staging(fecha: date):
    _year, _month, _day = fecha.year, str(fecha.month).zfill(2), str(fecha.day).zfill(2)

    nuevos_df = wr.s3.read_parquet(f"s3://cchc-dw-dev-raw/_bolcon/insolvencias/year={_year}/month={_month}/day={_day}/", dataset=True)
    nuevos_df["fecha_publicacion"] = pd.to_datetime(nuevos_df.fecha_publicacion, format="%d/%m/%Y")

    anterior_df = wr.s3.read_parquet(path)

    ruts_anterior = anterior_df[anterior_df.proceso_finalizado.isnull()].rut.fillna("CORRECCION:" + anterior_df.rol).drop_duplicates().to_list()
    ruts_nuevos = nuevos_df.rut.fillna("CORRECCION:" + nuevos_df.rol).drop_duplicates().to_list()


    #Buscar los ruts en el dataframe antiguos que no existan en el nuevo dataframe para marcarlos como finalizados, o sea, ya no son insolventes
    ruts_finalizados = [x for x in ruts_anterior if x not in ruts_nuevos]

    anterior_df.loc[ anterior_df.rut.isin(ruts_finalizados), "fecha_ejecucion"] = fecha.strftime("%Y-%m-%d")
    anterior_df.loc[ anterior_df.rut.isin(ruts_finalizados),  "proceso_finalizado"] = True
    
    ruts_nuevos = [x for x in ruts_nuevos if x not in ruts_anterior]
    nuevos_procesos = nuevos_df[nuevos_df.rut.fillna("CORRECCION:" + nuevos_df.rol).isin(ruts_nuevos)].copy()

    
    df = pd.concat([anterior_df, nuevos_procesos])

    print("Agregados", len(ruts_nuevos), "registros")
    print(ruts_nuevos)
    print("Marcados", len(ruts_finalizados), "como finalizados")

    wr.s3.to_parquet(
        df=df,
        path=path,
        dataset=True,
        mode="overwrite",
        table="bolcon_insolvencias_historico",
        database="staging_dev",
        schema_evolution=True,
        dtype={
            "fecha_ejecucion" : "string",
            "proceso_finalizado" : "boolean"
        }
    )

#staging(to_use[-1])

In [None]:
from IPython.display import clear_output

In [None]:

for fecha in to_use[1:]:
    clear_output(wait=True)
    print("Ejecutando fecha", fecha)
    staging(fecha)

In [None]:
df = anterior_df.copy()
df.rut = df.rut.fillna("CORRECCION:" + df.rol)

df[df.rol == 'C-18204-2020']

In [None]:
# 1. Ver ruts del día actual que no están en el día anterior, por ende, están finalizado
# 2. Ver ruts del día anterior que no estén en el actual, por ende, append
ruts_anterior = anterior_df.rut.drop_duplicates().to_list()
ruts_actuales = actual_df.rut.drop_duplicates().to_list()

ruts_finalizados = [x for x in ruts_anterior if x not in ruts_actuales]

procesos_finalizados = anterior_df[anterior_df.rut.isin(ruts_finalizados)][["rut"]].drop_duplicates().copy()

procesos_finalizados["fecha_ejecucion"] = "2023-02-03"
procesos_finalizados["proceso_finalizado"] = True

# Punto 2

ruts_nuevos = [x for x in ruts_actuales if x not in ruts_anterior]

nuevos_procesos = anterior_df[anterior_df.rut.isin(ruts_nuevos)].copy()


# final = pd.concat([actual_df, procesos_finalizados])

# final

anterior_df.loc[anterior_df.rut.isin(ruts_finalizados), "fecha_ejecucion"] = "2023-02-03"
anterior_df.loc[ anterior_df.rut.isin(ruts_finalizados),  "proceso_finalizado"] = True

anterior_df[anterior_df.fecha_ejecucion.notnull()]