In [None]:
suppressPackageStartupMessages(library(tidyverse))
library(lubridate)
library(glue)
library(furrr)

In [None]:
no_cores <- availableCores() - 1
no_cores

## Cohort number
This is re-factored code. Enter the cohort number below

In [None]:
grouping_number = 3 # This notebook will need to be run 3 times, adjusting the grouping to 1, 2 or 3 (which includes 4)

follow_up_time = 72
follow_up_time_str = ""
if(follow_up_time != 72) {
 follow_up_time_str <- glue::glue("_{follow_up_time}")   
}

In [None]:
# Delete walk_files directory content
unlink('data/walk_files/*')

# Helper functions

In [None]:
date_diff_fn <- function(df, p_id, endtime, follow_up_time = 72) {
  # endtime = 111 consultation finish time (or start time in sensitivity analysis)
  # start = start of 999/GP call etc.

  df1 <- df %>%
    dplyr::filter(person_id == p_id) %>%
    mutate(
      # time1 - time2 or difftime(time1, time2)
      call_time_diff = difftime(start, endtime, units = "hours")
    ) #%>% #select(visit_111_call_time_diff)
    # Remove values with no datetimes recorded and 
    # only return cases which occured within follow_up_time of the call.
    
    #message('df1')
    #df1 %>% glimpse()
    
    df2 <- df1 %>% filter(!is.na(call_time_diff) & (call_time_diff >= 0 & call_time_diff <= follow_up_time))
    #message('df2')
    #df2 %>% glimpse()
    #df2 %>% glimpse()

    return (list(df2))
    
}


calc_min_callback <- function(df) {
  df %>%
    pull(call_time_diff) %>%
    min(na.rm = T)
}

manage_split <- function(df, iter, df2, filename, uniq_id_to_remove, follow_up_time = 72) { # default is end of 111 call
  #message(glue::glue("Processing {filename}"))
  var1 = glue("{filename}_visit_{follow_up_time}_df")
        
  #df %>% glimpse()
  min_date = min(df$start)
  max_date = max(df$end)
  #message(glue("Min date is {min_date} and max date is {max_date}"))
    
  df3 <- df2 %>%
    anti_join(uniq_id_to_remove, by="unique_id") %>% 
    filter(
        start >= min_date,
        end <= max_date + hours(follow_up_time)
    )
    
    df3 %>% glimpse()
    
  #message(glue::glue("df3 contains {nrow(df3)} incidents where min_date is {min_date} and max_date is {max_date + hours(72)}"))
  #message("----------------")
    
  #df %>% head(1) %>% glimpse()
  
  df1 <- df %>% 
   rowwise() %>%
    mutate(
      "{filename}_visit_{follow_up_time}_df" := date_diff_fn(df3, person_id, end), # change end to start 
      "{filename}_calls_in_{follow_up_time}_hrs" := nrow((!!sym(var1))[0])
    ) %>% 
    select(unique_id, person_id, starts_with(filename))
    
   saveRDS(df1, paste0('data/walk_files/', filename, '_', iter, '.rds'))
}

# Abandoned calls

In [None]:
ab_df <- readRDS(glue::glue("data/final_abandoned_call_nest_df{follow_up_time_str}.rds"))
ab_df %>% glimpse()

# Final nested dataframe
fina_nest_df includes abandoned calls where more than 1 person is associated with the call (about 2,753 out of 30,579, 9%)

In [None]:
final_nest_df <- readRDS(glue::glue("data/final_nest_df{follow_up_time_str}.rds"))

In [None]:
final_nest_df %>% glimpse()

In [None]:
final_nest_df %>% count(delete_flag) # 0 30579 1 2753 v2 0 30450 1 2882

In [None]:
final_nest_df1 <- final_nest_df %>%
    filter(made_contact == 1)

In [None]:
final_nest_df1 %>% count() # 9722 v2 10567

In [None]:
process_nest <- function(nest_df_name, df) {
    print(glue::glue("Processing {nest_df_name}"))
    df %>%
        select(!!nest_df_name) %>%
        unnest(!!nest_df_name) %>%
        distinct(unique_id) #%>% 
        #pull()
}

# Cohort data 

In [None]:
# Note use of final_nest_df and not final_nest_df1 since we are looking backwards
if(grouping_number == 1) {
    print('Grouping 1: Abandoned calls with previous 111 triage')
    group_df <- final_nest_df %>%
        filter(contacted_iuc_b4_ac == 1)
} else if(grouping_number == 2) {
    print('Grouping 2: Abandoned calls with no previous 111 triage')
    group_df <- final_nest_df %>%
        filter(contacted_iuc_b4_ac == 0)
} else {
     print('Grouping 3: All abandoned calls irrespective of previous 111 triage')   
    group_df <- final_nest_df 
}


In [None]:
group_df %>% count() # Grouping 1: 2027 Grouping 2: 31305 Grouping 3: 33332 v2 Grouping 1: 2823 Grouping 2: 30509 Grouping 3: 33332

