In [1]:
#Qmak_Calderdale_FDM_FULL_V1.0.ipynb
##last modified 20230309 

#Builds the Airedale SUS FDM
#Assumes that the targetdb below exists
#Needs code to build it if it doesn't

#You will need to manually amend the target dataset for this script

library(bigrquery)
library(lubridate) # required for datetime

# Store the project ID
project_id = "yhcr-prd-phm-bia-core"

sourcedb <-"yhcr-prd-phm-bia-core.CB_STAGING_DATABASE_FDM_Format"
targetdb <-'yhcr-prd-phm-bia-core.CB_FDM_AdultSocialCare'

targetdb <-gsub(' ','',targetdb)
print (targetdb)


Attaching package: ‘lubridate’


The following objects are masked from ‘package:base’:

    date, intersect, setdiff, union




[1] "yhcr-prd-phm-bia-core.CB_FDM_AdultSocialCare"


In [5]:
#New section to build user table loop 

#Your loop references this table
#You run this to load the list of tables into the dataframe "table"

sql3 <-paste('select distinct table_name  from ' ,sourcedb,'.INFORMATION_SCHEMA.COLUMNS where table_name like \'tbl_adultsocial%\' and column_name = \'person_id\' order by table_name ;', sep = "")
tb3 <- bq_project_query(project_id, sql3)

table <- bq_table_download(tb3)
table 


eric <- paste("Done builder tables "  ,Sys.time() + hours(1) ,sep = "")
print(eric)


#This then loops through the data frame until the end
# uses the two variables table[[i, 1]] and table[[i, 2]]
# in the query

sql4 <-paste('drop table if exists ',targetdb,'.tmp_persons;', sep = "")
tb4 <-bq_project_query(project_id, sql4)


