# Initialize Truveta SDK

In [1]:
# These are some commonly used R Packages.  
# The arrow package makes loading data with spark faster. 
library(readr, warn.conflicts = FALSE)
library(arrow, warn.conflicts = FALSE)
library(magrittr, warn.conflicts = FALSE)
library(stringr, warn.conflicts = FALSE)
library(dplyr, warn.conflicts = FALSE)
library(rlang, warn.conflicts = FALSE)
library(data.table, warn.conflicts = FALSE)
library(lubridate, warn.conflicts = FALSE)
library(tidyr, warn.conflicts = FALSE)
library(truveta.notebook.study)
library(sparklyr)
library(ggplot2)
library(reshape2)

In [2]:
print("load snapshot")
con <- create_connection()
study <- get_study(con)
#print(study)
population_id = "ps-3ormi7swwukuhhu6kcqrqw4mue"
population <- get_population(con, study, title = "PancreaticMainPop")
snapshot <- get_latest_snapshot(con, population)
snapshot
# get list of tables from the snapshot
tables <- get_tables(con, snapshot)
tables

In [3]:
display_plot<- \(x, dpi = "screen", ...) {
    file <- tempfile()
    # dump as PNG
    ggplot2::ggsave(file, device = "png", plot = x, dpi = dpi, ...)
    # load as base64
    uri <- base64enc::dataURI(file = file, mime = "image/png")
    unlink(file)
    # display as HTML
    displayHTML(paste0('<img src="', uri, '">'))
}

In [4]:
#Get your working directory
# use fs = true when reading and writing files locally

output_path_local <- get_output_path(con, study, fs = TRUE)
output_path_local

In [9]:
t1 <- paste(output_path_local, "/lab_test_result.csv.r", sep = "")

# use read.csv to read file into a R dataframe
lab_test_result <- read.csv(t1)
#nrow(AdvChemoMed_tb)
##display_df(cases_control,10)

In [6]:
# Convert R DataFrame to Spark DataFrame
temp_data <- as.DataFrame(lab_data)
createOrReplaceTempView(temp_data, "lab_data")

In [19]:
# Build path
file_to_write <- paste(output_path_local, "/table_lab.csv.r", sep = "")

# use write.csv to write your file
write.csv(table_lab, file_to_write, row.names = FALSE)

In [11]:
LabStatusNullCodes = codeset(con, snapshot,
  "Truveta",
  "self",
  "1065712",
  "1065714"
)
create_view(LabStatusNullCodes, "AnnulLabStatusNullCodes")

In [19]:
sql <-  "

-- select all necessary required variables and keep only values with valid status, non null date and time

WITH temp as (
    SELECT PersonId, EncounterId, EffectiveDateTime, NormalizedValueUOMConceptId, NormalizedValueNumeric, NormalizedValueConceptId,
    explode_outer(CodeConceptId) as Code
    FROM SearchResult_def5LabCodes 
        WHERE (StatusConceptId IS NOT NULL AND StatusConceptId NOT IN (SELECT ConceptId FROM AnnulLabStatusNullCodes)) AND
        NormalizedValueNumeric IS NOT NULL AND
        EffectiveDateTime IS NOT NULL AND NormalizedValueUOMConceptId IS NOT NULL
),

-- Join with Concept to get the name of ConceptId and Unit

temp2 as (
  SELECT *, Code.Id as CodeConceptId
   FROM temp
),

temp3 as (
    SELECT p.*, c.ConceptName as LabTestName
    from temp2 p join Concept c on p.CodeConceptId = c.ConceptId
),

-- Join with Concept to get the name of  Unit

temp4 as (
  SELECT t.*, c.ConceptName as UnitOfMeasurement
  from temp3 t join Concept c on t.NormalizedValueUOMConceptId = c.ConceptId
  WHERE t.NormalizedValueNumeric >= 0 AND year(t.EffectiveDateTime) >= 2018
),

-- Since only lab values within baseline are required add the time frame

temp5 as (
    SELECT t.* from temp4 t INNER JOIN data p
    ON t.PersonId = p.PersonId
    WHERE t.EffectiveDateTime BETWEEN DATEADD(day, -360, p.PanDt) AND DATEADD(day,30,p.PanDt)
),

-- Consider only distinct values from all columns and create a update unitofmeasurement column

temp6 as (
  SELECT DISTINCT 
       PersonId, LabTestName, CodeConceptId, EncounterId, EffectiveDateTime, NormalizedValueNumeric, NormalizedValueUOMConceptId,
  CASE 
   WHEN UnitOfMeasurement IN (
     'No Information', 
     'Field has not been mapped', 
     'Invalid', 
     'Field is not present in source'
   ) THEN 'Unknown'
   ELSE UnitOfMeasurement
 END AS UnitOfMeasurement
 FROM temp5
)

