Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ dashboard:
Rscript scripts/dashboard.R

update_site:
Rscript -e "source('R/utils.R'); update_site()"
Rscript -e "suppressPackageStartupMessages(source(here::here('R', 'load_all.R'))); update_site()"

netlify:
netlify deploy --dir=reports --prod
91 changes: 91 additions & 0 deletions R/aux_data_utils.R
Original file line number Diff line number Diff line change
Expand Up @@ -617,3 +617,94 @@ process_nhsn_data <- function(raw_nhsn_data) {
mutate(version = fixed_version) %>%
relocate(geo_value, disease, time_value, version)
}


# for filenames of the form nhsn_data_2024-11-19_16-29-43.191649.rds
get_version_timestamp <- function(filename) ymd_hms(str_match(filename, "[0-9]{4}-..-.._..-..-..\\.[^.^_]*"))

#' all in one function to get and cache a nhsn archive from raw files
#' @description
#' This takes in all of the raw data files for the nhsn data, creates a
#' quasi-archive (it keeps one example per version-day, rather than one per
#' change), and puts it in the bucket. The stored value has the columns
#' geo_value, time_value, disease, endpoint (either basic or prelim), version,
#' version_timestamp (to enable keeping the most recent value), and value.
#' The returned value on the other hand is an actual epi_archive, only
#' containing the data for `disease_name`.
create_nhsn_data_archive <- function(disease_name) {
if (aws.s3::head_object("archive_timestamped.parquet", bucket = "forecasting-team-data")) {
aws.s3::save_object("archive_timestamped.parquet", bucket = "forecasting-team-data", file = here::here("cache/archive_timestamped.parquet"))
previous_archive <- qs::qread(here::here("cache/archive_timestamped.parquet"))
last_timestamp <- max(previous_archive$version_timestamp)
} else {
# there is no remote
previous_archive <- NULL
last_timestamp <- as.Date("1000-01-01")
}
new_data <- aws.s3::get_bucket_df(bucket = "forecasting-team-data", prefix = "nhsn_data_") %>%
filter(get_version_timestamp(Key) > last_timestamp) %>%
pull(Key) %>%
lapply(
function(filename) {
version_timestamp <- get_version_timestamp(filename)
res <- NULL
tryCatch(
{
s3load(object = filename, bucket = "forecasting-team-data")
if (grepl("prelim", filename)) {
res <- epi_data_raw_prelim
endpoint_val <- "prelim"
} else {
res <- epi_data_raw
endpoint_val <- "basic"
}
res <- res %>%
process_nhsn_data() %>%
select(geo_value, disease, time_value, value) %>%
mutate(version_timestamp = version_timestamp, endpoint = endpoint_val)
},
error = function(cond) {}
)
res
}
)
# drop any duplicates on the same day
compactified <-
new_data %>%
bind_rows()
if (nrow(compactified) == 0) {
one_per_day <- previous_archive
} else {
compactified <-
compactified %>%
arrange(geo_value, time_value, disease, endpoint, version_timestamp) %>%
mutate(version = as.Date(version_timestamp)) %>%
filter(if_any(
c(everything(), -endpoint, -version_timestamp), # all non-version, non-endpoint columns
~ !epiprocess:::is_locf(., .Machine$double.eps^0.5)
))

unchanged <- previous_archive %>% filter(!(version %in% unique(compactified$version)))
# only keep the last value for a given version (so across version_timestamps)
# we only need to do this for the versions in compactified, as the other versions can't possibly change
one_per_day <-
previous_archive %>%
filter(version %in% unique(compactified$version)) %>%
bind_rows(compactified) %>%
group_by(geo_value, disease, time_value, version) %>%
arrange(version_timestamp) %>%
filter(row_number() == n()) %>%
ungroup() %>%
bind_rows(unchanged)
qs::qsave(one_per_day, here::here("cache/archive_timestamped.parquet"))
aws.s3::put_object(here::here("cache/archive_timestamped.parquet"), "archive_timestamped.parquet", bucket = "forecasting-team-data")
}
one_per_day %>%
filter(disease == disease_name) %>%
select(-version_timestamp, -endpoint, -disease) %>%
mutate(
geo_value = ifelse(geo_value == "usa", "us", geo_value)
) %>%
filter(geo_value != "mp") %>%
as_epi_archive(compactify = TRUE)
}
32 changes: 17 additions & 15 deletions R/forecasters/forecaster_climatological.R
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
climate_linear_ensembled <- function(epi_data,
outcome,
extra_sources = "",
ahead = 7,
trainer = parsnip::linear_reg(),
quantile_levels = covidhub_probs(),
filter_source = "",
filter_agg_level = "",
scale_method = c("quantile", "std", "none"),
center_method = c("median", "mean", "none"),
nonlin_method = c("quart_root", "none"),
drop_non_seasons = FALSE,
residual_tail = 0.99,
residual_center = 0.35,
...) {
outcome,
extra_sources = "",
ahead = 7,
trainer = parsnip::linear_reg(),
quantile_levels = covidhub_probs(),
filter_source = "",
filter_agg_level = "",
scale_method = c("quantile", "std", "none"),
center_method = c("median", "mean", "none"),
nonlin_method = c("quart_root", "none"),
drop_non_seasons = FALSE,
residual_tail = 0.99,
residual_center = 0.35,
...) {
scale_method <- arg_match(scale_method)
center_method <- arg_match(center_method)
nonlin_method <- arg_match(nonlin_method)
Expand Down Expand Up @@ -49,7 +49,9 @@ climate_linear_ensembled <- function(epi_data,
}
learned_params <- calculate_whitening_params(season_data, outcome, scale_method, center_method, nonlin_method)
epi_data %<>% data_whitening(outcome, learned_params, nonlin_method)
epi_data <- epi_data %>% select(geo_value, source, time_value, season, value = !!outcome) %>% mutate(epiweek = epiweek(time_value))
epi_data <- epi_data %>%
select(geo_value, source, time_value, season, value = !!outcome) %>%
mutate(epiweek = epiweek(time_value))
pred_climate <- climatological_model(epi_data, ahead) %>% mutate(forecaster = "climate")
pred_geo_climate <- climatological_model(epi_data, ahead, geo_agg = FALSE) %>% mutate(forecaster = "climate_geo")
pred_linear <- forecaster_baseline_linear(epi_data, ahead, residual_tail = residual_tail, residual_center = residual_center) %>% mutate(forecaster = "linear")
Expand Down
4 changes: 2 additions & 2 deletions R/forecasters/forecaster_flusion.R
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ flusion <- function(epi_data,
adding_source <- FALSE
if (!("source" %in% names(epi_data))) {
adding_source <- TRUE
epi_data$source <- c("none")
epi_data$source <- c("nhsn")
attributes(epi_data)$metadata$other_keys <- "source"
}
if ("season_week" %nin% names(epi_data) | "season" %nin% names(epi_data)) {
Expand Down Expand Up @@ -146,7 +146,7 @@ flusion <- function(epi_data,
preproc %<>%
add_role(all_of(starts_with("slide_value")), new_role = "pre-predictor")
# one-hot encoding of the data source
if (all(levels(epi_data$source) != "none") && dummy_source) {
if (all(levels(epi_data$source) != "nhsn") && dummy_source) {
preproc %<>% step_dummy(source, one_hot = TRUE, keep_original_cols = TRUE, role = "pre-predictor")
}
# one-hot encoding of location
Expand Down
11 changes: 7 additions & 4 deletions R/forecasters/forecaster_no_recent_outcome.R
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ no_recent_outcome <- function(epi_data,
adding_source <- FALSE
if (!("source" %in% names(epi_data))) {
adding_source <- TRUE
epi_data$source <- c("none")
epi_data$source <- c("nhsn")
attributes(epi_data)$metadata$other_keys <- "source"
}
if ("season_week" %nin% names(epi_data)) {
Expand All @@ -59,10 +59,13 @@ no_recent_outcome <- function(epi_data,
args_input[["quantile_levels"]] <- quantile_levels
args_list <- do.call(default_args_list, args_input)
# if you want to hardcode particular predictors in a particular forecaster
predictors <- extra_sources[[1]]
# TODO: Partial match quantile_level coming from here (on Dmitry's machine)
predictors <- c(outcome, extra_sources[[1]])
c(args_list, tmp_pred, trainer) %<-% sanitize_args_predictors_trainer(epi_data, outcome, predictors, trainer, args_list)

if (extra_sources[[1]] == "") {
predictors <- character()
} else {
predictors <- extra_sources[[1]]
}
# end of the copypasta
# finally, any other pre-processing (e.g. smoothing) that isn't performed by
# epipredict
Expand Down
2 changes: 1 addition & 1 deletion R/forecasters/forecaster_scaled_pop.R
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ scaled_pop <- function(epi_data,
scale_method = c("none", "quantile", "std"),
center_method = c("median", "mean", "none"),
nonlin_method = c("quart_root", "none"),
trainer = parsnip::linear_reg(),
trainer = epipredict::quantile_reg(),
quantile_levels = covidhub_probs(),
filter_source = "",
filter_agg_level = "",
Expand Down
2 changes: 1 addition & 1 deletion R/forecasters/forecaster_scaled_pop_seasonal.R
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ scaled_pop_seasonal <- function(epi_data,
seasonal_backward_window = 5 * 7,
seasonal_forward_window = 3 * 7,
train_residual = FALSE,
trainer = parsnip::linear_reg(),
trainer = epipredict::quantile_reg(),
quantile_levels = covidhub_probs(),
filter_source = "",
filter_agg_level = "",
Expand Down
2 changes: 1 addition & 1 deletion R/forecasters/forecaster_smoothed_scaled.R
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ smoothed_scaled <- function(epi_data,
adding_source <- FALSE
if (!("source" %in% names(epi_data))) {
adding_source <- TRUE
epi_data$source <- c("none")
epi_data$source <- c("nhsn")
attributes(epi_data)$metadata$other_keys <- "source"
}
args_input[["ahead"]] <- ahead
Expand Down
38 changes: 28 additions & 10 deletions R/utils.R
Original file line number Diff line number Diff line change
Expand Up @@ -172,18 +172,19 @@ data_substitutions <- function(dataset, disease, forecast_generation_date) {
}

parse_prod_weights <- function(filename = here::here("covid_geo_exclusions.csv"),
gen_forecast_date) {
forecast_date_int, forecaster_fns) {
forecast_date <- as.Date(forecast_date_int)
all_states <- c(
unique(readr::read_csv("https://raw.githubusercontent.com/cmu-delphi/covidcast-indicators/refs/heads/main/_delphi_utils_python/delphi_utils/data/2020/state_pop.csv", show_col_types = FALSE)$state_id),
"usa", "us"
)
all_prod_weights <- readr::read_csv(filename, comment = "#", show_col_types = FALSE)
# if we haven't set specific weights, use the overall defaults
useful_prod_weights <- filter(all_prod_weights, forecast_date == gen_forecast_date)
useful_prod_weights <- filter(all_prod_weights, forecast_date == forecast_date)
if (nrow(useful_prod_weights) == 0) {
useful_prod_weights <- all_prod_weights %>%
filter(forecast_date == min(forecast_date)) %>%
mutate(forecast_date = gen_forecast_date)
mutate(forecast_date = forecast_date)
}
useful_prod_weights %>%
mutate(
Expand All @@ -196,7 +197,7 @@ parse_prod_weights <- function(filename = here::here("covid_geo_exclusions.csv")
unnest_longer(forecaster) %>%
group_by(forecast_date, forecaster, geo_value) %>%
summarize(weight = min(weight), .groups = "drop") %>%
mutate(forecast_date = as.Date(forecast_date)) %>%
mutate(forecast_date = as.Date(forecast_date_int)) %>%
group_by(forecast_date, geo_value) %>%
mutate(weight = ifelse(near(weight, 0), 0, weight / sum(weight)))
}
Expand Down Expand Up @@ -280,7 +281,8 @@ write_submission_file <- function(pred, forecast_reference_date, submission_dire
#' Utility to get the reference date for a given date. This is the last day of
#' the epiweek that the date falls in.
get_forecast_reference_date <- function(date) {
MMWRweek::MMWRweek2Date(epiyear(date), epiweek(date)) + 6
date <- as.Date(date)
MMWRweek::MMWRweek2Date(lubridate::epiyear(date), lubridate::epiweek(date)) + 6
}

update_site <- function() {
Expand All @@ -303,15 +305,31 @@ update_site <- function() {
stop("Template file does not exist.")
}
report_md_content <- readLines(template_path)

# Get the list of files in the reports directory
report_files <- dir_ls(reports_dir, regexp = ".*_prod.html")
report_files <- dir_ls(reports_dir, regexp = ".*_prod_on_.*.html")
report_table <- tibble(
filename = report_files,
dates = str_match_all(filename, "[0-9]{4}-..-..")
) %>%
unnest_wider(dates, names_sep = "_") %>%
rename(forecast_date = dates_1, generation_date = dates_2) %>%
mutate(
forecast_date = ymd(forecast_date),
generation_date = ymd(generation_date),
disease = str_match(filename, "flu|covid")
)

# Extract dates and sort files by date in descending order
report_files <- report_files[order(as.Date(str_extract(report_files, "\\d{4}-\\d{2}-\\d{2}")), decreasing = FALSE)]
# use the most recently generated forecast, and sort descending on the
# forecast date
used_reports <- report_table %>%
group_by(forecast_date, disease) %>%
arrange(generation_date) %>%
filter(generation_date == max(generation_date)) %>%
ungroup() %>%
arrange(forecast_date)

# Process each report file
for (report_file in report_files) {
for (report_file in used_reports$filename) {
file_name <- path_file(report_file)
file_parts <- str_split(fs::path_ext_remove(file_name), "_", simplify = TRUE)
date <- file_parts[1]
Expand Down
2 changes: 1 addition & 1 deletion scripts/covid_hosp_explore.R
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ if (!exists("ref_time_values_")) {
start_date <- as.Date("2023-10-04")
end_date <- as.Date("2024-04-24")
date_step <- 7L
#ref_time_values_ <- as.Date(c("2023-11-08", "2023-11-22"))
# ref_time_values_ <- as.Date(c("2023-11-08", "2023-11-22"))
}
time_value_adjust <- 3 # this moves the week marker from Saturday to Wednesday

Expand Down
Loading
Loading