In [1]:
setwd(fs::path_abs("~/Local_Workspace/TesiMag"))
# library(sf, warn.conflicts = FALSE)
# library(vroom, warn.conflicts = FALSE)
# library(tsibble, warn.conflicts = FALSE)
# library(tidyr, warn.conflicts = FALSE)
library(ggplot2, warn.conflicts = FALSE)
# library(forcats, warn.conflicts = FALSE)
library(arrow, warn.conflicts = FALSE)
# library(data.table, warn.conflicts = FALSE)
# library(dtplyr, warn.conflicts = FALSE)
library(dplyr, warn.conflicts = FALSE)
library(lubridate, warn.conflicts = FALSE)
source("src/paths/paths.R")
# source("src/load/load.R")
# source("src/pairing/utils.R")


In [2]:
path.lom <- file.path(path.ds, "ARPA", "LOMBARDIA")
path.md <- file.path(path.lom, "Stazioni_Meteorologiche.tsv")

metadata <- read_tsv_arrow(path.md,
    col_types = schema(
        IdSensore = uint32(),
        IdStazione = uint32(),
        Tipologia = dictionary(),
        Provincia = dictionary(),
        DataStart = timestamp(unit = "s"),
        DataStop = timestamp(unit = "s"),
        Quota = float(),
        lng = float(),
        lat = float()
    ),
    timestamp_parsers = "%d/%m/%Y",
    as_data_frame = FALSE,
    col_select = c("IdSensore", "IdStazione", "Tipologia", "Provincia", "DataStart", "DataStop", "Quota", "lng", "lat")
) |>
    filter(Tipologia == "Temperatura") |>
    select(-Tipologia) |>
    mutate(across(starts_with("Data"), ~ cast(., date32())))
# filter(Tipologia == "Temperatura") |>
# select(!c(Tipologia, starts_with("UTM"), ends_with("Misura"), location, Storico)) |>
# mutate(DataStart = as.IDate(DataStart, format = "%d/%m/%Y"), DataStop = as.IDate(DataStop, format = "%d/%m/%Y"))


In [3]:
na_strings <- c("", "NA", "9999.0", "-9999.0", "-9999", "NV", "NC", "ND")

read_data.vroom <- function(file, ...) {
    vroom(
        file.path(path.lom, paste0(file, ".csv")),
        delim = ",",
        col_types = cols(),
        na = na_strings,
        ...
    ) |> rename(Variable = idOperatore, Time = Data)
}

read_data.fread <- function(file, ...) {
    fread(file.path(path.lom, paste0(file, ".csv")),
        sep = ",",
        colClasses = c(
            "IdSensore" = "integer",
            "Data" = "character",
            "Valore" = "double",
            "Stato" = "factor",
            "idOperatore" = "character"
        ),
        na.strings = na_strings,
        ...
    )
}

read_data.arrow <- function(file, ...) {
    read_csv_arrow(file.path(path.lom, paste0(file, ".csv")),
        schema = schema(
            IdSensore = uint32(),
            Data = timestamp(unit = "s"),
            Valore = float(),
            Stato = dictionary(),
            idOperatore = uint8()
        ),
        timestamp_parsers = c("%d/%m/%Y %H:%M:%S", "%d/%m/%Y %H:%M:%S.000"),
        na = na_strings,
        col_names = TRUE,
        read_options = list(use_threads = TRUE, skip_rows = 0),
        as_data_frame = FALSE,
        ...
    ) |> rename(Variabile = idOperatore, Time = Data)
}

load_data <- function(data, ...) {
    data |>
        semi_join(metadata, by = "IdSensore") |>
        # drop_na(Stato) |>
        mutate(Time = dmy_hms(Data), Variabile = fct(idOperatore, levels = c("2", "1", "3", "4")) |> fct_recode(..., PRECIP = "4"), .keep = "unused") |>
        arrange(IdSensore, Variabile, Time) |>
        mutate(Time = lag(Time), .by = c(IdSensore, Variabile)) |>
        # drop_na(Time) |>
        arrange(IdSensore, Time, Variabile)
}

load_data.arrow <- function(data, ...) {
    data |>
        semi_join(metadata, by = "IdSensore") |>
        arrange(IdSensore, Variabile, Time) |>
        group_by(IdSensore, Variabile) |>
        mutate(Time = Time + as.difftime(1, unit = "secs")) |>
        arrange(IdSensore, Time, Variabile)
}

compute_daily_stats <- function(data, extremes) {
    # if (extremes) {
    #     tmin <- "T_MIN"
    #     tmax <- "T_MAX"
    # } else {
    #     tmin <- "T_AVG"
    #     tmax <- "T_AVG"
    # }
    data |>
        # as_tsibble(key = IdSensore, index = Time) |>
        # group_by_key() |>
        # index_by(date = ~ lubridate::date(.)) |>
        group_by(IdSensore, date = lubridate::date(Time)) |>
        summarise(T_MIN = min(Valore, na.rm = TRUE), T_MAX = max(Valore, na.rm = TRUE), .groups = "drop_last")
}


In [14]:
lom.schema <- schema(
    IdSensore = uint32(),
    Data = timestamp(unit = "s"),
    Valore = float32(),
    Stato = dictionary(),
    idOperatore = uint8()
)


In [9]:
# open_csv_dataset(fs::dir_ls(path.lom, glob = "*.csv"),
#     hive_style = FALSE,
#     schema = lom.schema,
#     timestamp_parsers = c("%d/%m/%Y %H:%M:%S", "%d/%m/%Y %H:%M:%S.000"),
#     na = na_strings,
#     col_names = TRUE,
#     skip = 1
# ) |>
#     semi_join(metadata, by = "IdSensore") |>
#     arrange(IdSensore, Data, idOperatore) |>
#     collect() |>
#     mutate(Stato = factor(Stato, levels = c("VA"))) |>
#     rename(Time = Data) |>
#     write_dataset(file.path(path.lom, "dataset"),
#         format = "ipc",
#         partitioning = "IdSensore"
#     )


In [6]:
# write_parquet(
#     ds,
#     file.path(path.lom, "full.parquet")
# )


In [20]:
data <- open_dataset(file.path(path.lom, "dataset"),
    format = "ipc"
)