SELECT * FROM temp6 WHERE UnitOfMeasurement != 'Unknown'
"
lab_data <- load_sql_table(con,snapshot,query = sql,view = "lab_data", output_mode = "sparklyr") %>% collect()

In [21]:
display_df(lab_data,con=con)

In [7]:
#Create multiple view for different lab test

DirectBilirubin =  codeset(con, snapshot,"LOINC",'selfAndDescendants', "35672-5", "1968-7", "34543-9","14629-0", "43820-0","56905-3","14152-3", "48623-3","56906-1","25564-6","35191-6")

TotalBilirubin = codeset(con, snapshot,"LOINC",'selfAndDescendants', "1975-2", "42719-5", "59827-6","59828-4")

#AlkalinePhosphatase
ALP = codeset(con, snapshot,"LOINC",'selfAndDescendants',"6768-6","6769-4", "1783-0","77141-0")

#Asparate Aminotransferase
AST = codeset(con, snapshot,"LOINC",'selfAndDescendants', "1920-8","30239-8","88112-8")

#Alanine Amino transferase LabCodeSet
ALT = codeset(con, snapshot, "LOINC",'selfAndDescendants',"1742-6","1743-4", "1744-2", "76625-3","77144-4")

LiverFunctionPanel = codeset(con, snapshot,"LOINC",'selfAndDescendants',"24325-3","24324-6")

Albumin = codeset(con, snapshot, "LOINC", 'selfAndDescendants',"1751-7", "2862-1","54347-0", "61151-7", "61152-5", "62234-0", "62235-7", "77148-5")

TotalProtein = codeset(con, snapshot,"LOINC",'selfAndDescendants',"2885-2", "2884-5")

# Pancreas (exocrine)
SerumLipase =  codeset(con, snapshot,"LOINC",'selfAndDescendants', "3040-3","2572-6", "72272-8", "100743-4")

SerumPanAmylase =  codeset(con, snapshot,"LOINC",'selfAndDescendants',"100898-6","24331-1","49541-6", "95126-9","2569-2", "1805-1","1809-3", "72272-8","72571-3", "25312-0")

ProthrombinT = codeset(con, snapshot,"LOINC",'selfAndDescendants',"5902-2") 

