## HCP Filtering
- Get data from a Cohort created from the Cohort Builder UI
- Filter Medical Events table to return only rows with a Modifier
- Upload final dataset to MapView
- Schedule refresh
- Export dataset to S3

#### Import Libraries & Connect to Snowflake

In [1]:
# create the Komodo client
from komodo.client import Client
from komodo.definitions.models.cohorts.cohort_create import CohortCreate
from dotenv import load_dotenv, set_key
import os
from dotenv import load_dotenv
import pandas as pd
from komodo.snowflake import get_snowflake_connection
from snowflake.connector.pandas_tools import pd_writer
from datetime import datetime
from komodo.dataset import upload_dataset_to_maplab
from time import sleep
 

now = datetime.now()
load_dotenv()
client = Client()


### Connect to Snowflake
print("--- Connecting to Snowflake ---")

account_id = os.getenv("KOMODO_ACCOUNT_ID")

conn = get_snowflake_connection(account_id)
curs = conn.cursor()
curs.execute("USE ROLE CUSTOMER_ROLE")
print("--- Success connecting to Snowflake ---")

  warn_incompatible_dep(


--- Connecting to Snowflake ---


DEBUG:komodo_connector.connection_creators.snowflake.connect:REST API object was created: f82bd78b-1a87-4b6c-a121-dd977d140a9d.snowflakecomputing.com:443


--- Success connecting to Snowflake ---


### Set Cohort Definition & Get Data

In [2]:
cohort_definition_id = "fltr_def_BMEBEGSBOFPMQLJT"  # replace this cohort definition ID

In [3]:
# create the JSON payload
cohort_payload = {
    "name": "Phompholyx Providers Cohort",
    "definition": {
        "cohort_definition": {
            "filters": [{
                "name": "filter_dfn", 
                "filter_definition": {"filter_definition_id": cohort_definition_id},
                "time_filter": {  # optional
                    "ranges": [
                        ["2024-01-01", "2024-01-31"]
                    ]
                }
            }],
            "entities": ["patient"],
            "source_filter": {
                "version": "release", 
                "include_rejected_claims": False
            },
        },
        # The below script will create tables in the MapLab UI as well
        "output_format": {
            "count_entities": False,
            "entities_to_count": [],
            "outputs": [
                {
                    "output_format": "snowflake-table",
                    "output_type": "plaid-medical-events",
                },
                {
                    "output_format": "snowflake-table",
                    "output_type": "plaid-pharmacy-events",
                },
                {
                    "output_format": "snowflake-table",
                    "output_type": "plaid-patient-demographics",
                },
                {
                    "output_format": "snowflake-table",
                    "output_type": "plaid-patient-geography",
                },
                {
                    "output_format": "snowflake-table",
                    "output_type": "plaid-plans",
                },
                {
                    "output_format": "snowflake-table",
                    "output_type": "plaid-providers",
                },
            ],
        },
    },
    "definition_schema_version": "1.0.0",
}

# create the CohortCreate instance with the JSON payload
cohort_create = CohortCreate.from_dict(cohort_payload)

In [4]:
### All of the available output formats
            # "outputs": [
            #     {
            #         "output_format": "snowflake-table",
            #         "output_type": "plaid-medical-events",
            #     },
            #     {
            #         "output_format": "snowflake-table",
            #         "output_type": "plaid-pharmacy-events",
            #     },
            #     {
            #         "output_format": "snowflake-table",
            #         "output_type": "plaid-patient-demographics",
            #     },
            #     {
            #         "output_format": "snowflake-table",
            #         "output_type": "plaid-patient-enrollment",
            #     },
            #     {
            #         "output_format": "snowflake-table",
            #         "output_type": "plaid-patient-geography",
            #     },
            #     {
            #         "output_format": "snowflake-table",
            #         "output_type": "plaid-plans",
            #     },
            #     {
            #         "output_format": "snowflake-table",
            #         "output_type": "plaid-providers",
            #     },
            #     {
            #         "output_format": "snowflake-table",
            #         "output_type": "plaid-patient-closed",
            #     },
            #     {
            #         "output_format": "snowflake-table",
            #         "output_type": "plaid-patient-insurance",
            #     },
            #     {
            #         "output_format": "snowflake-table",
            #         "output_type": "plaid-mortality",
            #     },
            #     {
            #         "output_format": "snowflake-table",
            #         "output_type": "plaid-patient-race-ethnicity",
            #     },
            # ],

In [5]:
# call the create_cohort operation
cohort_create_response = client.definitions.create_cohort(cohort_create)

# save the ID of the cohort
cohort_id = cohort_create_response.id

# print the cohort ID
cohort_id

# store the cohort ID as an environment variable that can be used across cookbook files
set_key('.env', 'cohort_id', cohort_id)

(True, 'cohort_id', 'cht_LMWPXYVMUZAAJRLG')

In [6]:
# cohort_id = "cht_XBWBXEDTIATCKZOK"  # replace this cohort ID if inputting manually

In [7]:
cohort_response = client.definitions.get_cohort(cohort_id)
 
while cohort_response.cohort_run.finished_at is None and cohort_response.cohort_run.error_message is None:
    cohort_response = client.definitions.get_cohort(cohort_id)
    print(f"Cohort status is {cohort_response.cohort_run.status.value}")
    if cohort_response.cohort_run.status == "FINISHED":
        break
    sleep(5)

Cohort status is QUEUED
Cohort status is RUNNING
Cohort status is RUNNING
Cohort status is RUNNING
Cohort status is RUNNING
Cohort status is RUNNING
Cohort status is RUNNING
Cohort status is RUNNING
Cohort status is RUNNING
Cohort status is RUNNING
Cohort status is RUNNING
Cohort status is RUNNING
Cohort status is FINISHED


In [8]:
def execute_snowflake_query(query, conn):
    """Execute a query against Snowflake and return results as a DataFrame"""
    try:
        return pd.read_sql(query, conn)
    except Exception as e:
        print(f"Query failed: {e}")
        return None

# Dictionary to store all datasets
datasets = {}

for index, item in enumerate(cohort_response.cohort_run.output.outputs):
    # Get dataset information
    cohort_output_dataset = item.dataset_id
    dataset = client.data_catalog.get_dataset(dataset_id=cohort_output_dataset)
    dataset_table = dataset.manifestations[0].fully_qualified_name
    
    # Extract the table type
    table_parts = dataset_table.split('_')
    table_type = '_'.join(table_parts[-2:])  # Get the last two parts
    
    # Create a query to fetch the data
    query = f"SELECT * FROM {dataset_table}"
    
    # Execute the query and store the result in a dataframe
    try:
        df = execute_snowflake_query(query, conn)
        
        if df is not None:
            datasets[table_type] = df
            print(f"Successfully created dataset for {table_type}")
        else:
            print(f"No data returned for {table_type}")
    except Exception as e:
        print(f"Error creating dataset for {table_type}: {e}")

# Print available datasets
print("\nAvailable datasets:")
for key in datasets.keys():
    print(f"- {key}: {len(datasets[key])} rows")

  return pd.read_sql(query, conn)


Successfully created dataset for MEDICAL_EVENTS
Successfully created dataset for PHARMACY_EVENTS
Successfully created dataset for PATIENT_DEMOGRAPHICS
Successfully created dataset for PATIENT_GEOGRAPHY
Successfully created dataset for PLAID_PLANS
Successfully created dataset for PLAID_PROVIDERS

Available datasets:
- MEDICAL_EVENTS: 13346 rows
- PHARMACY_EVENTS: 2376 rows
- PATIENT_DEMOGRAPHICS: 7 rows
- PATIENT_GEOGRAPHY: 8 rows
- PLAID_PLANS: 69 rows
- PLAID_PROVIDERS: 10 rows


### Preview Data

In [9]:
# Dictionary to store all dataset references and preview DataFrames
dataset_refs = {}
dataset_previews = {}

# First, collect all the dataset references
for index, item in enumerate(cohort_response.cohort_run.output.outputs):
    cohort_output_dataset = item.dataset_id
    dataset = client.data_catalog.get_dataset(dataset_id=cohort_output_dataset)
    dataset_table = dataset.manifestations[0].fully_qualified_name
    print(f"Dataset table: {dataset_table}")
    
    # Extract the table type
    table_parts = dataset_table.split('_')
    table_type = '_'.join(table_parts[-2:])
    
    # Store the reference
    dataset_refs[table_type] = dataset_table

# Now create preview DataFrames with limited rows
for table_type, table_name in dataset_refs.items():
    # Create a query that limits the number of rows
    preview_query = f"SELECT * FROM {table_name} LIMIT 100"  # Adjust the limit as needed
    
    try:
        # Execute the query to get a preview
        # Replace conn with your actual snowflake connection
        preview_df = pd.read_sql(preview_query, conn)
        
        # Store the preview DataFrame
        dataset_previews[table_type] = preview_df
        print(f"Successfully created preview for {table_type} ({len(preview_df)} rows)")
    except Exception as e:
        print(f"Error creating preview for {table_type}: {e}")

# Function to display pandas head previews
def display_pandas_previews(previews, rows=5):
    """
    Display pandas head() previews for each dataset
    
    Parameters:
    - previews: Dictionary mapping dataset names to pandas DataFrames
    - rows: Number of rows to show in preview
    """
    for name, df in previews.items():
        print(f"\n{'='*80}\n{name} PREVIEW:\n{'='*80}")
        print(f"\nShape: {df.shape[0]} rows × {df.shape[1]} columns")
        print(f"Columns: {', '.join(df.columns)}")
        print(f"\nFirst {rows} rows:")
        print(df.head(rows))  

# Display the previews
display_pandas_previews(dataset_previews)

Dataset table: COHORTS.PROD.COHORT_RUN_CHT_LMWPXYVMUZAAJRLG_0_PLAID_MEDICAL_EVENTS
Dataset table: COHORTS.PROD.COHORT_RUN_CHT_LMWPXYVMUZAAJRLG_1_PLAID_PHARMACY_EVENTS
Dataset table: COHORTS.PROD.COHORT_RUN_CHT_LMWPXYVMUZAAJRLG_2_PLAID_PATIENT_DEMOGRAPHICS
Dataset table: COHORTS.PROD.COHORT_RUN_CHT_LMWPXYVMUZAAJRLG_3_PLAID_PATIENT_GEOGRAPHY
Dataset table: COHORTS.PROD.COHORT_RUN_CHT_LMWPXYVMUZAAJRLG_4_PLAID_PLANS
Dataset table: COHORTS.PROD.COHORT_RUN_CHT_LMWPXYVMUZAAJRLG_5_PLAID_PROVIDERS


  preview_df = pd.read_sql(preview_query, conn)


Successfully created preview for MEDICAL_EVENTS (100 rows)
Successfully created preview for PHARMACY_EVENTS (100 rows)
Successfully created preview for PATIENT_DEMOGRAPHICS (7 rows)
Successfully created preview for PATIENT_GEOGRAPHY (8 rows)
Successfully created preview for PLAID_PLANS (69 rows)
Successfully created preview for PLAID_PROVIDERS (10 rows)

MEDICAL_EVENTS PREVIEW:

Shape: 100 rows × 25 columns
Columns: MEDICAL_EVENT_ID, PATIENT_ID, SERVICE_DATE, SERVICE_TO_DATE, PROCEDURE_CODE, PROCEDURE_CODE_TYPE, MODIFIERS, NDC11, UNITS, UNIT_TYPE, RENDERING_NPI, REFERRING_NPI, BILLING_NPI, PLACE_OF_SERVICE, REVENUE_CODE, BILL_TYPE_CODE, VISIT_TYPE, KH_PLAN_ID, DIAGNOSIS_CODES, EVENT_SOURCE, BILLING_NPI_CONFIDENCE, ROOT_SOURCE_ALIASES, ROOT_SOURCE_ALIASES_BITSET, ROOT_SOURCE_ALIASES_BITSET_GROUP, ROOT_SOURCE_ALIASES_GROUP

First 5 rows:
                                    MEDICAL_EVENT_ID PATIENT_ID SERVICE_DATE  \
0  454644bf30d90f225535230d3d0c2e58e07169045f4ccc...   ZG2KCMRF   2016-1

### Analysis

In [10]:
sql_query = f"""
SELECT * FROM COHORTS.PROD.COHORT_RUN_CHT_JCNGYWQIYLCDPJXT_0_PLAID_MEDICAL_EVENTS
    WHERE UPPER(MODIFIERS) IS NOT NULL
;
"""

# Execute the query and fetch results
final_dataset = pd.read_sql(sql_query, conn)

# Print the results
final_dataset.head()

  final_dataset = pd.read_sql(sql_query, conn)


Unnamed: 0,MEDICAL_EVENT_ID,PATIENT_ID,SERVICE_DATE,SERVICE_TO_DATE,PROCEDURE_CODE,PROCEDURE_CODE_TYPE,MODIFIERS,NDC11,UNITS,UNIT_TYPE,...,BILL_TYPE_CODE,VISIT_TYPE,KH_PLAN_ID,DIAGNOSIS_CODES,EVENT_SOURCE,BILLING_NPI_CONFIDENCE,ROOT_SOURCE_ALIASES,ROOT_SOURCE_ALIASES_BITSET,ROOT_SOURCE_ALIASES_BITSET_GROUP,ROOT_SOURCE_ALIASES_GROUP
0,1088e2baf76812a7b01d5cb74d06feb19ae4a4e71686ad...,1YS3BVSJ,2024-09-19,2024-09-19,82784,CPT,|91|,,2.0,,...,131.0,OUTPATIENT,5869.0,|C9000|D6481|D849|Z79899|G893|D801|Z5112|,INSTITUTIONAL,A - KNOWN,"[\n ""fleming""\n]",4194304,4194304,"[\n ""fleming""\n]"
1,fae7851cb73643204412bc8d3331885a9555d7fa3a838a...,1YS3BVSJ,2023-07-18,2023-07-18,J1442,HCPCS,|TB|JZ|,,300.0,1 MCG,...,131.0,OUTPATIENT,5869.0,|C9000|D759|D849|D801|Z9484|,INSTITUTIONAL,A - KNOWN,"[\n ""fleming-crick""\n]",128,128,"[\n ""fleming-crick""\n]"
2,bb2aa22fbf17eee882e774e8523604b51bf2eed18aec5b...,1YS3BVSJ,2023-06-26,2023-06-26,J2562,HCPCS,|JW|,,8.0,1 MG,...,131.0,OUTPATIENT,5869.0,|Z52011|C9000|Z79899|,INSTITUTIONAL,A - KNOWN,"[\n ""fleming-crick""\n]",128,160,"[\n ""rowley"",\n ""fleming-crick""\n]"
3,63f744e9d6ef3734ecb81b41dcca99ec25e898b8e6e067...,1YS3BVSJ,2023-07-10,2023-07-10,96367,CPT,|59|,,1.0,,...,131.0,OUTPATIENT,5869.0,|C9000|D61810|T451X5A|D849|D801|Z9484|Z5111|,INSTITUTIONAL,A - KNOWN,"[\n ""fleming-crick""\n]",128,128,"[\n ""fleming-crick""\n]"
4,81f201f90da970e744c91b8283bf6e0e8e7ba7ce59b827...,1YS3BVSJ,2023-06-05,2023-06-05,84165,CPT,|90|,,1.0,,...,,OTHER,5869.0,|C9000|,INSTITUTIONAL,B - HIGH,"[\n ""wallabega""\n]",32768,32768,"[\n ""wallabega""\n]"


### Save Final Dataset(s) to MapLab

In [11]:
# set the name of the dataset to be uploaded to the Komodo platform
# add the current date and time to the end of the dataset name to make it more distinct
final_dataset_datetime = now.strftime("%Y%m%d_%H%M%S")
final_dataset_dataset_name = "ABECMA_FINAL_DATASET_FROM_COHORT_RUN_" + final_dataset_datetime

# call the upload_dataset_to_maplab function
dataset_upload_dataset = upload_dataset_to_maplab(final_dataset, final_dataset_dataset_name)

# save the ID of the dataset
dataset_id = dataset_upload_dataset.id

# print the dataset ID
dataset_id

# store the dataset ID as an environment variable that can be used in subsequent cookbook files
from dotenv import load_dotenv, set_key

set_key(".env", "dataset_id", dataset_id)

DEBUG:komodo_connector.connection_creators.snowflake.connect:REST API object was created: f82bd78b-1a87-4b6c-a121-dd977d140a9d.snowflakecomputing.com:443


(True, 'dataset_id', 'de60b64d-aa42-4ca9-951a-f738652236aa')

In [12]:
print(dataset_id)

de60b64d-aa42-4ca9-951a-f738652236aa


### Export Dataset

In [13]:
# use this cell if you want to use the dataset ID that is stored in an environment variable

import os
from dotenv import load_dotenv
load_dotenv()

try:
    dataset_id = os.environ["dataset_id"]  # retrieve the dataset_id from the "3-retrieve-cohort-data.ipynb" cookbook file
except KeyError:
    print("Please set the variable `dataset_id` with value of your dataset id.")  # throw an error if no dataset ID can be retrieved

In [14]:
# # uncomment and use this cell if you do not have a dataset ID stored in an environment variable 
# # OR if you do not want to use the dataset ID from the the "3-retrieve-cohort-data.ipynb" cookbook file

# dataset_id = "*"  # replace this dataset ID

In [15]:
import pprint

# create the Komodo client
from komodo.client import Client
client = Client()

# retrieve list of Share requests
share_ids = client.connections.list_shares()
pprint.pprint(share_ids.shares)

[ShareMetadata(account_id='f82bd78b-1a87-4b6c-a121-dd977d140a9d', connection_string='arn:aws:iam::851851261022:role/PDI-s3-export', created_by='3009a19d-57af-47c1-a141-eec85b519c9f', created_time='2025-03-19 15:35:27.612861+00:00', customer_region='us-west-2', customer_s3_export_path='s3://kh-studio-test-pdi-s3-export/sandbox-maplab-enterprise', database_name=None, listing_name=None, output_file_format=<OutputFileFormatEnum.CSV: 'CSV'>, region='us-west-2', share_id='9bb1938d-3718-4e3c-9345-be89c73337de', share_name='sandboxmaplabenterprise_s3_external', share_type=<DataFormatEnum.S3_EXTERNAL: 'S3_EXTERNAL'>, updated_by='3009a19d-57af-47c1-a141-eec85b519c9f', updated_time='2025-03-19 15:35:27.612861+00:00', user_id='3009a19d-57af-47c1-a141-eec85b519c9f')]


In [21]:
share_id = "9bb1938d-3718-4e3c-9345-be89c73337de"  # replace this share ID

In [22]:
# retrieve the required IAM role policies
s3_details = client.connections.get_shares_aws_details()

In [23]:
# print and add the following IAM policy to your IAM role

s3_iam_policy = s3_details.iam_policy_json
print(s3_iam_policy)

# remembner to remove the last Resource line if you are using S3 managed keys

{"Version": "2012-10-17", "Statement": [{"Effect": "Allow", "Action": ["s3:GetObject", "s3:GetObjectTagging", "s3:GetObjectVersion", "s3:GetObjectVersionTagging", "s3:ListBucket"], "Resource": ["arn:aws:s3:::pdi-export-bb376550-8b58-4a65-b03c-bb0e8e564a02-prod-uw2", "arn:aws:s3:::pdi-export-bb376550-8b58-4a65-b03c-bb0e8e564a02-prod-uw2/sds_dataset_shares/*"]}, {"Effect": "Allow", "Action": ["s3:DeleteObject", "s3:GetObject", "s3:GetObjectAcl", "s3:ListBucket", "s3:PutObject", "s3:PutObjectAcl"], "Resource": ["arn:aws:s3:::{customer_s3_bucket}/{customer_s3_path}/*", "arn:aws:s3:::{customer_s3_bucket}"]}, {"Effect": "Allow", "Action": ["kms:Decrypt", "kms:GenerateDataKey"], "Resource": ["arn:aws:kms:us-west-2:904233131998:key/mrk-54689dae89204a768488fe09f9968257", "arn:aws:kms:{customer_region}:{customer_aws_account_id}:key/{customer_bucket_key_id}"]}]}


In [24]:
# print and add the following Trust policy to your IAM role

s3_iam_trust_relationship = s3_details.iam_trust_relationship_json
print(s3_iam_trust_relationship)

{"Version": "2012-10-17", "Statement": [{"Effect": "Allow", "Principal": {"AWS": ["arn:aws:iam::904233131998:role/customer_account_role"]}, "Action": "sts:AssumeRole", "Condition": {}}]}


In [25]:
from komodo.data_deliveries.models.dataset_shares.create_dataset_share_request import CreateDatasetShareRequest

dataset_export_request = CreateDatasetShareRequest(dataset_ids=[dataset_id])
dataset_export_response = client.data_deliveries.create_dataset_share(share_id=share_id, create_dataset_share_request=dataset_export_request)
dataset_export_id = dataset_export_response.dataset_shares[0].dataset_share_id

dataset_export_id

ServiceException: (502)
Reason: Bad Gateway
HTTP response headers: HTTPHeaderDict({'Date': 'Fri, 18 Apr 2025 13:58:14 GMT', 'Content-Type': 'application/json', 'Content-Length': '112', 'Connection': 'keep-alive', 'x-request-id': 'f0600befe6164283917ce4f6fd301c76', 'Access-Control-Allow-Origin': '*'})
HTTP response body: {"message":"Error submitting the scan for dataset: de60b64d-aa42-4ca9-951a-f738652236aa: Internal Server Error"}


In [None]:
dataset_export_details = client.data_deliveries.get_dataset_share(share_id=share_id, dataset_share_id=dataset_export_id)
print(dataset_export_details.status)