# Load

This notebook is used to demonstrate ingestion of data, refer to cell #2 for configuration values

In [1]:
# Initialization block
from IPython.display import display, HTML, Image , Markdown
from snowflake.snowpark.session import Session
import snowflake.snowpark.types as T
import snowflake.snowpark.functions as F
import os ,configparser ,json ,logging

# Import the commonly defined utility scripts using
# dynamic path include
import sys
sys.path.append('../python/lutils')
import sflk_base as L

display(Markdown("### Initialization"))
logging.basicConfig(stream=sys.stdout, level=logging.ERROR)

# Source various helper functions
%run ./scripts/notebook_helpers.py

# Define the project home directory, this is used for locating the config.ini file
PROJECT_HOME_DIR = '../../'
config = L.get_config(PROJECT_HOME_DIR)
sp_session = L.connect_to_snowflake(PROJECT_HOME_DIR)

if(sp_session == None):
    raise Exception(f'Unable to connect to snowflake. Validate connection information ')

sp_session.use_role(f'''{config['APP_DB']['role']}''')
sp_session.use_schema(f'''{config['APP_DB']['database']}.{config['APP_DB']['schema']}''')
sp_session.use_warehouse(f'''{config['SNOW_CONN']['warehouse']}''')

df = sp_session.sql('select current_user() ,current_role() ,current_database() ,current_schema();').to_pandas()
display(df)

### Initialization

Unnamed: 0,CURRENT_USER(),CURRENT_ROLE(),CURRENT_DATABASE(),CURRENT_SCHEMA()
0,SOLNDEMOUSR,PUBLIC,INDSOL_CMSGOV_PRICING,PUBLIC


In [40]:
# Parameter initialization

import pandas as pd
import os
from datetime import datetime
from datetime import timedelta
import time
import re

pd.set_option('display.max_colwidth', None)

def get_file_from_download_url(p_fl_url):
    splits = p_fl_url.split('/')
    return splits[len(splits) - 1]

def get_basename_of_datafile(p_datafile:str) -> str:
    base = os.path.basename(p_datafile)
    fl_base = os.path.splitext(base)
    return fl_base[0]

def get_cleansed_file_basename(p_datafile):
    fl_basename = get_basename_of_datafile(p_datafile)
    # Replace all non alphanumeric characters with _
    fl_name = re.sub('[^0-9a-zA-Z]+', '_', fl_basename)
    return fl_name

DATA_FILE_URL = 'https://priorityhealthtransparencymrfs.s3.amazonaws.com/2023_03_01_priority_health_HMO_in-network-rates.zip'
DATA_FILE = get_file_from_download_url(DATA_FILE_URL)
DATA_FILE_BASENAME = get_basename_of_datafile(DATA_FILE)
DATA_FILE_BASENAME_CLEANSED = get_cleansed_file_basename(DATA_FILE)

# INPUT_DATA_STAGE = config['APP_DB']['ext_stage']
INPUT_DATA_STAGE = 'data_stg'
DATA_STAGE_FOLDER = config['APP_DB']['folder_data']

TARGET_DATA_STAGE = config['APP_DB']['ext_stage']
TARGET_FOLDER = config['APP_DB']['folder_parsed']

# This will need to be updated based on provider
# Priority Health ~ 500
# CIGNA ~ 15000
SEGMENTS_PER_TASK = 500

warehouses = config['SNOW_CONN']['warehouse']
create_warehouses = True
warehouses_count = 10
#warehouses = 'INDSOL_PRICE_TRANS_TASK_0_WH,INDSOL_PRICE_TRANS_TASK_1_WH,INDSOL_PRICE_TRANS_TASK_2_WH,INDSOL_PRICE_TRANS_TASK_3_WH,INDSOL_PRICE_TRANS_TASK_4_WH,INDSOL_PRICE_TRANS_TASK_5_WH,INDSOL_PRICE_TRANS_TASK_6_WH,INDSOL_PRICE_TRANS_TASK_7_WH,INDSOL_PRICE_TRANS_TASK_8_WH,INDSOL_PRICE_TRANS_TASK_9_WH,INDSOL_PRICE_TRANS_TASK_10_WH'
#warehouses = 'INDSOL_PRICE_TRANS_TASK_0_WH,INDSOL_PRICE_TRANS_TASK_1_WH,INDSOL_PRICE_TRANS_TASK_2_WH,INDSOL_PRICE_TRANS_TASK_3_WH,INDSOL_PRICE_TRANS_TASK_4_WH,INDSOL_PRICE_TRANS_TASK_5_WH'
warehouses_lst = [ f'INDSOL_PRICE_TRANS_TASK_{i}_WH' for i in range(warehouses_count) ]
warehouses = ','.join(warehouses_lst)

# XSMALL | SMALL | MEDIUM | LARGE | XLARGE | XXLARGE | XXXLARGE | X4LARGE | X5LARGE | X6LARGE
warehouse_size = 'MEDIUM'


In [41]:
# Download file and upload to stage

from urllib import request