In [None]:
group_unique_id_GP_df <- process_nest(glue::glue("gp_nest_visit_{follow_up_time}_df"), group_df)
saveRDS(group_unique_id_GP_df, glue::glue("data/grouping{grouping_number}_unique_id_GP_df.rds"))

In [None]:
group_unique_id_GP_df %>% count() # Cohort 1: 852 Cohort 2: 4260 Cohort 3: 5112 v2 Grouping 1: 1123  GRouping 2: 3989 Grouping 3 5112

In [None]:
group_unique_id_IUC_df <- process_nest(glue::glue("iuc_nest_visit_{follow_up_time}_df"), group_df)
saveRDS(group_unique_id_IUC_df, glue::glue("data/grouping{grouping_number}_unique_id_IUC_df.rds"))

In [None]:
group_unique_id_CAD_df <- process_nest(glue::glue("cad_nest_visit_{follow_up_time}_df"), group_df)
saveRDS(group_unique_id_CAD_df, glue::glue("data/grouping{grouping_number}_unique_id_CAD_df.rds"))

In [None]:
group_unique_id_CAD_df %>% count() # Grouping 1: 180 Grouping 2: 793 Grouping 3: 973 v2 Grouping 1: 202 grouping 2: 771 grouping 3: 973

In [None]:
group_unique_id_ED_df <- process_nest(glue::glue("hosp_ed_nest_visit_{follow_up_time}_df"), group_df)
saveRDS(group_unique_id_ED_df, glue::glue("data/grouping{grouping_number}_unique_id_ED_df.rds"))

In [None]:
group_unique_id_ED_df %>% count() # Grouping 1: 417 Grouping 2: 2941 Grouping 3: 3410 v2 Grouping 1 611 Grouping 2 2473 grouping 3: 3084

In [None]:
group_unique_id_IP_df <- process_nest(glue::glue("hosp_ip_nest_visit_{follow_up_time}_df"), group_df)
saveRDS(group_unique_id_IP_df, glue::glue("data/grouping{grouping_number}_unique_id_IP_df.rds"))

# IUC call data

In [None]:
iuc_df <- readRDS('data/iuc_df.rds')

In [None]:
iuc_df %>% count() # 193526 v2 272193

In [None]:
iuc_df %>% glimpse()

## First step is to filter out all 111 calls in abandoned call dataset

In [None]:
iuc_df1 <- iuc_df %>% anti_join(group_unique_id_IUC_df)
iuc_df1 %>% count() # Grouping 1: 192496 Grouping 2: 189109 grouping 3: 188079 v2 Grouping 1: 270755 Grouping 2: 266083 Grouping 3: 264645


In [None]:
index_event_fn <- function(person_id, row_num, end, follow_up_time = 72) {
  #print(row_num)
  df <- tibble(person_id = person_id, row_num = row_num, end = end, index_ref = NA_integer_) %>%
    arrange(end)
  
  if(nrow(df) == 1) {
    # Easy, only one record so must be index call
    df <- df %>% 
      mutate(
        prev_row_num = row_num,
        index_event = 1,
        index_ref = NA_integer_
    )
  } else {
    
    df <- df %>% 
      mutate(
        #index_event = 1,
        index_event = case_when(
          row_number() == 1 ~ 1,
          as.numeric(difftime(end, lag(end), units = 'hours')) > follow_up_time ~ 1,
          TRUE ~ 0
        )
      )
    
    # Bit hacky but couldn't work out another way to capture the index event row number
    # and then replicate this through subsequent calls that occured within 72 hours
    # and so were linked to the index event
    index_ref_num = 0
    for (row in 1:nrow(df)) {
      #print(row)
      if(df$index_event[row] == 1) {
       # print('index row')
        index_ref_num = df$row_num[row]
        df$index_ref[row] = NA_integer_
      } else {
        df$index_ref[row] = index_ref_num
      }
      
    }
    
  }
  
  return(df %>% select(index_event, index_ref))
}

In [None]:
iuc_df2 <- iuc_df1 %>% 
    arrange(person_id, end) %>%
    mutate(
        row_num = row_number(),
        #person_id = bit64::as.integer.integer64(person_id)
    ) %>%
    group_by(person_id) %>%
    mutate(
     index_event_fn(person_id, row_num, end, follow_up_time = follow_up_time)   
    ) %>%
    ungroup()

In [None]:
saveRDS(iuc_df2, glue::glue("data/grouping{grouping_number}_iuc_df.rds"))

## Get index call for IUC cohort

In [None]:
index_iuc_df <- iuc_df2 %>% filter(index_event == 1)

In [None]:
iuc_df2 %>% count(index_event) # 0 26425 1 166071 v2 0 36760 1 233995, 0 37197 1 22886, 0 35756 1 228889

In [None]:
index_iuc_df %>% glimpse()

In [None]:
n <- 250
nr <- nrow(index_iuc_df)
index_iuc_df <- index_iuc_df %>% arrange(call_commenced_date_time)

