In [1]:
from io import BytesIO
from os import remove
from os.path import join
from pathlib import Path
from zipfile import ZipFile

import pandas as pd
from loguru import logger
from requests import get


def get_daily_temp_history(
    input_path: str,
    station_id: int,
) -> pd.DataFrame:
    """Gets the daily temperature history for a given station.

    Parameters
    ----------
    input_path : str
        Path to the directory where the data will be saved.
    station_id : str
        Chilean national code for the station.

    Returns
    -------
    pd.DataFrame
        Daily temperature history for the given station.
    """
    logger.info(f"getting/updating daily temperature history for station {station_id}...")

    # download and uncompress zip file
    zip_file_url = f"https://climatologia.meteochile.gob.cl/application/datos/getDatosSaclim/{station_id}_XXXX_DiarioTs_"  # noqa: E501
    req = get(
        zip_file_url,
        stream=True,
    ).content

    zip_file = ZipFile(BytesIO(req))
    zip_info = zip_file.infolist()[0]
    zip_info.filename = "daily_temp_history.csv"

    extract_path = f"{input_path}/stations/{station_id}"
    zip_file.extract(zip_info, path=extract_path)

    # # replace ',' with ';' in .csv file to allow correct column separation
    # history_path = f"{input_path}/stations/{station_id}/{zip_info.filename}"

    # with open(history_path, "r") as f:
    #     lines = f.readlines()
    #     lines = map(lambda x: x.replace(",", ";"), lines)

    # with open(history_path, "w") as f:
    #     f.writelines(lines)

    # # read and preprocess history data
    # h_data = pd.read_csv(history_path, sep=";")

    # col_names = {
    #     "momento": "date",
    #     "MediaCli_Valor": "cond_mean_temp",
    #     "MediaAri_Valor": "mean_temp",
    #     "NumDatos_Valor": "hourly_data_count",
    #     "Ts00_Valor": "00_temp",
    #     "Ts12_Valor": "12_temp",
    #     "Maxima_Valor": "max_temp",
    #     "FechaMax_Valor": "max_temp_date",
    #     "Minima_Valor": "min_temp",
    #     "FechaMin_Valor": "min_temp_date",
    #     "FechaPro_Valor": "process_date",
    # }
    # reordered_cols = [
    #     "min_temp",
    #     "max_temp",
    #     "mean_temp",
    #     "cond_mean_temp",
    #     "hourly_data_count",
    #     "00_temp",
    #     "12_temp",
    #     "min_temp_date",
    #     "max_temp_date",
    #     "process_date",
    # ]

    # # parse 'date' column
    # h_data["momento"] = pd.to_datetime(h_data["momento"], format="%d-%m-%Y %H:%M:%S")

    # h_data = (
    #     h_data.drop(columns=["CodigoNacional"])
    #     .rename(columns=col_names)
    #     .set_index("date")
    #     .sort_index()[reordered_cols]
    # )
    # h_data.index = h_data.index.normalize()

    # # save history data
    # max_date = h_data.index.max().strftime("%Y%m%d")
    # file_path = f"{input_path}/stations/{station_id}/daily_temp_history/{max_date}_update.parquet"
    # h_data.to_parquet(file_path)

    # # remove old .csv and .parquet files
    # remove(history_path)
    # for file in Path(f"{input_path}/stations/{station_id}/daily_temp_history").glob("*.parquet"):
    #     if file.name != f"{max_date}_update.parquet":
    #         remove(file)

    # logger.info(f"daily temperature history for station {station_id} successfully saved")


get_daily_temp_history("data", 330020)


[32m2023-10-29 20:36:06.473[0m | [1mINFO    [0m | [36m__main__[0m:[36mget_daily_temp_history[0m:[36m30[0m - [1mgetting/updating daily temperature history for station 330020...[0m
