**Sample the 56 M denominator and count:**

- observed diseases with at leaset 10 cases in all ages and ages above 18yo (minimun n cases = 10) in all samples 

- observed diseases with at least 100 cases in all ages and all samples 

- observed diseases with at leaset 1000 cases in all ages in all samples

- observed diseases with at least 10 deaths in all ages and ages above 18yo (minimum n deaths = 10) in all samples

- observed diseases with at least 100 deaths in all ages and all samples

- observed diseases with at least 1000 deaths in all ages in all samples

This creates:

* temp view _ccu013_02_all_ids_ and _ccu013_02_all_ids_18plus_ using query of cohort table and hes 

* then create samples at diff denomominator sizes 

* save them in the tables **ccu013_02_random_denominators** and  **ccu013_02_random_denominators_18plus**

* temp view of disases

Functions: 

* _ccu013_02_denominators_n_disease_cases_ 

* _ccu013_02_denominators_n_disease_cases_18plus_

* _ccu013_02_denominators_n_disease_cases_100_

* _ccu013_02_denominators_n_disease_cases1000_

* _ccu013_02_n_disease_deaths_ 

* _ccu013_02_n_disease_deaths_18plus_

* _ccu013_02_n_disease_deaths_100_

* _ccu013_02_n_disease_deaths_1000_


Output tables: 

* _ccu013_02_denominator_n_disease_cases_

* _ccu013_02_denominator_n_disease_cases_18plus_

* _ccu013_02_denominator_n_disease_cases_100_

* _ccu013_02_denominator_n_disease_cases1000_

* _ccu013_02_denominator_n_disease_deaths_

* _ccu013_02_denominator_n_disease_deaths_18plus_

* _ccu013_02_denominator_n_disease_deaths_100_

* _ccu013_02_denominator_n_disease_deaths_1000_


that contains count of diseases observed in each sample size

In [0]:
%sql 
select * from dsa_391419_j3w9t_collab.ccu013_02_dp_skinny_patient_23_01_2020_phe_cohort_joined_1ymort_eth limit 10;

In [0]:
%sql 
select * from dsa_391419_j3w9t_collab.ccu013_phedatav200_unrolled_first_sex_dep limit 10;

**Set temporal view of all NHS_eids in cohort:**

In [0]:
spark.sql(f"""CREATE OR REPLACE TEMP VIEW ccu013_02_all_ids AS 
           select distinct(NHS_NUMBER_DEID), c.age_23_01_2020, c.death_flag
           from dsa_391419_j3w9t_collab.ccu013_02_dp_skinny_patient_23_01_2020_phe_cohort_joined_1ymort_eth c
           where c.age_23_01_2020 >= 0 and c.age_23_01_2020 <= 100 
           and c.sex in (1,2)
           and c.IMD_quintile not LIKE ('Unknown')""")

In [0]:

%sql
select count(*) from ccu013_02_all_ids ;

In [0]:
spark.sql(f"""CREATE OR REPLACE TEMP VIEW ccu013_02_all_ids_18plus AS  
        select distinct(NHS_NUMBER_DEID), c.age_23_01_2020, c.death_flag
        from dsa_391419_j3w9t_collab.ccu013_02_dp_skinny_patient_23_01_2020_phe_cohort_joined_1ymort_eth c
        where c.age_23_01_2020 >= 18 and c.age_23_01_2020 <= 100 
        and c.sex in (1,2)
        and c.IMD_quintile not LIKE ('Unknown') """)

In [0]:
%sql 
select min(age_23_01_2020)  from ccu013_02_all_ids_18plus;

**Set temporal view of all relevant diseases (nodes)**

In [0]:
spark.sql(f"""
          CREATE OR REPLACE TEMP VIEW ccu013_02_disease_nodes AS
          select distinct(phecode) 
          from dsa_391419_j3w9t_collab.ccu013_lkp_phev200_phedict
        where f_disease_node = 1""")

In [0]:
%sql 
select count(*) from ccu013_02_disease_nodes;

**Cohort and data**:

 _replicates previous counts:_

In [0]:
%sql

select phecode, count(distinct(person_id_deid)) from dsa_391419_j3w9t_collab.ccu013_phedatav200_unrolled_first_sex_dep p
inner join dsa_391419_j3w9t_collab.ccu013_02_dp_skinny_patient_23_01_2020_phe_cohort_joined_1ymort_eth c
on p.person_id_deid = c.NHS_NUMBER_DEID
WHERE  c.age_23_01_2020 >= 0 and c.age_23_01_2020 <= 100
and c.sex in(1, 2)
and c.IMD_quintile not like ('Unknown')
group by phecode;


