In [1]:
import psycopg2
import pandas as pd
import sys 
sys.path.append('H:/uth_helpers') 
from db_utils import get_dsn, io_copy_from


In [2]:
connection = psycopg2.connect(get_dsn()+' keepalives=1 keepalives_idle=30 keepalives_interval=10')
connection.autocommit = True

In [None]:
with connection.cursor() as cursor:
    query = '''
create table dw_staging.pharmacy_claims 
(like data_warehouse.pharmacy_claims including defaults) 
with (
		appendonly=true, 
		orientation=column, 
		compresstype=zlib, 
		compresslevel=5 
	 )
distributed by (uth_member_id)
partition by list(data_source)
 (
  partition optz values ('optz'),
  partition optd values ('optd'),
  partition truc values ('truc'),
  partition trum values ('trum'),
  partition mdcd values ('mdcd'),
  partition mhtw values ('mhtw'),
  partition mcpp values ('mcpp'),
  partition mcrt values ('mcrt'),
  partition mcrn values ('mcrn'),
  partition iqva values ('iqva')
 )
;

insert into dw_staging.pharmacy_claims
select *
from data_warehouse.pharmacy_claims
;'''

    cursor.execute(query)
    print(cursor.rowcount)

2147483647


In [3]:
tables = [
    'member_enrollment_monthly',
    'member_enrollment_yearly',
    'member_enrollment_fiscal_yearly',
    'claim_header',
    'claim_detail',
    'claim_diag',
    'claim_icd_proc',
    'pharmacy_claims'
    ]

data_sources = [
    'iqva',
    'mcrt',
    'mcrn',
    'mdcd',
    'mhtw',
    'mcpp',
    'truc',
    'trum',
    'optd',
    'optz'
]

In [None]:
with connection.cursor() as cursor:
    for table in tables[2:]:
        query = f'alter table data_warehouse.{table} set schema staging_clean;'

        cursor.execute(query)
        if table != 'member_enrollment_fiscal_yearly':
            for data_source in data_sources[1:]:
                query = f'alter table data_warehouse.{table}_1_prt_{data_source} set schema staging_clean;'
                cursor.execute(query)
        else:
            for data_source in ['mdcd', 'mhtw', 'mcpp']:
                query = f'alter table data_warehouse.{table}_1_prt_{data_source} set schema staging_clean;'
                cursor.execute(query)

        print(table)

        query = f'alter table dw_staging.{table} set schema data_warehouse;'
        cursor.execute(query)

        for data_source in data_sources:
            print(data_source)
            query = f'alter table dw_staging.{table}_1_prt_{data_source} set schema data_warehouse;'
            cursor.execute(query)

        query = f'alter table data_warehouse.{table} owner to uthealth_dev;'
        cursor.execute(query)

        query = f'grant select on data_warehouse.{table} to uthealth_analyst;'
        cursor.execute(query)

        query = f'grant select on data_warehouse.{table} to apcd_uthealth_analyst;'
        cursor.execute(query)

member_enrollment_fiscal_yearly
iqva
mcrt
mcrn
mdcd
mhtw
mcpp
truc
trum
optd
optz
claim_header
iqva
mcrt
mcrn
mdcd
mhtw
mcpp
truc
trum
optd
optz
claim_detail
iqva
mcrt
mcrn
mdcd
mhtw
mcpp
truc
trum
optd
optz
claim_diag
iqva
mcrt
mcrn
mdcd
mhtw
mcpp
truc
trum
optd
optz
claim_icd_proc
iqva
mcrt
mcrn
mdcd
mhtw
mcpp
truc
trum
optd
optz
pharmacy_claims
iqva
mcrt
mcrn
mdcd
mhtw
mcpp
truc
trum
optd
optz


In [None]:
with connection.cursor() as cursor:
    for table in tables:
        print(table)
        cursor.execute(f'vacuum analyze data_warehouse.{table};')
        cursor.execute(f'vacuum analyze staging_clean.{table};')

member_enrollment_monthly
member_enrollment_yearly
member_enrollment_fiscal_yearly
claim_header
claim_detail
claim_diag
claim_icd_proc
pharmacy_claims


In [None]:
df = pd.DataFrame()
for table in tables:
    query = f'''
    select a.*, b.row_count, a.row_count - b.row_count
    from (
        select '{table}' table_name, count(*) row_count
        from data_warehouse.{table}
    ) a
    join (
        select '{table}' table_name, count(*) row_count
        from staging_clean.{table}
    ) b
    on a.table_name = b.table_name;
    '''

    df = pd.concat([df, pd.read_sql(query, con=connection)])

df



Unnamed: 0,table_name,row_count,row_count.1,?column?
0,member_enrollment_monthly,11143757210,11143757210,0
0,member_enrollment_yearly,1112294353,1112294353,0
0,member_enrollment_fiscal_yearly,68608011,68608011,0
0,claim_header,13281887224,13281887224,0
0,claim_detail,32794177147,32794177147,0
0,claim_diag,33602060193,33602060193,0
0,claim_icd_proc,349337742,349337742,0
0,pharmacy_claims,11264331646,11264331646,0
