In [5]:
#4_FDM_PrimaryCare_UnitTest_v1.3.ipynb
#Runs a series of sql statements against a built FDM.
#First Part runs against standard tables.
#Then each text itterates through tables containing the correctluy formatted fields.
#You will need to manually amend the target dataset for this script

library(bigrquery)
library(lubridate) # required for datetime

# Store the project ID
project_id = "yhcr-prd-phm-bia-core"

# Store the FDM targetdb
targetdb <-'yhcr-prd-phm-bia-core.CB_FDM_PrimaryCare_V7'
targetdb <-gsub(' ','',targetdb)
#print (targetdb)

#checklist of tests
#event dates after dod+42 days`
#observation period check
#person_id missing from data
#person_id in data missing from person table
#data tables contain date_from_date_to fields --- will need to lodd table names into temp table - use dynamic sql to build this
#                                                 pull in from schema - delete standard tables as these will be hardcoded, then 
#                                                 cursor through 

eric <- paste("variables for " ,  targetdb ," set " ,Sys.time() + hours(1) ,sep = "")
print (eric)

#build the results table

sql1 <-paste('drop table if  exists ' ,targetdb,'.tmp_unit_test_results;', sep = "")
tb1 <- bq_project_query(project_id, sql1)

sql2 <-paste('create table if not exists ' ,targetdb,'.tmp_unit_test_results 
(person_id int64, 
testno int64,
 test_desc string,
 test_result string,
 test_date datetime
);', sep = "")
tb2 <- bq_project_query(project_id, sql2)

fred <-paste('unit test results table built ' ,Sys.time() + hours(1), sep = "")
print (fred)


#Will only be used for those tables that contain a person_id.
sql3 <-paste('drop table if  exists ' ,targetdb,'.tmp_unit_test_tables;', sep = "")
tb3 <- bq_project_query(project_id, sql3)

sql4 <-paste('create table if not exists ' ,targetdb,'.tmp_unit_test_tables  as select distinct table_name from ' ,targetdb, '.INFORMATION_SCHEMA.COLUMNS
where column_name like "person_id" 
and table_name not in ("person", "care_site" ,"observation_period", "provider", "cb_change_log") 
and table_name not like "tmp%" 
order by table_name ' , sep = "")
tb4 <- bq_project_query(project_id, sql4)
#print(sql3)
#bit below is to display the results of the query
table <- bq_table_download(tb4)
#table

fred <-paste('unit test table built ' ,Sys.time() + hours(1), sep = "")
print (fred)

# Unit test 1 - person_id in person table not in observation period table passes

sql5 <-paste('insert into ',targetdb,'.tmp_unit_test_results (
person_id,testno , test_desc , test_result , test_date )
select distinct per.person_id,1, "test 1 - person_id in person table not in observation period table" ,"Fail" , CURRENT_DATE()
 from ' ,targetdb,'.person per 
left join ',targetdb,'.observation_period obs
on per.person_id = obs.person_id where per.person_id is not null and obs.person_id is null ' , sep = "")
tb5 <- bq_project_query(project_id, sql5)
#print(sql3)
#bit below is to display the results of the query
#table <- bq_table_download(tb5)
#table

fred <-paste('unit test 1 completed ' ,Sys.time() + hours(1), sep = "")
print (fred)

# Unit test 2 - does person_id in observation period table exist in person table

sql5a <-paste('insert into ',targetdb,'.tmp_unit_test_results (
person_id,testno , test_desc , test_result , test_date )
select distinct per. person_id,2 , "test 2 - person_id in observation period table exist in person table" ,"Fail" , CURRENT_DATE()
from  ' ,targetdb,'.observation_period obs
left join ',targetdb,'.person per 
on obs.person_id = per.person_id where obs.person_id is not null and per.person_id is null  ' , sep = "")
tb5a <- bq_project_query(project_id, sql5a)
#print(sql5a)
#bit below is to display the results of the query
#table <- bq_table_download(tb5a)
#table

fred <-paste('unit test 2 completed ' ,Sys.time() + hours(1), sep = "")
print (fred)

#unit test 3 - does the observation period end date exceed the death date + 42


sql7 <-paste('insert into ',targetdb,'.tmp_unit_test_results (
person_id,testno , test_desc , test_result , test_date )
select distinct per.person_id,3, "test 3 - does the observation period end date exceed the death date + 42" ,"Fail" , CURRENT_DATE()
from ' ,targetdb,' .person per
 , ' ,targetdb,'.observation_period obs
 where per.death_datetime is not null 
 and per.person_id = obs.person_id
 and obs.observation_period_end_date > date_add(per.death_datetime, INTERVAL 42 DAY)  ' , sep = "")
tb7 <- bq_project_query(project_id, sql7)
#print(sql7)
#bit below is to display the results of the query
#table <- bq_table_download(tb7)
#table

fred <-paste('unit test 3 completed ' ,Sys.time() + hours(1), sep = "")
print (fred)

# unit test 4 - Is the observation_period start date before the birthdate (where exists)  

sql8 <-paste('insert into ',targetdb,'.tmp_unit_test_results (
person_id,testno , test_desc , test_result , test_date )
select distinct per.person_id,4, "test 4 - is the observation_period start date before the birthdate " ,"Fail" , CURRENT_DATE()
from  ' ,targetdb,' .person per
 , ' ,targetdb,'.observation_period obs
 where per.birth_datetime is not null 
 and per.person_id = obs.person_id
 and obs.observation_period_start_date < per.birth_datetime ' , sep = "")
tb8 <- bq_project_query(project_id, sql8)
#print(sql8)
#bit below is to display the results of the query
#table <- bq_table_download(tb8)
#table

fred <-paste('unit test 4 completed ' ,Sys.time() + hours(1), sep = "")
print (fred)

#Your loop references this table
#You run this to load the list of tables into the dataframe "table"
#buity only those with an apropriately named start and end_date

sql3 <-paste('select distinct table_name from ' ,targetdb,'.INFORMATION_SCHEMA.COLUMNS 
where column_name like CONCAT(table_name,"_start_date") and table_name !="observation_period" order by table_name ', sep = "")
tb3 <- bq_project_query(project_id, sql3)
#print(sql3)
table <- bq_table_download(tb3)

#table 

fred <-paste('Reference table for loop built ' ,Sys.time() + hours(1), sep = "")
print (fred)

#data outside obs periods - sanity check
#Unit test 5 - does the data in the data tables start after the observation period end date

#This then loops through the data frame until the end
# uses the two variables table[[i, 1]] and table[[i, 2]]
# in the query

for(i in 2:nrow(table)) 
{
# for-loop over columns
#    print(paste0("Table: ", table[[i, 1]]))

    temp_sql_query  <-paste('insert into ',targetdb,'.tmp_unit_test_results (
person_id,testno , test_desc , test_result , test_date )
select distinct a.person_id,1, "test 5 - does the data in ', table[[i, 1]], ' start after the observation period end date" ,"Fail" , CURRENT_DATE()
from ',targetdb,'.', table[[i, 1]],' a 
 , ',targetdb,'.observation_period obs
 where a.person_id = obs.person_id 
and cast(a.', table[[i, 1]], '_start_date as date) > obs.observation_period_end_date ' , sep = "")
    #print (temp_sql_query)
    #print(paste0("Doing: ", table[[1, 1]]))
    temp_queried_table <- bq_project_query(project_id, temp_sql_query)
    #section below just brings this back to R 
    #temp_table <- bq_table_download(temp_queried_table)

    #Prints out a confirmation message for each update
    #print(paste0("Done: ", table[[1, 1]],' ', table[[1, 2]]))
    #cat("\n")
}

fred <-paste('unit test 5 completed ' ,Sys.time() + hours(1), sep = "")
print (fred)

#data outside obs periods 
#Unit test 6 - does the data in the data tables end after the observation period end date

#This then loops through the data frame until the end
# uses the two variables table[[i, 1]] and table[[i, 2]]
# in the query

for(i in 1:nrow(table)) 
{
# for-loop over columns
#    print(paste0("Table: ", table[[i, 1]]))

 temp_sql_query  <-paste('insert into ',targetdb,'.tmp_unit_test_results (
person_id,testno , test_desc , test_result , test_date )
select distinct a.person_id,1, "test 6 - does the data in ', table[[i, 1]], ' end after the observation period end date" ,"Fail" , CURRENT_DATE()
from ' ,targetdb,'.', table[[i, 1]],'  a
 , ' ,targetdb,'.observation_period obs
 where a.person_id = obs.person_id
 and cast(a.', table[[i, 1]], '_end_date as date) > obs.observation_period_end_date  ' , sep = "")
  temp_queried_table <- bq_project_query(project_id, temp_sql_query)
    #section below just brings this back to R 
    #temp_table <- bq_table_download(temp_queried_table)

    #Prints out a confirmation message for each update
    #print(paste0("Done: ", table[[1, 1]],' ', table[[1, 2]]))
    #cat("\n")
}

fred <-paste('unit test 6 completed ' ,Sys.time() + hours(1), sep = "")
print (fred)

#data outside obs periods 
#Unit test 7 - does the data in the data tables start before the observation period start date
#This then loops through the data frame until the end
# uses the two variables table[[i, 1]] and table[[i, 2]]
# in the query

for(i in 1:nrow(table)) 
{
# for-loop over columns
#    print(paste0("Table: ", table[[i, 1]]))

 temp_sql_query  <-paste('insert into ',targetdb,'.tmp_unit_test_results (
person_id,testno , test_desc , test_result , test_date )
select distinct a.person_id,1, "test 7 - does the data in ', table[[i, 1]], ' start before the observation period start date" ,"Fail" , CURRENT_DATE()
from  ' ,targetdb,'.', table[[i, 1]],'  a
 , ',targetdb,'.observation_period obs
 where a.person_id = obs.person_id
 and cast(a.', table[[i, 1]], '_start_date as date)   < obs.observation_period_start_date  ' , sep = "")

temp_queried_table <- bq_project_query(project_id, temp_sql_query)
    #section below just brings this back to R 
    #temp_table <- bq_table_download(temp_queried_table)

    #Prints out a confirmation message for each update
    #print(paste0("Done: ", table[[1, 1]],' ', table[[1, 2]]))
    #cat("\n")
}

fred <-paste('unit test 7 completed ' ,Sys.time() + hours(1), sep = "")
print (fred)

#data outside obs periods - sanity check
#Unit test 8  - does the data in the data tables end before the observation period start date
#This then loops through the data frame until the end
# uses the two variables table[[i, 1]] and table[[i, 2]]
# in the query

for(i in 1:nrow(table)) 
{
# for-loop over columns
#    print(paste0("Table: ", table[[i, 1]]))

 temp_sql_query  <-paste('insert into ',targetdb,'.tmp_unit_test_results (
person_id,testno , test_desc , test_result , test_date )
select distinct a.person_id,1, "test 8 - does the data in ', table[[i, 1]], ' end before the observation period start date" ,"Fail" , CURRENT_DATE()
from  ' ,targetdb,'.', table[[i, 1]],'  a
 , ' ,targetdb,'.observation_period obs
 where a.person_id = obs.person_id
 and cast(a.', table[[i, 1]], '_end_date as date)  < obs.observation_period_start_date ' , sep = "")

temp_queried_table <- bq_project_query(project_id, temp_sql_query)
    #section below just brings this back to R 
    #temp_table <- bq_table_download(temp_queried_table)

    #Prints out a confirmation message for each update
    #print(paste0("Done: ", table[[1, 1]],' ', table[[1, 2]]))
    #cat("\n")
}

fred <-paste('unit test 8 completed ' ,Sys.time() + hours(1), sep = "")
print (fred)

# Unit test 9 - does person_id in person table exist in data tables (will iterate through the data tables)
#This then loops through the data frame until the end
# uses the two variables table[[i, 1]] and table[[i, 2]]
# in the query

for(i in 1:nrow(table)) 
{
# for-loop over columns
#    print(paste0("Table: ", table[[i, 1]]))

 temp_sql_query  <-paste('insert into ',targetdb,'.tmp_unit_test_results (
person_id,testno , test_desc , test_result , test_date )
select distinct a.person_id,1, "test 9 - does the person_id in person_table exist in ', table[[i, 1]], ' table " ,"Fail" , CURRENT_DATE()
from  ' ,targetdb,'.', table[[i, 1]],' a 
left join ',targetdb,'.person per 
on a.person_id = per.person_id 
where a.person_id is not null and per.person_id is null ' , sep = "")

    temp_queried_table <- bq_project_query(project_id, temp_sql_query)
    #section below just brings this back to R 
    #temp_table <- bq_table_download(temp_queried_table)

    #Prints out a confirmation message for each update
    #print(paste0("Done: ", table[[1, 1]],' ', table[[1, 2]]))
    #cat("\n")
}

fred <-paste('unit test 9 completed ' ,Sys.time() + hours(1), sep = "")
print (fred)

# Unit test 10 - does person_id in data tables exist in person table  (will iterate through the data tables)
#This then loops through the data frame until the end
# uses the two variables table[[i, 1]] and table[[i, 2]]
# in the query

for(i in 1:nrow(table)) 
{
# for-loop over columns
#    print(paste0("Table: ", table[[i, 1]]))

 temp_sql_query  <-paste('insert into ',targetdb,'.tmp_unit_test_results (
person_id,testno , test_desc , test_result , test_date )
select distinct a.person_id,1, "test 10 - does the person_id in ', table[[i, 1]], ' exist in the person table " ,"Fail" , CURRENT_DATE()
from ' ,targetdb,'.', table[[i, 1]],'  a 
left join ',targetdb,'.person per 
on a.person_id = per.person_id where a.person_id is null 
and per.person_id is not null  ' , sep = "")

    temp_queried_table <- bq_project_query(project_id, temp_sql_query)
    #section below just brings this back to R 
    #temp_table <- bq_table_download(temp_queried_table)

    #Prints out a confirmation message for each update
    #print(paste0("Done: ", table[[1, 1]],' ', table[[1, 2]]))
    #cat("\n")
}

fred <-paste('unit test 10 completed ' ,Sys.time() + hours(1), sep = "")
print (fred)

# Unit test 11 - does data in the data tables exceed the death +42 date of an individual?  (will iterate through the data tables)
#This then loops through the data frame until the end
# uses the two variables table[[i, 1]] and table[[i, 2]]
# in the query

for(i in 1:nrow(table)) 
{
# for-loop over columns
#    print(paste0("Table: ", table[[i, 1]]))

 temp_sql_query  <-paste('insert into ',targetdb,'.tmp_unit_test_results (
person_id,testno , test_desc , test_result , test_date )
select distinct a.person_id,7, "test 11 - does data in ', table[[i, 1]], ' exceed the death +42 date " ,"Fail" , CURRENT_DATE()
from ' ,targetdb,'.', table[[i, 1]],' a 
left join ',targetdb,'.person per 
on a.person_id = per.person_id 
where a.person_id is null 
and (a.', table[[i, 1]], '_start_date > date_add(per.death_datetime, INTERVAL 42 DAY)
or a.', table[[i, 1]], '_end_date > date_add(per.death_datetime, INTERVAL 42 DAY) )' , sep = "")

    temp_queried_table <- bq_project_query(project_id, temp_sql_query)
    #section below just brings this back to R 
    #temp_table <- bq_table_download(temp_queried_table)

    #Prints out a confirmation message for each update
    #print(paste0("Done: ", table[[1, 1]],' ', table[[1, 2]]))
    #cat("\n")
}

fred <-paste('unit test 11 completed ' ,Sys.time() + hours(1), sep = "")
print (fred)

# Unit test 12 - does data in the data tables exist before the birth of an individual?  (will iterate through the data tables)
#This then loops through the data frame until the end
# uses the two variables table[[i, 1]] and table[[i, 2]]
# in the query

for(i in 1:nrow(table)) 
{
# for-loop over columns
#    print(paste0("Table: ", table[[i, 1]]))

 temp_sql_query  <-paste('insert into ',targetdb,'.tmp_unit_test_results (
person_id,testno , test_desc , test_result , test_date )
select distinct a.person_id,7, "test 12 - does data in ', table[[i, 1]], ' exist before birth date " ,"Fail" , CURRENT_DATE()
from ' ,targetdb,'.', table[[i, 1]],' a 
left join ',targetdb,'.person per 
on a.person_id = per.person_id 
where a.person_id is null 
and (a.', table[[i, 1]], '_start_date < per.birth_datetime
or a.', table[[i, 1]], '_end_date < per.birth_datetime  )' , sep = "")

    temp_queried_table <- bq_project_query(project_id, temp_sql_query)
    #section below just brings this back to R 
    #temp_table <- bq_table_download(temp_queried_table)

    #Prints out a confirmation message for each update
    #print(paste0("Done: ", table[[1, 1]],' ', table[[1, 2]]))
    #cat("\n")
}

fred <-paste('unit test 12 completed ' ,Sys.time() + hours(1), sep = "")
print (fred)

#Will only be used for those tables that contain a person_id.
sql3 <-paste('drop table if  exists ' ,targetdb,'.tmp_unit_test_summary;', sep = "")
tb3 <- bq_project_query(project_id, sql3)

sql4 <-paste('create table if not exists ' ,targetdb,'.tmp_unit_test_summary as 
SELECT test_desc , count(test_desc) as error_count
 FROM ', targetdb,'.tmp_unit_test_results 
 group by test_desc ' , sep = "")
tb4 <- bq_project_query(project_id, sql4)
print(sql3)
#bit below is to display the results of the query
#table <- bq_table_download(tb4)
#table

fred <-paste('unit test summary table built ' ,Sys.time() + hours(1), sep = "")
print (fred)

[1] "variables for yhcr-prd-phm-bia-core.CB_FDM_PrimaryCare_V7 set 2023-04-20 13:18:57"


Auto-refreshing stale OAuth token.



[1] "unit test results table built 2023-04-20 13:18:59"
[1] "unit test table built 2023-04-20 13:19:02"
[1] "unit test 1 completed 2023-04-20 13:19:04"
[1] "unit test 2 completed 2023-04-20 13:19:07"
[1] "unit test 3 completed 2023-04-20 13:19:09"
[1] "unit test 4 completed 2023-04-20 13:19:12"
[1] "Reference table for loop built 2023-04-20 13:19:12"
[1] "unit test 5 completed 2023-04-20 13:19:35"
[1] "unit test 6 completed 2023-04-20 13:20:03"
[1] "unit test 7 completed 2023-04-20 13:20:33"
[1] "unit test 8 completed 2023-04-20 13:20:59"
[1] "unit test 9 completed 2023-04-20 13:21:23"
[1] "unit test 10 completed 2023-04-20 13:21:47"
[1] "unit test 11 completed 2023-04-20 13:22:12"
[1] "unit test 12 completed 2023-04-20 13:22:42"
[1] "drop table if  exists yhcr-prd-phm-bia-core.CB_FDM_PrimaryCare_V7.tmp_unit_test_summary;"
[1] "unit test summary table built 2023-04-20 13:22:45"


In [6]:
#Now display Error Summary
sql9 <-paste('select * from  ' ,targetdb,'.tmp_unit_test_summary order by 	
test_desc ;', sep = "")
tb9 <- bq_project_query(project_id, sql9)

table <- bq_table_download(tb9)
table

test_desc,error_count
<chr>,<int64>