**Create table with IDs randomly sampled for diff denominators** 

In [0]:
def fun_sample_pop (sample_size = 500000):

  query = f'''
  select p.NHS_NUMBER_DEID as NHS_NUMBER_DEID, 
  p.age_23_01_2020 as age_23_01_2020,
  p.death_flag as death_flag,
  {sample_size} as denominator_size
  from ccu013_02_all_ids p
  inner join (select * from ccu013_02_all_ids order by rand() limit {sample_size}) as s 
  on p.NHS_NUMBER_DEID = s.NHS_NUMBER_DEID
  '''

  df_out = spark.sql(query)

  return df_out

In [0]:
def fun_sample_pop_18plus (sample_size = 500000):

    query = f'''
    select p.NHS_NUMBER_DEID as NHS_NUMBER_DEID, 
    p.age_23_01_2020 as age_23_01_2020,
    p.death_flag as death_flag,
    {sample_size} as denominator_size
    from ccu013_02_all_ids_18plus p
    inner join (select * from ccu013_all_ids_18plus order by rand() limit {sample_size}) as s 
    on p.NHS_NUMBER_DEID = s.NHS_NUMBER_DEID
    '''
    df_out = spark.sql(query)

    return df_out



In [0]:
df_out1 = fun_sample_pop (sample_size = 100)

display(df_out1)

In [0]:
df_out1_18plus  = fun_sample_pop_18plus(sample_size = 100)

display(df_out1_18plus)

In [0]:
denominator_samples = [500000, 1000000, 5000000, 10000000, 15000000, 20000000, 25000000, 30000000, 35000000, 40000000, 45000000, 50000000, 56949933]

In [0]:
from functools import reduce   

out_denos_df = reduce(lambda x,y: x.union(y), [fun_sample_pop(sample_size = s) for s in denominator_samples])


In [0]:
out_denos_df.write.mode("overwrite").option('overwriteschema','true').saveAsTable("dsa_391419_j3w9t_collab.ccu013_02_random_denominators")

In [0]:
%sql 
select count(*) from dsa_391419_j3w9t_collab.ccu013_02_random_denominators ;

In [0]:
%sql 
select count(distinct(NHS_NUMBER_DEID)) from dsa_391419_j3w9t_collab.ccu013_02_random_denominators;

In [0]:
%sql 
select distinct(denominator_size) from dsa_391419_j3w9t_collab.ccu013_02_random_denominators;

In [0]:
%sql 
select count(*) as n_rows, denominator_size
from dsa_391419_j3w9t_collab.ccu013_02_random_denominators 
group by denominator_size ; 

In [0]:
from functools import reduce   

out_df_denos_18plus  = reduce(lambda x,y: x.union(y), [fun_sample_pop_18plus(sample_size = s) for s in denominator_samples])

In [0]:
out_df_denos_18plus.write.mode("overwrite").option("overwriteschema", "true").saveAsTable("dsa_391419_j3w9t_collab.ccu013_02_random_denominators_18plus")

In [0]:
%sql 
select count(*) from dsa_391419_j3w9t_collab.ccu013_02_random_denominators_18plus;

In [0]:
%sql 
select distinct(denominator_size) from dsa_391419_j3w9t_collab.ccu013_02_random_denominators_18plus;

In [0]:
%sql 
select count(distinct(NHS_NUMBER_DEID)) as n_indiv, denominator_size from dsa_391419_j3w9t_collab.ccu013_02_random_denominators_18plus
group by denominator_size;

In [0]:
%sql 
select count(*) as n_rows, denominator_size 
from dsa_391419_j3w9t_collab.ccu013_02_random_denominators_18plus 
group by denominator_size;

**Long tail**

Function to count N diseaeses in each sample with at lease 10 people

Returns diseases and N people per sample 

It creates the table ccu013_02_denominator_n_disease_cases and ccu013_02_denominator_n_disease_cases_18plus

This has all diseases and N cases identified in every sample

In R, to export: 

-Count N disease per sample (to summarise sample)

-diseases and counts for long tail in samples  


Expanding with similarfunction that counts N diseases in each sample with at least 1000 cases

-Returns diseases and number fo people per sample 

-It creates the table ccu013_02_denominator_n_disease_cases1000

