In [1]:
#Qmak_BDCT_FDM_V1.5.ipynb
##last modified 20230309 

#Builds the NCMP FDM
#Assumes that the targetdb below exists
#Needs code to build it if it doesn't

#You will need to manually amend the target dataset for this script

library(bigrquery)
library(lubridate) 

# Store the project ID
project_id = "yhcr-prd-phm-bia-core"

sourcedb <-"yhcr-prd-phm-bia-core.CB_STAGING_DATABASE_FDM_Format"
targetdb <-'yhcr-prd-phm-bia-core.CB_FDM_PACT'
targetdb <-gsub(' ','',targetdb)
print (sourcedb) 
print (targetdb)



Attaching package: ‘lubridate’


The following objects are masked from ‘package:base’:

    date, intersect, setdiff, union




[1] "yhcr-prd-phm-bia-core.CB_STAGING_DATABASE_FDM_Format"
[1] "yhcr-prd-phm-bia-core.CB_FDM_PACT"


In [2]:
#New section to build user table loop 

#Your loop references this table
#You run this to load the list of tables into the dataframe "table"

sql3 <-paste('select distinct table_name  from ' ,sourcedb,'.INFORMATION_SCHEMA.COLUMNS where table_name like \'tbl_PACT%\' and column_name = \'person_id\' order by table_name ;', sep = "")
tb3 <- bq_project_query(project_id, sql3)

table <- bq_table_download(tb3)
table 

print("Done builder table") 


table_name
<chr>
tbl_PACT_evaluation_contacts
tbl_PACT_evaluation_detail


[1] "Done builder table"


In [4]:
#This then loops through the data frame until the end
# uses the two variables table[[i, 1]] and table[[i, 2]]
# in the query

sql4 <-paste('drop table if exists ',targetdb,'.tmp_persons;', sep = "")
tb4 <-bq_project_query(project_id, sql4)


