## Preliminaries

In [1]:
# install.packages("fpp3", repos = "https://cloud.r-project.org")

In [1]:
# Global settings
options(scipen=999)

Sys.setenv(PROJ_LIB = "/opt/conda/share/proj")
Sys.setenv(GDAL_DATA = "/opt/conda/share/gdal")

In [2]:
# Paths
ROOT_PATH <- '~/workspace'
CONFIG_PATH <- file.path(ROOT_PATH, 'configuration')
CODE_PATH <- file.path(ROOT_PATH, 'code')
DATA_PATH <- file.path(ROOT_PATH, 'data')
OUTPUT_DATA_PATH <- file.path(DATA_PATH, 'seasonality_rainfall')

In [3]:
# Load utils
source(file.path(CODE_PATH, "snt_utils.r"))

In [4]:
# List required pcks
required_packages <- c(
  "jsonlite",
  "data.table",
  "ggplot2",
  "fpp3",
  "arrow",
  "glue",
  "sf",
  "RColorBrewer",
  "httr",
  "reticulate"
)

# Execute function
install_and_load(required_packages)

            jsonlite           data.table              ggplot2 
    "jsonlite 1.8.9"  "data.table 1.17.8"      "ggplot2 3.5.2" 
                fpp3                arrow                 glue 
        "fpp3 1.0.2"       "arrow 18.1.0"         "glue 1.7.0" 
                  sf         RColorBrewer                 httr 
         "sf 1.0.20" "RColorBrewer 1.1.3"         "httr 1.4.7" 
          reticulate 
 "reticulate 1.44.1" 


In [5]:
Sys.setenv(RETICULATE_PYTHON = "/opt/conda/bin/python")
reticulate::py_config()$python
openhexa <- import("openhexa.sdk")

In [6]:
# Load SNT config
CONFIG_FILE_NAME <- "SNT_config.json"
config_json <- tryCatch({ fromJSON(file.path(CONFIG_PATH, CONFIG_FILE_NAME)) },
    error = function(e) {
        msg <- paste0("Error while loading configuration", conditionMessage(e))  
        cat(msg)   
        stop(msg) 
    })

msg <- paste0("SNT configuration loaded from  : ", file.path(CONFIG_PATH, CONFIG_FILE_NAME)) 
log_msg(msg)

# Set config variables
COUNTRY_CODE <- config_json$SNT_CONFIG$COUNTRY_CODE
era5_dataset <- config_json$SNT_DATASET_IDENTIFIERS$ERA5_DATASET_CLIMATE
dhis2_dataset <- config_json$SNT_DATASET_IDENTIFIERS$DHIS2_DATASET_FORMATTED

print(paste("Country code: ", COUNTRY_CODE))

[1] "SNT configuration loaded from  : ~/workspace/configuration/SNT_config.json"
[1] "Country code:  COD"


## Globals and parameters

**Parameters**

In [7]:
minimum_periods <- as.integer(48)
maximum_proportion_missings_overall <- 0.1
maximum_proportion_missings_per_district <- 0.2

In [8]:
# Parameters
minimum_month_block_size <- as.integer(3)
maximum_month_block_size <- as.integer(5)
threshold_for_seasonality <- 0.6
threshold_proportion_seasonal_years <- 0.5

**Fixed routine formatting columns**

In [10]:
# Global variables
type_of_seasonality <- "rainfall"
formatted_threshold_for_seasonality <- sprintf("%d%%", round(threshold_for_seasonality * 100))
data_source <- 'ERA5'
original_values_col <- 'MEAN'

# space and time columns
admin_level <- 'ADM2'
admin_id_col <- paste(admin_level, toupper('id'), sep = '_')
admin_name_col <- paste(admin_level, toupper('name'), sep = '_')
year_col <- 'YEAR'
month_col <- 'MONTH'
period_cols <- c(year_col, month_col)

In [11]:
possible_month_block_sizes <- as.integer(minimum_month_block_size:maximum_month_block_size)
formatted_threshold_for_seasonality <- sprintf("%d%%", round(threshold_for_seasonality * 100))
print(paste("Formatted threshold :",formatted_threshold_for_seasonality))

[1] "Formatted threshold : 60%"


## Load data

In [12]:
# Load spatial file from dataset
spatial_data_filename <- paste(COUNTRY_CODE, "shapes.geojson", sep = "_")
spatial_data <- get_latest_dataset_file_in_memory(dhis2_dataset, spatial_data_filename)
log_msg(glue("File {spatial_data_filename} successfully loaded from dataset version: {dhis2_dataset}"))

