In [None]:
knitr::opts_chunk$set(echo = TRUE)

This Rmarkdown document contains the code to run the PIONEER Treatment Patterns Study. The code has been largely adapted from the [PIONEERmetastaticTreatment](https://github.com/bdemeulder/PIONEERmetastaticTreatment) study. It has been refactored into an RMarkdown document to facilitate readability.

The study can be executed by rendering this documnent all at once or by running each code block one at a time.

# Setup

In [None]:
# This only needs to be done once
install.packages("devtools")
install.packages("survminer")
devtools::install_github("OHDSI/CohortDiagnostics")
devtools::install_github("OHDSI/CohortGenerator")

Change the parameters in the code chunck below to match your database. This is the only code that needs to be edited to run the study.

In [None]:
library(DatabaseConnector)
library(dplyr, warn.conflicts = FALSE)
library(here)
library(glue)
# connectionDetails <- createConnectionDetails("postgresql", user = "postgres", password = "", server = "localhost/covid")
# con <- connect(cd)
# dbGetQuery(con, "select * from cdm5.person limit 8")
# disconnect(con)
# 
# connectionDetails <- createConnectionDetails(dbms = "postgresql",
#                                              server = "testnode.arachnenetwork.com/synpuf_110k",
#                                              user = "ohdsi",
#                                              password = Sys.getenv("ODYS_DB_PASSWORD"),
#                                              port = "5441")

connectionDetails <- createConnectionDetails(dbms = "postgresql",
                                             server = "localhost/synpuf100k",
                                             user = "postgres",
                                             password = "")

connection = NULL
# cdmDatabaseSchema = "cdm5"
cdmDatabaseSchema = "cdm531"
oracleTempSchema = NULL
cohortDatabaseSchema = "scratch"
# cohortDatabaseSchema = "adam_black_results"
cohortStagingTable = "cohort_stg"
cohortTable = "cohort"
featureSummaryTable = "cohort_smry"
cohortIdsToExcludeFromExecution = c()
cohortIdsToExcludeFromResultsExport = NULL
# cohortGroups = getUserSelectableCohortGroups()
exportFolder = here::here("export")
databaseId = "Synpuf"
databaseName = databaseId
databaseDescription = "Synthetic medicare claims 100k person sample"
useBulkCharacterization = FALSE
minCellCount = 5
minimumSubjectCountForCharacterization = 140

In [None]:
# Source R code files in this project
purrr::walk(list.files(here("R"), full.names = TRUE), source)

if (!file.exists(exportFolder)) {
  dir.create(exportFolder, recursive = TRUE)
}

ParallelLogger::addDefaultFileLogger(file.path(exportFolder, "PioneerTreatmentPathways.txt"))
ParallelLogger::logInfo(.systemInfo())

## Check connection to database

Confirm that the database connection works and that we have write access to the `cohortDatabaseSchema`.

In [None]:
conn <- connect(connectionDetails)

# check query access
df <- renderTranslateQuerySql(conn, 
                              "select count(*) as n_persons from @cdmDatabaseSchema.person",
                              cdmDatabaseSchema = cdmDatabaseSchema) %>% 
  rename_all(tolower)

message(glue::glue("Number of rows in the person table: {df$n_persons}"))

insertTable(conn, cohortDatabaseSchema, "iris_temp", iris)

df <- renderTranslateQuerySql(conn, 
                              "select count(*) as n from @cohortDatabaseSchema.iris_temp",
                              cohortDatabaseSchema = cohortDatabaseSchema) %>% 
  rename_all(tolower)

renderTranslateExecuteSql(conn, 
                          "drop table @cohortDatabaseSchema.iris_temp",
                          cohortDatabaseSchema = cohortDatabaseSchema,
                          progressBar = FALSE)

if (df$n != nrow(iris)) {
  rlang::abort("Error with database write access check")
} else {
  message("Write access confirmed")
}

disconnect(conn)

# Save database metadata

In [None]:
ParallelLogger::logInfo("Saving database metadata")

con <- connect(connectionDetails)

vocabInfo <- renderTranslateQuerySql(
  con,
  glue("SELECT vocabulary_version FROM {cdmDatabaseSchema}.vocabulary WHERE vocabulary_id = 'None';"))

database <- data.frame(databaseId = databaseId,
                       databaseName = databaseName,
                       description = databaseDescription,
                       vocabularyVersion = vocabInfo[[1]],
                       isMetaAnalysis = 0)

readr::write_csv(database, file.path(exportFolder, "database.csv"))

andrData <- Andromeda::andromeda()
andrData$database <- database

disconnect(con)

# Generate Study Cohorts

In [None]:
start <- Sys.time()

# Instantiate cohorts -----------------------------------------------------------------------
cohortDefinitionSet <- readr::read_csv(file.path("inst", "settings", "CohortsToCreate.csv"), 
                                       show_col_types = FALSE) %>% 
  mutate(cohortName = name,
         sqlPath = file.path("inst", "sql", paste0(name, ".sql")),
         sql = purrr::map_chr(sqlPath, readr::read_file))

cohortTableNames <- CohortGenerator::getCohortTableNames(cohortStagingTable)

if (!file.exists(here(exportFolder, "incremental", "GeneratedCohorts.csv"))) {
  CohortGenerator::createCohortTables(
    connectionDetails = connectionDetails,
    cohortDatabaseSchema = cohortDatabaseSchema,
    cohortTableNames = cohortTableNames,
    incremental = TRUE
  )
}

ParallelLogger::logInfo("**********************************************************")
ParallelLogger::logInfo("  ---- Creating cohorts ---- ")
ParallelLogger::logInfo("**********************************************************")

CohortGenerator::generateCohortSet(
  connectionDetails = connectionDetails,
  cdmDatabaseSchema = cdmDatabaseSchema,
  cohortDatabaseSchema = cohortDatabaseSchema,
  cohortDefinitionSet = cohortDefinitionSet,
  cohortTableNames = cohortTableNames,
  incremental = TRUE,
  incrementalFolder = file.path(exportFolder, "incremental")
)

cohortCounts <- CohortGenerator::getCohortCounts(
    connectionDetails = connectionDetails,
    cohortDatabaseSchema = cohortDatabaseSchema,
    cohortTable = cohortStagingTable) %>%  
  left_join(select(cohortDefinitionSet, cohortId, cohortName = atlasName, group), ., by = "cohortId") %>% 
  mutate(cohortEntries = coalesce(cohortEntries, 0),
         cohortSubjects = coalesce(cohortSubjects, 0),
         databaseId = databaseId)

readr::write_csv(cohortCounts, file.path("export", paste0(databaseId, "CohortCounts.csv")))

if (all(filter(cohortCounts, group == "Target") %>% pull(cohortEntries) == 0)) {
  stop("All target cohorts are empty. You cannot execute this study.")
}
delta <- Sys.time() - start
msg <- paste("Generating cohorts took", signif(delta, 3), attr(delta, "units"))
ParallelLogger::logInfo(msg)

Censor cohorts that have counts lower than min cell

In [None]:
ParallelLogger::logInfo(" ---- Copy cohorts to main table ---- ")

con <- connect(connectionDetails)

# Idendtify feasable analyses
# Target count >= minCellCount
# Target with strata (TwS) >= minCellCount AND Target with strata (TwoS) >= minCellCount

# targetStrataXref is a table with all combinations of target cohorts with and without strata
targetStrataXref <- readr::read_csv(file.path("inst", "settings", "targetStrataXref.csv"), 
                                    show_col_types = FALSE)

df <- targetStrataXref %>% 
  select(targetId, strataId, cohortId, cohortType) %>% 
  inner_join(select(cohortCounts, targetId = cohortId, targetCount = cohortEntries), by = "targetId")

# TODO decide how to filter these


CohortGenerator::createCohortTables(connection = con,
                                    cohortDatabaseSchema = cohortDatabaseSchema,
                                    cohortTableNames = CohortGenerator::getCohortTableNames(cohortTable))

sql <- c("
  INSERT INTO @cohort_database_schema.@cohort_table 
  SELECT 
    cohort_definition_id,
    subject_id,
    cohort_start_date,
    cohort_end_date
  FROM @cohort_database_schema.@cohort_staging_table
  WHERE cohort_definition_id IN (@feasable_cohort_defition_ids);
   
  CREATE INDEX IDX_@cohort_table 
  ON @cohort_database_schema.@cohort_table (cohort_definition_id, subject_id, cohort_start_date);") %>% 
  SqlRender::render(
    cohort_database_schema = cohortDatabaseSchema,
    cohort_staging_table = cohortStagingTable,
    cohort_table = cohortTable,
    feasable_cohort_defition_ids = df$targetId) %>% 
  assertNoParameters() %>% 
  SqlRender::translate(targetDialect = attr(connection, "dbms"))
  
ParallelLogger::logInfo("Copy and censor cohorts to main analysis table")
executeSql(con, sql)
disconnect(con)
  

In [None]:
con <- DatabaseConnector::connect(connectionDetails)

n1 <- renderTranslateQuerySql(con, glue::glue("select count(*) as n from {cohortDatabaseSchema}.{cohortStagingTable}")) %>%
  rename_all(tolower) %>% 
  pull(n)

n2 <- renderTranslateQuerySql(con, glue::glue("select count(*) as n from {cohortDatabaseSchema}.{cohortTable}")) %>%
  rename_all(tolower) %>% 
  pull(n)

DatabaseConnector::disconnect(con)
message(glue("cohort staging table created with {n1} rows."))
message(glue("cohort table created with {n2} rows."))


In [None]:
CohortGenerator::getCohortCounts(
    connectionDetails = connectionDetails,
    cohortDatabaseSchema = cohortDatabaseSchema,
    cohortTable = cohortTable) 

In [None]:
# Compute the features
ParallelLogger::logInfo("**********************************************************")
ParallelLogger::logInfo(" ---- Create feature proportions ---- ")
ParallelLogger::logInfo("**********************************************************")
createFeatureProportions(connectionDetails = connectionDetails,
                         cohortDatabaseSchema = cohortDatabaseSchema,
                         cohortStagingTable = cohortStagingTable,
                         cohortTable = cohortTable,
                         featureSummaryTable = featureSummaryTable)

# Count staging cohorts

In [None]:
ParallelLogger::logInfo("Counting staging cohorts")

counts <- CohortGenerator::getCohortCounts(connectionDetails = connectionDetails,
                                           cohortDatabaseSchema = cohortDatabaseSchema,
                                           cohortTable = cohortStagingTable)

if (nrow(counts) > 0) {
  counts$databaseId <- databaseId
  counts <- enforceMinCellValue(counts, "cohortEntries", minCellCount)
  counts <- enforceMinCellValue(counts, "cohortSubjects", minCellCount)
}

allStudyCohorts <- bind_rows(
  readr::read_csv(here("inst", "settings", "CohortsToCreate.csv"), 
                  col_select = c("name" = "atlasName", "cohortId"),
                  show_col_types = FALSE),
  
  readr::read_csv(here("inst", "settings", "targetStrataXref.csv"), 
                  col_select = c("name", "cohortId"),
                  col_types = "cd",
                  show_col_types = FALSE)
  )

allCounts <- dplyr::left_join(allStudyCohorts, counts, by = "cohortId") %>% 
  mutate(across(c(cohortEntries, cohortSubjects), ~tidyr::replace_na(., 0))) %>% 
  mutate(databaseId = databaseId)

andrData$cohort_staging_count = allCounts

# Generate survival info 

In [None]:

ParallelLogger::logInfo("Generating time to event data")
start <- Sys.time()

targetStrataXref <- readr::read_csv(here("inst", "settings", "targetStrataXref.csv"), show_col_types = FALSE)
cohortsToCreate <- readr::read_csv(here("inst", "settings", "cohortsToCreate.csv"), show_col_types = FALSE)

targetCohortIds <- cohortsToCreate %>% 
  filter(group == "Target") %>% 
  pull(cohortId)

targetIds <- c(targetCohortIds, targetStrataXref$cohortId)

events <- readr::read_csv(here("inst", "settings", "TimeToEvent.csv"), show_col_types = F) %>% 
  mutate(outcomeCohortIds = stringr::str_replace_all(outcomeCohortIds, ";", ","))

connection <- connect(connectionDetails)

eventIds <- events %>% dplyr::pull(eventId)
  
# .eventId = 1
timeToEvent <- purrr::map_dfr(eventIds, function(.eventId) {
  outcomeIds <- events %>% 
    dplyr::filter(eventId == .eventId) %>% 
    dplyr::pull(outcomeCohortIds)
    
  sql <- SqlRender::render("
    SELECT cohort_definition_id AS target_id, 
       row_number() OVER (PARTITION BY cohort_definition_id) AS id,
       DATEDIFF(day, cohort_start_date, event_date) AS time_to_event,
       event
    FROM (
         SELECT t.cohort_definition_id, t.cohort_start_date,
                coalesce(min(o.cohort_start_date), max(t.cohort_end_date)) AS event_date,
                CASE WHEN min(o.cohort_start_date) IS NULL THEN 0 ELSE 1 END AS event
         FROM @cohort_database_schema.@cohort_table t
             LEFT JOIN (
                SELECT subject_id, MIN (cohort_start_date) AS cohort_start_date
                FROM @cohort_database_schema.@cohort_table
                WHERE cohort_definition_id IN (@outcome_ids)
                GROUP BY subject_id
              ) o
             ON t.subject_id = o.subject_id
                AND o.cohort_start_date >= t.cohort_start_date
                AND o.cohort_start_date <= t.cohort_end_date
         WHERE t.cohort_definition_id IN (@target_ids)
         GROUP BY t.cohort_definition_id, t.subject_id, t.cohort_start_date
         ) tab;
    ",
    cohort_database_schema = cohortDatabaseSchema,
    cohort_table = cohortTable,
    outcome_ids = outcomeIds, 
    target_ids = paste(targetIds, collapse = ', ')) %>% 
  assertNoParameters() %>% 
  SqlRender::translate(dbms(connection))
    
  km_grouped <- DatabaseConnector::querySql(connection, sql, snakeCaseToCamelCase = T)
  
  # .targetId = 1
  purrr::map_dfr(targetIds, function(.targetId){
    
    km <- km_grouped %>% dplyr::filter(targetId == .targetId)
    
    if (nrow(km) < 30 | length(km$event[km$event == 1]) < 1) {return(NULL)}

    # TODO: Change to Cyclops
    surv_info <- survival::survfit(survival::Surv(timeToEvent, event) ~ 1, data = km)
    surv_info <- survminer::surv_summary(surv_info)
    data.frame(targetId = .targetId, eventId = .eventId, time = surv_info$time, surv = surv_info$surv, 
               n.censor = surv_info$n.censor, n.event = surv_info$n.event, n.risk = surv_info$n.risk,
               lower = surv_info$lower, upper = surv_info$upper, databaseId = databaseId)
  })
  
})

andrData$cohort_time_to_event <- timeToEvent

disconnect(connection)

delta <- Sys.time() - start
msg <- paste("Generating time to event data took", signif(delta, 3), attr(delta, "units"))
print(msg)
ParallelLogger::logInfo(msg)

# Generate time to treatment switch info 

In [None]:
ParallelLogger::logInfo("Generating time to treatment switch data")
ParallelLogger::logInfo("Create treatment tables")

start <- Sys.time()
connection <- connect(connectionDetails)

sql <- c("
  DROP TABLE IF EXISTS @cohort_database_schema.drug_codesets;
  CREATE TABLE @cohort_database_schema.drug_codesets
  (
      codeset_tag VARCHAR NOT NULL,
      concept_id BIGINT NOT NULL
  );
  
  
  INSERT INTO @cohort_database_schema.drug_codesets (codeset_tag, concept_id)
  SELECT codeset_tag, concept_id
  FROM (
       --ADT
       SELECT 'ADT' AS codeset_tag, c.concept_id
       FROM (
            SELECT DISTINCT I.concept_id
            FROM (
                 SELECT concept_id
                 FROM @cdm_database_schema.CONCEPT
                 WHERE concept_id IN (19058410, 739471, 35834903, 1343039, 19089810, 1366310, 1351541, 1344381, 19010792,
                                      1356461, 1500211, 1300978, 1315286, 35807385, 35807349)
                 UNION
                 SELECT C.concept_id
                 FROM @cdm_database_schema.CONCEPT C
                     JOIN @cdm_database_schema.CONCEPT_ANCESTOR ca
                 ON C.concept_id = ca.descendant_concept_id
                     AND ca.ancestor_concept_id IN (19058410, 739471, 35834903, 1343039, 19089810, 1366310, 1351541, 1344381,
                                                    19010792, 1356461, 1500211, 1300978, 1315286, 35807385, 35807349)
                     AND C.invalid_reason IS NULL
                 ) I
            ) C
  
       UNION
       
       -- ARTA
       SELECT 'ARTA' AS codeset_tag, c.concept_id
       FROM (
            SELECT DISTINCT I.concept_id
            FROM (
                 SELECT concept_id
                 FROM @cdm_database_schema.CONCEPT
                 WHERE concept_id IN (40239056, 963987, 42900250, 1361291)
                 UNION
                 SELECT C.concept_id
                 FROM @cdm_database_schema.CONCEPT C
                     JOIN @cdm_database_schema.CONCEPT_ANCESTOR ca
                 ON C.concept_id = ca.descendant_concept_id
                     AND ca.ancestor_concept_id IN (40239056, 963987, 42900250, 1361291)
                     AND C.invalid_reason IS NULL
                 ) I
            ) C
  
       UNION
       
       --Chemo
       SELECT 'Chemo' AS codeset_tag, c.concept_id
       FROM (
            SELECT DISTINCT I.concept_id
            FROM (
                 SELECT concept_id
                 FROM @cdm_database_schema.CONCEPT
                 WHERE concept_id IN (1315942, 40222431, 1378382)
                 UNION
                 SELECT C.concept_id
                 FROM @cdm_database_schema.CONCEPT C
                     JOIN @cdm_database_schema.CONCEPT_ANCESTOR ca
                 ON C.concept_id = ca.descendant_concept_id
                     AND ca.ancestor_concept_id IN (1315942, 40222431, 1378382)
                     AND C.invalid_reason IS NULL
                 ) I
            ) C
  
       UNION
       
       --PARP
       SELECT 'PARP' AS codeset_tag, c.concept_id
       FROM (
            SELECT DISTINCT I.concept_id
            FROM (
                 SELECT concept_id
                 FROM @cdm_database_schema.CONCEPT
                 WHERE concept_id IN (45892579, 1718850)
                 UNION
                 SELECT C.concept_id
                 FROM @cdm_database_schema.CONCEPT C
                     JOIN @cdm_database_schema.CONCEPT_ANCESTOR ca
                 ON C.concept_id = ca.descendant_concept_id
                     AND ca.ancestor_concept_id IN (45892579, 1718850)
                     AND C.invalid_reason IS NULL
                 ) I
            ) C
  
       UNION
       
       --Immunotherapy
       SELECT 'Immuno' AS codeset_tag, c.concept_id
       FROM (
            SELECT DISTINCT I.concept_id
            FROM (
                 SELECT concept_id
                 FROM @cdm_database_schema.CONCEPT
                 WHERE concept_id IN (45775965, 40224095)
                 UNION
                 SELECT C.concept_id
                 FROM @cdm_database_schema.CONCEPT C
                     JOIN @cdm_database_schema.CONCEPT_ANCESTOR ca
                 ON C.concept_id = ca.descendant_concept_id
                     AND ca.ancestor_concept_id IN (45775965, 40224095)
                     AND C.invalid_reason IS NULL
                 ) I
            ) C
  
       UNION
       
       --lutetium
       SELECT 'lutetium' AS codeset_tag, c.concept_id
       FROM (
            SELECT DISTINCT I.concept_id
            FROM (
                 SELECT concept_id
                 FROM @cdm_database_schema.CONCEPT
                 WHERE concept_id IN (44816340)
                 UNION
                 SELECT C.concept_id
                 FROM @cdm_database_schema.CONCEPT C
                     JOIN @cdm_database_schema.CONCEPT_ANCESTOR ca
                 ON C.concept_id = ca.descendant_concept_id
                     AND ca.ancestor_concept_id IN (44816340)
                     AND C.invalid_reason IS NULL
                 ) I
            ) C
  
       UNION
       
       --radium223
       SELECT 'radium' AS codeset_tag, c.concept_id
       FROM (
            SELECT DISTINCT I.concept_id
            FROM (
                 SELECT concept_id
                 FROM @cdm_database_schema.CONCEPT
                 WHERE concept_id IN (45775578, 902727, 43526934)
                 UNION
                 SELECT C.concept_id
                 FROM @cdm_database_schema.CONCEPT C
                     JOIN @cdm_database_schema.CONCEPT_ANCESTOR ca
                 ON C.concept_id = ca.descendant_concept_id
                     AND ca.ancestor_concept_id IN (45775578, 902727, 43526934)
                     AND C.invalid_reason IS NULL
                 ) I
            LEFT JOIN
                (
                SELECT concept_id
                FROM @cdm_database_schema.CONCEPT
                WHERE concept_id IN (41267222)
  
                ) E
                ON I.concept_id = E.concept_id
            WHERE E.concept_id IS NULL
            ) C
       ) tab
  ;
  
  
  -- assign drug tag to each drug_exposure instead of individual drug concept_ids 
  -- TODO add analysis for number of patients less that 183 days in cohort
  DROP TABLE IF EXISTS @cohort_database_schema.treatment_tagged;
  
  CREATE TABLE @cohort_database_schema.treatment_tagged(
    cohort_definition_id int, 
    person_id bigint,
    codeset_tag varchar,
    drug_exposure_start_date date,
    cohort_start_date date,
    cohort_end_date date
  );
  
  WITH tab AS(
    SELECT coh.cohort_definition_id, de.person_id,
           cs.codeset_tag, de.drug_exposure_start_date,
           coh.cohort_start_date,
           coh.cohort_end_date
    FROM @cdm_database_schema.drug_exposure de
    JOIN @cohort_database_schema.@cohort_table coh
        ON de.person_id = coh.subject_id
    JOIN @cohort_database_schema.drug_codesets cs
        ON cs.concept_id = de.drug_concept_id
        )
  INSERT INTO @cohort_database_schema.treatment_tagged
  SELECT * 
  FROM tab 
  WHERE cohort_definition_id IN (@treatment_cohort_ids)
    AND cohort_end_date >= drug_exposure_start_date
    AND cohort_start_date <= drug_exposure_start_date
    AND DATEDIFF(day, cohort_start_date, cohort_end_date) > 183
  ORDER BY cohort_definition_id, person_id, drug_exposure_start_date;
  
  
  -- collect initial treatment patterns info
  DROP TABLE IF EXISTS @cohort_database_schema.treatment_pat;
  CREATE TABLE @cohort_database_schema.treatment_pat AS
  SELECT DENSE_RANK() OVER(PARTITION BY cohort_definition_id ORDER BY person_id) AS id,
         cohort_definition_id, person_id, before_codeset_tag, first_exposure, 
         after_codeset_tag, after_first_exposure, cohort_start_date, cohort_end_date
  FROM (
        SELECT COALESCE(before_cohort_id, after_cohort_id) AS cohort_definition_id, 
               COALESCE(before_person_id, after_person_id) AS person_id, before_codeset_tag, first_exposure,
               after_codeset_tag, COALESCE(after_first_exposure, first_exposure) AS after_first_exposure, 
               COALESCE(before.cohort_start_date, after.cohort_start_date) AS cohort_start_date,
               COALESCE(before.cohort_end_date, after.cohort_end_date) AS cohort_end_date
        FROM (
              -- first six months treatment
              SELECT cohort_definition_id AS before_cohort_id, 
                     person_id AS before_person_id, 
                     codeset_tag AS before_codeset_tag,
                     MIN(drug_exposure_start_date) AS first_exposure, 
                     cohort_start_date, 
                     cohort_end_date
              FROM @cohort_database_schema.treatment_tagged tp
              WHERE drug_exposure_start_date <= DATEADD(day, 183, cohort_start_date)
              GROUP BY cohort_definition_id, person_id, codeset_tag, cohort_start_date, cohort_end_date
              ) before
        FULL JOIN (
              -- post six months treatment
              SELECT cohort_definition_id AS after_cohort_id, person_id AS after_person_id, codeset_tag AS after_codeset_tag,
                     MIN(drug_exposure_start_date) AS after_first_exposure, cohort_start_date, cohort_end_date
              FROM @cohort_database_schema.treatment_tagged tp
              WHERE drug_exposure_start_date > DATEADD(day, 183, cohort_start_date)
              GROUP BY cohort_definition_id, person_id, codeset_tag, cohort_start_date, cohort_end_date
              ) after
        ON before.before_person_id = after.after_person_id
          AND before.before_codeset_tag = after.after_codeset_tag
          AND before_cohort_id = after_cohort_id
        ORDER BY after.after_person_id, after_first_exposure
        ) tab
  ORDER BY cohort_definition_id;
  ") %>% 
  SqlRender::render(
    warnOnMissingParameters = TRUE,
    cohort_database_schema = cohortDatabaseSchema,
    cdm_database_schema = cdmDatabaseSchema,
    treatment_cohort_ids = paste(targetIds, collapse = ', '),
    cohort_table = cohortStagingTable) %>% 
  assertNoParameters() %>% 
  SqlRender::translate(dbms(connection))

DatabaseConnector::executeSql(connection, sql)

In [None]:
ParallelLogger::logInfo("Get time to treatment switch")

deathCohortId <- events %>% dplyr::filter(name == 'Time to Death') %>% dplyr::pull(outcomeCohortIds)

sql <- c("
  SELECT cohort_definition_id, 
         ROW_NUMBER() OVER (PARTITION BY cohort_definition_id) AS id,
         DATEDIFF(day, cohort_start_date, event_date) AS time_to_event,
         event
  FROM (
        SELECT DISTINCT 
               tp.cohort_definition_id, tp.cohort_start_date, 
               COALESCE(trt.after_first_exposure, coh.cohort_start_date, tp.cohort_end_date) AS event_date,
               CASE WHEN trt.after_first_exposure IS NULL AND 
                         coh.cohort_start_date IS NULL 
                    THEN 0 ELSE 1 END AS event
        FROM @cohort_database_schema.treatment_pat tp
        LEFT JOIN (
            SELECT cohort_definition_id, person_id, 
                   MIN(after_first_exposure) AS after_first_exposure
            FROM @cohort_database_schema.treatment_pat
            WHERE first_exposure is NULL
            GROUP BY cohort_definition_id, person_id
            ) trt
        ON tp.cohort_definition_id = trt.cohort_definition_id
          AND tp.person_id = trt.person_id
        LEFT JOIN (
            SELECT subject_id, cohort_start_date 
            FROM @cohort_database_schema.@cohort_table 
            WHERE cohort_definition_id = @death_cohort_id
            ) coh
        ON tp.person_id = coh.subject_id
       ) tab
  ORDER BY cohort_definition_id, id;") %>% 
  SqlRender::render(
    cohort_database_schema = cohortDatabaseSchema,
    cohort_table = cohortStagingTable,
    death_cohort_id = deathCohortId
  ) %>% 
  assertNoParameters() %>% 
  SqlRender::translate(dbms(connection))

data <- DatabaseConnector::querySql(connection, sql, snakeCaseToCamelCase = T)

In [None]:
#calculate locality metrics for Time to Treatment Switch

metrics <- data %>%
  dplyr::filter(event == 1) %>% 
  dplyr::select(cohortDefinitionId, timeToEvent) %>% 
  dplyr::group_by(cohortDefinitionId) %>% 
  dplyr::summarise(minimum = min(timeToEvent),
                     q1 = quantile(timeToEvent, 0.25),
                     median = median(timeToEvent),
                     q3 = quantile(timeToEvent, 0.75),
                     maximum = max(timeToEvent)) %>% 
  dplyr::mutate(iqr = q3 - q1, analysisName = "Time to Treatment Switch") %>% 
  dplyr::relocate(iqr, .before = minimum)
  
timeToTreatmentSwitch <- purrr::map_df(targetIds, function(targetId){
  data <- data %>% dplyr::filter(cohortDefinitionId == targetId) %>% dplyr::select(id, timeToEvent, event)
  if (nrow(data) < 30 | length(data$event[data$event == 1]) < 1) {return(NULL)}
  surv_info <- survival::survfit(survival::Surv(timeToEvent, event) ~ 1, data = data)
  surv_info <- survminer::surv_summary(surv_info)
  
  data.frame(targetId = targetId, time = surv_info$time, surv = surv_info$surv, 
             n.censor = surv_info$n.censor, n.event = surv_info$n.event, n.risk = surv_info$n.risk,
             lower = surv_info$lower, upper = surv_info$upper, databaseId = databaseId)
})
andrData$cohort_time_to_treatment_switch <- timeToTreatmentSwitch

In [None]:
ParallelLogger::logInfo("Get sankey data")

sql <- c("
  SELECT pat.cohort_definition_id AS cohort_id, pat.id, 
         pat.before_codeset_tag, pat.after_codeset_tag
  FROM  @cohort_database_schema.treatment_pat pat
  LEFT JOIN (
      SELECT cohort_definition_id, person_id, codeset_tag, 
             MAX(drug_exposure_start_date) AS last_exposure
      FROM @cohort_database_schema.treatment_tagged
      GROUP BY cohort_definition_id, person_id, codeset_tag
      ) le
  ON pat.cohort_definition_id = le.cohort_definition_id
    AND pat.person_id = le.person_id
    AND pat.before_codeset_tag = le.codeset_tag
  LEFT JOIN (
      -- take the earliest date of new drug exposure
      SELECT  cohort_definition_id, person_id, MIN(after_first_exposure) first_switch
      FROM @cohort_database_schema.treatment_pat
      WHERE before_codeset_tag IS NULL
      GROUP BY cohort_definition_id, person_id
      ) fs
  ON pat.cohort_definition_id = fs.cohort_definition_id
    AND pat.person_id = fs.person_id
  WHERE DATEDIFF(day, pat.cohort_start_date, pat.cohort_end_date) >= 183
    AND first_exposure IS NOT NULL OR 
        DATEADD(day, @second_line_treatment_gap, first_switch) >= after_first_exposure
  ORDER BY pat.cohort_definition_id, pat.person_id;") %>% 
  SqlRender::render(
    cohort_database_schema = cohortDatabaseSchema,
    second_line_treatment_gap = '0'
  ) %>% 
  assertNoParameters() %>% 
  SqlRender::translate(dbms(connection))

data <- DatabaseConnector::querySql(connection, sql, snakeCaseToCamelCase = T)

data <- data %>% 
  group_by(cohortId, id) %>% 
  mutate(first_line = paste(beforeCodesetTag, collapse = " "),
         second_line = paste(afterCodesetTag, collapse = " ")) %>%
  select(id, cohortId, first_line, second_line) %>%
  distinct() %>%
  mutate(first_line = formatPattern(first_line),
         second_line = formatPattern(second_line))

treatmentPatternMap <- data.frame(name = unique(data$first_line))
treatmentPatternMap$code <- 1:nrow(treatmentPatternMap)

sankeyData <- dplyr::inner_join(data, treatmentPatternMap, by = c('first_line' = 'name')) %>% 
  dplyr::rename('sourceId' = 'code') %>%
  dplyr::rename('sourceName' = 'first_line')

rowCount <- nrow(treatmentPatternMap)
treatmentPatternMap <- data.frame(name = unique(data$second_line))
treatmentPatternMap$code <- (rowCount + 1):(nrow(treatmentPatternMap) + rowCount)

sankeyData <- dplyr::inner_join(sankeyData, treatmentPatternMap, by = c('second_line' = 'name')) %>%
  rename('targetId' = 'code') %>%
  rename('targetName' = 'second_line')

sankeyData <- sankeyData %>%
  group_by(cohortId, sourceName, targetName, sourceId, targetId) %>%
  summarise(value = dplyr::n(), .groups = "drop") %>%
  filter(sourceName != 'discontinued') %>% 
  select_all()

sankeyData$databaseId = databaseId
andrData$treatment_sankey <- sankeyData

delta <- Sys.time() - start
ParallelLogger::logInfo(paste("Generating time to treatment switch data took",
                              signif(delta, 3),
                              attr(delta, "units")))

disconnect(connection)

# Locality estimation of some time periods 

In [None]:
# median follow-up time
ParallelLogger::logInfo("Time periods locality estimation")

connection <- connect(connectionDetails)

start <- Sys.time()
sql <- c("
  WITH init_data AS (
    SELECT cohort_definition_id, DATEDIFF(day, cohort_start_date, cohort_end_date) AS value
    FROM @cohort_database_schema.@cohort_table
    WHERE cohort_definition_id IN (@target_ids)
  ),
  details AS (
    SELECT 
      cohort_definition_id,
      value,
      ROW_NUMBER() OVER (PARTITION BY cohort_definition_id ORDER BY value) AS row_number,
      SUM(1) OVER (PARTITION BY cohort_definition_id) AS total
    FROM init_data
  ),
  quartiles AS (
    SELECT cohort_definition_id,
      value,
      AVG(CASE
              WHEN row_number >= (FLOOR(total / 2.0) / 2.0)
                  AND row_number <= (FLOOR(total / 2.0) / 2.0) + 1
                  THEN value / 1.0
          END
          ) OVER (PARTITION BY cohort_definition_id) AS q1,
      AVG(CASE
              WHEN row_number >= (total / 2.0)
                  AND row_number <= (total / 2.0) + 1
                  THEN value / 1.0
          END
          ) OVER (PARTITION BY cohort_definition_id) AS median,
      AVG(CASE
              WHEN row_number >= (CEILING(total / 2.0) + (FLOOR(total / 2.0) / 2.0))
                  AND row_number <= (CEILING(total / 2.0) + (FLOOR(total / 2.0) / 2.0) + 1)
                  THEN value / 1.0
          END
          ) OVER (PARTITION BY cohort_definition_id) AS q3
    FROM details
  )
  SELECT cohort_definition_id,
         AVG(q3) - AVG(q1) AS IQR,
         MIN(value) AS minimum,
         AVG(q1) AS q1,
         AVG(median) AS median,
         AVG(q3) AS q3,
         MAX(value) AS maximum,
         'Follow up Time' AS analysis_name
  FROM quartiles
  GROUP BY cohort_definition_id;") %>%
  SqlRender::render(
    cohort_database_schema = cohortDatabaseSchema,
    cohort_table = cohortTable,
    target_ids = paste(targetIds, collapse = ",")) %>% 
  assertNoParameters() %>% 
  SqlRender::translate(dbms(connection))

df <- DatabaseConnector::querySql(connection, sql, snakeCaseToCamelCase = T)

metrics <- rbind(metrics, df)
disconnect(connection)

In [None]:

connection <- connect(connectionDetails)

sql <- c("
  WITH tab as (
    SELECT cohort_definition_id, cohort_start_date, min(drug_exposure_start_date) as drug_exposure_date
    FROM @cohort_database_schema.@cohort_table coh
    JOIN @cdm_database_schema.drug_exposure de
        ON coh.subject_id = de.person_id
            AND de.drug_concept_id IN (
                                      SELECT concept_id
                                      FROM @cohort_database_schema.drug_codesets
                                      )
            AND de.drug_exposure_start_date >= coh.cohort_start_date
            AND de.drug_exposure_start_date <= coh.cohort_end_date
    WHERE coh.cohort_definition_id IN (@target_ids)
    GROUP BY cohort_definition_id, subject_id, cohort_start_date
    ORDER BY cohort_definition_id
  ),
  init_data AS(
    SELECT cohort_definition_id,
           DATEDIFF(day, cohort_start_date, drug_exposure_date) AS value
    FROM tab
  ),
  details AS (
    SELECT 
      cohort_definition_id,
      value,
      ROW_NUMBER() OVER (PARTITION BY cohort_definition_id ORDER BY value) AS row_number,
      SUM(1) OVER (PARTITION BY cohort_definition_id) AS total
    FROM init_data
  ),
  quartiles AS (
    SELECT cohort_definition_id,
      value,
      AVG(CASE
              WHEN row_number >= (FLOOR(total / 2.0) / 2.0)
                  AND row_number <= (FLOOR(total / 2.0) / 2.0) + 1
                  THEN value / 1.0
          END
          ) OVER (PARTITION BY cohort_definition_id) AS q1,
      AVG(CASE
              WHEN row_number >= (total / 2.0)
                  AND row_number <= (total / 2.0) + 1
                  THEN value / 1.0
          END
          ) OVER (PARTITION BY cohort_definition_id) AS median,
      AVG(CASE
              WHEN row_number >= (CEILING(total / 2.0) + (FLOOR(total / 2.0) / 2.0))
                  AND row_number <= (CEILING(total / 2.0) + (FLOOR(total / 2.0) / 2.0) + 1)
                  THEN value / 1.0
          END
          ) OVER (PARTITION BY cohort_definition_id) AS q3
    FROM details
  )
  SELECT cohort_definition_id,
         AVG(q3) - AVG(q1) AS IQR,
         MIN(value) AS minimum,
         AVG(q1) AS q1,
         AVG(median) AS median,
         AVG(q3) AS q3,
         MAX(value) AS maximum,
         'Time to Treatment' AS analysis_name
  FROM quartiles
  GROUP BY cohort_definition_id;") %>%
  SqlRender::render(
    cdm_database_schema = cdmDatabaseSchema,
    cohort_database_schema = cohortDatabaseSchema,
    cohort_table = cohortTable,
    target_ids = paste(targetIds, collapse = ",")) %>% 
  assertNoParameters() %>% 
  SqlRender::translate(dbms(connection))

df <- DatabaseConnector::querySql(connection, sql, snakeCaseToCamelCase = T)
metrics <- rbind(metrics, df)
andrData$metrics_distribution <- metrics

disconnect(connection)

In [None]:
# drop treatment complementary tables
connection <- connect(connectionDetails)
sql <- c("
  DROP TABLE IF EXISTS @cohort_database_schema.drug_codesets;
  DROP TABLE IF EXISTS @cohort_database_schema.treatment_tagged;
  DROP TABLE IF EXISTS @cohort_database_schema.treatment_pat;
  DROP TABLE IF EXISTS @cohort_database_schema.metastasis_date;") %>% 
  SqlRender::render(cohort_database_schema = cohortDatabaseSchema) %>% 
  SqlRender::translate(dbms(connection))

DatabaseConnector::executeSql(connection, sql)

disconnect(connection)

# Counting cohorts 

In [None]:
ParallelLogger::logInfo("Counting cohorts")

counts <- CohortGenerator::getCohortCounts(
  connectionDetails = connectionDetails,
  cohortDatabaseSchema = cohortDatabaseSchema,
  cohortTable = cohortTable)

if (nrow(counts) > 0) {
  counts$databaseId <- databaseId
  counts <- enforceMinCellValue(counts, "cohortEntries", minCellCount)
  counts <- enforceMinCellValue(counts, "cohortSubjects", minCellCount)
}

readr::write_csv(counts, file.path(exportFolder, "cohort_count.csv"))
andrData$cohort_count <- counts

# Read in the cohort counts
# counts <- readr::read_csv(file.path(exportFolder, "cohort_count.csv"))
# colnames(counts) <- SqlRender::snakeCaseToCamelCase(colnames(counts))

# Export the cohorts from the study
# cohortsForExport <- loadCohortsForExportFromPackage(cohortIds = counts$cohortId)
# andrData$cohort <- cohortsForExport

# Extract feature counts 

In [None]:
ParallelLogger::logInfo("Extract feature counts")

connection <- connect(connectionDetails)

sql <- c("
  SELECT 
    c.cohort_definition_id, 
    f.feature_cohort_definition_id,
    f.window_id,
    c.total_count,
    f.feature_count,
    1.0*f.feature_count/c.total_count AS mean,
    sqrt(1.0*(total_count*f.feature_count - f.feature_count*f.feature_count)/(c.total_count*(c.total_count - 1))) AS sd
  FROM (
    SELECT cohort_definition_id, COUNT_BIG(DISTINCT subject_id) total_count
    FROM @cohort_database_schema.@cohort_table 
    GROUP BY cohort_definition_id
  ) c
  INNER JOIN @cohort_database_schema.@feature_summary_table f -- Feature Count
    ON c.cohort_definition_id = f.cohort_definition_id
    AND c.total_count > 1 -- Prevent divide by zero;
  ") %>% 
  SqlRender::render(
    cohort_database_schema = cohortDatabaseSchema,
    cohort_table = cohortTable,
    feature_summary_table = featureSummaryTable) %>% 
  SqlRender::translate(dbms(connection))
  
data <- DatabaseConnector::querySql(connection, sql) 
names(data) <- SqlRender::snakeCaseToCamelCase(names(data))
  
# formatFeatureProportions ----
featureTimeWindows <- readr::read_csv(here::here("inst", "settings", "featureTimeWindows.csv"), 
                                      show_col_types = FALSE)

featureCohorts <- readr::read_csv(here::here("inst", "settings","CohortsToCreate.csv"), show_col_types = FALSE) %>% 
  dplyr::filter(group %in% c("Stratification", "Outcome")) %>% 
  dplyr::select(name, cohortId)

data <- merge(data, featureTimeWindows, by = "windowId")
data <- merge(data, featureCohorts, by.x = "featureCohortDefinitionId", by.y = "cohortId")
names(data)[names(data) == 'name'] <- 'featureName'
names(data)[names(data) == 'cohortDefinitionId'] <- 'cohortId'

data$covariateId <- data$featureCohortDefinitionId * 1000 + data$windowId

if (nrow(data) != 0) {
  data$covariateName <- paste0("Cohort during day ", 
                               data$windowStart, " through ", 
                               data$windowEnd, " days ", 
                               data$windowType, " the index: ", 
                               data$featureName)
  
  data$analysisId <- 10000
  
} else {
  # Add empty columns
  data <- data.frame(data, matrix(nrow = 0, ncol = 2))
  data <- tibble::tibble(data) %>%
    dplyr::rename(covariateName=X1, analysisId=X2) %>% 
    as.data.frame()
}

# format output
if (nrow(featureProportions) > 0) {
  featureProportions$databaseId <- databaseId
  featureProportions <- enforceMinCellValue(featureProportions, "featureCount", minCellCount)
  featureProportions <- featureProportions[featureProportions$totalCount >= minimumSubjectCountForCharacterization, ]
}

features <- formatCovariates(featureProportions)

  if (nrow(data) > 0) {
    data <- data[round(data$mean, 4) != 0, ]
    covariates <- unique(data.table::setDT(data[, c("covariateId", "covariateName", "analysisId")]))
    colnames(covariates)[[3]] <- "covariateAnalysisId"
  } else {
    covariates <- data.table::data.table("covariateId" = integer(), "covariateName" = character(), "covariateAnalysisId" = integer())
  }
  return(covariates)

andrData$covariate <- features

if (nrow(features) > 0) {
  features$databaseId <- databaseId
  features <- merge(features, counts[, c("cohortid", "cohortentries")])
  features <- enforceMinCellValue(features, "mean", minCellCount/features$cohortEntries)
  features$sd[features$mean < 0] <- NA
  features$cohortEntries <- NULL
  features$mean <- round(features$mean, 3)
  features$sd <- round(features$sd, 3)
}
  
featureValues <- featureValues[,c("cohortId", "covariateId", "mean", "sd", "databaseId")]
andrData$covariate_value <- featureValues
# Also keeping a raw output for debugging
# writeToCsv(featureProportions, file.path(exportFolder, "feature_proportions.csv"))

DatabaseConnector::disconnect(connection)

# Cohort characterization

In [None]:
# Note to package maintainer: If any of the logic to this changes, you'll need to revist
# the function createBulkCharacteristics
runCohortCharacterization <- function(cohortId, cohortName, covariateSettings, windowId, curIndex, totalCount) {
  ParallelLogger::logInfo("- (windowId=", windowId, ", ", curIndex, " of ", totalCount, ") Creating characterization for cohort: ", cohortName)
  data <- getCohortCharacteristics(connection = connection,
                                   cdmDatabaseSchema = cdmDatabaseSchema,
                                   oracleTempSchema = oracleTempSchema,
                                   cohortDatabaseSchema = cohortDatabaseSchema,
                                   cohortTable = cohortTable,
                                   cohortId = cohortId,
                                   covariateSettings = covariateSettings)
  if (nrow(data) > 0) {
    data$cohortId <- cohortId
  }
  
  data$covariateId <- data$covariateId * 10 + windowId
  return(data)
}

# Subset the cohorts to the target/strata for running feature extraction
# that are >= 140 per protocol to improve efficency
featureExtractionCohorts <-  loadCohortsForExportWithChecksumFromPackage(counts[counts$cohortSubjects >= minimumSubjectCountForCharacterization, c("cohortId")]$cohortId)
# Bulk approach ----------------------
if (useBulkCharacterization) {
  ParallelLogger::logInfo("********************************************************************************************")
  ParallelLogger::logInfo("Bulk characterization of all cohorts for all time windows")
  ParallelLogger::logInfo("********************************************************************************************")
  createBulkCharacteristics(connection, 
                            oracleTempSchema, 
                            cohortIds = featureExtractionCohorts$cohortId, 
                            cdmDatabaseSchema, 
                            cohortDatabaseSchema, 
                            cohortTable)
  writeBulkCharacteristics(connection, oracleTempSchema, counts, minCellCount, databaseId, exportFolder, andrData)
} else {
  # Sequential Approach --------------------------------
  if (incremental) {
    recordKeepingFile <- file.path(incrementalFolder, "CreatedAnalyses.csv")
  }
  featureTimeWindows <- readr::read_csv(here::here("inst", "settings", "featureTimeWindows.csv"), 
                                          show_col_types = FALSE)
  
  for (i in 1:nrow(featureTimeWindows)) {
    windowStart <- featureTimeWindows$windowStart[i]
    windowEnd <- featureTimeWindows$windowEnd[i]
    windowId <- featureTimeWindows$windowId[i]
    ParallelLogger::logInfo("********************************************************************************************")
    ParallelLogger::logInfo(paste0("Characterize concept features for start: ", windowStart, ", end: ", windowEnd, " (windowId=", windowId, ")"))
    ParallelLogger::logInfo("********************************************************************************************")
    createDemographics <- (i == 1)
    covariateSettings <- FeatureExtraction::createCovariateSettings(useDemographicsGender = createDemographics,
                                                                    useDemographicsAgeGroup = createDemographics,
                                                                    useConditionGroupEraShortTerm = TRUE,
                                                                    useDrugGroupEraShortTerm = TRUE,
                                                                    shortTermStartDays = windowStart,
                                                                    endDays = windowEnd)
    task <- paste0("runCohortCharacterizationWindowId", windowId)
    if (incremental) {
      subset <- subsetToRequiredCohorts(cohorts = featureExtractionCohorts,
                                        task = task,
                                        incremental = incremental,
                                        recordKeepingFile = recordKeepingFile)
    } else {
      subset <- featureExtractionCohorts
    }
    
    if (nrow(subset) > 0) {
      for (j in 1:nrow(subset)) {
        data <- runCohortCharacterization(cohortId = subset$cohortId[j],
                                          cohortName = subset$cohortName[j],
                                          covariateSettings = covariateSettings,
                                          windowId = windowId,
                                          curIndex = j,
                                          totalCount = nrow(subset))
        covariates <- formatCovariates(data)
        writeToCsv(covariates, file.path(exportFolder, "covariate.csv"), incremental = incremental, covariateId = covariates$covariateId)
        data <- formatCovariateValues(data, counts, minCellCount, databaseId)
        writeToCsv(data, file.path(exportFolder, "covariate_value.csv"), incremental = incremental, cohortId = data$cohortId, data$covariateId)
        if (incremental) {
          recordTasksDone(cohortId = subset$cohortId[j],
                          task = task,
                          checksum = subset$checksum[j],
                          recordKeepingFile = recordKeepingFile,
                          incremental = incremental)
        }
      }
    }
  }
}

In [None]:
# Format results -----------------------------------------------------------------------------------
ParallelLogger::logInfo("********************************************************************************************")
ParallelLogger::logInfo("Formatting Results")
ParallelLogger::logInfo("********************************************************************************************")
# Ensure that the covariate_value is free of any duplicate values. This can happen after more than
# one run of the package.
andrData$covariate_value <- andrData$covariate_value %>% dplyr::distinct()


# # Export to zip file -------------------------------------------------------------------------------
# exportResults(exportFolder, databaseId, cohortIdsToExcludeFromResultsExport)
# delta <- Sys.time() - start
# ParallelLogger::logInfo(paste("Running study took",
#                               signif(delta, 3),
#                               attr(delta, "units")))
Andromeda::saveAndromeda(andrData, file.path(exportFolder, "study_results.zip"))

ParallelLogger::unregisterLogger("DEFAULT")
