# Source to Stage

This notebook with R kernel extracts all rda files from `pharmaversesdtm` source and converts them to stage database.

In [None]:
library(yaml)
library(DBI)
library(duckdb)
suppressWarnings(suppressMessages(library(dplyr)))
library(textclean)

## Get Configuration

In [None]:
process_yaml <- yaml::read_yaml("process.yaml")

In [None]:
current_path <- getwd()

In [None]:
source_folder <- file.path(current_path, process_yaml$config$connection$sdtm$path)
duckdb_file <- file.path(current_path, process_yaml$config$connection$stage$path)
metadata_file <- file.path(current_path, process_yaml$config$connection$stage$meta_path)
dir.create(dirname(duckdb_file), recursive = TRUE, showWarnings = FALSE)

## Set up local functions

In [None]:
# Cleans text strings in data frames, assumes Windows-1252
make_clean_strings <- function(df) {
  chr_cols <- sapply(df, is.character)
  
  df[chr_cols] <- lapply(df[chr_cols], function(col) {
    # Step 1: convert from Windows-1252 to UTF-8 safely
    col <- iconv(col, from = "WINDOWS-1252", to = "UTF-8", sub = "")
    # Step 2: now replace fancy punctuation with ASCII equivalents
    col <- textclean::replace_non_ascii(col)
    col
  })
  
  df
}

## Do conversion

In [None]:
dcon <- dbConnect(duckdb::duckdb(), dbdir = duckdb_file)

In [None]:
metadata <- list()

In [None]:
rda_files <- list.files(path = source_folder, pattern = "\\.rda$", full.names = TRUE)

In [None]:
for (rda_file in rda_files) {
  loaded_objects <- load(rda_file)

  for (obj_name in loaded_objects) {
    obj <- get(obj_name)
    obj_label <- attr(obj, "label")
    # Start collecting metadata for this object
    entry <- paste0("File: ", basename(rda_file),
                    "\nObject: ", obj_name,
                    "\nLabel: ", ifelse(!is.null(obj_label), obj_label, "(none)"),
                    "\n")
    
    # Only process data frames / tibbles
    if (is.data.frame(obj)) {
      table_name <- obj_name
      write(paste0("➡ Writing table:", table_name), stdout())
      flush(stdout())      
      
      # copy_to() automatically overwrites if the table already exists
      cleaned_obj <- make_clean_strings(obj)
      copy_to(dest = dcon, df = cleaned_obj, name = table_name, overwrite = TRUE, temporary = FALSE)
      
      # Update metadata
      entry <- paste0(entry, "Columns:\n")
      for (col_name in names(obj)) {
        col_label <- attr(obj[[col_name]], "label")
        entry <- paste0(entry, "- ", col_name, ": ", ifelse(!is.null(col_label), col_label, "(none)"), "\n")
      }
      # Add to metadata list
      metadata[[length(metadata) + 1]] <- entry
      
    } else {
      write(paste0("⚠ Skipping non-dataframe object:", obj_name), stdout())
      flush(stdout())
    }
  }
}

## List tables of the database

In [None]:
dbGetQuery(dcon, "
  SELECT table_name 
  FROM information_schema.tables 
  WHERE table_schema = 'main'
")

In [None]:
# Close the connection
dbDisconnect(dcon, shutdown = TRUE)

In [None]:
# Write the collected metadata to a text file
writeLines(unlist(metadata), con = metadata_file)