# Initialize Truveta SDK

In [1]:
library(readr, warn.conflicts = FALSE)
library(arrow, warn.conflicts = FALSE)
library(magrittr, warn.conflicts = FALSE)
library(stringr, warn.conflicts = FALSE)
library(dplyr, warn.conflicts = FALSE)
library(rlang, warn.conflicts = FALSE)
library(data.table, warn.conflicts = FALSE)
library(lubridate, warn.conflicts = FALSE)
library(tidyr, warn.conflicts = FALSE)
library(truveta.notebook.study)

In [2]:
# Use only one statement below and comment out whichever you are not using.
# con <- create_connection(output_mode = "sparkr")
con <- create_connection(output_mode = "sparklyr")

In [3]:
study <- get_study(con)
# Use only one statement below and comment out whichever you are not using.
# population <- get_population(con, study, title='MCI')
population <- get_population(con, id='p-an3klyy7kz2u7hq3fngqv7v63i')

#population

In [4]:
# Get latest completed active snapshot.
snapshot <- get_latest_snapshot(con, population)
snapshot

In [None]:
# Show tables in the snapshot.
get_tables(con, snapshot)

## Valid Diagnosis

In [39]:
#' This function takes a condition data frame as input and derives it to a encounter condition data frame.
#' @param con the connection of the snapshot
#' @param snapshot the current snapshot to access the codeset_from_prose function
#' @param df the data frame to derive
#' @return the derived data frame
to_encounter_condition <- \(con, snapshot, df) {
   if (inherits(con, c("tbl_spark", "SparkDataFrame"))) {
     # assume parameter order is df, con, snapshot
     x <- list(con=snapshot, snapshot=df, df=con)
     con <- x$con
     snapshot <- x$snapshot
     df <- x$df
   }
   stopifnot(inherits(df, c("tbl_spark", "SparkDataFrame")))

   to_codes <- \(url, variable) {
      codes <- truveta.notebook.study::codeset_from_prose(con, snapshot, url=url, variable_name=variable, output_mode="sparkr") |>
        SparkR::select('ConceptId') |>
        SparkR::collect() |>
        dplyr::pull(ConceptId)
      paste0(codes, collapse=",")
   }

   annulled_codes_str <- to_codes("https://library.truveta.com/o/truveta/d/annulled-status-code-set", "codes")  
   category_codes_str <- to_codes("https://library.truveta.com/o/truveta/d/encounter-condition-set", "pastConditionCategoryCodes")

   # make it effective by filtering out annulled
   # remove ProblemList and MedicalHistory
   # require EncounterId
   filter_expr <- glue::glue("(
      VerificationStatusConceptId NOT IN ({annulled_codes_str}) AND
      ClinicalStatusConceptId NOT IN ({annulled_codes_str}) AND
      CategoryConceptId NOT IN ({category_codes_str}) AND
      EncounterId IS NOT NULL
   )")
   if (inherits(df, "tbl_spark")) {
      df <- df |> dplyr::filter(dplyr::sql(filter_expr))
      df_e <- truveta.notebook.study::load_table(con, snapshot, "Encounter", output_mode="sparklyr") |>
         dplyr::filter(dplyr::sql(glue::glue("StatusConceptId NOT IN ({annulled_codes_str}) AND StartDateTime IS NOT NULL"))) |>
         dplyr::select(EncounterId=Id, EncounterStartDateTime=StartDateTime, EncounterEndDateTime=EndDateTime,
         EnctClassConceptId=ClassConceptId, EnctTypeConceptId = TypeConceptId)
      df <- df |>
         dplyr::inner_join(df_e, by=dplyr::join_by(EncounterId))
   } else { # SparkR
      df <- df |> SparkR::filter(as.character(filter_expr))
      df_e <- truveta.notebook.study::load_table(con, snapshot, "Encounter", output_mode="sparkr") |>
         SparkR::filter(as.character(glue::glue("(StatusConceptId NOT IN ({annulled_codes_str}) AND StartDateTime IS NOT NULL)"))) |>
         SparkR::select(c("Id", "StartDateTime", "EndDateTime",'ClassConceptId', 'TypeConceptId')) |>
         SparkR::withColumnRenamed("Id", "EncounterId_y") |>
         SparkR::withColumnRenamed("StartDateTime", "EncounterStartDateTime") |>
                  SparkR::withColumnRenamed("ClassConceptId", "EnctClassConceptId") |>
         SparkR::withColumnRenamed("TypeConceptId", "EnctTypeConceptId") |>
         SparkR::withColumnRenamed("EndDateTime", "EncounterEndDateTime")
      df <- df |>
         SparkR::join(df_e, joinExpr=df$EncounterId == df_e$EncounterId_y, joinType="inner") |>
         SparkR::drop("EncounterId_y")
   }
   df
}

### condition of certain diagnosis