In [0]:
def fun_sample_pop_n_disease_cases (sample_size = 500000):

    query = f'''
    select distinct(p.phecode) as phecode, 
    count(distinct(p.person_id_deid)) as n_indiv, 
    {sample_size} as denominator_size
    from dsa_391419_j3w9t_collab.ccu013_phedatav200_unrolled_first_sex_dep  p 
    inner join ccu013_02_disease_nodes d
    on p.phecode = d.phecode
    inner join (select * from dsa_391419_j3w9t_collab.ccu013_02_random_denominators where denominator_size =  {sample_size}) as s 
    on p.person_id_deid = s.NHS_NUMBER_DEID
    group by p.phecode
    having count(distinct(p.person_id_deid)) > 9 
    '''

    df_out = spark.sql(query)

    return df_out


In [0]:
def fun_sample_pop_n_disease_cases_18plus (sample_size = 500000):

    query = f'''
    select distinct(p.phecode) as phecode, 
    count(distinct(p.person_id_deid)) as n_indiv,
    {sample_size} as denominator_size
    from dsa_391419_j3w9t_collab.ccu013_phedatav200_unrolled_first_sex_dep  p 
    inner join ccu013_02_disease_nodes d
    on p.phecode = d.phecode
    inner join (select * from dsa_391419_j3w9t_collab.ccu013_02_random_denominators_18plus where denominator_size = {sample_size}) as s 
    on p.person_id_deid = s.NHS_NUMBER_DEID
    group by p.phecode
    having count(distinct(p.person_id_deid)) > 9
    '''
    df_out = spark.sql(query)

    return df_out

In [0]:
def fun_sample_pop_n_disease_cases_1000 (sample_size = 500000):

    query = f'''
    select distinct(p.phecode) as phecode, 
    count(distinct(p.person_id_deid)) as n_indiv,   
    {sample_size} as denominator_size
    from dsa_391419_j3w9t_collab.ccu013_phedatav200_unrolled_first_sex_dep  p 
    inner join ccu013_02_disease_nodes d
    on p.phecode = d.phecode        
    inner join (select * from dsa_391419_j3w9t_collab.ccu013_02_random_denominators where denominator_size =  {sample_size}) as s 
    on p.person_id_deid = s.NHS_NUMBER_DEID
    group by p.phecode
    having count(distinct(p.person_id_deid)) > 999    
    '''

    df_out = spark.sql(query)

    return df_out

In [0]:
def fun_sample_pop_n_disease_cases_100 (sample_size = 500000):

    query = f'''
    select distinct(p.phecode) as phecode, 
    count(distinct(p.person_id_deid)) as n_indiv,   
    {sample_size} as denominator_size
    from dsa_391419_j3w9t_collab.ccu013_phedatav200_unrolled_first_sex_dep  p 
    inner join ccu013_02_disease_nodes d
    on p.phecode = d.phecode        
    inner join (select * from dsa_391419_j3w9t_collab.ccu013_02_random_denominators where denominator_size =  {sample_size}) as s 
    on p.person_id_deid = s.NHS_NUMBER_DEID
    group by p.phecode
    having count(distinct(p.person_id_deid)) > 99    
    '''
    df_out = spark.sql(query)

    return df_out


**run all**:

In [0]:
denominator_samples

In [0]:
from functools import reduce

out_df = reduce(lambda x,y: x.union(y), [fun_sample_pop_n_disease_cases(sample_size = s) for s in denominator_samples])

In [0]:
out_df.write.mode('overwrite').option("overwriteSchema", "true").saveAsTable('dsa_391419_j3w9t_collab.ccu013_02_denominator_n_disease_cases')


**Older than 18:**

In [0]:
from functools import reduce

out_denos_df_18plus  = reduce(lambda x,y: x.union(y), [fun_sample_pop_n_disease_cases_18plus(sample_size = s) for s in denominator_samples])

In [0]:
out_denos_df_18plus.write.mode('overwrite').option("overwriteSchema", "true").saveAsTable('dsa_391419_j3w9t_collab.ccu013_02_denominator_n_disease_cases_18plus')

**Diseases with n cases > 99**

In [0]:
from functools import reduce

out_df_denos_100 = reduce(lambda x,y: x.union(y), [fun_sample_pop_n_disease_cases_100(sample_size = s) for s in denominator_samples])
out_df_denos_100.write.mode('overwrite').option("overwriteSchema", "true").saveAsTable('dsa_391419_j3w9t_collab.ccu013_02_denominator_n_disease_cases_100')

In [0]:
%sql 
select count(distinct(phecode)) as n_phecode, denominator_size from dsa_391419_j3w9t_collab.ccu013_02_denominator_n_disease_cases_100 
group by denominator_size;

**Diseases with n cases > 999**

