From 6181952987874ca1f096506cef6d5a5666c5a4f4 Mon Sep 17 00:00:00 2001 From: Jamie Gilbert Date: Thu, 4 Jan 2024 14:41:04 -0800 Subject: [PATCH 01/25] Revert "Revert "Fixes to support big int cohorts ids"" This reverts commit de27d57497220e8601d7a3db6af8ba87789452f6. --- R/Analyses.R | 11 +++++------ R/DataLoadingSaving.R | 6 +++--- R/RunAnalyses.R | 4 ++-- 3 files changed, 10 insertions(+), 11 deletions(-) diff --git a/R/Analyses.R b/R/Analyses.R index 539a1204..cd2f7a64 100644 --- a/R/Analyses.R +++ b/R/Analyses.R @@ -210,19 +210,18 @@ createOutcome <- function(outcomeId, riskWindowEnd = NULL, endAnchor = NULL) { errorMessages <- checkmate::makeAssertCollection() - checkmate::assertInt(outcomeId, add = errorMessages) + checkmate::assertNumeric(outcomeId, add = errorMessages) checkmate::assertLogical(outcomeOfInterest, add = errorMessages) checkmate::assertNumeric(trueEffectSize, len = 1, null.ok = TRUE, add = errorMessages) checkmate::assertInt(riskWindowStart, null.ok = TRUE, add = errorMessages) checkmate::assertInt(riskWindowEnd, null.ok = TRUE, add = errorMessages) checkmate::reportAssertions(collection = errorMessages) if (!is.null(startAnchor) && !grepl("start$|end$", startAnchor, ignore.case = TRUE)) { - stop("startAnchor should have value 'cohort start' or 'cohort end'") + stop("startAnchor should have value \'cohort start\' or \'cohort end\'") } if (!is.null(riskWindowEnd) && !grepl("start$|end$", endAnchor, ignore.case = TRUE)) { - stop("endAnchor should have value 'cohort start' or 'cohort end'") + stop("endAnchor should have value \'cohort start\' or \'cohort end\'") } - outcome <- list() for (name in names(formals(createOutcome))) { outcome[[name]] <- get(name) @@ -259,8 +258,8 @@ createTargetComparatorOutcomes <- function(targetId, excludedCovariateConceptIds = c(), includedCovariateConceptIds = c()) { errorMessages <- checkmate::makeAssertCollection() - checkmate::assertInt(targetId, add = errorMessages) - checkmate::assertInt(comparatorId, add = errorMessages) + checkmate::assertNumeric(targetId, add = errorMessages) + checkmate::assertNumeric(comparatorId, add = errorMessages) checkmate::assertList(outcomes, min.len = 1, add = errorMessages) for (i in seq_along(outcomes)) { checkmate::assertClass(outcomes[[i]], "outcome", add = errorMessages) diff --git a/R/DataLoadingSaving.R b/R/DataLoadingSaving.R index a8f690c3..3c031d80 100644 --- a/R/DataLoadingSaving.R +++ b/R/DataLoadingSaving.R @@ -127,9 +127,9 @@ getDbCohortMethodData <- function(connectionDetails, checkmate::assertClass(connectionDetails, "ConnectionDetails", add = errorMessages) checkmate::assertCharacter(cdmDatabaseSchema, len = 1, add = errorMessages) checkmate::assertCharacter(tempEmulationSchema, len = 1, null.ok = TRUE, add = errorMessages) - checkmate::assertInt(targetId, add = errorMessages) - checkmate::assertInt(comparatorId, add = errorMessages) - checkmate::assertIntegerish(outcomeIds, add = errorMessages) + checkmate::assertNumeric(targetId, add = errorMessages) + checkmate::assertNumeric(comparatorId, add = errorMessages) + checkmate::assertNumeric(outcomeIds, add = errorMessages) checkmate::assertCharacter(studyStartDate, len = 1, add = errorMessages) checkmate::assertCharacter(studyEndDate, len = 1, add = errorMessages) checkmate::assertCharacter(exposureDatabaseSchema, len = 1, add = errorMessages) diff --git a/R/RunAnalyses.R b/R/RunAnalyses.R index 3b9f90ff..1c133c74 100644 --- a/R/RunAnalyses.R +++ b/R/RunAnalyses.R @@ -749,7 +749,7 @@ getPs <- function(psFile) { } doCreateCmDataObject <- function(params) { - ParallelLogger::logDebug(sprintf("Calling getDbCohortMethodData() for targetId %d, comparatorId %d", + ParallelLogger::logDebug(sprintf("Calling getDbCohortMethodData() for targetId %s, comparatorId %s", params$args$targetId, params$args$comparatorId)) cohortMethodData <- do.call("getDbCohortMethodData", params$args) @@ -761,7 +761,7 @@ doCreateStudyPopObject <- function(params) { cohortMethodData <- getCohortMethodData(params$cohortMethodDataFile) args <- params$args args$cohortMethodData <- cohortMethodData - ParallelLogger::logDebug(sprintf("Calling createStudyPopulation() using %s for outcomeId %d", + ParallelLogger::logDebug(sprintf("Calling createStudyPopulation() using %s for outcomeId %s", params$cohortMethodDataFile, args$outcomeId)) studyPop <- do.call("createStudyPopulation", args) From e8ba028af328df969920c51334b9cda84a5690ab Mon Sep 17 00:00:00 2001 From: Jamie Gilbert Date: Mon, 8 Jan 2024 13:02:37 -0800 Subject: [PATCH 02/25] Description to denote fork --- DESCRIPTION | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/DESCRIPTION b/DESCRIPTION index 5887ab8d..c729222f 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -1,7 +1,7 @@ Package: CohortMethod Type: Package Title: New-User Cohort Method with Large Scale Propensity and Outcome Models -Version: 5.2.0 +Version: 5.2.1.99999 Date: 2023-12-21 Authors@R: c( person("Martijn", "Schuemie", , "schuemie@ohdsi.org", role = c("aut", "cre")), From dfbbd0f11e58e91781b0be4d0eca09047177f20f Mon Sep 17 00:00:00 2001 From: Jamie Gilbert Date: Fri, 16 Feb 2024 08:59:40 -0800 Subject: [PATCH 03/25] Change to support big int in outcomeIds --- R/StudyPopulation.R | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/R/StudyPopulation.R b/R/StudyPopulation.R index 200e711e..1543adf0 100644 --- a/R/StudyPopulation.R +++ b/R/StudyPopulation.R @@ -101,7 +101,7 @@ createStudyPopulation <- function(cohortMethodData, errorMessages <- checkmate::makeAssertCollection() checkmate::assertClass(cohortMethodData, "CohortMethodData", add = errorMessages) checkmate::assertDataFrame(population, null.ok = TRUE, add = errorMessages) - checkmate::assertInt(outcomeId, null.ok = TRUE, add = errorMessages) + checkmate::assertNumeric(outcomeId, null.ok = TRUE, add = errorMessages) checkmate::assertLogical(firstExposureOnly, len = 1, add = errorMessages) checkmate::assertLogical(restrictToCommonPeriod, len = 1, add = errorMessages) checkmate::assertInt(washoutPeriod, lower = 0, add = errorMessages) From 1ddfc9ca7498e0b44bbba084da5a6a99a432d8d9 Mon Sep 17 00:00:00 2001 From: Jamie Gilbert Date: Fri, 23 Feb 2024 07:53:51 -0800 Subject: [PATCH 04/25] Error handling weird rds save issues --- R/RunAnalyses.R | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/R/RunAnalyses.R b/R/RunAnalyses.R index 1c133c74..54d2db33 100644 --- a/R/RunAnalyses.R +++ b/R/RunAnalyses.R @@ -815,7 +815,11 @@ addPsToStudyPopForSubset <- function(subset, outputFolder) { refRow <- subset[i, ] studyPop <- readRDS(file.path(outputFolder, refRow$studyPopFile)) studyPop <- addPsToStudyPopulation(studyPop, ps) + tryCatch({ saveRDS(studyPop, file.path(outputFolder, refRow$psFile)) + }, error = function(err) { + ParallelLogger::logError(paste("error saving ", refRow$psFile)) + }) return(NULL) } plyr::l_ply(1:nrow(subset), addToStudyPop) From b7257e0a32bf37e0106d1294b058dd4364cb1beb Mon Sep 17 00:00:00 2001 From: Jamie Gilbert Date: Fri, 23 Feb 2024 07:58:32 -0800 Subject: [PATCH 05/25] Description --- DESCRIPTION | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/DESCRIPTION b/DESCRIPTION index c729222f..94d5c0ec 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -1,7 +1,7 @@ Package: CohortMethod Type: Package Title: New-User Cohort Method with Large Scale Propensity and Outcome Models -Version: 5.2.1.99999 +Version: 5.2.1 Date: 2023-12-21 Authors@R: c( person("Martijn", "Schuemie", , "schuemie@ohdsi.org", role = c("aut", "cre")), From 30e9e89441a093882cc006ff1df034f097485ae9 Mon Sep 17 00:00:00 2001 From: Jamie Gilbert Date: Mon, 26 Feb 2024 08:08:29 -0800 Subject: [PATCH 06/25] Missing files --- R/RunAnalyses.R | 251 ++++++++++++++++++++++++++++++++---------------- 1 file changed, 167 insertions(+), 84 deletions(-) diff --git a/R/RunAnalyses.R b/R/RunAnalyses.R index 54d2db33..3252d001 100644 --- a/R/RunAnalyses.R +++ b/R/RunAnalyses.R @@ -279,13 +279,14 @@ runCmAnalyses <- function(connectionDetails, saveRDS(targetComparatorOutcomesList, file.path(outputFolder, "targetComparatorOutcomesList.rds")) # Create cohortMethodData objects ----------------------------- - subset <- referenceTable[!duplicated(referenceTable$cohortMethodDataFile), ] - subset <- subset[subset$cohortMethodDataFile != "", ] - subset <- subset[!file.exists(file.path(outputFolder, subset$cohortMethodDataFile)), ] + subset <- referenceTable[!duplicated(referenceTable$cohortMethodDataFile),] + subset <- subset[subset$cohortMethodDataFile != "",] + subset <- subset[!file.exists(file.path(outputFolder, subset$cohortMethodDataFile)),] if (nrow(subset) != 0) { message("*** Creating cohortMethodData objects ***") + createCmDataTask <- function(i) { - refRow <- subset[i, ] + refRow <- subset[i,] analysisRow <- ParallelLogger::matchInList( cmAnalysisList, list(analysisId = refRow$analysisId) @@ -333,6 +334,7 @@ runCmAnalyses <- function(connectionDetails, ) return(task) } + objectsToCreate <- lapply(1:nrow(subset), createCmDataTask) cluster <- ParallelLogger::makeCluster(min(length(objectsToCreate), multiThreadingSettings$getDbCohortMethodDataThreads)) ParallelLogger::clusterRequire(cluster, "CohortMethod") @@ -341,13 +343,14 @@ runCmAnalyses <- function(connectionDetails, } # Create study populations -------------------------------------- - subset <- referenceTable[!duplicated(referenceTable$studyPopFile), ] - subset <- subset[subset$studyPopFile != "", ] - subset <- subset[!file.exists(file.path(outputFolder, subset$studyPopFile)), ] + subset <- referenceTable[!duplicated(referenceTable$studyPopFile),] + subset <- subset[subset$studyPopFile != "",] + subset <- subset[!file.exists(file.path(outputFolder, subset$studyPopFile)),] if (nrow(subset) != 0) { message("*** Creating study populations ***") + createStudyPopTask <- function(i) { - refRow <- subset[i, ] + refRow <- subset[i,] analysisRow <- ParallelLogger::matchInList( cmAnalysisList, list(analysisId = refRow$analysisId) @@ -393,6 +396,7 @@ runCmAnalyses <- function(connectionDetails, ) return(task) } + objectsToCreate <- lapply(1:nrow(subset), createStudyPopTask) cluster <- ParallelLogger::makeCluster(min(length(objectsToCreate), multiThreadingSettings$createStudyPopThreads)) ParallelLogger::clusterRequire(cluster, "CohortMethod") @@ -402,13 +406,14 @@ runCmAnalyses <- function(connectionDetails, # Fit propensity models --------------------------------------- if (refitPsForEveryOutcome) { - subset <- referenceTable[!duplicated(referenceTable$psFile), ] - subset <- subset[subset$psFile != "", ] - subset <- subset[!file.exists(file.path(outputFolder, subset$psFile)), ] + subset <- referenceTable[!duplicated(referenceTable$psFile),] + subset <- subset[subset$psFile != "",] + subset <- subset[!file.exists(file.path(outputFolder, subset$psFile)),] if (nrow(subset) != 0) { message("*** Fitting propensity models ***") + createPsTask <- function(i) { - refRow <- subset[i, ] + refRow <- subset[i,] analysisRow <- ParallelLogger::matchInList( cmAnalysisList, list(analysisId = refRow$analysisId) @@ -434,13 +439,14 @@ runCmAnalyses <- function(connectionDetails, ParallelLogger::stopCluster(cluster) } } else { - subset <- referenceTable[!duplicated(referenceTable$sharedPsFile), ] - subset <- subset[subset$sharedPsFile != "", ] - subset <- subset[!file.exists(file.path(outputFolder, subset$sharedPsFile)), ] + subset <- referenceTable[!duplicated(referenceTable$sharedPsFile),] + subset <- subset[subset$sharedPsFile != "",] + subset <- subset[!file.exists(file.path(outputFolder, subset$sharedPsFile)),] if (nrow(subset) != 0) { message("*** Fitting shared propensity models ***") + createSharedPsTask <- function(i) { - refRow <- subset[i, ] + refRow <- subset[i,] analysisRow <- ParallelLogger::matchInList( cmAnalysisList, list(analysisId = refRow$analysisId) @@ -459,6 +465,7 @@ runCmAnalyses <- function(connectionDetails, ) return(task) } + modelsToFit <- lapply(1:nrow(subset), createSharedPsTask) cluster <- ParallelLogger::makeCluster(min(length(modelsToFit), multiThreadingSettings$createPsThreads)) ParallelLogger::clusterRequire(cluster, "CohortMethod") @@ -466,9 +473,9 @@ runCmAnalyses <- function(connectionDetails, ParallelLogger::stopCluster(cluster) } - subset <- referenceTable[!duplicated(referenceTable$psFile), ] - subset <- subset[subset$psFile != "", ] - subset <- subset[!file.exists(file.path(outputFolder, subset$psFile)), ] + subset <- referenceTable[!duplicated(referenceTable$psFile),] + subset <- subset[subset$psFile != "",] + subset <- subset[!file.exists(file.path(outputFolder, subset$psFile)),] if (nrow(subset) != 0) { message("*** Adding propensity scores to study population objects ***") tasks <- split(subset, subset$sharedPsFile) @@ -480,25 +487,30 @@ runCmAnalyses <- function(connectionDetails, } # Trimming/Matching/Stratifying ----------------------------------------- - subset <- referenceTable[!duplicated(referenceTable$strataFile), ] - subset <- subset[subset$strataFile != "", ] - subset <- subset[!file.exists(file.path(outputFolder, subset$strataFile)), ] + subset <- referenceTable[!duplicated(referenceTable$strataFile),] + subset <- subset[subset$strataFile != "",] + subset <- subset[!file.exists(file.path(outputFolder, subset$strataFile)),] if (nrow(subset) != 0) { message("*** Trimming/Matching/Stratifying ***") + + tasks <- list() createTrimMatchStratTask <- function(i) { - refRow <- subset[i, ] + refRow <- subset[i,] analysisRow <- ParallelLogger::matchInList( cmAnalysisList, list(analysisId = refRow$analysisId) )[[1]] - task <- list( - psFile = file.path(outputFolder, refRow$psFile), - args = analysisRow, - strataFile = file.path(outputFolder, refRow$strataFile) - ) - return(task) + + if (file.exists(file.path(outputFolder, refRow$psFile))) { + tasks[[length(tasks) + 1]] <<- list( + psFile = file.path(outputFolder, refRow$psFile), + args = analysisRow, + strataFile = file.path(outputFolder, refRow$strataFile) + ) + } } - tasks <- lapply(1:nrow(subset), createTrimMatchStratTask) + + lapply(1:nrow(subset), createTrimMatchStratTask) cluster <- ParallelLogger::makeCluster(min(length(tasks), multiThreadingSettings$trimMatchStratifyThreads)) ParallelLogger::clusterRequire(cluster, "CohortMethod") @@ -507,13 +519,14 @@ runCmAnalyses <- function(connectionDetails, } # Computing shared covariate balance ---------------------------------- - subset <- referenceTable[!duplicated(referenceTable$sharedBalanceFile), ] - subset <- subset[subset$sharedBalanceFile != "", ] - subset <- subset[!file.exists(file.path(outputFolder, subset$sharedBalanceFile)), ] + subset <- referenceTable[!duplicated(referenceTable$sharedBalanceFile),] + subset <- subset[subset$sharedBalanceFile != "",] + subset <- subset[!file.exists(file.path(outputFolder, subset$sharedBalanceFile)),] if (nrow(subset) != 0) { message("*** Computing shared covariate balance ***") + createSharedBalanceTask <- function(i) { - refRow <- subset[i, ] + refRow <- subset[i,] analysisRow <- ParallelLogger::matchInList( cmAnalysisList, list(analysisId = refRow$analysisId) @@ -530,6 +543,7 @@ runCmAnalyses <- function(connectionDetails, ) return(task) } + tasks <- lapply(1:nrow(subset), createSharedBalanceTask) cluster <- ParallelLogger::makeCluster(min(length(tasks), multiThreadingSettings$computeSharedBalanceThreads)) ParallelLogger::clusterRequire(cluster, "CohortMethod") @@ -538,13 +552,14 @@ runCmAnalyses <- function(connectionDetails, } # Filtering covariates for computing covariate balance ------------------------------ - subset <- referenceTable[!duplicated(referenceTable$filteredForbalanceFile), ] - subset <- subset[subset$filteredForbalanceFile != "", ] - subset <- subset[!file.exists(file.path(outputFolder, subset$filteredForbalanceFile)), ] + subset <- referenceTable[!duplicated(referenceTable$filteredForbalanceFile),] + subset <- subset[subset$filteredForbalanceFile != "",] + subset <- subset[!file.exists(file.path(outputFolder, subset$filteredForbalanceFile)),] if (nrow(subset) != 0) { message("*** Filtering covariates for computing covariate balance ***") + createFilterForCovariateBalanceTask <- function(i) { - refRow <- subset[i, ] + refRow <- subset[i,] analysisRow <- ParallelLogger::matchInList( cmAnalysisList, list(analysisId = refRow$analysisId) @@ -558,6 +573,7 @@ runCmAnalyses <- function(connectionDetails, ) return(task) } + tasks <- lapply(1:nrow(subset), createFilterForCovariateBalanceTask) cluster <- ParallelLogger::makeCluster(min(length(tasks), multiThreadingSettings$prefilterCovariatesThreads)) ParallelLogger::clusterRequire(cluster, "CohortMethod") @@ -566,13 +582,14 @@ runCmAnalyses <- function(connectionDetails, } # Computing covariate balance (per outcome) ----------------------- - subset <- referenceTable[!duplicated(referenceTable$balanceFile), ] - subset <- subset[subset$balanceFile != "", ] - subset <- subset[!file.exists(file.path(outputFolder, subset$balanceFile)), ] + subset <- referenceTable[!duplicated(referenceTable$balanceFile),] + subset <- subset[subset$balanceFile != "",] + subset <- subset[!file.exists(file.path(outputFolder, subset$balanceFile)),] if (nrow(subset) != 0) { message("*** Computing covariate balance (per outcome) ***") + createBalanceTask <- function(i) { - refRow <- subset[i, ] + refRow <- subset[i,] analysisRow <- ParallelLogger::matchInList( cmAnalysisList, list(analysisId = refRow$analysisId) @@ -592,6 +609,7 @@ runCmAnalyses <- function(connectionDetails, ) return(task) } + tasks <- lapply(1:nrow(subset), createBalanceTask) cluster <- ParallelLogger::makeCluster(min(length(tasks), multiThreadingSettings$computeBalanceThreads)) ParallelLogger::clusterRequire(cluster, "CohortMethod") @@ -600,13 +618,14 @@ runCmAnalyses <- function(connectionDetails, } # Prefiltering covariates for outcome models ------------------------- - subset <- referenceTable[!duplicated(referenceTable$prefilteredCovariatesFile), ] - subset <- subset[subset$prefilteredCovariatesFile != "", ] - subset <- subset[!file.exists(file.path(outputFolder, subset$prefilteredCovariatesFile)), ] + subset <- referenceTable[!duplicated(referenceTable$prefilteredCovariatesFile),] + subset <- subset[subset$prefilteredCovariatesFile != "",] + subset <- subset[!file.exists(file.path(outputFolder, subset$prefilteredCovariatesFile)),] if (nrow(subset) != 0) { message("*** Prefiltering covariates for outcome models ***") + createPrefilterTask <- function(i) { - refRow <- subset[i, ] + refRow <- subset[i,] analysisRow <- ParallelLogger::matchInList( cmAnalysisList, list(analysisId = refRow$analysisId) @@ -624,6 +643,7 @@ runCmAnalyses <- function(connectionDetails, ) return(task) } + tasks <- lapply(1:nrow(subset), createPrefilterTask) cluster <- ParallelLogger::makeCluster(min(length(tasks), multiThreadingSettings$prefilterCovariatesThreads)) ParallelLogger::clusterRequire(cluster, "CohortMethod") @@ -632,13 +652,13 @@ runCmAnalyses <- function(connectionDetails, } # Fitting outcome models -------------------------- - subset <- referenceTable[referenceTable$outcomeOfInterest & referenceTable$outcomeModelFile != "", ] - subset <- subset[!file.exists(file.path(outputFolder, subset$outcomeModelFile)), ] + subset <- referenceTable[referenceTable$outcomeOfInterest & referenceTable$outcomeModelFile != "",] + subset <- subset[!file.exists(file.path(outputFolder, subset$outcomeModelFile)),] if (nrow(subset) != 0) { message("*** Fitting outcome models for outcomes of interest ***") createOutcomeModelTask <- function(i) { - refRow <- subset[i, ] + refRow <- subset[i,] analysisRow <- ParallelLogger::matchInList( cmAnalysisList, list(analysisId = refRow$analysisId) @@ -664,6 +684,7 @@ runCmAnalyses <- function(connectionDetails, outcomeModelFile = file.path(outputFolder, refRow$outcomeModelFile) )) } + modelsToFit <- lapply(1:nrow(subset), createOutcomeModelTask) cluster <- ParallelLogger::makeCluster(min(length(modelsToFit), multiThreadingSettings$fitOutcomeModelThreads)) ParallelLogger::clusterRequire(cluster, "CohortMethod") @@ -671,13 +692,13 @@ runCmAnalyses <- function(connectionDetails, ParallelLogger::stopCluster(cluster) } - subset <- referenceTable[!referenceTable$outcomeOfInterest & referenceTable$outcomeModelFile != "", ] - subset <- subset[!file.exists(file.path(outputFolder, subset$outcomeModelFile)), ] + subset <- referenceTable[!referenceTable$outcomeOfInterest & referenceTable$outcomeModelFile != "",] + subset <- subset[!file.exists(file.path(outputFolder, subset$outcomeModelFile)),] if (nrow(subset) != 0) { message("*** Fitting outcome models for other outcomes ***") createArgs <- function(i) { - refRow <- subset[i, ] + refRow <- subset[i,] analysisRow <- ParallelLogger::matchInList( cmAnalysisList, list(analysisId = refRow$analysisId) @@ -691,7 +712,7 @@ runCmAnalyses <- function(connectionDetails, params <- list( cohortMethodDataFile = file.path(outputFolder, refRow$cohortMethodDataFile), prefilteredCovariatesFile = prefilteredCovariatesFile, - psFile = file.path(outputFolder, refRow$psFile), + psFile = file.path(outputFolder, refRow$psFile), sharedPsFile = file.path(outputFolder, refRow$sharedPsFile), refitPsForEveryOutcome = refitPsForEveryOutcome, args = analysisRow, @@ -699,6 +720,7 @@ runCmAnalyses <- function(connectionDetails, ) return(params) } + modelsToFit <- lapply(1:nrow(subset), createArgs) cluster <- ParallelLogger::makeCluster(min(length(modelsToFit), multiThreadingSettings$fitOutcomeModelThreads)) ParallelLogger::clusterRequire(cluster, "CohortMethod") @@ -812,23 +834,24 @@ addPsToStudyPopForSubset <- function(subset, outputFolder) { ps <- readRDS(file.path(outputFolder, subset$sharedPsFile[1])) addToStudyPop <- function(i) { - refRow <- subset[i, ] + refRow <- subset[i,] studyPop <- readRDS(file.path(outputFolder, refRow$studyPopFile)) studyPop <- addPsToStudyPopulation(studyPop, ps) tryCatch({ - saveRDS(studyPop, file.path(outputFolder, refRow$psFile)) + saveRDS(studyPop, file.path(outputFolder, refRow$psFile)) }, error = function(err) { ParallelLogger::logError(paste("error saving ", refRow$psFile)) }) return(NULL) } + plyr::l_ply(1:nrow(subset), addToStudyPop) } addPsToStudyPopulation <- function(studyPopulation, ps) { # Merge meta-data newMetaData <- attr(studyPopulation, "metaData") - psMetaData <- attr(ps, "metaData") + psMetaData <- attr(ps, "metaData") missingColumns <- setdiff(names(psMetaData), names(newMetaData)) newMetaData <- append(newMetaData, psMetaData[missingColumns]) attr(studyPopulation, "metaData") <- newMetaData @@ -1013,7 +1036,9 @@ doComputeSharedBalance <- function(params) { doFilterForCovariateBalance <- function(params) { cohortMethodData <- loadCohortMethodData(params$cohortMethodDataFile) - covariateFilter <- params$computeCovariateBalanceArgs$covariateFilter + covariateFilter <- params$ + computeCovariateBalanceArgs$ + covariateFilter covariates <- filterCovariates(cohortMethodData$covariates, cohortMethodData$covariateRef, covariateFilter) filteredCohortMethodData <- Andromeda::andromeda( cohorts = cohortMethodData$cohorts, @@ -1062,10 +1087,12 @@ createReferenceTable <- function(cmAnalysisList, analysisFolder = sprintf("Analysis_%d", analysis$analysisId) ) } + analyses <- bind_rows(lapply(cmAnalysisList, convertAnalysisToTable)) foldersToCreate <- file.path(outputFolder, analyses$analysisFolder) foldersToCreate <- foldersToCreate[!dir.exists(foldersToCreate)] sapply(foldersToCreate, dir.create) + convertOutcomeToTable <- function(outcome) { tibble( outcomeId = outcome$outcomeId, @@ -1074,6 +1101,7 @@ createReferenceTable <- function(cmAnalysisList, ) %>% return() } + convertTcosToTable <- function(tcos) { lapply(tcos$outcomes, convertOutcomeToTable) %>% bind_rows() %>% @@ -1085,6 +1113,7 @@ createReferenceTable <- function(cmAnalysisList, ) %>% return() } + tcos <- bind_rows(lapply(targetComparatorOutcomesList, convertTcosToTable)) referenceTable <- analyses %>% cross_join(tcos) @@ -1099,6 +1128,7 @@ createReferenceTable <- function(cmAnalysisList, } }, list, object))) } + loadingArgsList <- unique(ParallelLogger::selectFromList( cmAnalysisList, "getDbCohortMethodDataArgs" @@ -1193,21 +1223,23 @@ createReferenceTable <- function(cmAnalysisList, studyPopArgsList <- lapply(studyPopArgsList, function(x) { return(x[[1]]) }) + equivalent <- function(studyPopArgs1, studyPopArgs2) { if (identical(studyPopArgs1, studyPopArgs2)) { return(TRUE) } if (studyPopArgs1$firstExposureOnly != studyPopArgs2$firstExposureOnly || - studyPopArgs1$restrictToCommonPeriod != studyPopArgs2$restrictToCommonPeriod || - studyPopArgs1$washoutPeriod != studyPopArgs2$washoutPeriod || - studyPopArgs1$removeDuplicateSubjects != studyPopArgs2$removeDuplicateSubjects || - studyPopArgs1$minDaysAtRisk != studyPopArgs2$minDaysAtRisk || - studyPopArgs1$minDaysAtRisk != 0) { + studyPopArgs1$restrictToCommonPeriod != studyPopArgs2$restrictToCommonPeriod || + studyPopArgs1$washoutPeriod != studyPopArgs2$washoutPeriod || + studyPopArgs1$removeDuplicateSubjects != studyPopArgs2$removeDuplicateSubjects || + studyPopArgs1$minDaysAtRisk != studyPopArgs2$minDaysAtRisk || + studyPopArgs1$minDaysAtRisk != 0) { return(FALSE) } else { return(TRUE) } } + findFirstEquivalent <- function(studyPopArgsList, studyPopArgs) { for (i in 1:length(studyPopArgsList)) { if (equivalent(studyPopArgsList[[i]], studyPopArgs)) { @@ -1215,6 +1247,7 @@ createReferenceTable <- function(cmAnalysisList, } } } + studyPopArgsEquivalentId <- sapply( cmAnalysisList, function(cmAnalysis, studyPopArgsList) { @@ -1260,9 +1293,11 @@ createReferenceTable <- function(cmAnalysisList, "stratifyByPsArgs", "stratifyByPsAndCovariatesArgs" ) + normStrataArgs <- function(strataArgs) { return(strataArgs[args][!is.na(names(strataArgs[args]))]) } + strataArgsList <- unique(ParallelLogger::selectFromList(cmAnalysisList, args)) strataArgsList <- strataArgsList[sapply( strataArgsList, @@ -1308,9 +1343,11 @@ createReferenceTable <- function(cmAnalysisList, if (refitPsForEveryOutcome) { referenceTable$sharedBalanceFile <- "" } else { + normBalanceArgs <- function(balanceArgs) { return(balanceArgs[args][!is.na(names(balanceArgs[args]))]) } + balanceArgsList <- unique(ParallelLogger::selectFromList(cmAnalysisList, args)) balanceArgsList <- balanceArgsList[sapply( balanceArgsList, @@ -1347,9 +1384,11 @@ createReferenceTable <- function(cmAnalysisList, # Add covariate balance files (per target-comparator-analysis-outcome) args <- "computeCovariateBalanceArgs" + normBalanceArgs <- function(balanceArgs) { return(balanceArgs[args][!is.na(names(balanceArgs[args]))]) } + balanceArgsList <- unique(ParallelLogger::selectFromList(cmAnalysisList, args)) balanceArgsList <- balanceArgsList[sapply( balanceArgsList, @@ -1374,7 +1413,9 @@ createReferenceTable <- function(cmAnalysisList, balanceIdsRequiringFiltering <- sapply( 1:length(balanceArgsList), - function(i) ifelse(is.null(balanceArgsList[[i]]$computeCovariateBalanceArgs$covariateFilter), NA, i) + function(i) ifelse(is.null(balanceArgsList[[i]]$ + computeCovariateBalanceArgs$ + covariateFilter), NA, i) ) } idx <- referenceTable$balanceId %in% balanceIdsRequiringFiltering @@ -1406,25 +1447,43 @@ createReferenceTable <- function(cmAnalysisList, cmAnalysisList, c("getDbCohortMethodDataArgs", "fitOutcomeModelArgs") )) + needsFilter <- function(loadingFittingArgs) { if (!"fitOutcomeModelArgs" %in% names(loadingFittingArgs)) { return(NULL) } - keep <- (loadingFittingArgs$fitOutcomeModelArgs$useCovariates & (length(loadingFittingArgs$fitOutcomeModelArgs$excludeCovariateIds) != 0 | - length(loadingFittingArgs$fitOutcomeModelArgs$includeCovariateIds) != 0)) | - length(loadingFittingArgs$fitOutcomeModelArgs$interactionCovariateIds) != 0 + keep <- (loadingFittingArgs$ + fitOutcomeModelArgs$ + useCovariates & (length(loadingFittingArgs$ + fitOutcomeModelArgs$ + excludeCovariateIds) != 0 | + length(loadingFittingArgs$ + fitOutcomeModelArgs$ + includeCovariateIds) != 0)) | + length(loadingFittingArgs$ + fitOutcomeModelArgs$ + interactionCovariateIds) != 0 if (keep) { loadingFittingArgs$relevantFields <- list( - useCovariates = loadingFittingArgs$fitOutcomeModelArgs$useCovariates, - excludeCovariateIds = loadingFittingArgs$fitOutcomeModelArgs$excludeCovariateIds, - includeCovariateIds = loadingFittingArgs$fitOutcomeModelArgs$includeCovariateIds, - interactionCovariateIds = loadingFittingArgs$fitOutcomeModelArgs$interactionCovariateIds + useCovariates = loadingFittingArgs$ + fitOutcomeModelArgs$ + useCovariates, + excludeCovariateIds = loadingFittingArgs$ + fitOutcomeModelArgs$ + excludeCovariateIds, + includeCovariateIds = loadingFittingArgs$ + fitOutcomeModelArgs$ + includeCovariateIds, + interactionCovariateIds = loadingFittingArgs$ + fitOutcomeModelArgs$ + interactionCovariateIds ) return(loadingFittingArgs) } else { return(NULL) } } + loadingFittingArgsList <- plyr::compact(lapply(loadingFittingArgsList, needsFilter)) referenceTable$prefilteredCovariatesFile <- "" if (length(loadingFittingArgsList) != 0) { @@ -1513,7 +1572,7 @@ createReferenceTable <- function(cmAnalysisList, referenceTable$targetId, referenceTable$comparatorId, referenceTable$outcomeId - ), ] + ),] # Remove non-essential files for outcomes not of interest: idx <- !referenceTable$outcomeOfInterest @@ -1708,9 +1767,13 @@ summarizeResults <- function(referenceTable, outputFolder, mainFileName, interac oneSidedP <- NA } else { p <- EmpiricalCalibration::computeTraditionalP(logRr = coefficient, - seLogRr = outcomeModel$outcomeModelTreatmentEstimate$seLogRr) + seLogRr = outcomeModel$ + outcomeModelTreatmentEstimate$ + seLogRr) oneSidedP <- EmpiricalCalibration::computeTraditionalP(logRr = coefficient, - seLogRr = outcomeModel$outcomeModelTreatmentEstimate$seLogRr, + seLogRr = outcomeModel$ + outcomeModelTreatmentEstimate$ + seLogRr, twoSided = FALSE, upper = TRUE) } @@ -1726,7 +1789,7 @@ summarizeResults <- function(referenceTable, outputFolder, mainFileName, interac # Assuming we're interest in the attrition of the target population only. Could change to depend on type # of adjustment (e.g IPTW ATE should use target + comparator): attritionFraction <- 1 - (attrition$targetExposures[nrow(attrition)] / attrition$targetExposures[1]) - result <- subset[i, ] %>% + result <- subset[i,] %>% select( "analysisId", "targetId", @@ -1755,7 +1818,9 @@ summarizeResults <- function(referenceTable, outputFolder, mainFileName, interac p = !!p, oneSidedP = !!oneSidedP, logRr = if (is.null(coefficient)) NA else coefficient, - seLogRr = if (is.null(coefficient)) NA else outcomeModel$outcomeModelTreatmentEstimate$seLogRr, + seLogRr = if (is.null(coefficient)) NA else outcomeModel$ + outcomeModelTreatmentEstimate$ + seLogRr, llr = if (is.null(coefficient)) NA else outcomeModel$outcomeModelTreatmentEstimate$llr, mdrr = !!mdrr, targetEstimator = outcomeModel$targetEstimator @@ -1765,17 +1830,33 @@ summarizeResults <- function(referenceTable, outputFolder, mainFileName, interac if (!is.null(outcomeModel$outcomeModelInteractionEstimates)) { for (j in seq_len(nrow(outcomeModel$outcomeModelInteractionEstimates))) { - z <- outcomeModel$outcomeModelInteractionEstimates$logRr[j] / outcomeModel$outcomeModelInteractionEstimates$seLogRr[j] + z <- outcomeModel$ + outcomeModelInteractionEstimates$ + logRr[j] / outcomeModel$ + outcomeModelInteractionEstimates$ + seLogRr[j] p <- 2 * pmin(pnorm(z), 1 - pnorm(z)) interActionResults[[length(interActionResults) + 1]] <- result %>% mutate( - interactionCovariateId = outcomeModel$outcomeModelInteractionEstimates$covariateId[j], - rr = exp(outcomeModel$outcomeModelInteractionEstimates$logRr[j]), - ci95Lb = exp(outcomeModel$outcomeModelInteractionEstimates$logLb95[j]), - ci95Ub = exp(outcomeModel$outcomeModelInteractionEstimates$logUb95[j]), + interactionCovariateId = outcomeModel$ + outcomeModelInteractionEstimates$ + covariateId[j], + rr = exp(outcomeModel$ + outcomeModelInteractionEstimates$ + logRr[j]), + ci95Lb = exp(outcomeModel$ + outcomeModelInteractionEstimates$ + logLb95[j]), + ci95Ub = exp(outcomeModel$ + outcomeModelInteractionEstimates$ + logUb95[j]), p = !!p, - logRr = outcomeModel$outcomeModelInteractionEstimates$logRr[j], - seLogRr = outcomeModel$outcomeModelInteractionEstimates$seLogRr[j], + logRr = outcomeModel$ + outcomeModelInteractionEstimates$ + logRr[j], + seLogRr = outcomeModel$ + outcomeModelInteractionEstimates$ + seLogRr[j], targetEstimator = outcomeModel$targetEstimator ) } @@ -1819,8 +1900,10 @@ calibrateEstimates <- function(results, calibrationThreads, interactions = FALSE # group = groups[[1]] calibrateGroup <- function(group) { - ncs <- group[group$trueEffectSize == 1 & !is.na(group$seLogRr), ] - pcs <- group[!is.na(group$trueEffectSize) & group$trueEffectSize != 1 & !is.na(group$seLogRr), ] + ncs <- group[group$trueEffectSize == 1 & !is.na(group$seLogRr),] + pcs <- group[!is.na(group$trueEffectSize) & + group$trueEffectSize != 1 & + !is.na(group$seLogRr),] if (nrow(ncs) >= 5) { null <- EmpiricalCalibration::fitMcmcNull(logRr = ncs$logRr, seLogRr = ncs$seLogRr) ease <- EmpiricalCalibration::computeExpectedAbsoluteSystematicError(null) From c8c10a6eb8ed45d52e2fca6311f885da3e2c18bb Mon Sep 17 00:00:00 2001 From: Jamie Gilbert Date: Mon, 26 Feb 2024 08:51:08 -0800 Subject: [PATCH 07/25] Error handling for tasks to continue --- R/RunAnalyses.R | 284 +++++++++++++++++++++++++++--------------------- 1 file changed, 159 insertions(+), 125 deletions(-) diff --git a/R/RunAnalyses.R b/R/RunAnalyses.R index 3252d001..e3edff8f 100644 --- a/R/RunAnalyses.R +++ b/R/RunAnalyses.R @@ -493,7 +493,6 @@ runCmAnalyses <- function(connectionDetails, if (nrow(subset) != 0) { message("*** Trimming/Matching/Stratifying ***") - tasks <- list() createTrimMatchStratTask <- function(i) { refRow <- subset[i,] analysisRow <- ParallelLogger::matchInList( @@ -502,15 +501,17 @@ runCmAnalyses <- function(connectionDetails, )[[1]] if (file.exists(file.path(outputFolder, refRow$psFile))) { - tasks[[length(tasks) + 1]] <<- list( + task <- list( psFile = file.path(outputFolder, refRow$psFile), args = analysisRow, strataFile = file.path(outputFolder, refRow$strataFile) ) + + return(task) } } - lapply(1:nrow(subset), createTrimMatchStratTask) + tasks <- lapply(1:nrow(subset), createTrimMatchStratTask) cluster <- ParallelLogger::makeCluster(min(length(tasks), multiThreadingSettings$trimMatchStratifyThreads)) ParallelLogger::clusterRequire(cluster, "CohortMethod") @@ -909,97 +910,114 @@ doTrimMatchStratify <- function(params) { ps <- getPs(params$psFile) ParallelLogger::logDebug(sprintf("Performing matching etc., using %s", params$psFile)) - ps <- applyTrimMatchStratify(ps, params$args) - saveRDS(ps, params$strataFile) + + tryCatch({ + ps <- applyTrimMatchStratify(ps, params$args) + saveRDS(ps, params$strataFile) + }, error = function(err) { + ParallelLogger::logError(sprintf("Unable to save %s - %s", params$strataFile, err)) + }) return(NULL) } doPrefilterCovariates <- function(params) { - cohortMethodData <- loadCohortMethodData(params$cohortMethodDataFile) - covariates <- cohortMethodData$covariates - if (nrow_temp(covariates) > 0) { - if (params$args$useCovariates) { - covariatesToInclude <- params$args$includeCovariateIds - covariatesToExclude <- params$args$excludeCovariateIds - } else { - covariatesToInclude <- c() - covariatesToExclude <- c() - } - covariatesToInclude <- unique(c(covariatesToInclude, params$args$interactionCovariateIds)) - if (length(covariatesToInclude) != 0) { - covariates <- covariates %>% - filter(.data$covariateId %in% covariatesToInclude) - } - if (length(covariatesToExclude) != 0) { - covariates <- covariates %>% - filter(!.data$covariateId %in% covariatesToExclude) + tryCatch({ + cohortMethodData <- loadCohortMethodData(params$cohortMethodDataFile) + covariates <- cohortMethodData$covariates + if (nrow_temp(covariates) > 0) { + if (params$args$useCovariates) { + covariatesToInclude <- params$args$includeCovariateIds + covariatesToExclude <- params$args$excludeCovariateIds + } else { + covariatesToInclude <- c() + covariatesToExclude <- c() + } + covariatesToInclude <- unique(c(covariatesToInclude, params$args$interactionCovariateIds)) + if (length(covariatesToInclude) != 0) { + covariates <- covariates %>% + filter(.data$covariateId %in% covariatesToInclude) + } + if (length(covariatesToExclude) != 0) { + covariates <- covariates %>% + filter(!.data$covariateId %in% covariatesToExclude) + } } - } - filteredCohortMethodData <- Andromeda::andromeda( - cohorts = cohortMethodData$cohorts, - outcomes = cohortMethodData$outcomes, - covariates = covariates, - covariateRef = cohortMethodData$covariateRef, - analysisRef = cohortMethodData$analysisRef - ) - attr(filteredCohortMethodData, "metaData") <- attr(cohortMethodData, "metaData") - class(filteredCohortMethodData) <- "CohortMethodData" - attr(class(filteredCohortMethodData), "package") <- "CohortMethod" - saveCohortMethodData(filteredCohortMethodData, params$prefilteredCovariatesFile) + filteredCohortMethodData <- Andromeda::andromeda( + cohorts = cohortMethodData$cohorts, + outcomes = cohortMethodData$outcomes, + covariates = covariates, + covariateRef = cohortMethodData$covariateRef, + analysisRef = cohortMethodData$analysisRef + ) + attr(filteredCohortMethodData, "metaData") <- attr(cohortMethodData, "metaData") + class(filteredCohortMethodData) <- "CohortMethodData" + attr(class(filteredCohortMethodData), "package") <- "CohortMethod" + saveCohortMethodData(filteredCohortMethodData, params$prefilteredCovariatesFile) + }, error = function(err) { + ParallelLogger::logError(err) + }) return(NULL) } doFitOutcomeModel <- function(params) { - if (params$prefilteredCovariatesFile == "") { - cohortMethodDataFile <- params$cohortMethodDataFile - } else { - cohortMethodDataFile <- params$prefilteredCovariatesFile - } - cohortMethodData <- getCohortMethodData(cohortMethodDataFile) - studyPop <- readRDS(params$studyPopFile) - args <- list(cohortMethodData = cohortMethodData, population = studyPop) - args <- append(args, params$args) - ParallelLogger::logDebug(sprintf("Calling fitOutcomeModel() using %s and %s", - cohortMethodDataFile, - params$studyPopFile)) - outcomeModel <- do.call("fitOutcomeModel", args) - saveRDS(outcomeModel, params$outcomeModelFile) + tryCatch({ + if (params$prefilteredCovariatesFile == "") { + cohortMethodDataFile <- params$cohortMethodDataFile + } else { + cohortMethodDataFile <- params$prefilteredCovariatesFile + } + cohortMethodData <- getCohortMethodData(cohortMethodDataFile) + studyPop <- readRDS(params$studyPopFile) + args <- list(cohortMethodData = cohortMethodData, population = studyPop) + args <- append(args, params$args) + ParallelLogger::logDebug(sprintf("Calling fitOutcomeModel() using %s and %s", + cohortMethodDataFile, + params$studyPopFile)) + outcomeModel <- do.call("fitOutcomeModel", args) + saveRDS(outcomeModel, params$outcomeModelFile) + }, error = function(err) { + ParallelLogger::logError(err) + }) return(NULL) } doFitOutcomeModelPlus <- function(params) { - if (params$prefilteredCovariatesFile == "") { - cohortMethodDataFile <- params$cohortMethodDataFile - } else { - cohortMethodDataFile <- params$prefilteredCovariatesFile - } - cohortMethodData <- getCohortMethodData(cohortMethodDataFile) + tryCatch({ + if (params$prefilteredCovariatesFile == "") { + cohortMethodDataFile <- params$cohortMethodDataFile + } else { + cohortMethodDataFile <- params$prefilteredCovariatesFile + } + cohortMethodData <- getCohortMethodData(cohortMethodDataFile) - ParallelLogger::logDebug(sprintf("Calling createStudyPopulation(), performing matching etc., and calling fitOutcomeModel() using %s for outcomeID %d", - cohortMethodDataFile, - params$args$createStudyPopArgs$outcomeId)) + ParallelLogger::logDebug(sprintf("Calling createStudyPopulation(), performing matching etc., and calling fitOutcomeModel() using %s for outcomeID %d", + cohortMethodDataFile, + params$args$createStudyPopArgs$outcomeId)) - # Create study pop - args <- params$args$createStudyPopArgs - args$cohortMethodData <- cohortMethodData - studyPop <- do.call("createStudyPopulation", args) + # Create study pop + args <- params$args$createStudyPopArgs + args$cohortMethodData <- cohortMethodData + studyPop <- do.call("createStudyPopulation", args) - if (!is.null(params$args$createPsArgs)) { - if (params$refitPsForEveryOutcome) { - ps <- getPs(params$psFile) + if (!is.null(params$args$createPsArgs)) { + if (params$refitPsForEveryOutcome) { + ps <- getPs(params$psFile) + } else { + ps <- getPs(params$sharedPsFile) + ps <- addPsToStudyPopulation(studyPop, ps) + } } else { - ps <- getPs(params$sharedPsFile) - ps <- addPsToStudyPopulation(studyPop, ps) + ps <- studyPop } - } else { - ps <- studyPop - } - ps <- applyTrimMatchStratify(ps, params$args) - args <- params$args$fitOutcomeModelArgs - args$population <- ps - args$cohortMethodData <- cohortMethodData - outcomeModel <- do.call("fitOutcomeModel", args) - saveRDS(outcomeModel, params$outcomeModelFile) + ps <- applyTrimMatchStratify(ps, params$args) + args <- params$args$fitOutcomeModelArgs + args$population <- ps + args$cohortMethodData <- cohortMethodData + outcomeModel <- do.call("fitOutcomeModel", args) + saveRDS(outcomeModel, params$outcomeModelFile) + }, error = function(err) { + ParallelLogger::logError(err) + }) return(NULL) } @@ -1011,65 +1029,81 @@ doComputeSharedBalance <- function(params) { # Create study pop message("Computing covariate balance across all outcomes (ignore messages about 'no outcome specified')") - args <- params$args$createStudyPopArgs - args$cohortMethodData <- cohortMethodData - studyPop <- do.call("createStudyPopulation", args) + tryCatch({ + args <- params$args$createStudyPopArgs + args$cohortMethodData <- cohortMethodData + studyPop <- do.call("createStudyPopulation", args) - if (!is.null(params$args$createPsArgs)) { - # Add PS - ps <- getPs(params$sharedPsFile) - idx <- match(studyPop$rowId, ps$rowId) - studyPop$propensityScore <- ps$propensityScore[idx] - studyPop$iptw <- ps$iptw[idx] - ps <- studyPop - } else { - ps <- studyPop - } - ps <- applyTrimMatchStratify(ps, params$args) - args <- params$args$computeSharedCovariateBalanceArgs - args$population <- ps - args$cohortMethodData <- cohortMethodData - balance <- do.call("computeCovariateBalance", args) - saveRDS(balance, params$sharedBalanceFile) + if (!is.null(params$args$createPsArgs)) { + # Add PS + ps <- getPs(params$sharedPsFile) + idx <- match(studyPop$rowId, ps$rowId) + studyPop$propensityScore <- ps$propensityScore[idx] + studyPop$iptw <- ps$iptw[idx] + ps <- studyPop + } else { + ps <- studyPop + } + + + ps <- applyTrimMatchStratify(ps, params$args) + args <- params$args$computeSharedCovariateBalanceArgs + args$population <- ps + args$cohortMethodData <- cohortMethodData + balance <- do.call("computeCovariateBalance", args) + saveRDS(balance, params$sharedBalanceFile) + }, + error = function(err) { + ParallelLogger::logError(err) + }) return(NULL) } doFilterForCovariateBalance <- function(params) { - cohortMethodData <- loadCohortMethodData(params$cohortMethodDataFile) - covariateFilter <- params$ - computeCovariateBalanceArgs$ - covariateFilter - covariates <- filterCovariates(cohortMethodData$covariates, cohortMethodData$covariateRef, covariateFilter) - filteredCohortMethodData <- Andromeda::andromeda( - cohorts = cohortMethodData$cohorts, - outcomes = cohortMethodData$outcomes, - covariates = covariates, - covariateRef = cohortMethodData$covariateRef, - analysisRef = cohortMethodData$analysisRef - ) - attr(filteredCohortMethodData, "metaData") <- attr(cohortMethodData, "metaData") - class(filteredCohortMethodData) <- "CohortMethodData" - attr(class(filteredCohortMethodData), "package") <- "CohortMethod" - saveCohortMethodData(filteredCohortMethodData, params$filteredForbalanceFile) + tryCatch({ + cohortMethodData <- loadCohortMethodData(params$cohortMethodDataFile) + covariateFilter <- params$ + computeCovariateBalanceArgs$ + covariateFilter + covariates <- filterCovariates(cohortMethodData$covariates, cohortMethodData$covariateRef, covariateFilter) + filteredCohortMethodData <- Andromeda::andromeda( + cohorts = cohortMethodData$cohorts, + outcomes = cohortMethodData$outcomes, + covariates = covariates, + covariateRef = cohortMethodData$covariateRef, + analysisRef = cohortMethodData$analysisRef + ) + attr(filteredCohortMethodData, "metaData") <- attr(cohortMethodData, "metaData") + class(filteredCohortMethodData) <- "CohortMethodData" + attr(class(filteredCohortMethodData), "package") <- "CohortMethod" + saveCohortMethodData(filteredCohortMethodData, params$filteredForbalanceFile) + }, error = function(err) { + ParallelLogger::logError(err) + }) return(NULL) } doComputeBalance <- function(params) { - if (params$filteredForbalanceFile == "") { - cohortMethodData <- getCohortMethodData(params$cohortMethodDataFile) - } else { - cohortMethodData <- getCohortMethodData(params$filteredForbalanceFile) - } - strataPop <- readRDS(params$strataFile) + tryCatch({ + if (params$filteredForbalanceFile == "") { + cohortMethodData <- getCohortMethodData(params$cohortMethodDataFile) + } else { + cohortMethodData <- getCohortMethodData(params$filteredForbalanceFile) + } + strataPop <- readRDS(params$strataFile) - args <- params$computeCovariateBalanceArgs - args$population <- strataPop - args$cohortMethodData <- cohortMethodData - ParallelLogger::logDebug(sprintf("Computing balance balance using %s and %s", - params$cohortMethodDataFile, - params$strataFile)) - balance <- do.call("computeCovariateBalance", args) - saveRDS(balance, params$balanceFile) + args <- params$computeCovariateBalanceArgs + args$population <- strataPop + args$cohortMethodData <- cohortMethodData + ParallelLogger::logDebug(sprintf("Computing balance balance using %s and %s", + params$cohortMethodDataFile, + params$strataFile)) + balance <- do.call("computeCovariateBalance", args) + saveRDS(balance, params$balanceFile) + }, + error = function(err) { + ParallelLogger::logError(err) + }) return(NULL) } From 09c47d889ec10e3af1c7e4167027b34130302829 Mon Sep 17 00:00:00 2001 From: Jamie Gilbert Date: Mon, 26 Feb 2024 08:56:33 -0800 Subject: [PATCH 08/25] Error handling for tasks to continue --- R/RunAnalyses.R | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/R/RunAnalyses.R b/R/RunAnalyses.R index e3edff8f..eb60e263 100644 --- a/R/RunAnalyses.R +++ b/R/RunAnalyses.R @@ -907,11 +907,12 @@ applyTrimMatchStratify <- function(ps, arguments) { } doTrimMatchStratify <- function(params) { - ps <- getPs(params$psFile) - ParallelLogger::logDebug(sprintf("Performing matching etc., using %s", - params$psFile)) - tryCatch({ + ps <- getPs(params$psFile) + ParallelLogger::logDebug(sprintf("Performing matching etc., using %s", + params$psFile)) + + ps <- applyTrimMatchStratify(ps, params$args) saveRDS(ps, params$strataFile) }, error = function(err) { From 981083593af15ff737da6f7bc42b3b7734a278c7 Mon Sep 17 00:00:00 2001 From: Jamie Gilbert Date: Thu, 7 Mar 2024 11:55:14 -0800 Subject: [PATCH 09/25] Only summarize results for outcome files that exist to stop crash --- R/RunAnalyses.R | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/R/RunAnalyses.R b/R/RunAnalyses.R index eb60e263..9aa76c1d 100644 --- a/R/RunAnalyses.R +++ b/R/RunAnalyses.R @@ -1789,7 +1789,8 @@ getInteractionResultsSummary <- function(outputFolder) { summarizeResults <- function(referenceTable, outputFolder, mainFileName, interactionsFileName, calibrationThreads = 1) { subset <- referenceTable %>% - filter(.data$outcomeModelFile != "") + filter(.data$outcomeModelFile != "", file.exists(.data$outcomeModelFile)) + mainResults <- vector("list", nrow(subset)) interActionResults <- list() pb <- txtProgressBar(style = 3) From 7555d98d17b24f4baef3bbbcf4c044260ac69e5d Mon Sep 17 00:00:00 2001 From: Jamie Gilbert Date: Wed, 20 Mar 2024 08:21:30 -0700 Subject: [PATCH 10/25] Fix path in outcome file find --- R/RunAnalyses.R | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/R/RunAnalyses.R b/R/RunAnalyses.R index 9aa76c1d..dbfd5842 100644 --- a/R/RunAnalyses.R +++ b/R/RunAnalyses.R @@ -1789,7 +1789,7 @@ getInteractionResultsSummary <- function(outputFolder) { summarizeResults <- function(referenceTable, outputFolder, mainFileName, interactionsFileName, calibrationThreads = 1) { subset <- referenceTable %>% - filter(.data$outcomeModelFile != "", file.exists(.data$outcomeModelFile)) + filter(.data$outcomeModelFile != "", file.exists(file.path(outputFolder, .data$outcomeModelFile))) mainResults <- vector("list", nrow(subset)) interActionResults <- list() From dfd29f4e56a8166869185d00d9863ad931cc316d Mon Sep 17 00:00:00 2001 From: Jamie Gilbert Date: Fri, 22 Mar 2024 11:36:41 -0700 Subject: [PATCH 11/25] Added migrations script --- .../migrations/Migration_2-v5.2.2.sql | 41 +++++++++++++++++++ 1 file changed, 41 insertions(+) create mode 100644 inst/sql/sql_server/migrations/Migration_2-v5.2.2.sql diff --git a/inst/sql/sql_server/migrations/Migration_2-v5.2.2.sql b/inst/sql/sql_server/migrations/Migration_2-v5.2.2.sql new file mode 100644 index 00000000..ea3a0db3 --- /dev/null +++ b/inst/sql/sql_server/migrations/Migration_2-v5.2.2.sql @@ -0,0 +1,41 @@ +-- Generated SQL statements to alter columns to BIGINT data type + +-- Alter columns in cm_attrition table +ALTER TABLE @database_schema.@table_prefixcm_attrition ALTER COLUMN exposure_id BIGINT; +ALTER TABLE @database_schema.@table_prefixcm_attrition ALTER COLUMN target_id BIGINT; +ALTER TABLE @database_schema.@table_prefixcm_attrition ALTER COLUMN comparator_id BIGINT; +ALTER TABLE @database_schema.@table_prefixcm_attrition ALTER COLUMN outcome_id BIGINT; + +-- Alter columns in cm_follow_up_dist table +ALTER TABLE @database_schema.@table_prefixcm_follow_up_dist ALTER COLUMN target_id BIGINT; +ALTER TABLE @database_schema.@table_prefixcm_follow_up_dist ALTER COLUMN comparator_id BIGINT; +ALTER TABLE @database_schema.@table_prefixcm_follow_up_dist ALTER COLUMN outcome_id BIGINT; + +-- Alter columns in cm_result table +ALTER TABLE @database_schema.@table_prefixcm_result ALTER COLUMN target_id BIGINT; +ALTER TABLE @database_schema.@table_prefixcm_result ALTER COLUMN comparator_id BIGINT; +ALTER TABLE @database_schema.@table_prefixcm_result ALTER COLUMN outcome_id BIGINT; + +-- Alter columns in cm_interaction_result table +ALTER TABLE @database_schema.@table_prefixcm_interaction_result ALTER COLUMN target_id BIGINT; +ALTER TABLE @database_schema.@table_prefixcm_interaction_result ALTER COLUMN comparator_id BIGINT; +ALTER TABLE @database_schema.@table_prefixcm_interaction_result ALTER COLUMN outcome_id BIGINT; + +-- Alter columns in cm_target_comparator_outcome table +ALTER TABLE @database_schema.@table_prefixcm_target_comparator_outcome ALTER COLUMN outcome_id BIGINT; +ALTER TABLE @database_schema.@table_prefixcm_target_comparator_outcome ALTER COLUMN target_id BIGINT; +ALTER TABLE @database_schema.@table_prefixcm_target_comparator_outcome ALTER COLUMN comparator_id BIGINT; + +-- Alter columns in cm_kaplan_meier_dist table +ALTER TABLE @database_schema.@table_prefixcm_kaplan_meier_dist ALTER COLUMN target_id BIGINT; +ALTER TABLE @database_schema.@table_prefixcm_kaplan_meier_dist ALTER COLUMN comparator_id BIGINT; +ALTER TABLE @database_schema.@table_prefixcm_kaplan_meier_dist ALTER COLUMN outcome_id BIGINT; + +-- Alter columns in cm_likelihood_profile table +ALTER TABLE @database_schema.@table_prefixcm_likelihood_profile ALTER COLUMN target_id BIGINT; +ALTER TABLE @database_schema.@table_prefixcm_likelihood_profile ALTER COLUMN comparator_id BIGINT; +ALTER TABLE @database_schema.@table_prefixcm_likelihood_profile ALTER COLUMN outcome_id BIGINT; + +-- Alter columns in cm_shared_covariate_balance table +ALTER TABLE @database_schema.@table_prefixcm_shared_covariate_balance ALTER COLUMN target_id BIGINT; +ALTER TABLE @database_schema.@table_prefixcm_shared_covariate_balance ALTER COLUMN comparator_id BIGINT; From 7ef04ee2877a2b234553abc208009221916c0b6b Mon Sep 17 00:00:00 2001 From: Jamie Gilbert Date: Fri, 22 Mar 2024 12:05:15 -0700 Subject: [PATCH 12/25] Postgres specific migration --- .../postgresql/Migration_2-v5.2.2.sql | 41 +++++++++++++++++++ 1 file changed, 41 insertions(+) create mode 100644 inst/sql/sql_server/postgresql/Migration_2-v5.2.2.sql diff --git a/inst/sql/sql_server/postgresql/Migration_2-v5.2.2.sql b/inst/sql/sql_server/postgresql/Migration_2-v5.2.2.sql new file mode 100644 index 00000000..83e7902c --- /dev/null +++ b/inst/sql/sql_server/postgresql/Migration_2-v5.2.2.sql @@ -0,0 +1,41 @@ +-- Generated SQL statements to alter columns to BIGINT data type + +-- Alter columns in cm_attrition table +ALTER TABLE @database_schema.@table_prefixcm_attrition ALTER COLUMN exposure_id TYPE BIGINT; +ALTER TABLE @database_schema.@table_prefixcm_attrition ALTER COLUMN target_id TYPE BIGINT; +ALTER TABLE @database_schema.@table_prefixcm_attrition ALTER COLUMN comparator_id TYPE BIGINT; +ALTER TABLE @database_schema.@table_prefixcm_attrition ALTER COLUMN outcome_id TYPE BIGINT; + +-- Alter columns in cm_follow_up_dist table +ALTER TABLE @database_schema.@table_prefixcm_follow_up_dist ALTER COLUMN target_id TYPE BIGINT; +ALTER TABLE @database_schema.@table_prefixcm_follow_up_dist ALTER COLUMN comparator_id TYPE BIGINT; +ALTER TABLE @database_schema.@table_prefixcm_follow_up_dist ALTER COLUMN outcome_id TYPE BIGINT; + +-- Alter columns in cm_result table +ALTER TABLE @database_schema.@table_prefixcm_result ALTER COLUMN target_id TYPE BIGINT; +ALTER TABLE @database_schema.@table_prefixcm_result ALTER COLUMN comparator_id TYPE BIGINT; +ALTER TABLE @database_schema.@table_prefixcm_result ALTER COLUMN outcome_id TYPE BIGINT; + +-- Alter columns in cm_interaction_result table +ALTER TABLE @database_schema.@table_prefixcm_interaction_result ALTER COLUMN target_id TYPE BIGINT; +ALTER TABLE @database_schema.@table_prefixcm_interaction_result ALTER COLUMN comparator_id TYPE BIGINT; +ALTER TABLE @database_schema.@table_prefixcm_interaction_result ALTER COLUMN outcome_id TYPE BIGINT; + +-- Alter columns in cm_target_comparator_outcome table +ALTER TABLE @database_schema.@table_prefixcm_target_comparator_outcome ALTER COLUMN outcome_id TYPE BIGINT; +ALTER TABLE @database_schema.@table_prefixcm_target_comparator_outcome ALTER COLUMN target_id TYPE BIGINT; +ALTER TABLE @database_schema.@table_prefixcm_target_comparator_outcome ALTER COLUMN comparator_id TYPE BIGINT; + +-- Alter columns in cm_kaplan_meier_dist table +ALTER TABLE @database_schema.@table_prefixcm_kaplan_meier_dist ALTER COLUMN target_id TYPE BIGINT; +ALTER TABLE @database_schema.@table_prefixcm_kaplan_meier_dist ALTER COLUMN comparator_id TYPE BIGINT; +ALTER TABLE @database_schema.@table_prefixcm_kaplan_meier_dist ALTER COLUMN outcome_id TYPE BIGINT; + +-- Alter columns in cm_likelihood_profile table +ALTER TABLE @database_schema.@table_prefixcm_likelihood_profile ALTER COLUMN target_id TYPE BIGINT; +ALTER TABLE @database_schema.@table_prefixcm_likelihood_profile ALTER COLUMN comparator_id TYPE BIGINT; +ALTER TABLE @database_schema.@table_prefixcm_likelihood_profile ALTER COLUMN outcome_id TYPE BIGINT; + +-- Alter columns in cm_shared_covariate_balance table +ALTER TABLE @database_schema.@table_prefixcm_shared_covariate_balance ALTER COLUMN target_id TYPE BIGINT; +ALTER TABLE @database_schema.@table_prefixcm_shared_covariate_balance ALTER COLUMN comparator_id TYPE BIGINT; From 648d11c15d4954bb2829a832b52c0525c5bac12d Mon Sep 17 00:00:00 2001 From: Jamie Gilbert Date: Fri, 22 Mar 2024 12:09:50 -0700 Subject: [PATCH 13/25] Moved migration to correct path --- .../sql_server/postgresql/{ => migrations}/Migration_2-v5.2.2.sql | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename inst/sql/sql_server/postgresql/{ => migrations}/Migration_2-v5.2.2.sql (100%) diff --git a/inst/sql/sql_server/postgresql/Migration_2-v5.2.2.sql b/inst/sql/sql_server/postgresql/migrations/Migration_2-v5.2.2.sql similarity index 100% rename from inst/sql/sql_server/postgresql/Migration_2-v5.2.2.sql rename to inst/sql/sql_server/postgresql/migrations/Migration_2-v5.2.2.sql From 93378da1608d9af5818e58ab3e0984f88b5701d0 Mon Sep 17 00:00:00 2001 From: Jamie Gilbert Date: Fri, 22 Mar 2024 12:13:13 -0700 Subject: [PATCH 14/25] Moved migration to correct path --- .../{sql_server => }/postgresql/migrations/Migration_2-v5.2.2.sql | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename inst/sql/{sql_server => }/postgresql/migrations/Migration_2-v5.2.2.sql (100%) diff --git a/inst/sql/sql_server/postgresql/migrations/Migration_2-v5.2.2.sql b/inst/sql/postgresql/migrations/Migration_2-v5.2.2.sql similarity index 100% rename from inst/sql/sql_server/postgresql/migrations/Migration_2-v5.2.2.sql rename to inst/sql/postgresql/migrations/Migration_2-v5.2.2.sql From 9d43b7c420897ab434072d4a9148e75e4384089f Mon Sep 17 00:00:00 2001 From: Jamie Gilbert Date: Fri, 22 Mar 2024 13:13:22 -0700 Subject: [PATCH 15/25] Missing columns --- .../migrations/Migration_2-v5.2.2.sql | 17 +++++++++++++++++ .../migrations/Migration_2-v5.2.2.sql | 17 +++++++++++++++++ 2 files changed, 34 insertions(+) diff --git a/inst/sql/postgresql/migrations/Migration_2-v5.2.2.sql b/inst/sql/postgresql/migrations/Migration_2-v5.2.2.sql index 83e7902c..67c899f7 100644 --- a/inst/sql/postgresql/migrations/Migration_2-v5.2.2.sql +++ b/inst/sql/postgresql/migrations/Migration_2-v5.2.2.sql @@ -21,10 +21,20 @@ ALTER TABLE @database_schema.@table_prefixcm_interaction_result ALTER COLUMN tar ALTER TABLE @database_schema.@table_prefixcm_interaction_result ALTER COLUMN comparator_id TYPE BIGINT; ALTER TABLE @database_schema.@table_prefixcm_interaction_result ALTER COLUMN outcome_id TYPE BIGINT; + +ALTER TABLE @database_schema.@table_prefixcm_covariate_balance ALTER COLUMN target_id TYPE BIGINT; +ALTER TABLE @database_schema.@table_prefixcm_covariate_balance ALTER COLUMN comparator_id TYPE BIGINT; +ALTER TABLE @database_schema.@table_prefixcm_covariate_balance ALTER COLUMN outcome_id TYPE BIGINT; + +ALTER TABLE @database_schema.@table_prefixcm_diagnostics_summary ALTER COLUMN target_id TYPE BIGINT; +ALTER TABLE @database_schema.@table_prefixcm_diagnostics_summary ALTER COLUMN comparator_id TYPE BIGINT; +ALTER TABLE @database_schema.@table_prefixcm_diagnostics_summary ALTER COLUMN outcome_id TYPE BIGINT; + -- Alter columns in cm_target_comparator_outcome table ALTER TABLE @database_schema.@table_prefixcm_target_comparator_outcome ALTER COLUMN outcome_id TYPE BIGINT; ALTER TABLE @database_schema.@table_prefixcm_target_comparator_outcome ALTER COLUMN target_id TYPE BIGINT; ALTER TABLE @database_schema.@table_prefixcm_target_comparator_outcome ALTER COLUMN comparator_id TYPE BIGINT; +ALTER TABLE @database_schema.@table_prefixcm_target_comparator_outcome ALTER COLUMN outcome_of_interest BIGINT; -- Alter columns in cm_kaplan_meier_dist table ALTER TABLE @database_schema.@table_prefixcm_kaplan_meier_dist ALTER COLUMN target_id TYPE BIGINT; @@ -36,6 +46,13 @@ ALTER TABLE @database_schema.@table_prefixcm_likelihood_profile ALTER COLUMN tar ALTER TABLE @database_schema.@table_prefixcm_likelihood_profile ALTER COLUMN comparator_id TYPE BIGINT; ALTER TABLE @database_schema.@table_prefixcm_likelihood_profile ALTER COLUMN outcome_id TYPE BIGINT; +ALTER TABLE @database_schema.@table_prefixcm_propensity_model ALTER COLUMN target_id TYPE BIGINT; +ALTER TABLE @database_schema.@table_preficm_propensity_model ALTER COLUMN comparator_id TYPE BIGINT + +ALTER TABLE @database_schema.@table_prefixcm_preference_score_dist ALTER COLUMN target_id TYPE BIGINT; +ALTER TABLE @database_schema.@table_prefixcm_preference_score_dist ALTER COLUMN comparator_id TYPE BIGINT + + -- Alter columns in cm_shared_covariate_balance table ALTER TABLE @database_schema.@table_prefixcm_shared_covariate_balance ALTER COLUMN target_id TYPE BIGINT; ALTER TABLE @database_schema.@table_prefixcm_shared_covariate_balance ALTER COLUMN comparator_id TYPE BIGINT; diff --git a/inst/sql/sql_server/migrations/Migration_2-v5.2.2.sql b/inst/sql/sql_server/migrations/Migration_2-v5.2.2.sql index ea3a0db3..bf15cd17 100644 --- a/inst/sql/sql_server/migrations/Migration_2-v5.2.2.sql +++ b/inst/sql/sql_server/migrations/Migration_2-v5.2.2.sql @@ -21,10 +21,20 @@ ALTER TABLE @database_schema.@table_prefixcm_interaction_result ALTER COLUMN tar ALTER TABLE @database_schema.@table_prefixcm_interaction_result ALTER COLUMN comparator_id BIGINT; ALTER TABLE @database_schema.@table_prefixcm_interaction_result ALTER COLUMN outcome_id BIGINT; + +ALTER TABLE @database_schema.@table_prefixcm_covariate_balance ALTER COLUMN target_id BIGINT; +ALTER TABLE @database_schema.@table_prefixcm_covariate_balance ALTER COLUMN comparator_id BIGINT; +ALTER TABLE @database_schema.@table_prefixcm_covariate_balance ALTER COLUMN outcome_id BIGINT; + +ALTER TABLE @database_schema.@table_prefixcm_diagnostics_summary ALTER COLUMN target_id BIGINT; +ALTER TABLE @database_schema.@table_prefixcm_diagnostics_summary ALTER COLUMN comparator_id BIGINT; +ALTER TABLE @database_schema.@table_prefixcm_diagnostics_summary ALTER COLUMN outcome_id BIGINT; + -- Alter columns in cm_target_comparator_outcome table ALTER TABLE @database_schema.@table_prefixcm_target_comparator_outcome ALTER COLUMN outcome_id BIGINT; ALTER TABLE @database_schema.@table_prefixcm_target_comparator_outcome ALTER COLUMN target_id BIGINT; ALTER TABLE @database_schema.@table_prefixcm_target_comparator_outcome ALTER COLUMN comparator_id BIGINT; +ALTER TABLE @database_schema.@table_prefixcm_target_comparator_outcome ALTER COLUMN outcome_of_interest BIGINT; -- Alter columns in cm_kaplan_meier_dist table ALTER TABLE @database_schema.@table_prefixcm_kaplan_meier_dist ALTER COLUMN target_id BIGINT; @@ -36,6 +46,13 @@ ALTER TABLE @database_schema.@table_prefixcm_likelihood_profile ALTER COLUMN tar ALTER TABLE @database_schema.@table_prefixcm_likelihood_profile ALTER COLUMN comparator_id BIGINT; ALTER TABLE @database_schema.@table_prefixcm_likelihood_profile ALTER COLUMN outcome_id BIGINT; +ALTER TABLE @database_schema.@table_prefixcm_propensity_model ALTER COLUMN target_id BIGINT; +ALTER TABLE @database_schema.@table_preficm_propensity_model ALTER COLUMN comparator_id BIGINT + +ALTER TABLE @database_schema.@table_prefixcm_preference_score_dist ALTER COLUMN target_id BIGINT; +ALTER TABLE @database_schema.@table_prefixcm_preference_score_dist ALTER COLUMN comparator_id BIGINT + + -- Alter columns in cm_shared_covariate_balance table ALTER TABLE @database_schema.@table_prefixcm_shared_covariate_balance ALTER COLUMN target_id BIGINT; ALTER TABLE @database_schema.@table_prefixcm_shared_covariate_balance ALTER COLUMN comparator_id BIGINT; From 7139e1eb627da74c7b121dc6d9846d19ec178958 Mon Sep 17 00:00:00 2001 From: Jamie Gilbert Date: Fri, 22 Mar 2024 13:20:06 -0700 Subject: [PATCH 16/25] Missing kw --- inst/sql/postgresql/migrations/Migration_2-v5.2.2.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/inst/sql/postgresql/migrations/Migration_2-v5.2.2.sql b/inst/sql/postgresql/migrations/Migration_2-v5.2.2.sql index 67c899f7..aa6d1d40 100644 --- a/inst/sql/postgresql/migrations/Migration_2-v5.2.2.sql +++ b/inst/sql/postgresql/migrations/Migration_2-v5.2.2.sql @@ -34,7 +34,7 @@ ALTER TABLE @database_schema.@table_prefixcm_diagnostics_summary ALTER COLUMN ou ALTER TABLE @database_schema.@table_prefixcm_target_comparator_outcome ALTER COLUMN outcome_id TYPE BIGINT; ALTER TABLE @database_schema.@table_prefixcm_target_comparator_outcome ALTER COLUMN target_id TYPE BIGINT; ALTER TABLE @database_schema.@table_prefixcm_target_comparator_outcome ALTER COLUMN comparator_id TYPE BIGINT; -ALTER TABLE @database_schema.@table_prefixcm_target_comparator_outcome ALTER COLUMN outcome_of_interest BIGINT; +ALTER TABLE @database_schema.@table_prefixcm_target_comparator_outcome ALTER COLUMN outcome_of_interest TYPE BIGINT; -- Alter columns in cm_kaplan_meier_dist table ALTER TABLE @database_schema.@table_prefixcm_kaplan_meier_dist ALTER COLUMN target_id TYPE BIGINT; From b5cdb420614f0381d10308ef18e4cf2840297eb8 Mon Sep 17 00:00:00 2001 From: Jamie Gilbert Date: Fri, 22 Mar 2024 13:27:17 -0700 Subject: [PATCH 17/25] Typo --- inst/sql/postgresql/migrations/Migration_2-v5.2.2.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/inst/sql/postgresql/migrations/Migration_2-v5.2.2.sql b/inst/sql/postgresql/migrations/Migration_2-v5.2.2.sql index aa6d1d40..1d2da22b 100644 --- a/inst/sql/postgresql/migrations/Migration_2-v5.2.2.sql +++ b/inst/sql/postgresql/migrations/Migration_2-v5.2.2.sql @@ -47,7 +47,7 @@ ALTER TABLE @database_schema.@table_prefixcm_likelihood_profile ALTER COLUMN com ALTER TABLE @database_schema.@table_prefixcm_likelihood_profile ALTER COLUMN outcome_id TYPE BIGINT; ALTER TABLE @database_schema.@table_prefixcm_propensity_model ALTER COLUMN target_id TYPE BIGINT; -ALTER TABLE @database_schema.@table_preficm_propensity_model ALTER COLUMN comparator_id TYPE BIGINT +ALTER TABLE @database_schema.@table_prefixcm_propensity_model ALTER COLUMN comparator_id TYPE BIGINT ALTER TABLE @database_schema.@table_prefixcm_preference_score_dist ALTER COLUMN target_id TYPE BIGINT; ALTER TABLE @database_schema.@table_prefixcm_preference_score_dist ALTER COLUMN comparator_id TYPE BIGINT From dacd00ccf59acdcac313e9d0b34ebe124c2aef38 Mon Sep 17 00:00:00 2001 From: Jamie Gilbert Date: Fri, 22 Mar 2024 13:30:13 -0700 Subject: [PATCH 18/25] Typo --- inst/sql/postgresql/migrations/Migration_2-v5.2.2.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/inst/sql/postgresql/migrations/Migration_2-v5.2.2.sql b/inst/sql/postgresql/migrations/Migration_2-v5.2.2.sql index 1d2da22b..9ef476b5 100644 --- a/inst/sql/postgresql/migrations/Migration_2-v5.2.2.sql +++ b/inst/sql/postgresql/migrations/Migration_2-v5.2.2.sql @@ -47,7 +47,7 @@ ALTER TABLE @database_schema.@table_prefixcm_likelihood_profile ALTER COLUMN com ALTER TABLE @database_schema.@table_prefixcm_likelihood_profile ALTER COLUMN outcome_id TYPE BIGINT; ALTER TABLE @database_schema.@table_prefixcm_propensity_model ALTER COLUMN target_id TYPE BIGINT; -ALTER TABLE @database_schema.@table_prefixcm_propensity_model ALTER COLUMN comparator_id TYPE BIGINT +ALTER TABLE @database_schema.@table_prefixcm_propensity_model ALTER COLUMN comparator_id TYPE BIGINT; ALTER TABLE @database_schema.@table_prefixcm_preference_score_dist ALTER COLUMN target_id TYPE BIGINT; ALTER TABLE @database_schema.@table_prefixcm_preference_score_dist ALTER COLUMN comparator_id TYPE BIGINT From ce5b9a9e60682c960d6de15c708c798918bb0ba2 Mon Sep 17 00:00:00 2001 From: Jamie Gilbert Date: Fri, 22 Mar 2024 13:30:32 -0700 Subject: [PATCH 19/25] Typo --- inst/sql/postgresql/migrations/Migration_2-v5.2.2.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/inst/sql/postgresql/migrations/Migration_2-v5.2.2.sql b/inst/sql/postgresql/migrations/Migration_2-v5.2.2.sql index 9ef476b5..623d9731 100644 --- a/inst/sql/postgresql/migrations/Migration_2-v5.2.2.sql +++ b/inst/sql/postgresql/migrations/Migration_2-v5.2.2.sql @@ -50,7 +50,7 @@ ALTER TABLE @database_schema.@table_prefixcm_propensity_model ALTER COLUMN targe ALTER TABLE @database_schema.@table_prefixcm_propensity_model ALTER COLUMN comparator_id TYPE BIGINT; ALTER TABLE @database_schema.@table_prefixcm_preference_score_dist ALTER COLUMN target_id TYPE BIGINT; -ALTER TABLE @database_schema.@table_prefixcm_preference_score_dist ALTER COLUMN comparator_id TYPE BIGINT +ALTER TABLE @database_schema.@table_prefixcm_preference_score_dist ALTER COLUMN comparator_id TYPE BIGINT; -- Alter columns in cm_shared_covariate_balance table From 8b8dc4dd5484fd6ef4e18cffd1d04042530fa7d6 Mon Sep 17 00:00:00 2001 From: Jamie Gilbert Date: Fri, 22 Mar 2024 13:38:44 -0700 Subject: [PATCH 20/25] Updated spec to match ddl --- inst/csv/resultsDataModelSpecification.csv | 68 +++++++++++----------- 1 file changed, 34 insertions(+), 34 deletions(-) diff --git a/inst/csv/resultsDataModelSpecification.csv b/inst/csv/resultsDataModelSpecification.csv index 853b7f47..3bc0b5a2 100644 --- a/inst/csv/resultsDataModelSpecification.csv +++ b/inst/csv/resultsDataModelSpecification.csv @@ -2,15 +2,15 @@ table_name,column_name,data_type,is_required,primary_key,min_cell_count,deprecat cm_attrition,sequence_number,int,Yes,Yes,No,No,The place in the sequence of steps defining the final analysis cohort. 1 indicates the original exposed population without any inclusion criteria. cm_attrition,description,varchar,Yes,No,No,No,"A description of the last restriction, e.g. ""Removing persons with the outcome prior"". " cm_attrition,subjects,int,Yes,No,Yes,No,The number of subjects in the cohort. -cm_attrition,exposure_id,int,Yes,Yes,No,No,The identifier of the exposure cohort to which the attrition applies. Can be either the target or comparator cohort ID. -cm_attrition,target_id,int,Yes,Yes,No,No,The identifier for the target cohort. -cm_attrition,comparator_id,int,Yes,Yes,No,No,The identifier for the comparator cohort. +cm_attrition,exposure_id,bigint,Yes,Yes,No,No,The identifier of the exposure cohort to which the attrition applies. Can be either the target or comparator cohort ID. +cm_attrition,target_id,bigint,Yes,Yes,No,No,The identifier for the target cohort. +cm_attrition,comparator_id,bigint,Yes,Yes,No,No,The identifier for the comparator cohort. cm_attrition,analysis_id,int,Yes,Yes,No,No,The identifier for the outcome cohort. -cm_attrition,outcome_id,int,Yes,Yes,No,No,Foreign key referencing the cm_analysis table. +cm_attrition,outcome_id,bigint,Yes,Yes,No,No,Foreign key referencing the cm_analysis table. cm_attrition,database_id,varchar,Yes,Yes,No,No,Foreign key referencing the database. -cm_follow_up_dist,target_id,int,Yes,Yes,No,No,The identifier for the target cohort. -cm_follow_up_dist,comparator_id,int,Yes,Yes,No,No,The identifier for the comparator cohort. -cm_follow_up_dist,outcome_id,int,Yes,Yes,No,No,The identifier for the outcome cohort. +cm_follow_up_dist,target_id,bigint,Yes,Yes,No,No,The identifier for the target cohort. +cm_follow_up_dist,comparator_id,bigint,Yes,Yes,No,No,The identifier for the comparator cohort. +cm_follow_up_dist,outcome_id,bigint,Yes,Yes,No,No,The identifier for the outcome cohort. cm_follow_up_dist,analysis_id,int,Yes,Yes,No,No,Foreign key referencing the cm_analysis table. cm_follow_up_dist,target_min_days,float,No,No,No,No,The minimum number of observation days for a person. cm_follow_up_dist,target_p_10_days,float,No,No,No,No,The 10^th^ percentile of number of observation days for a person in the target group. @@ -35,9 +35,9 @@ cm_analysis,analysis_id,int,Yes,Yes,No,No,A unique identifier for an analysis. cm_analysis,description,varchar,Yes,No,No,No,"A description for an analysis, e.g. 'On-treatment'." cm_analysis,definition,varchar,Yes,No,No,No,A CohortMethod JSON object specifying the analysis. cm_result,analysis_id,int,Yes,Yes,No,No,Foreign key referencing the cm_analysis table. -cm_result,target_id,int,Yes,Yes,No,No,The identifier for the target cohort. -cm_result,comparator_id,int,Yes,Yes,No,No,The identifier for the comparator cohort. -cm_result,outcome_id,int,Yes,Yes,No,No,The identifier for the outcome cohort. +cm_result,target_id,bigint,Yes,Yes,No,No,The identifier for the target cohort. +cm_result,comparator_id,bigint,Yes,Yes,No,No,The identifier for the comparator cohort. +cm_result,outcome_id,bigint,Yes,Yes,No,No,The identifier for the outcome cohort. cm_result,rr,float,No,No,No,No,The estimated relative risk (e.g. the hazard ratio). cm_result,ci_95_lb,float,No,No,No,No,The lower bound of the 95% confidence interval of the relative risk. cm_result,ci_95_ub,float,No,No,No,No,The upper bound of the 95% confidence interval of the relative risk. @@ -62,9 +62,9 @@ cm_result,calibrated_se_log_rr,float,No,No,No,No,The standard error of the log o cm_result,target_estimator,varchar,Yes,No,No,No,"The target estimator, for example ""att"", ""ate"", ""atu"" or ""ato""." cm_result,database_id,varchar,Yes,Yes,No,No,Foreign key referencing the database. cm_interaction_result,analysis_id,int,Yes,Yes,No,No,Foreign key referencing the cm_analysis table. -cm_interaction_result,target_id,int,Yes,Yes,No,No,The identifier for the target cohort. -cm_interaction_result,comparator_id,int,Yes,Yes,No,No,The identifier for the comparator cohort. -cm_interaction_result,outcome_id,int,Yes,Yes,No,No,The identifier for the outcome cohort. +cm_interaction_result,target_id,bigint,Yes,Yes,No,No,The identifier for the target cohort. +cm_interaction_result,comparator_id,bigint,Yes,Yes,No,No,The identifier for the comparator cohort. +cm_interaction_result,outcome_id,bigint,Yes,Yes,No,No,The identifier for the outcome cohort. cm_interaction_result,interaction_covariate_id,int,Yes,Yes,No,No,Foreign key referencing the cm_covariate table. cm_interaction_result,rr,float,Yes,No,No,No,The estimated relative risk (e.g. the ratio of hazard ratios). cm_interaction_result,ci_95_lb,float,Yes,No,No,No,The lower bound of the 95% confidence interval of the relative risk. @@ -95,9 +95,9 @@ cm_covariate_analysis,covariate_analysis_id,int,Yes,Yes,No,No,A unique identifie cm_covariate_analysis,covariate_analysis_name,varchar,Yes,No,No,No,"A name for a covariate analysis, e.g. 'Demographics: age group'." cm_covariate_analysis,analysis_id,int,Yes,Yes,No,No,A foreign key referencing the cm_analysis table. cm_covariate_balance,database_id,varchar,Yes,Yes,No,No,Foreign key referencing the database. -cm_covariate_balance,target_id,int,Yes,Yes,No,No,The identifier for the target cohort. -cm_covariate_balance,comparator_id,int,Yes,Yes,No,No,The identifier for the comparator cohort. -cm_covariate_balance,outcome_id,int,Yes,Yes,No,No,The identifier for the outcome cohort. +cm_covariate_balance,target_id,bigint,Yes,Yes,No,No,The identifier for the target cohort. +cm_covariate_balance,comparator_id,bigint,Yes,Yes,No,No,The identifier for the comparator cohort. +cm_covariate_balance,outcome_id,bigint,Yes,Yes,No,No,The identifier for the outcome cohort. cm_covariate_balance,analysis_id,int,Yes,Yes,No,No,A foreign key referencing the cm_analysis table. cm_covariate_balance,covariate_id,bigint,Yes,Yes,No,No,A foreign key referencing the cm_covariate table. cm_covariate_balance,target_mean_before,float,No,No,Yes,No,The mean value of the covariate in the target cohort before propensity score adjustment. @@ -112,9 +112,9 @@ cm_covariate_balance,target_std_diff,float,No,No,No,No,The standardized differen cm_covariate_balance,comparator_std_diff,float,No,No,No,No,The standardized difference of the means before and after propensity score adjustment in the comparator cohort. cm_covariate_balance,target_comparator_std_diff,float,No,No,No,No,The standardized difference of the means before and after propensity score adjustment in the union of the target and comparator cohorts. cm_diagnostics_summary,analysis_id,int,Yes,Yes,No,No,A foreign key referencing the cm_analysis table. -cm_diagnostics_summary,target_id,int,Yes,Yes,No,No,The identifier for the target cohort. -cm_diagnostics_summary,comparator_id,int,Yes,Yes,No,No,The identifier for the comparator cohort. -cm_diagnostics_summary,outcome_id,int,Yes,Yes,No,No,The identifier for the outcome cohort. +cm_diagnostics_summary,target_id,bigint,Yes,Yes,No,No,The identifier for the target cohort. +cm_diagnostics_summary,comparator_id,bigint,Yes,Yes,No,No,The identifier for the comparator cohort. +cm_diagnostics_summary,outcome_id,bigint,Yes,Yes,No,No,The identifier for the outcome cohort. cm_diagnostics_summary,database_id,varchar,Yes,Yes,No,No,Foreign key referencing the database. cm_diagnostics_summary,max_sdm,float,No,No,No,No,The maximum absolute standardized difference of mean. cm_diagnostics_summary,shared_max_sdm,float,No,No,No,No,The maximum absolute standardized difference of mean of the shared balance (shared across outcomes). @@ -132,11 +132,11 @@ cm_diagnostics_summary,generalizability_diagnostic,varchar(20),Yes,No,No,No,Pass cm_diagnostics_summary,ease_diagnostic,varchar(20),Yes,No,No,No,Pass / warning / fail classification of the EASE diagnostic. cm_diagnostics_summary,unblind,int,Yes,No,No,No,"Is unblinding the result recommended? (1 = yes, 0 = no)" cm_diagnostics_summary,unblind_for_evidence_synthesis,int,Yes,No,No,No,"Is unblinding the result for inclusion in evidence synthesis recommended? This ignores the MDRR diagnostic. (1 = yes, 0 = no)" -cm_target_comparator_outcome,outcome_id,int,Yes,Yes,No,No,The identifier for the outcome cohort. +cm_target_comparator_outcome,outcome_id,bigint,Yes,Yes,No,No,The identifier for the outcome cohort. cm_target_comparator_outcome,outcome_of_interest,int,Yes,No,No,No,"Is the outcome of interest (1 = yes, 0 = no)" cm_target_comparator_outcome,true_effect_size,float,No,No,No,No,The true effect size for the target-comparator-outcome. For negatitive controls this equals 1. -cm_target_comparator_outcome,target_id,int,Yes,Yes,No,No,The identifier for the target cohort. -cm_target_comparator_outcome,comparator_id,int,Yes,Yes,No,No,The identifier for the comparator cohort. +cm_target_comparator_outcome,target_id,bigint,Yes,Yes,No,No,The identifier for the target cohort. +cm_target_comparator_outcome,comparator_id,bigint,Yes,Yes,No,No,The identifier for the comparator cohort. cm_kaplan_meier_dist,time_day,int,Yes,Yes,No,No,Time in days since cohort start. cm_kaplan_meier_dist,target_survival,float,Yes,No,No,No,The estimated survival fraction in the target cohort. cm_kaplan_meier_dist,target_survival_lb,float,Yes,No,No,No,The lower bound of the 95% confidence interval of the survival fraction in the target cohort. @@ -146,34 +146,34 @@ cm_kaplan_meier_dist,comparator_survival_lb,float,Yes,No,No,No,The lower bound o cm_kaplan_meier_dist,comparator_survival_ub,float,Yes,No,No,No,The upper bound of the 95% confidence interval of the survival fraction in the comparator cohort. cm_kaplan_meier_dist,target_at_risk,int,No,No,Yes,No,The number of subjects still at risk in the target cohort. cm_kaplan_meier_dist,comparator_at_risk,int,No,No,Yes,No,The number of subjects still at risk in the comparator cohort. -cm_kaplan_meier_dist,target_id,int,Yes,Yes,No,No,The identifier for the target cohort. -cm_kaplan_meier_dist,comparator_id,int,Yes,Yes,No,No,The identifier for the comparator cohort. -cm_kaplan_meier_dist,outcome_id,int,Yes,Yes,No,No,The identifier for the outcome cohort. +cm_kaplan_meier_dist,target_id,bigint,Yes,Yes,No,No,The identifier for the target cohort. +cm_kaplan_meier_dist,comparator_id,bigint,Yes,Yes,No,No,The identifier for the comparator cohort. +cm_kaplan_meier_dist,outcome_id,bigint,Yes,Yes,No,No,The identifier for the outcome cohort. cm_kaplan_meier_dist,analysis_id,int,Yes,Yes,No,No,A foreign key referencing the cm_analysis table. cm_kaplan_meier_dist,database_id,varchar,Yes,Yes,No,No,Foreign key referencing the database. cm_likelihood_profile,log_rr,float,Yes,Yes,No,No,The log of the relative risk where the likelihood is sampled. cm_likelihood_profile,log_likelihood,float,Yes,No,No,No,The normalized log likelihood. -cm_likelihood_profile,target_id,int,Yes,Yes,No,No,The identifier for the target cohort. -cm_likelihood_profile,comparator_id,int,Yes,Yes,No,No,The identifier for the comparator cohort. -cm_likelihood_profile,outcome_id,int,Yes,Yes,No,No,The identifier for the outcome cohort. +cm_likelihood_profile,target_id,bigint,Yes,Yes,No,No,The identifier for the target cohort. +cm_likelihood_profile,comparator_id,bigint,Yes,Yes,No,No,The identifier for the comparator cohort. +cm_likelihood_profile,outcome_id,bigint,Yes,Yes,No,No,The identifier for the outcome cohort. cm_likelihood_profile,analysis_id,int,Yes,Yes,No,No,A foreign key referencing the cm_analysis table. cm_likelihood_profile,database_id,varchar,Yes,Yes,No,No,Foreign key referencing the database. cm_preference_score_dist,analysis_id,int,Yes,Yes,No,No,A foreign key referencing the cm_analysis table. -cm_preference_score_dist,target_id,int,Yes,Yes,No,No,The identifier for the target cohort. -cm_preference_score_dist,comparator_id,int,Yes,Yes,No,No,The identifier for the comparator cohort. +cm_preference_score_dist,target_id,bigint,Yes,Yes,No,No,The identifier for the target cohort. +cm_preference_score_dist,comparator_id,bigint,Yes,Yes,No,No,The identifier for the comparator cohort. cm_preference_score_dist,database_id,varchar,Yes,Yes,No,No,Foreign key referencing the database. cm_preference_score_dist,preference_score,float,Yes,Yes,No,No,A preference score value. cm_preference_score_dist,target_density,float,Yes,No,No,No,The distribution density for the target cohort at the given preference score. cm_preference_score_dist,comparator_density,float,Yes,No,No,No,The distribution density for the comparator cohort at the given preference score. -cm_propensity_model,target_id,int,Yes,Yes,No,No,The identifier for the target cohort. -cm_propensity_model,comparator_id,int,Yes,Yes,No,No,The identifier for the comparator cohort. +cm_propensity_model,target_id,bigint,Yes,Yes,No,No,The identifier for the target cohort. +cm_propensity_model,comparator_id,bigint,Yes,Yes,No,No,The identifier for the comparator cohort. cm_propensity_model,analysis_id,int,Yes,Yes,No,No,A foreign key referencing the cm_analysis table. cm_propensity_model,database_id,varchar,Yes,Yes,No,No,Foreign key referencing the database. cm_propensity_model,covariate_id,bigint,Yes,Yes,No,No,Foreign key referencing the cm_covariate table. 0 is reserved for the intercept. cm_propensity_model,coefficient,float,Yes,No,No,No,The coefficient (beta) for the covariate in the propensity model. cm_shared_covariate_balance,database_id,varchar,Yes,Yes,No,No,Foreign key referencing the database. -cm_shared_covariate_balance,target_id,int,Yes,Yes,No,No,The identifier for the target cohort. -cm_shared_covariate_balance,comparator_id,int,Yes,Yes,No,No,The identifier for the comparator cohort. +cm_shared_covariate_balance,target_id,bigint,Yes,Yes,No,No,The identifier for the target cohort. +cm_shared_covariate_balance,comparator_id,bigint,Yes,Yes,No,No,The identifier for the comparator cohort. cm_shared_covariate_balance,analysis_id,int,Yes,Yes,No,No,A foreign key referencing the cm_analysis table. cm_shared_covariate_balance,covariate_id,bigint,Yes,Yes,No,No,A foreign key referencing the cm_covariate table. cm_shared_covariate_balance,mean_before,float,No,No,Yes,No,The mean value of the covariate in the union of the target and comparator cohort before propensity score adjustment. From 8181fdf23fb3b008274ad54505230b8dbbc42f8e Mon Sep 17 00:00:00 2001 From: Jamie Gilbert Date: Tue, 2 Apr 2024 18:38:30 -0700 Subject: [PATCH 21/25] checks to allow big integers that overflow Integerish but are still integers --- R/Analyses.R | 2 ++ R/DataLoadingSaving.R | 1 + R/StudyPopulation.R | 1 + 3 files changed, 4 insertions(+) diff --git a/R/Analyses.R b/R/Analyses.R index cd2f7a64..e272a4a3 100644 --- a/R/Analyses.R +++ b/R/Analyses.R @@ -211,6 +211,7 @@ createOutcome <- function(outcomeId, endAnchor = NULL) { errorMessages <- checkmate::makeAssertCollection() checkmate::assertNumeric(outcomeId, add = errorMessages) + checkmate::asserTRUE(all(outcomeId %% 1 == 0), add = errorMessages) checkmate::assertLogical(outcomeOfInterest, add = errorMessages) checkmate::assertNumeric(trueEffectSize, len = 1, null.ok = TRUE, add = errorMessages) checkmate::assertInt(riskWindowStart, null.ok = TRUE, add = errorMessages) @@ -260,6 +261,7 @@ createTargetComparatorOutcomes <- function(targetId, errorMessages <- checkmate::makeAssertCollection() checkmate::assertNumeric(targetId, add = errorMessages) checkmate::assertNumeric(comparatorId, add = errorMessages) + checkmate::assertTRUE(all(c(targetId, comparatorId) %% 1 == 0), add = errorMessages) checkmate::assertList(outcomes, min.len = 1, add = errorMessages) for (i in seq_along(outcomes)) { checkmate::assertClass(outcomes[[i]], "outcome", add = errorMessages) diff --git a/R/DataLoadingSaving.R b/R/DataLoadingSaving.R index 3c031d80..1c5aaac5 100644 --- a/R/DataLoadingSaving.R +++ b/R/DataLoadingSaving.R @@ -130,6 +130,7 @@ getDbCohortMethodData <- function(connectionDetails, checkmate::assertNumeric(targetId, add = errorMessages) checkmate::assertNumeric(comparatorId, add = errorMessages) checkmate::assertNumeric(outcomeIds, add = errorMessages) + checkmate::assertTRUE(all(c(targetId, comparatorId, outcomeIds) %% 1 == 0), add = errorMessages) checkmate::assertCharacter(studyStartDate, len = 1, add = errorMessages) checkmate::assertCharacter(studyEndDate, len = 1, add = errorMessages) checkmate::assertCharacter(exposureDatabaseSchema, len = 1, add = errorMessages) diff --git a/R/StudyPopulation.R b/R/StudyPopulation.R index 1543adf0..204bc15c 100644 --- a/R/StudyPopulation.R +++ b/R/StudyPopulation.R @@ -102,6 +102,7 @@ createStudyPopulation <- function(cohortMethodData, checkmate::assertClass(cohortMethodData, "CohortMethodData", add = errorMessages) checkmate::assertDataFrame(population, null.ok = TRUE, add = errorMessages) checkmate::assertNumeric(outcomeId, null.ok = TRUE, add = errorMessages) + if (!is.null(outcomeId)) checkmate::asserTRUE(all(outcomeId %% 1 == 0), add = errorMessages) checkmate::assertLogical(firstExposureOnly, len = 1, add = errorMessages) checkmate::assertLogical(restrictToCommonPeriod, len = 1, add = errorMessages) checkmate::assertInt(washoutPeriod, lower = 0, add = errorMessages) From 1fbd8dd810e31ec8d44c0964240ad6950c0d929e Mon Sep 17 00:00:00 2001 From: Jamie Gilbert Date: Thu, 4 Apr 2024 12:39:30 -0700 Subject: [PATCH 22/25] Fixed type --- R/Analyses.R | 2 +- R/StudyPopulation.R | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/R/Analyses.R b/R/Analyses.R index 89c14505..c7be7b9e 100644 --- a/R/Analyses.R +++ b/R/Analyses.R @@ -211,7 +211,7 @@ createOutcome <- function(outcomeId, endAnchor = NULL) { errorMessages <- checkmate::makeAssertCollection() checkmate::assertNumeric(outcomeId, add = errorMessages) - checkmate::asserTRUE(all(outcomeId %% 1 == 0), add = errorMessages) + checkmate::assertTRUE(all(outcomeId %% 1 == 0), add = errorMessages) checkmate::assertLogical(outcomeOfInterest, add = errorMessages) checkmate::assertNumeric(trueEffectSize, len = 1, null.ok = TRUE, add = errorMessages) checkmate::assertInt(riskWindowStart, null.ok = TRUE, add = errorMessages) diff --git a/R/StudyPopulation.R b/R/StudyPopulation.R index 94af1717..90b56427 100644 --- a/R/StudyPopulation.R +++ b/R/StudyPopulation.R @@ -102,7 +102,7 @@ createStudyPopulation <- function(cohortMethodData, checkmate::assertClass(cohortMethodData, "CohortMethodData", add = errorMessages) checkmate::assertDataFrame(population, null.ok = TRUE, add = errorMessages) checkmate::assertNumeric(outcomeId, null.ok = TRUE, add = errorMessages) - if (!is.null(outcomeId)) checkmate::asserTRUE(all(outcomeId %% 1 == 0), add = errorMessages) + if (!is.null(outcomeId)) checkmate::assertTRUE(all(outcomeId %% 1 == 0), add = errorMessages) checkmate::assertLogical(firstExposureOnly, len = 1, add = errorMessages) checkmate::assertLogical(restrictToCommonPeriod, len = 1, add = errorMessages) checkmate::assertInt(washoutPeriod, lower = 0, add = errorMessages) From 8feeca53a668353547af288e14f58763e044a0a6 Mon Sep 17 00:00:00 2001 From: Jamie Gilbert Date: Mon, 8 Apr 2024 07:16:49 -0700 Subject: [PATCH 23/25] Removed changes from testing branch --- R/RunAnalyses.R | 524 +++++++++++++++++++----------------------------- 1 file changed, 203 insertions(+), 321 deletions(-) diff --git a/R/RunAnalyses.R b/R/RunAnalyses.R index 3e19c0e0..e1ea1f12 100644 --- a/R/RunAnalyses.R +++ b/R/RunAnalyses.R @@ -292,14 +292,13 @@ runCmAnalyses <- function(connectionDetails, saveRDS(file.path(outputFolder, "outcomeModelReference.rds")) # Create cohortMethodData objects ----------------------------- - subset <- referenceTable[!duplicated(referenceTable$cohortMethodDataFile),] - subset <- subset[subset$cohortMethodDataFile != "",] - subset <- subset[!file.exists(file.path(outputFolder, subset$cohortMethodDataFile)),] + subset <- referenceTable[!duplicated(referenceTable$cohortMethodDataFile), ] + subset <- subset[subset$cohortMethodDataFile != "", ] + subset <- subset[!file.exists(file.path(outputFolder, subset$cohortMethodDataFile)), ] if (nrow(subset) != 0) { message("*** Creating cohortMethodData objects ***") - createCmDataTask <- function(i) { - refRow <- subset[i,] + refRow <- subset[i, ] analysisRow <- ParallelLogger::matchInList( cmAnalysisList, list(analysisId = refRow$analysisId) @@ -356,14 +355,13 @@ runCmAnalyses <- function(connectionDetails, } # Create study populations -------------------------------------- - subset <- referenceTable[!duplicated(referenceTable$studyPopFile),] - subset <- subset[subset$studyPopFile != "",] - subset <- subset[!file.exists(file.path(outputFolder, subset$studyPopFile)),] + subset <- referenceTable[!duplicated(referenceTable$studyPopFile), ] + subset <- subset[subset$studyPopFile != "", ] + subset <- subset[!file.exists(file.path(outputFolder, subset$studyPopFile)), ] if (nrow(subset) != 0) { message("*** Creating study populations ***") - createStudyPopTask <- function(i) { - refRow <- subset[i,] + refRow <- subset[i, ] analysisRow <- ParallelLogger::matchInList( cmAnalysisList, list(analysisId = refRow$analysisId) @@ -409,7 +407,6 @@ runCmAnalyses <- function(connectionDetails, ) return(task) } - objectsToCreate <- lapply(1:nrow(subset), createStudyPopTask) cluster <- ParallelLogger::makeCluster(min(length(objectsToCreate), multiThreadingSettings$createStudyPopThreads)) ParallelLogger::clusterRequire(cluster, "CohortMethod") @@ -452,14 +449,13 @@ runCmAnalyses <- function(connectionDetails, ParallelLogger::stopCluster(cluster) } } else { - subset <- referenceTable[!duplicated(referenceTable$sharedPsFile),] - subset <- subset[subset$sharedPsFile != "",] - subset <- subset[!file.exists(file.path(outputFolder, subset$sharedPsFile)),] + subset <- referenceTable[!duplicated(referenceTable$sharedPsFile), ] + subset <- subset[subset$sharedPsFile != "", ] + subset <- subset[!file.exists(file.path(outputFolder, subset$sharedPsFile)), ] if (nrow(subset) != 0) { message("*** Fitting shared propensity models ***") - createSharedPsTask <- function(i) { - refRow <- subset[i,] + refRow <- subset[i, ] analysisRow <- ParallelLogger::matchInList( cmAnalysisList, list(analysisId = refRow$analysisId) @@ -486,9 +482,9 @@ runCmAnalyses <- function(connectionDetails, ParallelLogger::stopCluster(cluster) } - subset <- referenceTable[!duplicated(referenceTable$psFile),] - subset <- subset[subset$psFile != "",] - subset <- subset[!file.exists(file.path(outputFolder, subset$psFile)),] + subset <- referenceTable[!duplicated(referenceTable$psFile), ] + subset <- subset[subset$psFile != "", ] + subset <- subset[!file.exists(file.path(outputFolder, subset$psFile)), ] if (nrow(subset) != 0) { message("*** Adding propensity scores to study population objects ***") tasks <- split(subset, subset$sharedPsFile) @@ -500,30 +496,24 @@ runCmAnalyses <- function(connectionDetails, } # Trimming/Matching/Stratifying ----------------------------------------- - subset <- referenceTable[!duplicated(referenceTable$strataFile),] - subset <- subset[subset$strataFile != "",] - subset <- subset[!file.exists(file.path(outputFolder, subset$strataFile)),] + subset <- referenceTable[!duplicated(referenceTable$strataFile), ] + subset <- subset[subset$strataFile != "", ] + subset <- subset[!file.exists(file.path(outputFolder, subset$strataFile)), ] if (nrow(subset) != 0) { message("*** Trimming/Matching/Stratifying ***") - createTrimMatchStratTask <- function(i) { - refRow <- subset[i,] + refRow <- subset[i, ] analysisRow <- ParallelLogger::matchInList( cmAnalysisList, list(analysisId = refRow$analysisId) )[[1]] - - if (file.exists(file.path(outputFolder, refRow$psFile))) { - task <- list( - psFile = file.path(outputFolder, refRow$psFile), - args = analysisRow, - strataFile = file.path(outputFolder, refRow$strataFile) - ) - - return(task) - } + task <- list( + psFile = file.path(outputFolder, refRow$psFile), + args = analysisRow, + strataFile = file.path(outputFolder, refRow$strataFile) + ) + return(task) } - tasks <- lapply(1:nrow(subset), createTrimMatchStratTask) cluster <- ParallelLogger::makeCluster(min(length(tasks), multiThreadingSettings$trimMatchStratifyThreads)) @@ -533,9 +523,9 @@ runCmAnalyses <- function(connectionDetails, } # Computing shared covariate balance ---------------------------------- - subset <- referenceTable[!duplicated(referenceTable$sharedBalanceFile),] - subset <- subset[subset$sharedBalanceFile != "",] - subset <- subset[!file.exists(file.path(outputFolder, subset$sharedBalanceFile)),] + subset <- referenceTable[!duplicated(referenceTable$sharedBalanceFile), ] + subset <- subset[subset$sharedBalanceFile != "", ] + subset <- subset[!file.exists(file.path(outputFolder, subset$sharedBalanceFile)), ] if (nrow(subset) != 0) { message("*** Computing shared covariate balance ***") @@ -557,7 +547,6 @@ runCmAnalyses <- function(connectionDetails, ) return(task) } - tasks <- lapply(1:nrow(subset), createSharedBalanceTask) cluster <- ParallelLogger::makeCluster(min(length(tasks), multiThreadingSettings$computeSharedBalanceThreads)) ParallelLogger::clusterRequire(cluster, "CohortMethod") @@ -566,14 +555,13 @@ runCmAnalyses <- function(connectionDetails, } # Filtering covariates for computing covariate balance ------------------------------ - subset <- referenceTable[!duplicated(referenceTable$filteredForbalanceFile),] - subset <- subset[subset$filteredForbalanceFile != "",] - subset <- subset[!file.exists(file.path(outputFolder, subset$filteredForbalanceFile)),] + subset <- referenceTable[!duplicated(referenceTable$filteredForbalanceFile), ] + subset <- subset[subset$filteredForbalanceFile != "", ] + subset <- subset[!file.exists(file.path(outputFolder, subset$filteredForbalanceFile)), ] if (nrow(subset) != 0) { message("*** Filtering covariates for computing covariate balance ***") - createFilterForCovariateBalanceTask <- function(i) { - refRow <- subset[i,] + refRow <- subset[i, ] analysisRow <- ParallelLogger::matchInList( cmAnalysisList, list(analysisId = refRow$analysisId) @@ -596,14 +584,13 @@ runCmAnalyses <- function(connectionDetails, } # Computing covariate balance (per outcome) ----------------------- - subset <- referenceTable[!duplicated(referenceTable$balanceFile),] - subset <- subset[subset$balanceFile != "",] - subset <- subset[!file.exists(file.path(outputFolder, subset$balanceFile)),] + subset <- referenceTable[!duplicated(referenceTable$balanceFile), ] + subset <- subset[subset$balanceFile != "", ] + subset <- subset[!file.exists(file.path(outputFolder, subset$balanceFile)), ] if (nrow(subset) != 0) { message("*** Computing covariate balance (per outcome) ***") - createBalanceTask <- function(i) { - refRow <- subset[i,] + refRow <- subset[i, ] analysisRow <- ParallelLogger::matchInList( cmAnalysisList, list(analysisId = refRow$analysisId) @@ -623,7 +610,6 @@ runCmAnalyses <- function(connectionDetails, ) return(task) } - tasks <- lapply(1:nrow(subset), createBalanceTask) cluster <- ParallelLogger::makeCluster(min(length(tasks), multiThreadingSettings$computeBalanceThreads)) ParallelLogger::clusterRequire(cluster, "CohortMethod") @@ -632,14 +618,13 @@ runCmAnalyses <- function(connectionDetails, } # Prefiltering covariates for outcome models ------------------------- - subset <- referenceTable[!duplicated(referenceTable$prefilteredCovariatesFile),] - subset <- subset[subset$prefilteredCovariatesFile != "",] - subset <- subset[!file.exists(file.path(outputFolder, subset$prefilteredCovariatesFile)),] + subset <- referenceTable[!duplicated(referenceTable$prefilteredCovariatesFile), ] + subset <- subset[subset$prefilteredCovariatesFile != "", ] + subset <- subset[!file.exists(file.path(outputFolder, subset$prefilteredCovariatesFile)), ] if (nrow(subset) != 0) { message("*** Prefiltering covariates for outcome models ***") - createPrefilterTask <- function(i) { - refRow <- subset[i,] + refRow <- subset[i, ] analysisRow <- ParallelLogger::matchInList( cmAnalysisList, list(analysisId = refRow$analysisId) @@ -657,7 +642,6 @@ runCmAnalyses <- function(connectionDetails, ) return(task) } - tasks <- lapply(1:nrow(subset), createPrefilterTask) cluster <- ParallelLogger::makeCluster(min(length(tasks), multiThreadingSettings$prefilterCovariatesThreads)) ParallelLogger::clusterRequire(cluster, "CohortMethod") @@ -666,13 +650,13 @@ runCmAnalyses <- function(connectionDetails, } # Fitting outcome models -------------------------- - subset <- referenceTable[referenceTable$outcomeOfInterest & referenceTable$outcomeModelFile != "",] - subset <- subset[!file.exists(file.path(outputFolder, subset$outcomeModelFile)),] + subset <- referenceTable[referenceTable$outcomeOfInterest & referenceTable$outcomeModelFile != "", ] + subset <- subset[!file.exists(file.path(outputFolder, subset$outcomeModelFile)), ] if (nrow(subset) != 0) { message("*** Fitting outcome models for outcomes of interest ***") createOutcomeModelTask <- function(i) { - refRow <- subset[i,] + refRow <- subset[i, ] analysisRow <- ParallelLogger::matchInList( cmAnalysisList, list(analysisId = refRow$analysisId) @@ -698,7 +682,6 @@ runCmAnalyses <- function(connectionDetails, outcomeModelFile = file.path(outputFolder, refRow$outcomeModelFile) )) } - modelsToFit <- lapply(1:nrow(subset), createOutcomeModelTask) cluster <- ParallelLogger::makeCluster(min(length(modelsToFit), multiThreadingSettings$fitOutcomeModelThreads)) ParallelLogger::clusterRequire(cluster, "CohortMethod") @@ -706,13 +689,13 @@ runCmAnalyses <- function(connectionDetails, ParallelLogger::stopCluster(cluster) } - subset <- referenceTable[!referenceTable$outcomeOfInterest & referenceTable$outcomeModelFile != "",] - subset <- subset[!file.exists(file.path(outputFolder, subset$outcomeModelFile)),] + subset <- referenceTable[!referenceTable$outcomeOfInterest & referenceTable$outcomeModelFile != "", ] + subset <- subset[!file.exists(file.path(outputFolder, subset$outcomeModelFile)), ] if (nrow(subset) != 0) { message("*** Fitting outcome models for other outcomes ***") createArgs <- function(i) { - refRow <- subset[i,] + refRow <- subset[i, ] analysisRow <- ParallelLogger::matchInList( cmAnalysisList, list(analysisId = refRow$analysisId) @@ -726,7 +709,7 @@ runCmAnalyses <- function(connectionDetails, params <- list( cohortMethodDataFile = file.path(outputFolder, refRow$cohortMethodDataFile), prefilteredCovariatesFile = prefilteredCovariatesFile, - psFile = file.path(outputFolder, refRow$psFile), + psFile = file.path(outputFolder, refRow$psFile), sharedPsFile = file.path(outputFolder, refRow$sharedPsFile), refitPsForEveryOutcome = refitPsForEveryOutcome, args = analysisRow, @@ -734,7 +717,6 @@ runCmAnalyses <- function(connectionDetails, ) return(params) } - modelsToFit <- lapply(1:nrow(subset), createArgs) cluster <- ParallelLogger::makeCluster(min(length(modelsToFit), multiThreadingSettings$fitOutcomeModelThreads)) ParallelLogger::clusterRequire(cluster, "CohortMethod") @@ -785,7 +767,7 @@ getPs <- function(psFile) { } doCreateCmDataObject <- function(params) { - ParallelLogger::logDebug(sprintf("Calling getDbCohortMethodData() for targetId %s, comparatorId %s", + ParallelLogger::logDebug(sprintf("Calling getDbCohortMethodData() for targetId %d, comparatorId %d", params$args$targetId, params$args$comparatorId)) cohortMethodData <- do.call("getDbCohortMethodData", params$args) @@ -797,7 +779,7 @@ doCreateStudyPopObject <- function(params) { cohortMethodData <- getCohortMethodData(params$cohortMethodDataFile) args <- params$args args$cohortMethodData <- cohortMethodData - ParallelLogger::logDebug(sprintf("Calling createStudyPopulation() using %s for outcomeId %s", + ParallelLogger::logDebug(sprintf("Calling createStudyPopulation() using %s for outcomeId %d", params$cohortMethodDataFile, args$outcomeId)) studyPop <- do.call("createStudyPopulation", args) @@ -848,24 +830,19 @@ addPsToStudyPopForSubset <- function(subset, outputFolder) { ps <- readRDS(file.path(outputFolder, subset$sharedPsFile[1])) addToStudyPop <- function(i) { - refRow <- subset[i,] + refRow <- subset[i, ] studyPop <- readRDS(file.path(outputFolder, refRow$studyPopFile)) studyPop <- addPsToStudyPopulation(studyPop, ps) - tryCatch({ - saveRDS(studyPop, file.path(outputFolder, refRow$psFile)) - }, error = function(err) { - ParallelLogger::logError(paste("error saving ", refRow$psFile)) - }) + saveRDS(studyPop, file.path(outputFolder, refRow$psFile)) return(NULL) } - plyr::l_ply(1:nrow(subset), addToStudyPop) } addPsToStudyPopulation <- function(studyPopulation, ps) { # Merge meta-data newMetaData <- attr(studyPopulation, "metaData") - psMetaData <- attr(ps, "metaData") + psMetaData <- attr(ps, "metaData") missingColumns <- setdiff(names(psMetaData), names(newMetaData)) newMetaData <- append(newMetaData, psMetaData[missingColumns]) attr(studyPopulation, "metaData") <- newMetaData @@ -920,118 +897,100 @@ applyTrimMatchStratify <- function(ps, arguments) { } doTrimMatchStratify <- function(params) { - tryCatch({ - ps <- getPs(params$psFile) - ParallelLogger::logDebug(sprintf("Performing matching etc., using %s", - params$psFile)) - - - ps <- applyTrimMatchStratify(ps, params$args) - saveRDS(ps, params$strataFile) - }, error = function(err) { - ParallelLogger::logError(sprintf("Unable to save %s - %s", params$strataFile, err)) - }) + ps <- getPs(params$psFile) + ParallelLogger::logDebug(sprintf("Performing matching etc., using %s", + params$psFile)) + ps <- applyTrimMatchStratify(ps, params$args) + saveRDS(ps, params$strataFile) return(NULL) } doPrefilterCovariates <- function(params) { - tryCatch({ - cohortMethodData <- loadCohortMethodData(params$cohortMethodDataFile) - covariates <- cohortMethodData$covariates - if (nrow_temp(covariates) > 0) { - if (params$args$useCovariates) { - covariatesToInclude <- params$args$includeCovariateIds - covariatesToExclude <- params$args$excludeCovariateIds - } else { - covariatesToInclude <- c() - covariatesToExclude <- c() - } - covariatesToInclude <- unique(c(covariatesToInclude, params$args$interactionCovariateIds)) - if (length(covariatesToInclude) != 0) { - covariates <- covariates %>% - filter(.data$covariateId %in% covariatesToInclude) - } - if (length(covariatesToExclude) != 0) { - covariates <- covariates %>% - filter(!.data$covariateId %in% covariatesToExclude) - } + cohortMethodData <- loadCohortMethodData(params$cohortMethodDataFile) + covariates <- cohortMethodData$covariates + if (nrow_temp(covariates) > 0) { + if (params$args$useCovariates) { + covariatesToInclude <- params$args$includeCovariateIds + covariatesToExclude <- params$args$excludeCovariateIds + } else { + covariatesToInclude <- c() + covariatesToExclude <- c() } - filteredCohortMethodData <- Andromeda::andromeda( - cohorts = cohortMethodData$cohorts, - outcomes = cohortMethodData$outcomes, - covariates = covariates, - covariateRef = cohortMethodData$covariateRef, - analysisRef = cohortMethodData$analysisRef - ) - attr(filteredCohortMethodData, "metaData") <- attr(cohortMethodData, "metaData") - class(filteredCohortMethodData) <- "CohortMethodData" - attr(class(filteredCohortMethodData), "package") <- "CohortMethod" - saveCohortMethodData(filteredCohortMethodData, params$prefilteredCovariatesFile) - }, error = function(err) { - ParallelLogger::logError(err) - }) + covariatesToInclude <- unique(c(covariatesToInclude, params$args$interactionCovariateIds)) + if (length(covariatesToInclude) != 0) { + covariates <- covariates %>% + filter(.data$covariateId %in% covariatesToInclude) + } + if (length(covariatesToExclude) != 0) { + covariates <- covariates %>% + filter(!.data$covariateId %in% covariatesToExclude) + } + } + filteredCohortMethodData <- Andromeda::andromeda( + cohorts = cohortMethodData$cohorts, + outcomes = cohortMethodData$outcomes, + covariates = covariates, + covariateRef = cohortMethodData$covariateRef, + analysisRef = cohortMethodData$analysisRef + ) + attr(filteredCohortMethodData, "metaData") <- attr(cohortMethodData, "metaData") + class(filteredCohortMethodData) <- "CohortMethodData" + attr(class(filteredCohortMethodData), "package") <- "CohortMethod" + saveCohortMethodData(filteredCohortMethodData, params$prefilteredCovariatesFile) return(NULL) } doFitOutcomeModel <- function(params) { - tryCatch({ - if (params$prefilteredCovariatesFile == "") { - cohortMethodDataFile <- params$cohortMethodDataFile - } else { - cohortMethodDataFile <- params$prefilteredCovariatesFile - } - cohortMethodData <- getCohortMethodData(cohortMethodDataFile) - studyPop <- readRDS(params$studyPopFile) - args <- list(cohortMethodData = cohortMethodData, population = studyPop) - args <- append(args, params$args) - ParallelLogger::logDebug(sprintf("Calling fitOutcomeModel() using %s and %s", - cohortMethodDataFile, - params$studyPopFile)) - outcomeModel <- do.call("fitOutcomeModel", args) - saveRDS(outcomeModel, params$outcomeModelFile) - }, error = function(err) { - ParallelLogger::logError(err) - }) + if (params$prefilteredCovariatesFile == "") { + cohortMethodDataFile <- params$cohortMethodDataFile + } else { + cohortMethodDataFile <- params$prefilteredCovariatesFile + } + cohortMethodData <- getCohortMethodData(cohortMethodDataFile) + studyPop <- readRDS(params$studyPopFile) + args <- list(cohortMethodData = cohortMethodData, population = studyPop) + args <- append(args, params$args) + ParallelLogger::logDebug(sprintf("Calling fitOutcomeModel() using %s and %s", + cohortMethodDataFile, + params$studyPopFile)) + outcomeModel <- do.call("fitOutcomeModel", args) + saveRDS(outcomeModel, params$outcomeModelFile) return(NULL) } doFitOutcomeModelPlus <- function(params) { - tryCatch({ - if (params$prefilteredCovariatesFile == "") { - cohortMethodDataFile <- params$cohortMethodDataFile - } else { - cohortMethodDataFile <- params$prefilteredCovariatesFile - } - cohortMethodData <- getCohortMethodData(cohortMethodDataFile) + if (params$prefilteredCovariatesFile == "") { + cohortMethodDataFile <- params$cohortMethodDataFile + } else { + cohortMethodDataFile <- params$prefilteredCovariatesFile + } + cohortMethodData <- getCohortMethodData(cohortMethodDataFile) - ParallelLogger::logDebug(sprintf("Calling createStudyPopulation(), performing matching etc., and calling fitOutcomeModel() using %s for outcomeID %d", - cohortMethodDataFile, - params$args$createStudyPopArgs$outcomeId)) + ParallelLogger::logDebug(sprintf("Calling createStudyPopulation(), performing matching etc., and calling fitOutcomeModel() using %s for outcomeID %d", + cohortMethodDataFile, + params$args$createStudyPopArgs$outcomeId)) - # Create study pop - args <- params$args$createStudyPopArgs - args$cohortMethodData <- cohortMethodData - studyPop <- do.call("createStudyPopulation", args) + # Create study pop + args <- params$args$createStudyPopArgs + args$cohortMethodData <- cohortMethodData + studyPop <- do.call("createStudyPopulation", args) - if (!is.null(params$args$createPsArgs)) { - if (params$refitPsForEveryOutcome) { - ps <- getPs(params$psFile) - } else { - ps <- getPs(params$sharedPsFile) - ps <- addPsToStudyPopulation(studyPop, ps) - } + if (!is.null(params$args$createPsArgs)) { + if (params$refitPsForEveryOutcome) { + ps <- getPs(params$psFile) } else { - ps <- studyPop + ps <- getPs(params$sharedPsFile) + ps <- addPsToStudyPopulation(studyPop, ps) } - ps <- applyTrimMatchStratify(ps, params$args) - args <- params$args$fitOutcomeModelArgs - args$population <- ps - args$cohortMethodData <- cohortMethodData - outcomeModel <- do.call("fitOutcomeModel", args) - saveRDS(outcomeModel, params$outcomeModelFile) - }, error = function(err) { - ParallelLogger::logError(err) - }) + } else { + ps <- studyPop + } + ps <- applyTrimMatchStratify(ps, params$args) + args <- params$args$fitOutcomeModelArgs + args$population <- ps + args$cohortMethodData <- cohortMethodData + outcomeModel <- do.call("fitOutcomeModel", args) + saveRDS(outcomeModel, params$outcomeModelFile) return(NULL) } @@ -1043,81 +1002,63 @@ doComputeSharedBalance <- function(params) { # Create study pop message("Computing covariate balance across all outcomes (ignore messages about 'no outcome specified')") - tryCatch({ - args <- params$args$createStudyPopArgs - args$cohortMethodData <- cohortMethodData - studyPop <- do.call("createStudyPopulation", args) - - if (!is.null(params$args$createPsArgs)) { - # Add PS - ps <- getPs(params$sharedPsFile) - idx <- match(studyPop$rowId, ps$rowId) - studyPop$propensityScore <- ps$propensityScore[idx] - studyPop$iptw <- ps$iptw[idx] - ps <- studyPop - } else { - ps <- studyPop - } - + args <- params$args$createStudyPopArgs + args$cohortMethodData <- cohortMethodData + studyPop <- do.call("createStudyPopulation", args) - ps <- applyTrimMatchStratify(ps, params$args) - args <- params$args$computeSharedCovariateBalanceArgs - args$population <- ps - args$cohortMethodData <- cohortMethodData - balance <- do.call("computeCovariateBalance", args) - saveRDS(balance, params$sharedBalanceFile) - }, - error = function(err) { - ParallelLogger::logError(err) - }) + if (!is.null(params$args$createPsArgs)) { + # Add PS + ps <- getPs(params$sharedPsFile) + idx <- match(studyPop$rowId, ps$rowId) + studyPop$propensityScore <- ps$propensityScore[idx] + studyPop$iptw <- ps$iptw[idx] + ps <- studyPop + } else { + ps <- studyPop + } + ps <- applyTrimMatchStratify(ps, params$args) + args <- params$args$computeSharedCovariateBalanceArgs + args$population <- ps + args$cohortMethodData <- cohortMethodData + balance <- do.call("computeCovariateBalance", args) + saveRDS(balance, params$sharedBalanceFile) return(NULL) } doFilterForCovariateBalance <- function(params) { - tryCatch({ - cohortMethodData <- loadCohortMethodData(params$cohortMethodDataFile) - covariateFilter <- params$ - computeCovariateBalanceArgs$ - covariateFilter - covariates <- filterCovariates(cohortMethodData$covariates, cohortMethodData$covariateRef, covariateFilter) - filteredCohortMethodData <- Andromeda::andromeda( - cohorts = cohortMethodData$cohorts, - outcomes = cohortMethodData$outcomes, - covariates = covariates, - covariateRef = cohortMethodData$covariateRef, - analysisRef = cohortMethodData$analysisRef - ) - attr(filteredCohortMethodData, "metaData") <- attr(cohortMethodData, "metaData") - class(filteredCohortMethodData) <- "CohortMethodData" - attr(class(filteredCohortMethodData), "package") <- "CohortMethod" - saveCohortMethodData(filteredCohortMethodData, params$filteredForbalanceFile) - }, error = function(err) { - ParallelLogger::logError(err) - }) + cohortMethodData <- loadCohortMethodData(params$cohortMethodDataFile) + covariateFilter <- params$computeCovariateBalanceArgs$covariateFilter + covariates <- filterCovariates(cohortMethodData$covariates, cohortMethodData$covariateRef, covariateFilter) + filteredCohortMethodData <- Andromeda::andromeda( + cohorts = cohortMethodData$cohorts, + outcomes = cohortMethodData$outcomes, + covariates = covariates, + covariateRef = cohortMethodData$covariateRef, + analysisRef = cohortMethodData$analysisRef + ) + attr(filteredCohortMethodData, "metaData") <- attr(cohortMethodData, "metaData") + class(filteredCohortMethodData) <- "CohortMethodData" + attr(class(filteredCohortMethodData), "package") <- "CohortMethod" + saveCohortMethodData(filteredCohortMethodData, params$filteredForbalanceFile) return(NULL) } doComputeBalance <- function(params) { - tryCatch({ - if (params$filteredForbalanceFile == "") { - cohortMethodData <- getCohortMethodData(params$cohortMethodDataFile) - } else { - cohortMethodData <- getCohortMethodData(params$filteredForbalanceFile) - } - strataPop <- readRDS(params$strataFile) + if (params$filteredForbalanceFile == "") { + cohortMethodData <- getCohortMethodData(params$cohortMethodDataFile) + } else { + cohortMethodData <- getCohortMethodData(params$filteredForbalanceFile) + } + strataPop <- readRDS(params$strataFile) - args <- params$computeCovariateBalanceArgs - args$population <- strataPop - args$cohortMethodData <- cohortMethodData - ParallelLogger::logDebug(sprintf("Computing balance balance using %s and %s", - params$cohortMethodDataFile, - params$strataFile)) - balance <- do.call("computeCovariateBalance", args) - saveRDS(balance, params$balanceFile) - }, - error = function(err) { - ParallelLogger::logError(err) - }) + args <- params$computeCovariateBalanceArgs + args$population <- strataPop + args$cohortMethodData <- cohortMethodData + ParallelLogger::logDebug(sprintf("Computing balance balance using %s and %s", + params$cohortMethodDataFile, + params$strataFile)) + balance <- do.call("computeCovariateBalance", args) + saveRDS(balance, params$balanceFile) return(NULL) } @@ -1135,12 +1076,10 @@ createReferenceTable <- function(cmAnalysisList, analysisFolder = sprintf("Analysis_%d", analysis$analysisId) ) } - analyses <- bind_rows(lapply(cmAnalysisList, convertAnalysisToTable)) foldersToCreate <- file.path(outputFolder, analyses$analysisFolder) foldersToCreate <- foldersToCreate[!dir.exists(foldersToCreate)] sapply(foldersToCreate, dir.create) - convertOutcomeToTable <- function(outcome) { tibble( outcomeId = outcome$outcomeId, @@ -1149,7 +1088,6 @@ createReferenceTable <- function(cmAnalysisList, ) %>% return() } - convertTcosToTable <- function(tcos) { lapply(tcos$outcomes, convertOutcomeToTable) %>% bind_rows() %>% @@ -1161,7 +1099,6 @@ createReferenceTable <- function(cmAnalysisList, ) %>% return() } - tcos <- bind_rows(lapply(targetComparatorOutcomesList, convertTcosToTable)) referenceTable <- analyses %>% cross_join(tcos) @@ -1176,7 +1113,6 @@ createReferenceTable <- function(cmAnalysisList, } }, list, object))) } - loadingArgsList <- unique(ParallelLogger::selectFromList( cmAnalysisList, "getDbCohortMethodDataArgs" @@ -1271,23 +1207,21 @@ createReferenceTable <- function(cmAnalysisList, studyPopArgsList <- lapply(studyPopArgsList, function(x) { return(x[[1]]) }) - equivalent <- function(studyPopArgs1, studyPopArgs2) { if (identical(studyPopArgs1, studyPopArgs2)) { return(TRUE) } if (studyPopArgs1$firstExposureOnly != studyPopArgs2$firstExposureOnly || - studyPopArgs1$restrictToCommonPeriod != studyPopArgs2$restrictToCommonPeriod || - studyPopArgs1$washoutPeriod != studyPopArgs2$washoutPeriod || - studyPopArgs1$removeDuplicateSubjects != studyPopArgs2$removeDuplicateSubjects || - studyPopArgs1$minDaysAtRisk != studyPopArgs2$minDaysAtRisk || - studyPopArgs1$minDaysAtRisk != 0) { + studyPopArgs1$restrictToCommonPeriod != studyPopArgs2$restrictToCommonPeriod || + studyPopArgs1$washoutPeriod != studyPopArgs2$washoutPeriod || + studyPopArgs1$removeDuplicateSubjects != studyPopArgs2$removeDuplicateSubjects || + studyPopArgs1$minDaysAtRisk != studyPopArgs2$minDaysAtRisk || + studyPopArgs1$minDaysAtRisk != 0) { return(FALSE) } else { return(TRUE) } } - findFirstEquivalent <- function(studyPopArgsList, studyPopArgs) { for (i in 1:length(studyPopArgsList)) { if (equivalent(studyPopArgsList[[i]], studyPopArgs)) { @@ -1295,7 +1229,6 @@ createReferenceTable <- function(cmAnalysisList, } } } - studyPopArgsEquivalentId <- sapply( cmAnalysisList, function(cmAnalysis, studyPopArgsList) { @@ -1341,11 +1274,9 @@ createReferenceTable <- function(cmAnalysisList, "stratifyByPsArgs", "stratifyByPsAndCovariatesArgs" ) - normStrataArgs <- function(strataArgs) { return(strataArgs[args][!is.na(names(strataArgs[args]))]) } - strataArgsList <- unique(ParallelLogger::selectFromList(cmAnalysisList, args)) strataArgsList <- strataArgsList[sapply( strataArgsList, @@ -1391,11 +1322,9 @@ createReferenceTable <- function(cmAnalysisList, if (refitPsForEveryOutcome) { referenceTable$sharedBalanceFile <- "" } else { - normBalanceArgs <- function(balanceArgs) { return(balanceArgs[args][!is.na(names(balanceArgs[args]))]) } - balanceArgsList <- unique(ParallelLogger::selectFromList(cmAnalysisList, args)) balanceArgsList <- balanceArgsList[sapply( balanceArgsList, @@ -1432,11 +1361,9 @@ createReferenceTable <- function(cmAnalysisList, # Add covariate balance files (per target-comparator-analysis-outcome) args <- "computeCovariateBalanceArgs" - normBalanceArgs <- function(balanceArgs) { return(balanceArgs[args][!is.na(names(balanceArgs[args]))]) } - balanceArgsList <- unique(ParallelLogger::selectFromList(cmAnalysisList, args)) balanceArgsList <- balanceArgsList[sapply( balanceArgsList, @@ -1461,9 +1388,7 @@ createReferenceTable <- function(cmAnalysisList, balanceIdsRequiringFiltering <- sapply( 1:length(balanceArgsList), - function(i) ifelse(is.null(balanceArgsList[[i]]$ - computeCovariateBalanceArgs$ - covariateFilter), NA, i) + function(i) ifelse(is.null(balanceArgsList[[i]]$computeCovariateBalanceArgs$covariateFilter), NA, i) ) } idx <- referenceTable$balanceId %in% balanceIdsRequiringFiltering @@ -1495,43 +1420,25 @@ createReferenceTable <- function(cmAnalysisList, cmAnalysisList, c("getDbCohortMethodDataArgs", "fitOutcomeModelArgs") )) - needsFilter <- function(loadingFittingArgs) { if (!"fitOutcomeModelArgs" %in% names(loadingFittingArgs)) { return(NULL) } - keep <- (loadingFittingArgs$ - fitOutcomeModelArgs$ - useCovariates & (length(loadingFittingArgs$ - fitOutcomeModelArgs$ - excludeCovariateIds) != 0 | - length(loadingFittingArgs$ - fitOutcomeModelArgs$ - includeCovariateIds) != 0)) | - length(loadingFittingArgs$ - fitOutcomeModelArgs$ - interactionCovariateIds) != 0 + keep <- (loadingFittingArgs$fitOutcomeModelArgs$useCovariates & (length(loadingFittingArgs$fitOutcomeModelArgs$excludeCovariateIds) != 0 | + length(loadingFittingArgs$fitOutcomeModelArgs$includeCovariateIds) != 0)) | + length(loadingFittingArgs$fitOutcomeModelArgs$interactionCovariateIds) != 0 if (keep) { loadingFittingArgs$relevantFields <- list( - useCovariates = loadingFittingArgs$ - fitOutcomeModelArgs$ - useCovariates, - excludeCovariateIds = loadingFittingArgs$ - fitOutcomeModelArgs$ - excludeCovariateIds, - includeCovariateIds = loadingFittingArgs$ - fitOutcomeModelArgs$ - includeCovariateIds, - interactionCovariateIds = loadingFittingArgs$ - fitOutcomeModelArgs$ - interactionCovariateIds + useCovariates = loadingFittingArgs$fitOutcomeModelArgs$useCovariates, + excludeCovariateIds = loadingFittingArgs$fitOutcomeModelArgs$excludeCovariateIds, + includeCovariateIds = loadingFittingArgs$fitOutcomeModelArgs$includeCovariateIds, + interactionCovariateIds = loadingFittingArgs$fitOutcomeModelArgs$interactionCovariateIds ) return(loadingFittingArgs) } else { return(NULL) } } - loadingFittingArgsList <- plyr::compact(lapply(loadingFittingArgsList, needsFilter)) referenceTable$prefilteredCovariatesFile <- "" if (length(loadingFittingArgsList) != 0) { @@ -1620,7 +1527,7 @@ createReferenceTable <- function(cmAnalysisList, referenceTable$targetId, referenceTable$comparatorId, referenceTable$outcomeId - ),] + ), ] # Remove non-essential files for outcomes not of interest: idx <- !referenceTable$outcomeOfInterest @@ -1802,8 +1709,7 @@ getInteractionResultsSummary <- function(outputFolder) { summarizeResults <- function(referenceTable, outputFolder, mainFileName, interactionsFileName, calibrationThreads = 1) { subset <- referenceTable %>% - filter(.data$outcomeModelFile != "", file.exists(file.path(outputFolder, .data$outcomeModelFile))) - + filter(.data$outcomeModelFile != "") mainResults <- vector("list", nrow(subset)) interActionResults <- list() pb <- txtProgressBar(style = 3) @@ -1816,13 +1722,9 @@ summarizeResults <- function(referenceTable, outputFolder, mainFileName, interac oneSidedP <- NA } else { p <- EmpiricalCalibration::computeTraditionalP(logRr = coefficient, - seLogRr = outcomeModel$ - outcomeModelTreatmentEstimate$ - seLogRr) + seLogRr = outcomeModel$outcomeModelTreatmentEstimate$seLogRr) oneSidedP <- EmpiricalCalibration::computeTraditionalP(logRr = coefficient, - seLogRr = outcomeModel$ - outcomeModelTreatmentEstimate$ - seLogRr, + seLogRr = outcomeModel$outcomeModelTreatmentEstimate$seLogRr, twoSided = FALSE, upper = TRUE) } @@ -1838,7 +1740,7 @@ summarizeResults <- function(referenceTable, outputFolder, mainFileName, interac # Assuming we're interest in the attrition of the target population only. Could change to depend on type # of adjustment (e.g IPTW ATE should use target + comparator): attritionFraction <- 1 - (attrition$targetExposures[nrow(attrition)] / attrition$targetExposures[1]) - result <- subset[i,] %>% + result <- subset[i, ] %>% select( "analysisId", "targetId", @@ -1867,9 +1769,7 @@ summarizeResults <- function(referenceTable, outputFolder, mainFileName, interac p = !!p, oneSidedP = !!oneSidedP, logRr = if (is.null(coefficient)) NA else coefficient, - seLogRr = if (is.null(coefficient)) NA else outcomeModel$ - outcomeModelTreatmentEstimate$ - seLogRr, + seLogRr = if (is.null(coefficient)) NA else outcomeModel$outcomeModelTreatmentEstimate$seLogRr, llr = if (is.null(coefficient)) NA else outcomeModel$outcomeModelTreatmentEstimate$llr, mdrr = !!mdrr, targetEstimator = outcomeModel$targetEstimator @@ -1879,33 +1779,17 @@ summarizeResults <- function(referenceTable, outputFolder, mainFileName, interac if (!is.null(outcomeModel$outcomeModelInteractionEstimates)) { for (j in seq_len(nrow(outcomeModel$outcomeModelInteractionEstimates))) { - z <- outcomeModel$ - outcomeModelInteractionEstimates$ - logRr[j] / outcomeModel$ - outcomeModelInteractionEstimates$ - seLogRr[j] + z <- outcomeModel$outcomeModelInteractionEstimates$logRr[j] / outcomeModel$outcomeModelInteractionEstimates$seLogRr[j] p <- 2 * pmin(pnorm(z), 1 - pnorm(z)) interActionResults[[length(interActionResults) + 1]] <- result %>% mutate( - interactionCovariateId = outcomeModel$ - outcomeModelInteractionEstimates$ - covariateId[j], - rr = exp(outcomeModel$ - outcomeModelInteractionEstimates$ - logRr[j]), - ci95Lb = exp(outcomeModel$ - outcomeModelInteractionEstimates$ - logLb95[j]), - ci95Ub = exp(outcomeModel$ - outcomeModelInteractionEstimates$ - logUb95[j]), + interactionCovariateId = outcomeModel$outcomeModelInteractionEstimates$covariateId[j], + rr = exp(outcomeModel$outcomeModelInteractionEstimates$logRr[j]), + ci95Lb = exp(outcomeModel$outcomeModelInteractionEstimates$logLb95[j]), + ci95Ub = exp(outcomeModel$outcomeModelInteractionEstimates$logUb95[j]), p = !!p, - logRr = outcomeModel$ - outcomeModelInteractionEstimates$ - logRr[j], - seLogRr = outcomeModel$ - outcomeModelInteractionEstimates$ - seLogRr[j], + logRr = outcomeModel$outcomeModelInteractionEstimates$logRr[j], + seLogRr = outcomeModel$outcomeModelInteractionEstimates$seLogRr[j], targetEstimator = outcomeModel$targetEstimator ) } @@ -1949,10 +1833,8 @@ calibrateEstimates <- function(results, calibrationThreads, interactions = FALSE # group = groups[[1]] calibrateGroup <- function(group) { - ncs <- group[group$trueEffectSize == 1 & !is.na(group$seLogRr),] - pcs <- group[!is.na(group$trueEffectSize) & - group$trueEffectSize != 1 & - !is.na(group$seLogRr),] + ncs <- group[group$trueEffectSize == 1 & !is.na(group$seLogRr), ] + pcs <- group[!is.na(group$trueEffectSize) & group$trueEffectSize != 1 & !is.na(group$seLogRr), ] if (nrow(ncs) >= 5) { null <- EmpiricalCalibration::fitMcmcNull(logRr = ncs$logRr, seLogRr = ncs$seLogRr) ease <- EmpiricalCalibration::computeExpectedAbsoluteSystematicError(null) From 545c75570ca9867955a4f46a0dc6b4f495c3863d Mon Sep 17 00:00:00 2001 From: Jamie Gilbert Date: Mon, 8 Apr 2024 07:27:25 -0700 Subject: [PATCH 24/25] Removed changes from testing branch --- R/Analyses.R | 5 +++-- R/RunAnalyses.R | 14 +++++--------- 2 files changed, 8 insertions(+), 11 deletions(-) diff --git a/R/Analyses.R b/R/Analyses.R index c7be7b9e..afe3e8fe 100644 --- a/R/Analyses.R +++ b/R/Analyses.R @@ -218,11 +218,12 @@ createOutcome <- function(outcomeId, checkmate::assertInt(riskWindowEnd, null.ok = TRUE, add = errorMessages) checkmate::reportAssertions(collection = errorMessages) if (!is.null(startAnchor) && !grepl("start$|end$", startAnchor, ignore.case = TRUE)) { - stop("startAnchor should have value \'cohort start\' or \'cohort end\'") + stop("startAnchor should have value 'cohort start' or 'cohort end'") } if (!is.null(riskWindowEnd) && !grepl("start$|end$", endAnchor, ignore.case = TRUE)) { - stop("endAnchor should have value \'cohort start\' or \'cohort end\'") + stop("endAnchor should have value 'cohort start' or 'cohort end'") } + outcome <- list() for (name in names(formals(createOutcome))) { outcome[[name]] <- get(name) diff --git a/R/RunAnalyses.R b/R/RunAnalyses.R index e1ea1f12..00f97c0c 100644 --- a/R/RunAnalyses.R +++ b/R/RunAnalyses.R @@ -346,7 +346,6 @@ runCmAnalyses <- function(connectionDetails, ) return(task) } - objectsToCreate <- lapply(1:nrow(subset), createCmDataTask) cluster <- ParallelLogger::makeCluster(min(length(objectsToCreate), multiThreadingSettings$getDbCohortMethodDataThreads)) ParallelLogger::clusterRequire(cluster, "CohortMethod") @@ -416,14 +415,13 @@ runCmAnalyses <- function(connectionDetails, # Fit propensity models --------------------------------------- if (refitPsForEveryOutcome) { - subset <- referenceTable[!duplicated(referenceTable$psFile),] - subset <- subset[subset$psFile != "",] - subset <- subset[!file.exists(file.path(outputFolder, subset$psFile)),] + subset <- referenceTable[!duplicated(referenceTable$psFile), ] + subset <- subset[subset$psFile != "", ] + subset <- subset[!file.exists(file.path(outputFolder, subset$psFile)), ] if (nrow(subset) != 0) { message("*** Fitting propensity models ***") - createPsTask <- function(i) { - refRow <- subset[i,] + refRow <- subset[i, ] analysisRow <- ParallelLogger::matchInList( cmAnalysisList, list(analysisId = refRow$analysisId) @@ -474,7 +472,6 @@ runCmAnalyses <- function(connectionDetails, ) return(task) } - modelsToFit <- lapply(1:nrow(subset), createSharedPsTask) cluster <- ParallelLogger::makeCluster(min(length(modelsToFit), multiThreadingSettings$createPsThreads)) ParallelLogger::clusterRequire(cluster, "CohortMethod") @@ -528,9 +525,8 @@ runCmAnalyses <- function(connectionDetails, subset <- subset[!file.exists(file.path(outputFolder, subset$sharedBalanceFile)), ] if (nrow(subset) != 0) { message("*** Computing shared covariate balance ***") - createSharedBalanceTask <- function(i) { - refRow <- subset[i,] + refRow <- subset[i, ] analysisRow <- ParallelLogger::matchInList( cmAnalysisList, list(analysisId = refRow$analysisId) From 9675062ba4fc9d1776e59d60daaac77e069aeae4 Mon Sep 17 00:00:00 2001 From: Jamie Gilbert Date: Mon, 8 Apr 2024 07:29:12 -0700 Subject: [PATCH 25/25] Whitespace --- R/RunAnalyses.R | 1 - 1 file changed, 1 deletion(-) diff --git a/R/RunAnalyses.R b/R/RunAnalyses.R index 00f97c0c..1899a3a8 100644 --- a/R/RunAnalyses.R +++ b/R/RunAnalyses.R @@ -571,7 +571,6 @@ runCmAnalyses <- function(connectionDetails, ) return(task) } - tasks <- lapply(1:nrow(subset), createFilterForCovariateBalanceTask) cluster <- ParallelLogger::makeCluster(min(length(tasks), multiThreadingSettings$prefilterCovariatesThreads)) ParallelLogger::clusterRequire(cluster, "CohortMethod")