# Create local download folder
DOWNLOAD_FOLDER=f'{PROJECT_HOME_DIR}/temp'
os.makedirs(DOWNLOAD_FOLDER ,exist_ok=True)
download_file_path = os.path.join(DOWNLOAD_FOLDER, DATA_FILE)

if not os.path.exists(os.path.dirname(download_file_path)):
    print(f'Downloading file to local: {download_file_path}')
    request.urlretrieve(DATA_FILE_URL, download_file_path)

In [42]:
# Upload data file to stage, if not present

sql_stmt = f'''select relative_path
from directory(@{INPUT_DATA_STAGE})
where relative_path like '%{DATA_FILE}%'
;'''
# print(sql_stmt)

sp_session.sql(f'alter stage {INPUT_DATA_STAGE} refresh;').collect()
df = sp_session.sql(sql_stmt).to_pandas()
if (len(df) < 1):
    print(f'Uploading to stage {INPUT_DATA_STAGE} ...')
    sp_session.file.put(
        local_file_name = download_file_path
        ,stage_location = f'{INPUT_DATA_STAGE}/{DATA_STAGE_FOLDER}'
        ,auto_compress=False ,overwrite=True)
    
    sp_session.sql('alter stage {INPUT_DATA_STAGE} refresh;').collect()
    df = sp_session.sql(sql_stmt).to_pandas()

display(df)

Unnamed: 0,RELATIVE_PATH
0,data/2023_03_01_priority_health_HMO_in-network-rates.zip


In [43]:
# Create warehouses for parallelism

if create_warehouses == True:
    whs = warehouses.split(',')
    print(f'Creating {len(whs)} warehouses ..')

    sp_session.sql('use role sysadmin;').collect()
    for wh_name in whs:
        print(f'    - {wh_name}')
        sql_stmt = f'''
            create warehouse if not exits {wh_name} with
                WAREHOUSE_SIZE = 'XSMALL'
                AUTO_RESUME = TRUE
                AUTO_SUSPEND = 300
                COMMENT = 'warehouse created as part of pricing transperancy industry solution usecase.'
            ;
        '''
        sp_session.sql(sql_stmt).collect()
        rl = config['APP_DB']['role']
        sp_session.sql(f'grant ALL PRIVILEGES on warehouse {wh_name} to role {rl};').collect()
       
    sp_session.use_role(f'''{config['APP_DB']['role']}''')

Creating 10 warehouses ..
    - INDSOL_PRICE_TRANS_TASK_0_WH
    - INDSOL_PRICE_TRANS_TASK_1_WH
    - INDSOL_PRICE_TRANS_TASK_2_WH
    - INDSOL_PRICE_TRANS_TASK_3_WH
    - INDSOL_PRICE_TRANS_TASK_4_WH
    - INDSOL_PRICE_TRANS_TASK_5_WH
    - INDSOL_PRICE_TRANS_TASK_6_WH
    - INDSOL_PRICE_TRANS_TASK_7_WH
    - INDSOL_PRICE_TRANS_TASK_8_WH
    - INDSOL_PRICE_TRANS_TASK_9_WH


In [44]:
# Cleanup block

# We will cleanup specific resources and artifacts from possible previous runs.
stmts = [
    f''' delete from segment_task_execution_status where data_file = '{DATA_FILE}'; '''
    ,f''' delete from task_to_segmentids where data_file = '{DATA_FILE}'; '''
    ,f''' delete from in_network_rates_file_header where data_file = '{DATA_FILE}'; '''
    ,f''' delete from in_network_rates_segment_header where data_file = '{DATA_FILE}'; '''
    ,f''' alter stage {INPUT_DATA_STAGE} refresh; '''
]    
    
print(' truncating tables ...')
for stmt in stmts:
    sp_session.sql(stmt).collect()

print(f''' cleaning up files in external stage under path {TARGET_FOLDER}/{DATA_FILE_BASENAME}/ ...''')

stmt = f''' select relative_path from directory(@{TARGET_DATA_STAGE}) where relative_path like '%{DATA_STAGE_FOLDER}/{DATA_FILE_BASENAME}/%'; '''
files = sp_session.sql(stmt).collect()
for r in files:
    stmt = f''' remove @{TARGET_DATA_STAGE}/{r['RELATIVE_PATH']}; '''
    sp_session.sql(stmt).collect()

 truncating tables ...
 cleaning up files in external stage under path raw_parsed/2023_03_01_priority_health_HMO_in-network-rates/ ...


In [46]:
# Cautious enablement, used during development for testing
print(f'Cleaning dags for datafile: {DATA_FILE_BASENAME_CLEANSED}')
sp_session.call('delete_dag_for_datafile',DATA_FILE_BASENAME_CLEANSED ,False);

Cleaning dags for datafile: 2023_03_01_priority_health_HMO_in_network_rates


In [47]:
# reset the warehouse size to desired

print(f'''No of warehouses: {len(warehouses.split(','))}''')
for wh in warehouses.split(','):
    sp_session.sql(f''' alter warehouse {wh} set max_concurrency_level = 8; ''').collect()
    sp_session.sql(f''' alter warehouse {wh} set warehouse_size = {warehouse_size}; ''').collect()


No of warehouses: 10