In [0]:
from functools import reduce

out_df_denos_1000 = reduce(lambda x,y: x.union(y), [fun_sample_pop_n_disease_cases_1000(sample_size = s) for s in denominator_samples])

In [0]:
out_df_denos_1000.write.mode('overwrite').option("overwriteSchema", "true").saveAsTable('dsa_391419_j3w9t_collab.ccu013_02_denominator_n_disease_cases_1000')

In [0]:
%sql 
select * from dsa_391419_j3w9t_collab.ccu013_02_denominator_n_disease_cases_1000 limit 10;

**output 1:** Number diseaess in at least 10 people for each sample size:  (create output in R)

In [0]:
%sql 
select count(distinct(phecode)) as n_phecodes, 
denominator_size 
from dsa_391419_j3w9t_collab.ccu013_02_denominator_n_disease_cases
group by denominator_size ;

In [0]:
%sql 
select count(distinct(phecode)) as n_phecodes, 
denominator_size 
from dsa_391419_j3w9t_collab.ccu013_02_denominator_n_disease_cases_18plus
group by denominator_size;

In [0]:
%sql 
select count(distinct(phecode)) as n_phecodes, 
denominator_size 
from dsa_391419_j3w9t_collab.ccu013_02_denominator_n_disease_cases_1000
group by denominator_size; 



In [0]:
%sql 
select min(age_23_01_2020) from dsa_391419_j3w9t_collab.ccu013_02_random_denominators_18plus;


**Deaths:**
- count N diseases with at least 10 deaths per sample in one year (23-01-2020 to 23-01-2021) in all ages at baseline 

Note using death_flag, consistent with SMR and abs_mort and has deaths in one year. 

The HR takes follow up up to 31-11-2021 - hence why n events (n deaths in main results show higher deaths in HR than here or SMR, abs mort)


adding function to count N diseases with N deaths > 999

In [0]:
sample_size = 500000

In [0]:

def fun_sample_pop_n_disease_deaths (sample_size = 500000):
  
  query = f'''
  select distinct(p.phecode) as phecode, 
  count(distinct(p.person_id_deid)) as n_deaths, 
  {sample_size} as denominator_size 
  from dsa_391419_j3w9t_collab.ccu013_phedatav200_unrolled_first_sex_dep p
  inner join ccu013_02_disease_nodes d
  on p.phecode = d.phecode 
  inner join (select * from dsa_391419_j3w9t_collab.ccu013_02_random_denominators where denominator_size = {sample_size}) as s
  on p.person_id_deid = s.NHS_NUMBER_DEID
  where s.death_flag = 1
  group by p.phecode
  having count(distinct(p.person_id_deid)) > 9
  '''

  out_n_dis_death = spark.sql(query)

  return out_n_dis_death


In [0]:
def fun_sample_pop_n_disease_deaths_1000 (sample_size = 500000):

    query = f'''
    select
    distinct(p.phecode) as phecode, 
    count(distinct(p.person_id_deid)) as n_deaths,  
    {sample_size} as denominator_size 
    from dsa_391419_j3w9t_collab.ccu013_phedatav200_unrolled_first_sex_dep p
    inner join ccu013_02_disease_nodes d
    on p.phecode = d.phecode 
    inner join (select * from dsa_391419_j3w9t_collab.ccu013_02_random_denominators where denominator_size = {sample_size}) as s
    on p.person_id_deid = s.NHS_NUMBER_DEID
    where s.death_flag = 1  
    group by p.phecode
    having count(distinct(p.person_id_deid)) > 999
    ''' 

    out_n_dis_death = spark.sql(query)

    return out_n_dis_death


In [0]:
def fun_sample_pop_n_disease_deaths_100 (sample_size = 500000):

    query = f'''
    select
    distinct(p.phecode) as phecode, 
    count(distinct(p.person_id_deid)) as n_deaths,
    {sample_size} as denominator_size 
    from dsa_391419_j3w9t_collab.ccu013_phedatav200_unrolled_first_sex_dep p
    inner join ccu013_02_disease_nodes d
    on p.phecode = d.phecode 
    inner join (select * from dsa_391419_j3w9t_collab.ccu013_02_random_denominators where denominator_size = {sample_size}) as s
    on p.person_id_deid = s.NHS_NUMBER_DEID
    where s.death_flag = 1
    group by p.phecode
    having count(distinct(p.person_id_deid)) > 99
    ''' 

    out_n_dis_death = spark.sql(query)

    return out_n_dis_death  

In [0]:
    