[1] "File downloaded successfully from dataset version: SNT_COD_20251120_1045"
File COD_shapes.geojson successfully loaded from dataset version: snt-dhis2-formatted


In [13]:
# Load rainfall data from dataset
rainfall_data_filename <- paste(COUNTRY_CODE, "total_precipitation_monthly.parquet", sep = "_")
original_dt <- get_latest_dataset_file_in_memory(era5_dataset, rainfall_data_filename)
log_msg(glue("File {rainfall_data_filename} successfully loaded from dataset version: {era5_dataset}"))

[1] "File downloaded successfully from dataset version: COD_snt_20250731_1458"
File COD_total_precipitation_monthly.parquet successfully loaded from dataset version: snt-era5-climate


In [14]:
# Columns formatting
admin_data <- st_drop_geometry(spatial_data)
setDT(admin_data)
common_cols <- names(admin_data)

seasonality_col <- glue('SEASONALITY', toupper(type_of_seasonality), .sep = "_")
season_duration_col <- glue('SEASONAL_BLOCK_DURATION', toupper(type_of_seasonality), .sep = "_")
season_start_month_col <- glue('SEASONAL_START_MONTH', toupper(type_of_seasonality), .sep = "_")
final_table_cols <- c(names(admin_data), seasonality_col, season_duration_col, season_start_month_col)
print(final_table_cols)

[1] "ADM1_NAME"                        "ADM1_ID"                         
[3] "ADM2_NAME"                        "ADM2_ID"                         
[5] "SEASONALITY_RAINFALL"             "SEASONAL_BLOCK_DURATION_RAINFALL"


**Create the containers for the data**

In [15]:
# Create an empty table if the analysis is stopped for lack of enough data
seasonality_cols <- c(seasonality_col, season_duration_col, season_start_month_col)
empty_dt <- copy(admin_data)[, (seasonality_cols) := NA]

## Preprocess input data

In [16]:
# format table
setDT(original_dt)
integer_cols <- c(year_col, month_col)
numeric_cols <- c(original_values_col)
original_dt[, (integer_cols) := lapply(.SD, as.integer), .SDcols = integer_cols]
# head(original_dt)

In [17]:
# keep only the useful columns and aggregate the data on them
original_dt <- original_dt[,
                           setNames(list(sum(get(original_values_col), na.rm = TRUE)), original_values_col), 
                           by = c(admin_id_col, period_cols)
                           ]

num_periods <- make_cartesian_admin_period(original_dt, admin_id_col, year_col, month_col)[[1]]
all_rows <- make_cartesian_admin_period(original_dt, admin_id_col, year_col, month_col)[[2]]