In [38]:
lab_flag <- function(codes = "codes", lab_name = "name") {

  create_view(codes, "concept_codes")

  sql <- "
    WITH filtered_labs AS (
      SELECT 
        l.PersonId,
        l.NormalizedValueNumeric,
        l.UnitOfMeasurement,
        l.EffectiveDateTime,
        '%s' as ConceptName
      FROM 
        lab_data l
      INNER JOIN 
        concept_codes c ON l.CodeConceptId = c.ConceptId
    ),

    converted_labs AS (
      SELECT 
        PersonId,
        EffectiveDateTime,

        -- Value conversion based on lab type
        CASE 
          -- For Bilirubin: mg/dL is standard
          WHEN ConceptName IN ('DirectBilirubin', 'TotalBilirubin') THEN
            CASE 
              WHEN UnitOfMeasurement = 'milligram per deciliter' THEN NormalizedValueNumeric
              WHEN UnitOfMeasurement = 'gram per liter' THEN NormalizedValueNumeric * 100   -- g/L to mg/dL
              WHEN UnitOfMeasurement = 'percent' THEN NormalizedValueNumeric / 100          -- to proportion, adjust logic if needed
              ELSE NULL
            END

          -- For AST, ALT, ALP: IU/L is standard
          WHEN ConceptName IN ('AST','ALT','ALP') THEN
            CASE 
              WHEN UnitOfMeasurement IN ('unit per liter', 'international unit per liter', 'enzyme unit per liter') THEN NormalizedValueNumeric
              WHEN UnitOfMeasurement = 'enzyme unit per deciliter' THEN NormalizedValueNumeric * 10
              WHEN UnitOfMeasurement = 'milligram per deciliter' THEN NULL  -- not convertible
              ELSE NULL
            END

          -- For Albumin/Protein: g/dL is standard
          WHEN ConceptName IN ('Albumin', 'TotalProtein') THEN
            CASE 
              WHEN UnitOfMeasurement = 'gram per deciliter' THEN NormalizedValueNumeric
              WHEN UnitOfMeasurement = 'gram per liter' THEN NormalizedValueNumeric / 100
              WHEN UnitOfMeasurement = 'milligram per deciliter' THEN NormalizedValueNumeric / 1000
              ELSE NULL
            END

          -- For Amylase/Lipase: standard is U/L
          WHEN ConceptName IN ('SerumPanAmylase', 'SerumLipase') THEN
            CASE 
              WHEN UnitOfMeasurement IN ('unit per liter', 'international unit per liter', 'enzyme unit per liter') THEN NormalizedValueNumeric
              WHEN UnitOfMeasurement = 'enzyme unit per deciliter' THEN NormalizedValueNumeric * 10
              ELSE NULL
            END

          WHEN ConceptName IN ('ProthrombinT') THEN
            CASE 
              WHEN UnitOfMeasurement IN ('second') THEN NormalizedValueNumeric
              WHEN UnitOfMeasurement = 'minute' THEN NormalizedValueNumeric * 60
              ELSE NULL
            END

          ELSE NULL
        END AS StandardValue,

        -- Standardized Unit Label
        CASE 
          WHEN ConceptName IN ('DirectBilirubin', 'TotalBilirubin') AND UnitOfMeasurement IN ('milligram per deciliter', 'gram per liter', 'percent') THEN 'mg/dL'
          WHEN ConceptName IN ('AST','ALT','ALP') AND UnitOfMeasurement IN ('unit per liter', 'international unit per liter', 'enzyme unit per liter', 'enzyme unit per deciliter') THEN 'IU/L'
          WHEN ConceptName IN ('Albumin', 'TotalProtein') AND UnitOfMeasurement IN ('gram per deciliter', 'gram per liter', 'milligram per deciliter') THEN 'g/dL'
          WHEN ConceptName IN ('SerumPanAmylase', 'SerumLipase') AND UnitOfMeasurement IN ('unit per liter', 'international unit per liter', 'enzyme unit per liter', 'enzyme unit per deciliter') THEN 'U/L'
          WHEN ConceptName IN ('ProthrombinT') AND UnitOfMeasurement IN ('second','minute') THEN 's'
          ELSE NULL
        END AS Unit

      FROM 
        filtered_labs
    ),

    latest_lab AS (
     SELECT *, 
       ROW_NUMBER() OVER (PARTITION BY PersonId ORDER BY EffectiveDateTime DESC) AS rn
      FROM converted_labs
        WHERE StandardValue IS NOT NULL AND Unit IS NOT NULL
        -- if Unit is NULL remove record
    )


    SELECT 
      PersonId, 
      ROUND(StandardValue, 3) AS %s,
      Unit AS %sUnit
    FROM latest_lab
    WHERE rn=1
  "
  
  sql1 <-  sprintf(sql,lab_name, lab_name, lab_name)

  tb <- load_sql_table(con, snapshot, query = sql1,output_mode = "sparklyr") %>% collect()
  return(tb)
}


In [42]:
# Call each function

DirectBilirubin_tb <- lab_flag(codes = DirectBilirubin, lab_name = "DirectBilirubin") 

TotalBilirubin_tb <- lab_flag(codes = TotalBilirubin, lab_name = "TotalBilirubin") 

ALP_tb <- lab_flag(codes = ALP, lab_name = "ALP") 

AST_tb <- lab_flag(codes = AST, lab_name = "AST") 

ALT_tb <- lab_flag(codes = ALT, lab_name = "ALT") 

#LiverFunctionPanel_tb <- lab_flag(codes = LiverFunctionPanel, lab_name = "LiverFunctionPanel") 

Albumin_tb <- lab_flag(codes = defAlbumin, lab_name = "Albumin") 

TotalProtein_tb <- lab_flag(codes = TotalProtein, lab_name = "TotalProtein") 

SerumLipase_tb <- lab_flag(codes = SerumLipase, lab_name = "SerumLipase") 

SerumPanAmylase_tb <- lab_flag(codes = SerumPanAmylase, lab_name = "SerumPanAmylase") 

ProthrombinT_tb <- lab_flag(codes = ProthrombinT, lab_name = "ProthrombinT") 

In [35]:
# Start with base cohort
lab_test_result <- cases_control_allvar %>%
  select(PersonId, AVAL, indicator) %>%
  # Join each lab table by PersonId
  left_join(DirectBilirubin_tb, by = "PersonId") %>%
  left_join(TotalBilirubin_tb, by = "PersonId") %>%
  left_join(ALP_tb, by = "PersonId") %>%
  left_join(AST_tb, by = "PersonId") %>%
  left_join(ALT_tb, by = "PersonId") %>%
  left_join(Albumin_tb, by = "PersonId") %>%
  left_join(TotalProtein_tb, by = "PersonId") %>%
  left_join(SerumLipase_tb, by = "PersonId") %>%
  left_join(SerumPanAmylase_tb, by = "PersonId") %>%
  left_join(ProthrombinT_tb, by = "PersonId")

  #   left_join(LiverFunctionPanel_tb, by = "PersonId") %>%


