### INSTALL Packages

In [0]:
install.packages("renv")
renv::restore() #TBD fix R version issue
remotes::install_github("sparklyr/sparklyr")
# install.packages("dplyr")
# install.packages("purr")
# install.packages("here")
# install.packages("haven")
# install.packages("labelled")
# install.packages("sjlabelled")
# install.packages("sjmisc")
# install.packages("https://cran.r-project.org/src/contrib/Archive/maditr/maditr_0.8.3.tar.gz", repos = NULL, type = "source")
# install.packages("expss", repos = "https://cloud.r-project.org")

### Check Scala version

In [0]:
%scala
spark.version
util.Properties.versionString

### Check R / Java version if running into error

In [0]:
.libPaths()
R.Version()$version.string
list.files("/usr/local/lib/R/site-library")


### Split the run by parquet

In [0]:
library(haven)
library(labelled)
library(expss)
library(sparklyr)
library(dplyr)
library(stringr)
library(DBI)
library(purrr)
library(here)
library(arrow)
options(sparklyr.log.console = TRUE)
source("Chap07_FP/run_indicators_spark.R")
options(warn = -1)
# Connect to Databricks Spark (adjust config as needed)
sc <- spark_connect(method = "databricks")

sql_query <- "SELECT 
  CONCAT(table_catalog, '.', table_schema, '.', table_name) AS view
FROM 
  idm_dhs_recode_dev_01.information_schema.tables
WHERE 
 table_schema NOT IN ('information_schema') 
ORDER BY 
  table_schema, 
  table_name;
"
# Get all views in the 'dhs_recode' catalog
all_views <- dbGetQuery(sc, sql_query)
view_names <- all_views$view

# Find all IR and MR views for this chapter
ir_views <- view_names[grepl("ir[0-9]+", view_names, ignore.case = TRUE)]
mr_views <- view_names[grepl("mr[0-9]+", view_names, ignore.case = TRUE)]

ir_views
mr_views

### Copy scripts to the driver

In [0]:
# zip -r Chap07_FP.zip Chap07_FP
dbutils.fs.rm("dbfs:/tmp/data.zip")
r_files <- list.files("Chap07_FP", pattern = "\\.R$", full.names=TRUE)
r_files
files_to_zip <- c(
  r_files,
  "renv.lock",
  "required_col.csv"
)
files_to_zip
zip::zip("data.zip", files = files_to_zip, mode = "cherry-pick", root = ".")
dbutils.fs.cp("file:/Workspace/Users/meikang.wu@gatesfoundation.org/DHS-Indicators-R/data.zip", "dbfs:/tmp/data.zip")
dbutils.fs.ls("dbfs:/tmp/data.zip")


In [0]:
system("cp /dbfs/tmp/data.zip ./data.zip")
zip::zip_list("./data.zip")$filename
# unzip("./data.zip", exdir = "./temp_data")

In [0]:
chap_num <- "ch07"
decode_view <- function(view) {
  m <- str_match(view, ".([A-z]*)([im]r)([0-9]+)")
  if (is.na(m[1,1])) return(NULL)
  list(
    country_code = m[1,2],
    ir_or_mr = m[1,3],
    version = m[1,4],
    view = view
  )
}

for (view in ir_views) {
  meta <- decode_view(view)
  if (is.null(meta)) next
  ir_tbl <- tbl(sc, sql(paste0('SELECT * FROM ', view)))
  n_rows <- ir_tbl %>% tally() %>% pull(n)
  message(paste0("process ", meta$country_code))
  message(paste0("retrieved ", n_rows))
  result <- tryCatch(
    {
      spark_apply(ir_tbl, function(e) {
      
        tmp_dir <- tempfile("dhs_")
        dir.create(tmp_dir)
        system("cp /dbfs/tmp/data.zip ./data.zip")
        unzip("./data.zip", exdir = tmp_dir)
        message(paste0("working dir:", tmp_dir))
        # change
        setwd(tmp_dir)
        # install.packages("renv")
        # install.packages("here")
        source("FP_KNOW.R")
        source("FP_USE.R")
        source("FP_Need.R")
        source("FP_COMM.R")
        source("run_indicators_spark.R")       
        run_indicators(IRdata = e)
      })

    }, 
    error = function(e) {
      message(glue::glue("Error processing {view}: {e$message}"))
    NULL
    }
  )

  result %>%
  sdf_coalesce(1) %>%
  spark_write_csv("/dbfs/volume/test_mewu", paste0(view, "_", chap_num, ".csv"), mode = "overwrite")
}

In [0]:

#   ir_tbl %>% spark_write_parquet(paste0("dbfs:/tmp/results/", meta$country_code,meta$ir_or_mr, meta$version), mode="overwrite")
  
#   output_path <- paste0("/dbfs/tmp/results/", meta$country_code, meta$ir_or_mr, meta$version)
#   parquet_files <- list.files(output_path, pattern = "\\.parquet$", full.names = TRUE)
#   # ir_df <- bind_rows(lapply(parquet_files, read_parquet))
#   # ir_df <- collect(ir_tbl)
  
#   processed_list <- list()
#   # Loop through files, read and process
#   for (file in parquet_files) {
    
#     ir_df <- read_parquet(file)  
#     if (nrow(ir_df) == 0) {
#       message(paste("Skipping empty file:", file))
#       next
#     }
#     processed_df <- tryCatch(
#       {      
#         message(paste0("run batch ", length(processed_list)))
#         message(paste0("filename: ", file))
#         message(paste0("data size:", nrow(ir_df)))
#         processed_df <- run_indicators(IRdata = ir_df, MRdata = NULL)
#       },
#       error = function(e) {
#         message("run_indicators() failed: ", e$message)
#         NULL
#       }
#     )
#     processed_list[[length(processed_list) + 1]] <- processed_df
#   }

#   out <- bind_rows(processed_list)
#   message(paste0("total data size:", nrow(out)))
#   message(paste0("save final result for ", meta$view))
#   if (!is.null(out)) {
#     out$filename <- view
#     out$country <- meta$country_code
#     out_path <- file.path("/dbfs/tmp/variables", paste0(view, "_", chap_num, ".csv"))
  
#     dir.create(dirname(out_path), recursive = TRUE, showWarnings = FALSE)
#     write.csv(out, out_path, row.names = FALSE)
#     } else {
#       message(paste0("No file Saved Error occured: ", meta$view))
#     }
# }



### Check if file exists

In [0]:
%scala
// dbutils.fs.ls("dbfs:/tmp/results/")
dbutils.fs.ls("dbfs:/tmp/variables/")
// dbutils.fs.rm("dbfs:/tmp/results/", recurse=true)

### Preview one micro level file

In [0]:
csv_path <- "/dbfs/tmp/variables/dhs_recode.af.afir71_ch07.csv"
df_preview <- read.csv(csv_path, nrows = 1000)
head(df_preview)

