In [None]:
# Install dependencies
%pip install --upgrade pip 
%pip install pandas pyarrow fastparquet

In [None]:
import pandas as pd
import json
from datetime import *

In [None]:
data_dir = "../data"

In [None]:
dfs = []

In [None]:
for file in ["postgres2_datos-diarios-antiguo_999999999.json","postgres2_datos-diarios-nuevo-2024_999999999.json","postgres2_ultimas-x-horas-antiguo_9999999.json","postgres2_ultimas-x-horas-nuevo_9999999.json"]:
    df = pd.read_json(f"{data_dir}/{file}")
    metadata = f"metadata-{file}"
    metadata = json.load(open(f"{data_dir}/{metadata}"))
    df["source"] = metadata["source"]
    df["fecha_hora_extraccion"] = metadata["timestamp"]

    dfs.append(df)

In [None]:
for file in ["postgres2_datos.json"]:
    df = pd.read_json(f"{data_dir}/{file}").T
    df = df.reset_index()
    df.rename(columns={"index": "estacion"}, inplace=True)

    metadata = f"metadata-{file}"
    metadata = json.load(open(f"{data_dir}/{metadata}"))
    df["source"] = metadata["source"]
    df["fecha_hora_extraccion"] = metadata["timestamp"]

    dfs.append(df)

In [None]:
df = pd.concat(dfs, ignore_index=True)

In [None]:
df = df.drop(columns=["fecha_formateada","hora_formateada"])

In [None]:
def obtener_estacion(source):
    if "antiguo" in source:
        return "antiguo"
    elif "nuevo" in source:
        return "nuevo"

In [None]:
df["estacion"] = df[["source", "estacion"]].apply(lambda x: x["estacion"] if pd.notnull(x["estacion"]) else obtener_estacion(x["source"]), axis=1)

In [None]:
def obtener_modalidad(source):
    if "horas" in source:
        return "horas"
    elif "diarios" in source:
        return "diarios"
    return "datos"

In [None]:
df["modalidad"] = df["source"].apply(obtener_modalidad)

In [None]:
df["id_parametro"] = df["id_parametro"].astype(str) + "_" + df["estacion"]

In [None]:
columns = {
    "id_parametro": str,
    "valor_original": float,
    "valor_ica": float,
    "promedio_24hrs": float,
    "valor_calculado_x2": float,
    "valor_medido": float,
    "observaciones": str,
    "modalidad": str,
    "estacion": str,
    "fecha_hora_extraccion": lambda x: (
        datetime.fromisoformat(x.replace("Z", "+00:00"))
        if pd.notnull(x) and isinstance(x, str)
        else x
    ),
    "fecha_hora_registro": lambda x: (
        pd.to_datetime(x, utc=True)
        if pd.notnull(x) and isinstance(x, str)
        else x
    ),
    "fecha_hora_calculo": lambda x: (
        datetime.fromisoformat(x.replace("Z", "+00:00"))
        if pd.notnull(x) and isinstance(x, str)
        else x
    ),
}

In [None]:
for col_name, conv in columns.items():
    if callable(conv) and not isinstance(conv, type):
        df[col_name] = df[col_name].apply(conv)
    else:
        df[col_name] = df[col_name].astype(conv)

In [None]:
df = df[list(columns.keys())]

In [None]:
df

In [None]:
df.to_parquet(f"{data_dir}/sensores.parquet", index=False)