if (num_periods < minimum_periods){    
    log_msg(glue("Data is not reliable: 
                    at least {minimum_periods} year-month periods of data are required for the case analyais; 
                    the data only contains {num_periods} periods. Abandoning analysis.")
           , level="error")
    stop("ERROR 1")
}

# inject the (possibly missing) rows into the data
original_dt <- make_full_time_space_data(
  input_dt=original_dt,
  full_rows_dt=all_rows,
  target_colname=original_values_col,
  admin_colname=admin_id_col,
  year_colname=year_col,
  month_colname=month_col)

if(nrow(original_dt[is.na(get(original_values_col)),]) > (maximum_proportion_missings_overall * nrow(original_dt))){    
    log_msg("There are too many missing values in the data overall. Abandoning analysis.", level="error")
    stop("ERROR 2")   
}

### Imputation of missings

**Remove impute files (if any)**

In [18]:
# Remove existing imputation files
filename_imputed_dt <- paste(COUNTRY_CODE, type_of_seasonality, 'imputed.csv', sep = '_')
files_in_folder <- list.files(OUTPUT_DATA_PATH, full.names = TRUE)
files_to_remove <- files_in_folder[grepl(filename_imputed_dt, basename(files_in_folder), ignore.case = TRUE)]
file.remove(files_to_remove)
print(glue("Deleted files: {str(files_to_remove)}"))

 chr(0) 


In [19]:
# create the name of the column which will store the imputed/estimated values
imputed_col = paste(original_values_col, 'EST', sep = '_')

# if there are rows of missing data for cases, impute them (SARIMA)
if(nrow(original_dt[!is.na(get(original_values_col)),]) != nrow(original_dt)) {
    log_msg("There is missing data. Proceeding to impute them.", level="warning")
    
    # extract data on only the administrative units which have missing values for original_values_col
    missing_dt <- extract_dt_with_missings(original_dt, target_colname = original_values_col, id_colname = admin_id_col)
    missing_dt <- missing_dt[, PERIOD := make_yearmonth(year = YEAR, month = MONTH)]
    missing_dt <- missing_dt[, .SD, .SDcols = c(admin_id_col, 'PERIOD', original_values_col)]
    
    # how many rows missing for each administrative unit? if too many, then not good idea to impute
    missings_by_admin_unit <- missing_dt[, .(missing_count = sum(is.na(get(original_values_col)))), by = admin_id_col][order(-missing_count)]
    
    # if for any given admin unit, more than a given % of data is missing, there's too much to impute (maybe should be stricter - to discuss)
    if(missings_by_admin_unit[, max(missing_count)] > maximum_proportion_missings_per_district * num_periods){
      log_msg("Some administrative units have too many missing values in the target data. Abandoning analysis.", level="error")
      stop("ERROR 3")
    }
    
    # split to list per admin_unit_id, to apply SARIMA imputation on each time series (per admin unit)
    missing_districts_list <- split(missing_dt, by = admin_id_col)
    
    # seasonal ARIMA to estimate missing cases: apply function to list of data.tables with missing rows, then create data.table from result
    filled_missings_dt <- rbindlist(
    lapply(missing_districts_list,
           fill_missing_cases_ts,
           original_values_colname=original_values_col,
           estimated_values_colname=imputed_col,
           admin_colname=admin_id_col,
           period_colname='PERIOD',
           threshold_for_missing = 0.0)
    )
    
    # add the imputed ("_EST") values to the original data
    imputed_dt <- merge.data.table(original_dt, filled_missings_dt[, .SD, .SDcols = !(original_values_col)], by = c(admin_id_col, year_col, month_col), all.x = TRUE)
    
    # copy from the districts without missings;
    # if data is large, this could be made faster by only copying from the districts which are not in the missing_dt
    imputed_dt[!is.na(get(original_values_col)), (imputed_col) := get(original_values_col)]

    # Save imputed file, only if it was computed..
    fwrite(imputed_dt, file = file.path(OUTPUT_DATA_PATH, filename_imputed_dt))
    
} else {
    imputed_dt <- copy(original_dt)
    imputed_dt[, (imputed_col) := get(original_values_col)]
}

## Seasonality

In [None]:
# Function to compute the starting month of the seasonal block for each admin unit
# based on the most frequently occurring starting month across years
compute_start_month <- function(
  input_dt,
  indicator,
  vector_of_durations,
  admin_colname = 'ADM2_ID',
  year_colname = 'YEAR',
  month_colname = 'MONTH',
  proportion_seasonal_years_threshold = 0.5
){
  indicator <- toupper(indicator)
  
  # make an "empty" data.table, with only the admin units
  output_dt <- input_dt[, setNames(list(unique(get(admin_colname))), admin_colname)]
  
  for (num_months in vector_of_durations) {
    
    regex_pattern <- paste(toupper(indicator), num_months, "MTH_ROW_SEASONALITY$", sep = '_')
    row_seasonality_colname <- grep(regex_pattern, names(input_dt), value = TRUE)
    
    if (length(row_seasonality_colname) == 0) {
      warning(paste("No column found matching pattern:", regex_pattern))
      next
    }
    
    # filter only rows where seasonality = 1 (i.e., the month is the start of a seasonal block)
    seasonal_rows <- input_dt[get(row_seasonality_colname) == 1, ]
    
    # count how many times each month appears as a starting month, per admin unit
    month_counts <- seasonal_rows[, .N, by = c(admin_colname, month_colname)]
    
    # for each admin unit, find the month with the maximum count
    # in case of ties, take the earliest month
    start_month_dt <- month_counts[
      order(get(admin_colname), -N, get(month_colname)),
      .SD[1],
      by = admin_colname
    ]
    
    # create column name for starting month
    start_month_colname <- paste('START_MONTH', toupper(indicator), num_months, 'MTH', sep = '_')
    
    # rename the month column to the new name and keep only relevant columns
    start_month_dt <- start_month_dt[, .SD, .SDcols = c(admin_colname, month_colname)]
    setnames(start_month_dt, month_colname, start_month_colname)
    
    # merge with output_dt
    output_dt <- merge.data.table(output_dt, start_month_dt, by = admin_colname, all.x = TRUE)
  }
  
  return(output_dt)
}

# Function to retrieve the starting month corresponding to the minimum seasonal block size
compute_seasonal_start_month <- function(
  input_dt,
  start_month_column_pattern,
  seasonality_column_pattern,
  vector_of_possible_month_block_sizes,
  start_month_colname,
  valid_value = 1
){
  # column names which match patterns
  start_month_cols <- grep(
    start_month_column_pattern,
    names(input_dt),
    ignore.case = TRUE,
    value = TRUE
  )
  
  seasonality_cols <- grep(
    seasonality_column_pattern,
    names(input_dt),
    ignore.case = TRUE,
    value = TRUE
  )
  
  # validate block sizes with columns
  if (length(vector_of_possible_month_block_sizes) != length(start_month_cols)) {
    stop("Input possible month block sizes should correspond to number of start month columns.")
  }
  
  if (length(seasonality_cols) != length(start_month_cols)) {
    stop("Number of seasonality columns should match number of start month columns.")
  }
  
  block_sizes <- as.integer(vector_of_possible_month_block_sizes)
  
  # rowwise compute the new column: get start month for minimum block size where seasonal
  output_dt <- copy(input_dt)
  output_dt[, (start_month_colname) := 
    apply(.SD, 1, function(row) {
      
      # split row into seasonality values and start month values
      n <- length(block_sizes)
      seasonality_values <- row[1:n]
      start_month_values <- row[(n+1):(2*n)]
      
      # find block sizes where seasonality = valid_value
      valid_indices <- which(seasonality_values == valid_value)
      
      # if no seasonality, return NA
      if (length(valid_indices) == 0) return(NA_integer_)
      
      # find minimum block size among valid ones
      valid_blocks <- block_sizes[valid_indices]
      min_block_idx <- which(block_sizes == min(valid_blocks))
      
      # return the start month corresponding to that block size
      return(as.integer(start_month_values[min_block_idx]))
    }),
    .SDcols = c(seasonality_cols, start_month_cols)
  ]
  
  return(output_dt)
}

In [20]:
# The seasonality per row (period-admin unit) -----------------------------

row_seasonality_dt <- compute_month_seasonality(
  input_dt=imputed_dt,
  indicator=type_of_seasonality,
  values_colname=imputed_col,
  vector_of_durations=possible_month_block_sizes,
  admin_colname=admin_id_col,
  year_colname=year_col,
  month_colname=month_col,
  proportion_threshold=threshold_for_seasonality
)

# The seasonality per admin unit, irrespective of year ----------------------

seasonality_source_dt <- process_seasonality(
  input_dt=row_seasonality_dt,
  indicator=type_of_seasonality,
  vector_of_durations=possible_month_block_sizes,
  admin_colname=admin_id_col,
  year_colname=year_col,
  month_colname=month_col,
  proportion_seasonal_years_threshold=threshold_proportion_seasonal_years
)

# Compute the starting month per block size ---------------------------------

start_month_dt <- compute_start_month(
  input_dt=row_seasonality_dt,
  indicator=type_of_seasonality,
  vector_of_durations=possible_month_block_sizes,
  admin_colname=admin_id_col,
  year_colname=year_col,
  month_colname=month_col,
  proportion_seasonal_years_threshold=threshold_proportion_seasonal_years
)

# Merge start month with seasonality data
seasonality_source_dt <- merge.data.table(seasonality_source_dt, start_month_dt, by = admin_id_col, all.x = TRUE)

# Compute the duration block; there are normal warnings when it's only 0-es for seasonality:
# for those admin units without any seasonality, the duration of the block will be 'infinite')
check_pattern_seasonality <- paste("^SEASONALITY", toupper(type_of_seasonality), "[0-9]+_MTH$", sep = "_")
check_pattern_start_month <- paste("^START_MONTH", toupper(type_of_seasonality), "[0-9]+_MTH$", sep = "_")
seasonality_source_dt <- seasonality_source_dt[, .SD, .SDcols = c(admin_id_col, grep(check_pattern_seasonality, names(seasonality_source_dt), value = TRUE), grep(check_pattern_start_month, names(seasonality_source_dt), value = TRUE))]

## Result file

### long

This format, until further notice, is not saved.

In [21]:
seasonality_long_dt <- melt(
  seasonality_source_dt,
  id.vars = grep(check_pattern_seasonality, names(seasonality_source_dt), value = TRUE, invert = TRUE), # all cols which don't follow the pattern
  variable.name = 'MONTH_BLOCK_SIZE',
  value.name =seasonality_col
  )

In [22]:
seasonality_long_dt[, MONTH_BLOCK_SIZE := possible_month_block_sizes[match(MONTH_BLOCK_SIZE, grep(check_pattern_seasonality, names(seasonality_source_dt), value = TRUE))]]

# add remaining admin unit columns and save the final results
admin_seasonality_long_dt <- merge.data.table(admin_data, seasonality_long_dt, by = c(admin_id_col), all = TRUE)

In [23]:
# order the columns
specific_cols <- setdiff(names(admin_seasonality_long_dt), names(admin_data)) # last columns
admin_seasonality_long_dt <- admin_seasonality_long_dt[, .SD, .SDcols = c(common_cols, specific_cols)]

In [24]:
# Keeping for now.
# # filename_admin_seasonality_long_dt <- paste(COUNTRY_CODE, data_source, admin_level, gsub("\\.", "", as.character(threshold_for_seasonality)), type_of_seasonality, 'seasonality_long.csv', sep = '_')
# filename_admin_seasonality_long_dt <- paste(COUNTRY_CODE, data_source, admin_level, type_of_seasonality, 'seasonality_long.csv', sep = '_')
# fwrite(admin_seasonality_long_dt, file.path(OUTPUT_DATA_PATH, filename_admin_seasonality_long_dt))

### Transform to wide format

In [None]:
# Compute minimum seasonal block duration
seasonality_wide_dt <- compute_min_seasonality_block(
    input_dt=seasonality_source_dt,
    seasonality_column_pattern=check_pattern_seasonality,
    vector_of_possible_month_block_sizes=possible_month_block_sizes,
    seasonal_blocksize_colname=season_duration_col,
    valid_value = 1
)

# Compute starting month corresponding to the minimum seasonal block
seasonality_wide_dt <- compute_seasonal_start_month(
    input_dt=seasonality_wide_dt,
    start_month_column_pattern=check_pattern_start_month,
    seasonality_column_pattern=check_pattern_seasonality,
    vector_of_possible_month_block_sizes=possible_month_block_sizes,
    start_month_colname=season_start_month_col,
    valid_value = 1
)

In [26]:
# Create a new, overall column 'SEASONALITY_' based on the values of columns in 'check_pattern_seasonality'
# seasonality_wide_dt <- seasonality_wide_dt[, (paste('SEASONALITY', toupper(type_of_seasonality), sep="_")) := ifelse(rowSums(.SD == 1) > 0, 1, 0), .SDcols = grep(check_pattern_seasonality, names(seasonality_source_dt), value = TRUE)]
seasonality_wide_dt <- seasonality_wide_dt[, (seasonality_col) := ifelse(rowSums(.SD == 1) > 0, 1, 0), .SDcols = grep(check_pattern_seasonality, names(seasonality_source_dt), value = TRUE)]
seasonality_wide_dt <- seasonality_wide_dt[, (grep(check_pattern_seasonality, names(seasonality_wide_dt), value = TRUE)) := NULL] # drop the seasonality columns with the pattern
seasonality_wide_dt <- seasonality_wide_dt[, (grep(check_pattern_start_month, names(seasonality_wide_dt), value = TRUE)) := NULL] # drop the start month columns with the pattern

In [27]:
# add remaining admin unit columns and save the final results
admin_seasonality_wide_dt <- merge.data.table(admin_data, seasonality_wide_dt, by = c(admin_id_col), all = TRUE)
output_cols <- c(seasonality_col, season_duration_col, season_start_month_col)
admin_seasonality_wide_dt <- admin_seasonality_wide_dt[, .SD, .SDcols = c(common_cols, output_cols)]
# head(admin_seasonality_wide_dt)

**Save output**

In [28]:
# Create the filename
file_stem <- paste(COUNTRY_CODE, type_of_seasonality, 'seasonality', sep = '_')
filename_csv = glue("{file_stem}.csv")
filename_parquet = glue("{file_stem}.parquet")
fwrite(admin_seasonality_wide_dt, file.path(OUTPUT_DATA_PATH, filename_csv))
write_parquet(admin_seasonality_wide_dt, file.path(OUTPUT_DATA_PATH, filename_parquet))
log_msg(paste0("Rainfall seasonality results saved in folder ", OUTPUT_DATA_PATH))

[1] "Rainfall seasonality results saved in folder ~/workspace/data/seasonality_rainfall"


In [29]:
# fwrite(row_seasonality_dt, file.path(OUTPUT_DATA_PATH, "row_seasonality.csv"))

In [30]:
# fwrite(seasonality_source_dt, file.path(OUTPUT_DATA_PATH, "processed_seasonality.csv"))