### Importing libraries and data

In [1]:
import pandas as pd
import numpy as np
import psycopg2
import snowflake.connector as sfc

In [2]:
query_request = pd.read_csv('DQ_format.csv')
pg_cols = query_request['GDW'].values.tolist()
sf_cols = query_request['Snowflake'].values.tolist()
data_list = []

for i in range(len(pg_cols)):    
    attrib_cols = query_request.iloc[i,2:].notna()
    attribs = attrib_cols[attrib_cols == True].index.tolist()
    for a in attribs:
        pg_col = a + '(' + pg_cols[i] + ')'
        sf_col = a + '(' + sf_cols[i] + ')'
        prefix_col = a + '_'
        col_name = sf_cols[i]
        data = [pg_col, sf_col, prefix_col, col_name]
        data_list += [data]
    
query_cols = pd.DataFrame(data_list, columns = ['Snowflake', 'GDW', 'Out_Prefix', 'col_name'])

### Postgres setup

In [3]:
pg_schema = 'com_zx_courier'
pg_table = 'stg_order'
pg_date_col = 'created_date'
pg_start_date = '2019-06-01'
pg_end_date = '2020-06-18'
pg_group_period = 'month'

pg_cols = query_cols['GDW'].values.tolist()

### Snowflake setup

In [4]:
sf_schema = 'BR_ZE'
sf_table = 'STR_ORDER'
sf_date_col = 'created_date'
sf_start_date = '2019-06-01'
sf_end_date = '2020-06-18'
sf_group_period = 'month'

sf_cols = query_cols['Snowflake'].values.tolist()

### Output Table setup

In [5]:
out_cols_prefix = query_cols['Out_Prefix'].values.tolist()
col_names = query_cols['col_name'].values.tolist()

### Postgres connection and query

In [6]:
def gdw_query(query):
    conn = psycopg2.connect(
                    user='postgres',
                    password='',
                    host='localhost',
                    port='5432',
                    database='gdwprod')
    try:
        df = pd.read_sql_query(query, conn)
        return df
    finally:
        conn.close()

In [7]:
query = '''select '''
for i in range(len(pg_cols)):
    sub_query = '''{query_col}'''.format(query_col = pg_cols[i]) + \
                ''' {out_prefix}{col_name}'''.format(col_name = col_names[i], out_prefix=out_cols_prefix[i]) + ''', '''
    query += sub_query

query += '''count(*) num_entries'''
query += ''' from {schema}.{table} '''.format(schema=pg_schema, table=pg_table)
metrics_query_pg = query + '''where date({date_col}) > '{start_date}' and date({date_col}) < '{end_date}';'''\
                                                                                        .format(date_col = pg_date_col, 
                                                                                          start_date = pg_start_date, 
                                                                                          end_date = pg_end_date)

In [8]:
query = '''select date_trunc('{period}', {date_col}) group_col, '''.format(period = pg_group_period,
                                                                     date_col = pg_date_col)
for i in range(len(pg_cols)):
    sub_query = '''{query_col}'''.format(query_col = pg_cols[i]) + \
                ''' {out_prefix}{col_name}'''.format(col_name = col_names[i], out_prefix=out_cols_prefix[i]) + ''', '''
    query += sub_query

query += '''count(*) num_entries'''
query += ''' from {schema}.{table} '''.format(schema=pg_schema, table=pg_table)
group_query_pg = query + '''group by date_trunc('{period}', {date_col});'''.format(period = pg_group_period,
                                                                             date_col = pg_date_col)

### Snowflake connection and query

In [9]:
def snow_query(query):
    conn = sfc.connect(
                    user='sbhardwaj',
                    password='',
                    account='zxventures.us-east-1',
                    warehouse='WH_INTERACTIVE',
                    database='OMNICHANNEL',)
    try:
        df = pd.read_sql_query(query, conn)
        return df
    finally:
        conn.close()

In [10]:
query = '''select '''
for i in range(len(sf_cols)):
    sub_query = '''{query_col}'''.format(query_col = sf_cols[i]) + \
                ''' {out_prefix}{col_name}'''.format(col_name = col_names[i], out_prefix=out_cols_prefix[i]) + ''', '''
    query += sub_query

query += '''count(*) num_entries'''
query += ''' from {schema}.{table} '''.format(schema=sf_schema, table=sf_table)
metrics_query_sf = query + '''where date({date_col}) > '{start_date}' and date({date_col}) < '{end_date}';'''\
                                                                                        .format(date_col = sf_date_col, 
                                                                                          start_date = sf_start_date, 
                                                                                          end_date = sf_end_date)

In [11]:
query = '''select date_trunc('{period}', {date_col}) group_col, '''.format(period = sf_group_period,
                                                                     date_col = sf_date_col)
for i in range(len(sf_cols)):
    sub_query = '''{query_col}'''.format(query_col = sf_cols[i]) + \
                ''' {out_prefix}{col_name}'''.format(col_name = col_names[i], out_prefix=out_cols_prefix[i]) + ''', '''
    query += sub_query

query += '''count(*) num_entries'''
query += ''' from {schema}.{table} '''.format(schema=sf_schema, table=sf_table)
group_query_sf = query + '''group by date_trunc('{period}', {date_col});'''.format(period = sf_group_period,
                                                                             date_col = sf_date_col)

## Various overall metrics

In [12]:
pg1 = gdw_query(metrics_query_pg)
sf1 = snow_query(metrics_query_sf)
sf1.columns = sf1.columns.map(lambda x: x.lower())

In [17]:
str_order_metrics = pd.DataFrame(columns=['Postgres', 'Snowflake', 'is_identical'])
str_order_metrics['Postgres'] = pg1.T[0].values
str_order_metrics['Snowflake'] = sf1.T[0].values
str_order_metrics.index = sf1.columns.values
str_order_metrics['is_identical'] = str_order_metrics['Postgres'] == str_order_metrics['Snowflake']

str_order_metrics.to_csv('metrics_{table_name}.csv'.format(table_name = sf_table))

## Group-wise metrics

In [18]:
pg3 = gdw_query(group_query_pg)
pg3.index = pd.to_datetime(pg3.group_col)

In [19]:
sf3 = snow_query(group_query_sf)
sf3.columns = sf3.columns.map(lambda x: x.lower())
sf3.index = pd.to_datetime(sf3['group_col'])

In [20]:
str_order_monthwise= pd.DataFrame(columns=['Postgres', 'Snowflake', 'is_identical'], index=pg3.index)
str_order_monthwise.index.name = 'Month'
str_order_monthwise['Postgres'] = pg3['num_entries'].values
str_order_monthwise['Snowflake'] = sf3['num_entries'].values
str_order_monthwise['has_identical_entries'] = str_order_monthwise.Postgres == str_order_monthwise.Snowflake

In [21]:
str_order_monthwise.to_csv('grouped_metrics_{table_name}.csv'.format(table_name=sf_table))