In [23]:
index_codes = codeset_from_prose(con,snapshot, variable_name = "mci_codes")
index_conditions <- load_filtered_table(con, snapshot, 'Condition',
codes = index_codes,
 select_columns = c('Id','PersonId','RecordedDateTime', 'CodeConceptId','EncounterId',"VerificationStatusConceptId","ClinicalStatusConceptId",
 'PrimaryDiagnosisConceptId','CategoryConceptId'),
    return_concept_id = TRUE, apply_distinct = TRUE,
 view_name='tbl_index_conditions', apply_annulled_filter=TRUE)

# display the index_conditions dataframe and check count
display_df(index_conditions,5)
sdf_nrow(index_conditions)


In [20]:
index_encounters <- to_encounter_condition(con, snapshot, index_conditions)
create_view(index_encounters, 'mci_diag')

Decode concepts here

In [None]:
id_cols = [ 'PrimaryDiagnosisConceptId','ClassConceptId', 'TypeConceptId','CategoryConceptId','CodeConceptId']
decoded_diag = snapshot.decode_dataframe_concept_ids(
    df = valid_cond_enct,
    concept_id_columns=id_cols
)

study.create_view(decoded_diag, view_name = 'index_diagnosis')

### Valid Condition Summary

In [None]:
sql = """
    WITH condition_tb AS (
        SELECT 
            cv.PersonId, 
            COUNT(DISTINCT cv.Id) AS CondCount, 
            COUNT(DISTINCT cv.EncounterId) AS EnctCount,
            MIN(cv.RecordedDateTime) AS FirstCondDateTime,
            MAX(cv.RecordedDateTime) AS LastCondDateTime

        FROM 
            index_diagnosis cv
        GROUP BY PersonId
)

SELECT *
FROM condition_tb
"""

idx_diagnosis = snapshot.load_sql_table(sql)

In [None]:
# saving as a parquet
path = file.path("xxxx.parquet")
save_artifacts_data(con,sutdy,idx_diagnosis,path)
# saving as a csv
path = file.path("xxxx.csv")
save_artifacts_data(con,study, idx_diagnosis,path, data_type="csv")

# saving as a csv.r 
path = file.path("xxxx.csv.r")
save_artifacts_data(con,study, idx_diagnosis,path, data_type="csv")


## condition without diagnosis restriction

In [27]:
sql <- "
WITH cond as (
SELECT 
    DISTINCT c.* 
    FROM Condition c JOIN ConditionCodeConceptMap cm 
    ON c.CodeConceptMapId = cm.Id
    WHERE (cm.SourceConceptId = 2703595 OR cm.SourceConceptId = 2703594)
    AND (OnsetDateTime IS NOT NULL OR RecordedDateTime IS NOT NULL) 
     AND (c.OnsetDateTime IS NOT NULL OR c.RecordedDateTime IS NOT NULL) 
      AND c.VerificationStatusConceptId NOT IN (1065198,1065195)
      AND c.ClinicalStatusConceptId NOT IN (1065184,1065178,1065183)
)

SELECT *
FROM cond
"

a <- load_sql_table(con,snapshot, query = sql, view_name = "all_cond")
head(a)

In [40]:
index_encounters <- to_encounter_condition(con, snapshot, a)
create_view(index_encounters, 'diagnosis_enct')

In [41]:
colnames(index_encounters)

In [None]:
display_df(index_encounters)

## Diagnosis Encounter QC tables

In [42]:
sql2 = "
WITH encounterF as (
    SELECT *,
    CASE 
    -- emergency
	  WHEN EnctClassConceptId in (1065217) then 'Emergency'
      -- inpatient acute and inpatient encounter
      WHEN EnctClassConceptId in (1065215,1065220, 1065225) then 'Inpatient' 
      --observation encounter
      WHEN EnctClassConceptId in (1065225) then 'LabImaging'
      --ambulatory, short stay, home health
      WHEN EnctClassConceptId in (1065216,1065227,1065230) then 'Outpatient'
      --home health and virtual
      WHEN EnctClassConceptId in (1065216) then 'Virtual'
      --unknown types 
      WHEN EnctClassConceptId in (1067555, 2703598, 1065218, 1067558,2649591,0,1067561) then 'Unknown'
      else 'Other'
    end as EncounterType
  FROM diagnosis_enct 
)

SELECT *
FROM encounterF
"
diag_enct = load_sql_table(con,snapshot,query = sql2, view_name="encounterF")

In [44]:
sql2 = "
WITH enct_cnt as (
    SELECT PersonId,EncounterId, EncounterStartDateTime as EncounterDT,
     EncounterType, ROW_NUMBER() OVER (PARTITION BY PersonId ORDER BY EncounterStartDateTime ASC) as row_num1,
    ROW_NUMBER() OVER (PARTITION BY PersonId ORDER BY EncounterStartDateTime DESC) as row_num2
    FROM encounterF
),

temp1 as (
    SELECT *, 
    CASE 
    WHEN row_num1 == 1 THEN 'First'
    WHEN row_num2 == 1 THEN 'Last'
    ELSE 'Not Used' END AS EnctDTFL,
    GREATEST(row_num1, row_num2) AS TotEnctCnt
    FROM enct_cnt
    WHERE row_num1 == 1 OR row_num2 == 1
)

SELECT *
FROM temp1
 "

enct_type = load_sql_table(con,snapshot, query = sql2,output_mode = "sparklyr") %>% collect() 