split_iuc_df <- split(index_iuc_df, rep(1:ceiling(nr/n), each=n, length.out=nr))

In [None]:
split_iuc_df[[1]] %>% glimpse()

# IUC calls

In [None]:
plan(multisession, workers = no_cores)

start <- Sys.time()
future_walk2(split_iuc_df, rep(1:ceiling(nr/n)), ~manage_split(.x,.y, iuc_df2 %>% filter(index_event == 0), "iuc_nest", group_unique_id_IUC_df, follow_up_time = follow_up_time))
end <- Sys.time()

In [None]:
end-start # Grouping 1: 4.9 mins GRouping 2: 4.8 mins GRouping 3: 4.7 mins

# GP calls

In [None]:
gp_df <- readRDS('data/gp_df.rds')

In [None]:
plan(multisession, workers = no_cores)

start <- Sys.time()
future_walk2(split_iuc_df, rep(1:ceiling(nr/n)), ~manage_split(.x,.y, gp_df, "gp_nest", group_unique_id_GP_df, follow_up_time = follow_up_time))
end <- Sys.time()

In [None]:
end-start # Grouping 1: 9 mins Grouping 2: 6.2 mins GRouping 3: 8.8 mins

# CAD calls

In [None]:
cad_df <- readRDS('data/cad_df.rds')

In [None]:
plan(multisession, workers = no_cores)

start <- Sys.time()
future_walk2(split_iuc_df, rep(1:ceiling(nr/n)), ~manage_split(.x,.y, cad_df, "cad_nest", group_unique_id_CAD_df, follow_up_time = follow_up_time))
end <- Sys.time()

In [None]:
end-start # Grouping 1: 4 mins Grouping 2: 2.9 mins Grouping 3: 4.0 mins

# ED datasets

In [None]:
hosp_ed_df <- readRDS('data/hosp_ed_df.rds')

In [None]:
plan(multisession, workers = no_cores)

start <- Sys.time()
future_walk2(split_iuc_df, rep(1:ceiling(nr/n)), ~manage_split(.x,.y, hosp_ed_df, "hosp_ed_nest", group_unique_id_ED_df, follow_up_time = follow_up_time))
end <- Sys.time()

In [None]:
end-start # Grouping 1: 4.5 mins Grouping 2: 5 mins Grouping 3: 4.47 mins

# In-patient data

In [None]:
hosp_ip_df <- readRDS('data/hosp_ip_df.rds')

In [None]:
plan(multisession, workers = no_cores)

start <- Sys.time()
future_walk2(split_iuc_df, rep(1:ceiling(nr/n)), ~manage_split(.x,.y, hosp_ip_df, "hosp_ip_nest", group_unique_id_IP_df, follow_up_time = follow_up_time))
end <- Sys.time()

In [None]:
end-start # Grouping 1: 4.7 mins Grouping 2: 4.6 mins Grouping 3: 4.4 mins

# Combine data

In [None]:
combine_rds <- function(file_path, filename) {
  list.files(path = file_path, glue("^{filename}"), full.names = T) %>%
    map_dfr(readRDS)
}

In [None]:
iuc_nest_df <- combine_rds("data/walk_files", "iuc_nest")
gp_nest_df <- combine_rds("data/walk_files", "gp_nest")
cad_nest_df <- combine_rds("data/walk_files", "cad_nest")
hosp_ed_nest_df <- combine_rds("data/walk_files", "hosp_ed_nest")
hosp_ip_nest_df <- combine_rds("data/walk_files", "hosp_ip_nest")

In [None]:
final_nest_df <- index_iuc_df %>%
  left_join(iuc_nest_df,      by=c("unique_id", "person_id")) %>% 
  left_join(gp_nest_df,       by=c("unique_id", "person_id")) %>%
  left_join(cad_nest_df,      by=c("unique_id", "person_id")) %>%
  left_join(hosp_ed_nest_df,  by=c("unique_id", "person_id")) %>%
  left_join(hosp_ip_nest_df,  by=c("unique_id", "person_id"))

In [None]:
follow_up_time_str <- ""
if(follow_up_time != 72) {
 follow_up_time_str <- glue::glue("_{follow_up_time}")   
}
saveRDS(final_nest_df, glue::glue("data/grouping{grouping_number}_iuc_nest_df{follow_up_time_str}.rds"))

In [None]:
final_nest_df %>% glimpse()

In [None]:
final_nest_df1 <- final_nest_df %>%
    mutate(
        made_contact = if_else(rowSums(across(ends_with(glue::glue("in_{follow_up_time}_hrs")))) > 0, 1, 0),
    )

In [None]:
final_nest_df1 %>% count(made_contact) # Grouping 1: 0 82866 1 83205 Grouping 2: 0 80553 1 81845 Grouping 3: 0 81106 1 81296
# v2 Grouping 1: 0 116942 1 117053 Grouping 2: 0 113698 1 115188 Grouping 3: 0 114504 1 114385