---
## Data loading
We will be loading the segments and file header using DAG. 

In [48]:
# we build out the DAG
df = sp_session.call('in_network_rates_dagbuilder_matrix' ,f'{INPUT_DATA_STAGE}/{DATA_STAGE_FOLDER}' ,DATA_FILE 
    ,f"@{TARGET_DATA_STAGE}/{TARGET_FOLDER}" ,SEGMENTS_PER_TASK ,warehouses ,5 ,5)

sp_session.sql(f''' alter stage {TARGET_DATA_STAGE} refresh; ''').collect()
print(' Status of execution')
print(df)

 Status of execution
{
  "data_file": "2023_03_01_priority_health_HMO_in-network-rates.zip",
  "root_task": "DAG_ROOT_2023_03_01_priority_health_HMO_in_network_rates",
  "status": true,
  "task_matrix_shape": [
    5,
    5
  ],
  "term_task": "TERM_tsk_2023_03_01_priority_health_HMO_in_network_rates"
}


The above operation results in defining the DAG in Snowflake like here. The task names are specific to the data file being parsed.
![](../../doc/soln_images/task_dags.png)

In [49]:
# Next we invoke the DAG

start_time = time.time()
print(f'Started at: {datetime.now().strftime("%H:%M:%S")}')

sql_stmts = [
    f''' execute task DAG_ROOT_{DATA_FILE_BASENAME_CLEANSED}; '''
]
for stmt in sql_stmts:
    print(stmt)
    sp_session.sql(stmt).collect()

end_time = time.time()
print(f'Ended at: {datetime.now().strftime("%H:%M:%S")}')

elapsed_time = end_time - start_time
elapsed = str(timedelta(seconds=elapsed_time))
print(f'Elapsed: {elapsed}')

Started at: 23:03:23
 execute task DAG_ROOT_2023_03_01_priority_health_HMO_in_network_rates; 
Ended at: 23:03:24
Elapsed: 0:00:00.916954


---
## Inspection

In [50]:

print('Tasks to segments')
file_ingestion_df = sp_session.table('TASK_TO_SEGMENTIDS').filter(F.col('DATA_FILE') == F.lit(DATA_FILE)).to_pandas()
display(file_ingestion_df)

Tasks to segments


Unnamed: 0,BUCKET,DATA_FILE,ASSIGNED_TASK_NAME,FROM_IDX,TO_IDX,SEGMENTS_RECORD_COUNT,INSERTED_AT
0,0,2023_03_01_priority_health_HMO_in-network-rates.zip,T_2023_03_01_priority_health_HMO_in_network_rates_0_500,0,500,500,2023-03-20 20:03:02.363
1,1,2023_03_01_priority_health_HMO_in-network-rates.zip,T_2023_03_01_priority_health_HMO_in_network_rates_501_1000,501,1000,499,2023-03-20 20:03:02.363
2,2,2023_03_01_priority_health_HMO_in-network-rates.zip,T_2023_03_01_priority_health_HMO_in_network_rates_1001_1500,1001,1500,499,2023-03-20 20:03:02.363
3,3,2023_03_01_priority_health_HMO_in-network-rates.zip,T_2023_03_01_priority_health_HMO_in_network_rates_1501_2000,1501,2000,499,2023-03-20 20:03:02.363
4,4,2023_03_01_priority_health_HMO_in-network-rates.zip,T_2023_03_01_priority_health_HMO_in_network_rates_2001_2500,2001,2500,499,2023-03-20 20:03:02.363
...,...,...,...,...,...,...,...
70,70,2023_03_01_priority_health_HMO_in-network-rates.zip,T_2023_03_01_priority_health_HMO_in_network_rates_35001_35500,35001,35500,499,2023-03-20 20:03:02.363
71,71,2023_03_01_priority_health_HMO_in-network-rates.zip,T_2023_03_01_priority_health_HMO_in_network_rates_35501_36000,35501,36000,499,2023-03-20 20:03:02.363
72,72,2023_03_01_priority_health_HMO_in-network-rates.zip,T_2023_03_01_priority_health_HMO_in_network_rates_36001_36500,36001,36500,499,2023-03-20 20:03:02.363
73,73,2023_03_01_priority_health_HMO_in-network-rates.zip,T_2023_03_01_priority_health_HMO_in_network_rates_36501_37000,36501,37000,499,2023-03-20 20:03:02.363


In [53]:

print('Tasks ,warehouses and state')
sp_session.sql(f''' SHOW TASKS IN  DATABASE {config['APP_DB']['database']}; ''').collect()
stmt = f'''
    select "name" as task_name
        ,"warehouse" as warehouse
        ,"state" as state
    from table(result_scan(last_query_id()))
    where "name" like '%{DATA_FILE_BASENAME_CLEANSED.upper()}%'
       -- and state != 'suspended'
    order by state;
'''
df = sp_session.sql(stmt).to_pandas()
display(df)

Tasks ,warehouses and state


SnowparkSessionException: (1404): Cannot perform this operation because the session has been closed.

--- 
### Closeout

    With that we are finished this section of the demo setup

In [52]:
sp_session.close()
print('Finished!!!')

Finished!!!