sql5 <-paste('create table ',targetdb,'.tmp_persons 
(person_id int64);', sep = "")
tb5 <-bq_project_query(project_id, sql5)
#print(tb5)


#Loops through all the data tables to build a list of unique person_ids from all of them 

for(i in 1:nrow(table)) 
{
# for-loop over columns
    #print(paste0("Table: ", table[[i, 1]]))
    temp_sql_query  <-paste('insert into ',targetdb,'.tmp_persons select distinct cast(person_id as int64)  from ' ,sourcedb,'.',table[[i, 1]] ,';' , sep = "") 
    temp_queried_table <- bq_project_query(project_id, temp_sql_query)
    temp_table <- bq_table_download(temp_queried_table)
    #print(paste0("done ", temp_table[[1, 1]]))
    #cat("\n")
}


eric <- paste("Done tmp persons "  ,Sys.time() + hours(1) ,sep = "")
print(eric)

#Now build the persons table 

sql11 <-paste('drop table if exists ' ,targetdb,'.person', sep = "")
tb11 <- bq_project_query(project_id, sql11)


sql12 <-paste('create table ' ,targetdb,'.person 
as 
SELECT distinct a.person_id
, gender_concept_id
, year_of_birth
, month_of_birth
, day_of_birth
, birth_datetime
, death_datetime
, race_concept_id
, ethnicity_concept_id
, location_id
, provider_id
, care_site_id
, person_source_value
, gender_source_value
, gender_source_concept_id
, race_source_value
, race_source_concept_id
, ethnicity_source_value
, ethnicity_source_concept_id 
FROM yhcr-prd-phm-bia-core.CB_FDM_MASTER.person a
,',targetdb,'.tmp_persons b where a.person_id =  b.person_id' , sep = "")

tb12 <- bq_project_query(project_id, sql12)
#print(sql12)

eric <- paste("Done persons "  ,Sys.time() + hours(1) ,sep = "")
print(eric)



#This query makes the observation period based on the persons table BUT adds in rules for this data extract.
#This query makes the observation period based on the persons table BUT adds in rules for this data extract.

#Qmak_PrimaryCareObservationPeriod_Part1

sql15 <-paste('drop table if exists ' ,targetdb,'.tmp_Eventdates' , sep = "")
sql16 <-paste('drop table if exists ' ,targetdb,'.tmp_EventdatesValid' , sep = "")
sql17 <-paste('truncate table ' ,targetdb,'.observation_period' , sep = "")

tb15 <-bq_project_query(project_id, sql15)
tb16 <-bq_project_query(project_id, sql16)
tb17 <-bq_project_query(project_id, sql17)

#This just builds a temp table for later use 
sql18 <-paste('create table ',targetdb,'.tmp_Eventdates 
(person_id int64
, EventDate Datetime) ' , sep = "")
tb18 <-bq_project_query(project_id, sql18)

eric <- paste("temp event dates built "  ,Sys.time() + hours(1) ,sep = "")
print(eric)

#Loop through the user tables
#selecting person_id and date from and date to for each of them.

#start_date_first
for(i in 1:nrow(table)) 
{
# for-loop over columns
    #print(paste0("Table: ", table[[i, 1]]))
    temp_sql_query  <-paste('insert into ',targetdb,'.tmp_Eventdates select distinct cast(person_id as int64) as person_id, ',table[[i, 1]] ,'_start_date from ' ,sourcedb,'.',table[[i, 1]] ,' where person_id is not null;' , sep = "") 
    temp_queried_table <- bq_project_query(project_id, temp_sql_query)
    #temp_table <- bq_table_download(temp_queried_table)
    #print(temp_sql_query)
    #print(paste0("done ", temp_table[[i, 1]]))
    #cat("\n")
}

eric <- paste("Start dates built "  ,Sys.time() + hours(1) ,sep = "")
print(eric)

#Now do end date
for(i in 1:nrow(table)) 
{
# for-loop over columns
    #print(paste0("Table: ", table[[i, 1]]))
    temp_sql_query  <-paste('insert into ',targetdb,'.tmp_Eventdates select distinct cast(person_id as int64) as person_id, ',table[[i, 1]] ,'_end_date from ' ,sourcedb,'.',table[[i, 1]] ,' where person_id is not null ;' , sep = "") 
    temp_queried_table <- bq_project_query(project_id, temp_sql_query)
    #temp_table <- bq_table_download(temp_queried_table)
    #print(temp_sql_query)
    #print(paste0("done ", temp_table[[i, 1]]))
    #cat("\n")
}

eric <- paste("End dates built "  ,Sys.time() + hours(1) ,sep = "")
print(eric)


sql24 <-paste('create table ',targetdb,'.tmp_EventdatesValid
as
select distinct a.person_id
,cast(a.EventDate as date) as EventDate
from ',targetdb,'.tmp_Eventdates a ,' 
,targetdb,'.person e 
where e.person_id = a.person_id 
and e.death_datetime is not null 
and a.EventDate >= e.birth_datetime
and a.EventDate <= date_add(e.death_datetime, INTERVAL 42 day) 
and a.EventDate <= (Select max(cast(updated_date as date)) from yhcr-prd-phm-bia-core.CB_FDM_Management.cb_source_data_list 
where tables like "src_bmbc_adultsocialcare_%")', sep = "")

tb24 <-bq_project_query(project_id, sql24)


#-- BUT this only includes persons with a death datetime
#-- So section below includes those with a null deathdatetime

sql25 <-paste('insert into ',targetdb,'.tmp_EventdatesValid
select distinct a.person_id
, cast(a.EventDate as date) as EventDate
from ',targetdb,'.tmp_Eventdates a
,' ,targetdb,'.person e 
where e.person_id = a.person_id 
and e.death_datetime is null 
and a.EventDate >= e.birth_datetime
and a.EventDate <= (Select  max(cast(updated_date as date)) from yhcr-prd-phm-bia-core.CB_FDM_Management.cb_source_data_list 
where tables like "src_bmbc_adultsocialcare_%")', sep = "")

tb25 <-bq_project_query(project_id, sql25)

print("Temp valid dates built")

#third part pushes these into the observation_period table

sql26 <-paste('insert into ' ,targetdb,'.observation_period
select distinct
ROW_NUMBER() over (Order by person_id) as observation_period_id 
, person_id
, min(EventDate) as observation_period_start_date
, max(EventDate) as observation_period_end_date
,null as period_type_concept_id
from ',targetdb,'.tmp_EventdatesValid
group by person_id', sep = "")


tb26 <-bq_project_query(project_id, sql26)

print ("observation period built")

# Now remove from person table where there is no observation period
# This happens where there is patient data but no actual records 
# or there is patient data but no dob 

sql26a <-paste(' delete from ' ,targetdb,'.person where person_id
not in (select person_id from ', targetdb, ' . observation_period)', sep = "")

tb26a <-bq_project_query(project_id, sql26a) 

print ("invalid people removed")


eric <- paste("obs period built invalid people removed "  ,Sys.time() + hours(1) ,sep = "")
print(eric)


# As the Warehouse tables are all fdm ready we can just copy them into the FDM space
# once we've dropped them - if they exist ! 
# So drop loop next 

#start drop them first
for(i in 1:nrow(table)) 
{
# for-loop over columns
    #print(paste0("Table: ", table[[i, 1]]))
    temp_sql_query  <-paste('drop table if exists ',targetdb,'.', table[[i, 1]] ,';' , sep = "") 
    temp_queried_table <- bq_project_query(project_id, temp_sql_query)
    #temp_table <- bq_table_download(temp_queried_table)
    #print(temp_sql_query)
    #print(paste0("done ", temp_table[[i, 1]]))
    #cat("\n")
}

eric <- paste("User tables removed "  ,Sys.time() + hours(1) ,sep = "")
print(eric)

#Now build them 
for(i in 1:nrow(table)) 
{
# for-loop over columns
    #print(paste0("Table: ", table[[i, 1]]))
    temp_sql_query  <-paste('create table ',targetdb,'.',table[[i, 1]] ,' as select src.* from ' ,sourcedb,'.',table[[i, 1]],' src 
    , ',targetdb,'.observation_period obs where cast(src.person_id as int64)  = obs.person_id 
    and src.',table[[i, 1]],'_end_date <= obs.observation_period_end_date  
    and src.',table[[i, 1]],'_start_date >= obs.observation_period_start_date', sep = "")
    #print(temp_sql_query)
    temp_queried_table <- bq_project_query(project_id, temp_sql_query)
    #temp_table <- bq_table_download(temp_queried_table)
    #print(paste0("done ", temp_table[[i, 1]]))
    #cat("\n")
}


eric <- paste("User tables built "  ,Sys.time() + hours(1) ,sep = "")
print(eric)

#But this only works for those tables with a tbl_prefix containing person_id 
# so we need to manually mop up the ones that don't.


table_name
<chr>
tbl_adultsocialcare_assessments
tbl_adultsocialcare_contacts
tbl_adultsocialcare_services


[1] "Done builder tables 2024-05-08 15:21:49.463219"
[1] "Done tmp persons 2024-05-08 15:21:58.53083"
[1] "Done persons 2024-05-08 15:22:01.14761"
[1] "temp event dates built 2024-05-08 15:22:05.045789"
[1] "Start dates built 2024-05-08 15:22:10.597191"
[1] "End dates built 2024-05-08 15:22:23.356529"
[1] "Temp valid dates built"
[1] "observation period built"
[1] "invalid people removed"
[1] "obs period built invalid people removed 2024-05-08 15:22:31.953011"
[1] "User tables removed 2024-05-08 15:22:34.173299"
[1] "User tables built 2024-05-08 15:22:40.865605"


In [6]:
sql90 <-paste('drop table if exists ',targetdb,'.tmp_persons;', sep = "")
tb90 <-bq_project_query(project_id, sql90)

sql91 <-paste('drop table if exists ',targetdb,'.tmp_Eventdates;', sep = "")
tb91 <-bq_project_query(project_id, sql91)

sql92 <-paste('drop table if exists ',targetdb,'.tmp_EventdatesValid;', sep = "")
tb92 <-bq_project_query(project_id, sql92)

print("temp tables dropped")

eric <- paste('FDM Build of',targetdb,'finished'  ,Sys.time() + hours(1) ,sep = "")
print(eric)

[1] "temp tables dropped"
[1] "FDM Build ofyhcr-prd-phm-bia-core.CB_FDM_AdultSocialCarefinished2024-05-08 15:23:48.37768"


In [7]:
#section to update the CD FDM_Management details 

sql90 <-paste('update yhcr-prd-phm-bia-core.CB_FDM_Management.cb_fdm_details 
set fdm_build_date = "2024-05-08" 
where fdm_name = "CB_FDM_AdultSocialCare" ;', sep = "")
tb90 <-bq_project_query(project_id, sql90)


eric <- paste('Management updated '  ,Sys.time() + hours(1) ,sep = "")
print(eric)

[1] "Management updated 2024-05-08 15:23:53.03441"