def fun_sample_pop_n_disease_deaths_18plus (sample_size = 500000):

    query = f'''
    select
    distinct(p.phecode) as phecode, 
    count(distinct(p.person_id_deid)) as n_deaths,
    {sample_size} as denominator_size 
    from dsa_391419_j3w9t_collab.ccu013_phedatav200_unrolled_first_sex_dep p
    inner join ccu013_02_disease_nodes d
    on p.phecode = d.phecode 
    inner join (select * from dsa_391419_j3w9t_collab.ccu013_02_random_denominators_18plus where denominator_size = {sample_size}) as s
    on p.person_id_deid = s.NHS_NUMBER_DEID
    where s.death_flag = 1
    group by p.phecode
    having count(distinct(p.person_id_deid)) > 9

    ''' 

    out_n_dis_death = spark.sql(query)

    return out_n_dis_death

In [0]:
from functools import reduce

out_df_death = reduce(lambda x,y: x.union(y), [fun_sample_pop_n_disease_deaths(sample_size = s) for s in denominator_samples])


In [0]:
out_df_death.write.mode('overwrite').saveAsTable('dsa_391419_j3w9t_collab.ccu013_02_denominator_n_disease_deaths')

In [0]:
from functools import reduce

out_df_death_18plus = reduce(lambda x,y: x.union(y), [fun_sample_pop_n_disease_deaths_18plus(sample_size = s) for s in denominator_samples])

In [0]:
out_df_death_18plus.write.mode('overwrite').saveAsTable('dsa_391419_j3w9t_collab.ccu013_02_denominator_n_disease_deaths_18plus')

In [0]:
denominator_samples

**Disease with n deaths > 99**:

In [0]:
from functools import reduce

out_df_death_100 = reduce(lambda x,y: x.union(y), [fun_sample_pop_n_disease_deaths_100(sample_size = s) for s in denominator_samples])
out_df_death_100.write.mode('overwrite').saveAsTable('dsa_391419_j3w9t_collab.ccu013_02_denominator_n_disease_deaths_100')

In [0]:
%sql 
select count(distinct(phecode)) as n_phecodes, denominator_size from dsa_391419_j3w9t_collab.ccu013_02_denominator_n_disease_deaths_100
group by denominator_size;
     

In [0]:
from functools import reduce

out_deaths_1000 = reduce(lambda x,y: x.union(y), [fun_sample_pop_n_disease_deaths_1000(sample_size = s) for s in denominator_samples])

In [0]:
out_deaths_1000.write.mode('overwrite').saveAsTable('dsa_391419_j3w9t_collab.ccu013_02_denominator_n_disease_deaths_1000')

In [0]:
%sql 
select count(distinct(phecode)) as n_phecodes, 
denominator_size 
from dsa_391419_j3w9t_collab.ccu013_02_denominator_n_disease_deaths_1000
group by denominator_size;

In R query this to export: 

In [0]:
%sql 
select count(distinct(phecode)), 
denominator_size 
from dsa_391419_j3w9t_collab.ccu013_02_denominator_n_disease_deaths
group by denominator_size;


In [0]:
%sql 
select count(distinct(phecode)), 
denominator_size 
from dsa_391419_j3w9t_collab.ccu013_02_denominator_n_disease_deaths_18plus  
group by denominator_size;

one off: 

In [0]:
%sql 
select p.person_id_deid, 
p.phecode,
c.death_flag
from dsa_391419_j3w9t_collab.ccu013_phedatav200_unrolled_first_sex_dep p
inner join dsa_391419_j3w9t_collab.ccu013_02_dp_skinny_patient_23_01_2020_phe_cohort_joined_1ymort_eth c 
on p.person_id_deid = c.NHS_NUMBER_DEID
where c.death_flag = 1
and c.SEX in (1, 2)
and c.age_23_01_2020 >= 0 and c.age_23_01_2020 <= 100
and c.IMD_quintile not like ('Unknown')
and phecode = 'X665_0'

In [0]:
%sql 
select p.person_id_deid, 
p.phecode,
c.death_flag
from dsa_391419_j3w9t_collab.ccu013_phedatav200_unrolled_first_sex_dep p
inner join dsa_391419_j3w9t_collab.ccu013_02_dp_skinny_patient_23_01_2020_phe_cohort_joined_1ymort_eth c
on p.person_id_deid = c.NHS_NUMBER_DEID
where c.death_flag = 1
and c.SEX in (1, 2)
and c.age_23_01_2020 >= 0 and c.age_23_01_2020 <= 100
and c.IMD_quintile not like ('Unknown')
and p.phecode = ''; 