## Trial

In [39]:
SerumPanAmylase_tb <- lab_flag(codes = SerumPanAmylase, lab_name = "SerumPanAmylase") 

In [41]:
summary(SerumPanAmylase_tb)

In [43]:
display_df(lab_test_result,10)

In [44]:
library(dplyr)
library(tidyr)
library(purrr)

# Extract lab column names (i.e., remove PersonId, AVAL, indicator, and any Unit columns)
lab_cols <- lab_test_result %>%
  select(-PersonId, -AVAL, -indicator) %>%
  select(!ends_with("Unit")) %>%
  colnames()

# Create summary table
lab_summary <- map_dfr(lab_cols, function(lab) {
  
  df <- lab_test_result %>%
    select(indicator, value = all_of(lab)) %>%
    filter(!is.na(value))  # keep only non-null values

  tibble(
    Lab = lab,
    Count_indicator_1 = sum(df$indicator == 1),
    Count_indicator_0 = sum(df$indicator == 0),
    Total_non_null = nrow(df),
    Max = max(df$value, na.rm = TRUE),
    Min = min(df$value, na.rm = TRUE),
    Median = median(df$value, na.rm = TRUE),
    Mean = mean(df$value, na.rm = TRUE)
  )
})

In [48]:
display_df(lab_summary,15)

In [39]:
library(ggplot2)
library(tidyr)
library(dplyr)

# Reshape to long format
lab_long <- lab_test_result %>%
  pivot_longer(cols = lab_cols, names_to = "LabTest", values_to = "Value")

lab_plot <- ggplot(lab_long, aes(x = Value, fill = factor(indicator))) +
  geom_density(alpha = 0.5, linewidth = 0.8) +
  facet_wrap(~ LabTest, scales = "free", ncol = 3) +
  scale_fill_manual(
    values = c("0" = "steelblue", "1" = "tomato"),
    labels = c("Control", "Case")
  ) +
  labs(
    title = "Density Plot of Lab Tests by Group",
    x = "Lab Value",
    y = "Density",
    fill = "Group"
  ) +
  theme_minimal(base_size = 12) +
  theme(
    strip.text = element_text(face = "bold", size = 10),
    legend.position = "top"
  )

display_plot(plot)

In [None]:
#Save plot
library(ggplot2)

# get file system reference
artifacts_path_local <- get_artifacts_path(con, study, fs = TRUE)
artifacts_images_dir = file.path(artifacts_path_local, "study_images")

# create directory if not exists
if (!dir.exists(artifacts_images_dir)) {
    message("Creating directory: ", artifacts_images_dir)
    dir.create(artifacts_images_dir)
}

# Assign image path
image_path = file.path(artifacts_images_dir, "my_pyplot_r.png")

ggsave(image_path, plot=lab_plot)

In [13]:
library(dplyr)
library(purrr)

# Get lab test columns, excluding PersonId, AVAL, indicator, and Unit columns
lab_cols <- lab_test_result %>%
  select(-PersonId, -AVAL, -indicator) %>%
  select(!ends_with("Unit")) %>%
  colnames()

# Create summary table
lab_summary <- map(lab_cols, function(lab) {
  
  df <- lab_test_result %>%
    select(indicator, value = all_of(lab)) %>%
    filter(!is.na(value))

  Cases <- df %>% filter(indicator == 1)
  Control <- df %>% filter(indicator == 0)

  # Wilcoxon test with Hodges-Lehmann Estimate
  wilcox_result <- tryCatch(
    wilcox.test(Cases$value, Control$value, conf.int = TRUE, exact = FALSE),
    error = function(e) NULL
  )

  p_value <- wilcox_result$p.value
  estimate <- round(wilcox_result$estimate,2)
  hl_lower <- round(wilcox_result$conf.int[1], 2)
  hl_upper <- round(wilcox_result$conf.int[2], 2)

  data.frame(
    Lab_Test = lab,
    Cases_Count = nrow(Cases),
    Control_Count = nrow(Control),
    Cases_Median_IQR = paste0(round(median(Cases$value, na.rm = TRUE), 2), " (", round(IQR(Cases$value, na.rm = TRUE), 2), ")"),
    Control_Median_IQR = paste0(round(median(Control$value, na.rm = TRUE), 2), " (", round(IQR(Control$value, na.rm = TRUE), 2), ")"),
    p_value = ifelse(is.na(p_value), "NA", 
                     ifelse(p_value < 0.0001, "<0.0001",
                            ifelse(p_value < 0.001, "<0.001",
                                   ifelse(p_value < 0.01, "<0.01",
                                          ifelse(p_value < 0.05, "<0.05", "NS"))))),
    Hodges_Lehmann_Estimate = estimate,
    HL_95CI = paste0("(", hl_lower, ", ", hl_upper, ")")
  )
})