sql5 <-paste('create table ',targetdb,'.tmp_persons 
(person_id int64);', sep = "")
tb5 <-bq_project_query(project_id, sql5)
#print(tb5)


#Loops through all the data tables to build a list of unique person_ids from all of them 

for(i in 1:nrow(table)) 
{
# for-loop over columns
    #print(paste0("Table: ", table[[i, 1]]))
    temp_sql_query  <-paste('insert into ',targetdb,'.tmp_persons select distinct cast(person_id as int64)  from ' ,sourcedb,'.',table[[i, 1]] ,';' , sep = "") 
    temp_queried_table <- bq_project_query(project_id, temp_sql_query)
    temp_table <- bq_table_download(temp_queried_table)
    #print(paste0("done ", temp_table[[1, 1]]))
    #cat("\n")
}

fred <-paste('tmp person table built ' ,Sys.time() + hours(1), sep = "")
print (fred)

#Now build the persons table 

sql11 <-paste('drop table if exists ' ,targetdb,'.person', sep = "")
tb11 <- bq_project_query(project_id, sql11)


sql12 <-paste('create table ' ,targetdb,'.person 
as 
SELECT distinct a.person_id
, gender_concept_id
, year_of_birth
, month_of_birth
, day_of_birth
, birth_datetime
, death_datetime
, race_concept_id
, ethnicity_concept_id
, location_id
, provider_id
, care_site_id
, person_source_value
, gender_source_value
, gender_source_concept_id
, race_source_value
, race_source_concept_id
, ethnicity_source_value
, ethnicity_source_concept_id 
FROM yhcr-prd-phm-bia-core.CB_FDM_MASTER.person a
,',targetdb,'.tmp_persons b where a.person_id =  b.person_id' , sep = "")

tb12 <- bq_project_query(project_id, sql12)
#print(sql12)


fred <-paste('person table built ' ,Sys.time() + hours(1), sep = "")
print (fred)



#Ensure you have updated the lookup table ie
#sql14 <- "update `yhcr-prd-phm-bia-core.CY_LOOKUPS.tbl_Dataset_ExtractDateRef` set extract_date = '2022-08-23' where DatasetName = 'BDCT'"  

#tb14 <- bq_project_query(project_id, sql14)



#This query makes the observation period based on the persons table BUT adds in rules for this data extract.
#This query makes the observation period based on the persons table BUT adds in rules for this data extract.

#Qmak_BDCT_observation period_Part1

sql15 <-paste('drop table if exists ' ,targetdb,'.tmp_Eventdates' , sep = "")
sql16 <-paste('drop table if exists ' ,targetdb,'.tmp_EventdatesValid' , sep = "")
sql17 <-paste('truncate table ' ,targetdb,'.observation_period' , sep = "")

tb15 <-bq_project_query(project_id, sql15)
tb16 <-bq_project_query(project_id, sql16)
tb17 <-bq_project_query(project_id, sql17)

#This just builds a temp table for later use 
sql18 <-paste('create table ',targetdb,'.tmp_Eventdates 
(person_id int64
, EventDate Datetime) ' , sep = "")
tb18 <-bq_project_query(project_id, sql18)



fred <-paste('temp event dates built ' ,Sys.time() + hours(1), sep = "")
print (fred)


#Loop through the user tables
#selecting person_id and date from and date to for each of them.

#start_date_first
for(i in 1:nrow(table)) 
{
# for-loop over columns
    #print(paste0("Table: ", table[[i, 1]]))
    temp_sql_query  <-paste('insert into ',targetdb,'.tmp_Eventdates select distinct cast(person_id as int64) as person_id, ',table[[i, 1]] ,'_start_date from ' ,sourcedb,'.',table[[i, 1]] ,' where person_id is not null;' , sep = "") 
    temp_queried_table <- bq_project_query(project_id, temp_sql_query)
    #temp_table <- bq_table_download(temp_queried_table)
    #print(temp_sql_query)
    #print(paste0("done ", temp_table[[i, 1]]))
    #cat("\n")
}
fred <-paste('Done start dates ' ,Sys.time() + hours(1), sep = "")
print (fred)


#Now do end date
for(i in 1:nrow(table)) 
{
# for-loop over columns
    #print(paste0("Table: ", table[[i, 1]]))
    temp_sql_query  <-paste('insert into ',targetdb,'.tmp_Eventdates select distinct cast(person_id as int64) as person_id, ',table[[i, 1]] ,'_end_date from ' ,sourcedb,'.',table[[i, 1]] ,' where person_id is not null ;' , sep = "") 
    temp_queried_table <- bq_project_query(project_id, temp_sql_query)
    #temp_table <- bq_table_download(temp_queried_table)
    #print(temp_sql_query)
    #print(paste0("done ", temp_table[[i, 1]]))
    #cat("\n")
}


fred <-paste('Done end dates ' ,Sys.time() + hours(1), sep = "")
print (fred)


sql24 <-paste('create table ',targetdb,'.tmp_EventdatesValid
as
select distinct a.person_id
,cast(a.EventDate as date) as EventDate
from ',targetdb,'.tmp_Eventdates a ,' 
,targetdb,'.person e 
where e.person_id = a.person_id 
and e.death_datetime is not null 
and a.EventDate >= e.birth_datetime
and a.EventDate <= date_add(e.death_datetime, INTERVAL 42 day) 
and a.EventDate <= (Select max(updated_date) from yhcr-prd-phm-bia-core.CB_FDM_Management.cb_source_data_list where dataset = \'CB_STAGING_DATABASE_FDM_Format\')', sep = "")


#-- BUT this only includes persons with a death datetime
#-- So section below includes those with a null deathdatetime

sql25 <-paste('insert into ',targetdb,'.tmp_EventdatesValid
select distinct a.person_id
, cast(a.EventDate as date) as EventDate
from ',targetdb,'.tmp_Eventdates a
,' ,targetdb,'.person e 
where e.person_id = a.person_id 
and e.death_datetime is null 
and a.EventDate >= e.birth_datetime
and a.EventDate <= (Select max(updated_date) from yhcr-prd-phm-bia-core.CB_FDM_Management.cb_source_data_list where dataset = \'CB_STAGING_DATABASE_FDM_Format\')', sep = "")

fred <-paste('Temp valid dates built ' ,Sys.time() + hours(1), sep = "")
print (fred)



#third part pushes these into the observation_period table

sql26 <-paste('insert into ' ,targetdb,'.observation_period
select distinct
ROW_NUMBER() over (Order by person_id) as observation_period_id 
, person_id
, min(EventDate) as observation_period_start_date
, max(EventDate) as observation_period_end_date
,null as period_type_concept_id
from ',targetdb,'.tmp_EventdatesValid
group by person_id', sep = "")

#Finally Run these sql's


tb24 <-bq_project_query(project_id, sql24)
tb25 <-bq_project_query(project_id, sql25)
tb26 <-bq_project_query(project_id, sql26)

fred <-paste('observation period built ' ,Sys.time() + hours(1), sep = "")
print (fred)



# Now remove from person table where there is no observation period
# This happens where there is patient data but no actual records 
# or there is patient data but no dob 

sql26a <-paste(' delete from ' ,targetdb,'.person where person_id
not in (select person_id from ', targetdb, ' . observation_period)', sep = "")

tb26a <-bq_project_query(project_id, sql26a) 

fred <-paste('invalid people removed ' ,Sys.time() + hours(1), sep = "")
print (fred)





# As the tables are all fdm ready we can just copy them into the FDM space
# once we've dropped them - if they exist ! 
# So drop loop next 

#start drop them first
for(i in 1:nrow(table)) 
{
# for-loop over columns
    #print(paste0("Table: ", table[[i, 1]]))
    temp_sql_query  <-paste('drop table if exists ',targetdb,'.', table[[i, 1]] ,';' , sep = "") 
    temp_queried_table <- bq_project_query(project_id, temp_sql_query)
    #temp_table <- bq_table_download(temp_queried_table)
    #print(temp_sql_query)
    #print(paste0("done ", temp_table[[i, 1]]))
    #cat("\n")
}
fred <-paste('Done - user tables removed ' ,Sys.time() + hours(1), sep = "")
print (fred)



#Now build them 
for(i in 1:nrow(table)) 
{
# for-loop over columns
    #print(paste0("Table: ", table[[i, 1]]))
    temp_sql_query  <-paste('create table ',targetdb,'.',table[[i, 1]] ,' as select src.* except(digest) from ' ,sourcedb,'.',table[[i, 1]],' src 
    , ',targetdb,'.observation_period obs where cast(src.person_id as int64)  = obs.person_id 
    and src.',table[[i, 1]],'_end_date <= obs.observation_period_end_date  
    and src.',table[[i, 1]],'_start_date >= obs.observation_period_start_date', sep = "")
    #print(temp_sql_query)
    temp_queried_table <- bq_project_query(project_id, temp_sql_query)
    #temp_table <- bq_table_download(temp_queried_table)
    #print(paste0("done ", temp_table[[i, 1]]))
    #cat("\n")
}

fred <-paste('Done - user tables built ' ,Sys.time() + hours(1), sep = "")
print (fred)




fred <-paste('FDM Build of',targetdb,'finished')
print (fred)

[1] "tmp person table built 2024-04-25 18:08:22.231074"
[1] "person table built 2024-04-25 18:08:24.888181"
[1] "temp event dates built 2024-04-25 18:08:28.587592"
[1] "Done start dates 2024-04-25 18:08:31.798583"
[1] "Done end dates 2024-04-25 18:08:41.465279"
[1] "Temp valid dates built 2024-04-25 18:08:41.472813"
[1] "observation period built 2024-04-25 18:08:47.246754"
[1] "invalid people removed 2024-04-25 18:08:49.21223"
[1] "Done - user tables removed 2024-04-25 18:08:50.74947"
[1] "Done - user tables built 2024-04-25 18:08:54.58587"
[1] "FDM Build of yhcr-prd-phm-bia-core.CB_FDM_PACT finished"