head(enct_type)

In [45]:
sql <- "
SELECT PersonId,EncounterType, count(distinct EncounterId) as Cnt
    FROM encounterF
    GROUP BY PersonId, EncounterType
 "

 enct_typecnt <- load_sql_table(con,snapshot, query = sql, output_mode = "sparklyr") %>% collect()

In [84]:
enct_typecnt %>% group_by(EncounterType) %>% summarise(pts = n_distinct(PersonId))

In [46]:
enct_pivoted <- pivot_wider(enct_typecnt, id_cols = "PersonId", names_from=EncounterType, values_from = Cnt)

In [48]:
combined <- enct_type %>% left_join(., enct_pivoted, by = "PersonId")

In [80]:
summary(combined)

In [49]:
# saving as a csv.r 
path = file.path("DIAG_ENCT_QC.csv.r")
save_artifacts_data(con,study, combined,path, data_type="csv")

In [50]:
sql_ex2 <- "
WITH med_dispense as 
(
    SELECT DISTINCT PersonId, DispenseDateTime
    FROM MedicationDispense
    WHERE DispenseDateTime IS NOT NULL AND
    StatusConceptId NOT IN (2989063, 2989065,2989060,2989064)

),


tb3 as
(
    SELECT PersonId, min(DispenseDateTime) as DSSTDT, max(DispenseDateTime) as DSENDT,
    -- changing distinct dispense Id to distinct dispense date time
     count(distinct DispenseDateTime) as TotNumDisRec
    FROM med_dispense
    GROUP BY PersonId
),

tb4 as (
   SELECT *,DATEDIFF(DSENDT,DSSTDT) as DISSDAYDIFF
   FROM  tb3 
)

SELECT *
FROM tb4
"
ex2 <- load_sql_table(con,snapshot,query = sql_ex2, output_mode = "sparklyr") %>% collect()
display_df(ex2)

In [51]:
# saving as a csv.r 
path = file.path("MED_DIS_QC.csv.r")
save_artifacts_data(con,study, ex2,path, data_type="csv")

In [52]:
sql_exqc <- "
--Remove error, canceled,declined or stopped dispense 
WITH med_base as 
(SELECT *
FROM MedicationRequest
WHERE StatusConceptId NOT IN (2989063, 2989065,2989060,2989064) AND
AuthoredOnDateTime IS NOT NULL

),

-- EXQC: subject level data
tb4 as
(
   SELECT PersonId, min(AuthoredOnDateTime) as RequestFirstDT, max(AuthoredOnDateTime) as RequestLastDT, 
   count(DISTINCT AuthoredOnDateTime ) as TotNumReqRec
   FROM med_base
   GROUP BY PersonId
)

SELECT *, DATEDIFF(RequestLastDT, RequestFirstDT) as ReqDiffDAYS
FROM tb4
"

exqc<- load_sql_table(con, snapshot, query = sql_exqc, output_mode = "sparklyr") %>% collect()

display_df(exqc)

In [53]:
nrow(exqc)

In [54]:
nrow(ex2)

In [None]:
demo = get_demographic(con,snapshot)
create_view(demo, "demo")

In [69]:
med_qc <- exqc %>% full_join(ex2, by = "PersonId")

In [70]:
# saving as a csv.r 
path = file.path("MED_QC.csv.r")
save_artifacts_data(con,study, med_qc,path, data_type="csv")

In [60]:
library(ggplot2)

In [71]:
a <- med_qc %>%
mutate(MEDFL = case_when(TotNumDisRec > 0 & TotNumReqRec > 0 ~'MR and MD',
is.na(TotNumDisRec) & TotNumReqRec > 0  ~'MR Only',
 is.na(TotNumReqRec) &TotNumDisRec > 0  ~ 'MD Only',
TRUE ~ "Unknown")) %>% group_by(MEDFL) %>% summarise(cnt = n())

display_df(a)

In [79]:
scatter_plot <- ggplot(med_qc, aes(x = TotNumDisRec, y = TotNumReqRec)) +
  geom_point() +
  labs(title = "Scatter Plot of MD and MR Records", x = "Dispense Records", y = "Request Records") +
  geom_smooth(methods = "lm", color = "blue") + xlim(c(0,2000)) + ylim(c(0,2100)) +
  geom_abline(slope = 1, intercept = 0, linetype = "dashed", color = "red")
# Display the plot
show_plot(scatter_plot)

In [74]:
display_df(med_qc %>% filter(TotNumReqRec > 2000))

In [78]:
summary(med_qc %>% mutate(req_dis_diff = TotNumReqRec - TotNumDisRec) %>% select(req_dis_diff))

In [None]:
# saving as a parquet
path = file.path("xxxx.parquet")
save_artifacts_data(con,sutdy,idx_diagnosis,path)
# saving as a csv
path = file.path("xxxx.csv")
save_artifacts_data(con,study, idx_diagnosis,path, data_type="csv")

# saving as a csv.r 
path = file.path("DIAG_ENCT_QC.csv.r")
save_artifacts_data(con,study, combined,path, data_type="csv")