table_lab <- bind_rows(lab_summary)


In [16]:
table_lab

In [None]:
# Explore Lab
sql1 <-  "
  SELECT *, 
  FROM SearchResult_def5LabCodes
  LIMIT 20
"
lb <- load_sql_table(con, snapshot, sql1, view_name='lb',output_mode = "sparklyr") 
display_df(lb)

In [None]:
sql <- "
SELECT CodeConceptId, NormalizedValueUOMConceptId, count(distinct NormalizedValueUOMConceptId) as CountUnit
FROM lab_data 
group by CodeConceptId, NormalizedValueUOMConceptId
LIMIT 20
"
labunit <- load_sql_table(con, snapshot, sql, view_name='labunit',output_mode = "sparklyr") 


In [None]:
sql <- "
WITH temp as (
    SELECT *, explode_outer(CodeConceptId) as Code
     FROM SearchResult_def5LabCodes 
),
temp2 as (
  SELECT PersonId, EncounterId, Code.Id as CodeConceptId,
  NormalizedValueUOMConceptId, NormalizedValueConceptId, NormalizedValueNumeric,StatusConceptId
   FROM temp
  )
 
SELECT p.CodeConceptId, c.ConceptName, count(*) as Records,
count(distinct p.EncounterId) as EnctCount, count(distinct p.PersonId) as Pts
FROM temp2 p join Concept c on p.CodeConceptId = c.ConceptId
group by p.CodeConceptId, c.ConceptName
order by Records desc 
LIMIT 10
"

lab <- load_sql_table(con, snapshot, sql, view_name='lab',output_mode = "sparklyr") 
display_df(lab)

In [36]:
# check the unit for each lab test
create_view(LiverFunctionPanel,"concept_codes")
sql <- "
SELECT DISTINCT l.NormalizedValueUOMConceptId, l.UnitOfMeasurement
FROM lab_data l
    INNER JOIN concept_codes c
        ON l.CodeConceptId = c.ConceptId
"
tb <- load_sql_table(con,snapshot, query = sql, output_mode = "sparklyr" )

In [53]:
nrow(DirectBilirubin_tb)

In [10]:
head(lab_test_result)

In [33]:
#Dummy subjid variables
comorbidity <- c("CKD", "T2DM", "Hepatitis", "Hypertension", "CLD", "Hyperlipidemia", "osa", "COPD", "Anxiety", "Ischemic_Heart_Disease", "Depression", "Obesity_codes","Cancer",
"Gastroesophageal_refluxdisease", "abdominal_pain","Dyspnea","Anemia","FHOMND")

#Exposure
Exposure1 <- c("RadiationFl1", "SurgeryFl1","AdvChemoMedFl1","RadiationFl2", "SurgeryFl2","AdvChemoMedFl2","exposure")
Exposure2 <- c("Nortriptyline","Venlafaxine", "Metronidazole", "tricyclic_antidepressant", "SNRI")

#Demographic and other
DemographicOth <- c("Age_Group", "AgeAtDiagnosis","BMI_imputed", "Ethnicity", "Race", "Sex", "MaritalStatus", "Region","OthMetFLBase", "PanLocation", "BMI_GROUP_impute")

#lab variables
lab <- c("DirectBilirubin","TotalBilirubin","ALP","Albumin","TotalBilirubin","AST","ALT","TotalProtein","ProthrombinT", "SerumLipase")

#Analysis variables
analysis_var <- c("indicator", "DTHFL","PanEncClass", "LivMetN", "DiffPanDthDays", "LivDtFl30","LivDtFl", "LivDtFlBase", "PARAM", "CNSR", "AVAL", "FollowUpDiagEnc")

temp <- cases_control_allvar_new %>% arrange(PersonId) %>% mutate(subject_id = sprintf("subj%04d", row_number())) %>% 
select(comorbidity,Exposure1,Exposure2,DemographicOth,analysis_var,subject_id)