# Campaign Intelligence Starter Setup Script

## Project Setup using Notebook

### *Pre-requisites:*

1. You have *conda/miniconda* installed. If not, please go to the following [docs.conda.io](https://docs.conda.io/projects/miniconda/en/latest/) or you can also use venv to create a virtual environment for this application.
2. Role with access to install NativeApps from marketplace like DATA_HARMONIZATION_DEV
3. Any environment to run notebooks like VS code, Jupyterlab, etc.

### Update config files
The proyect contains some config file that should be updated with your own environment variables 
1. `app_config.json`

2. `connection_config.json`



## Setup local environment (only the first time running the notebook)

### Using conda

In [None]:
#%%capture
#### We have disabled the results of this block using %%capture but feel free to uncomment to debug

#! conda create --name data_harmonization python=3.8 -y
#! conda install -c anaconda ipykernel -y
#! python -m ipykernel install --user --name=data_harmonization


### Using venv

In [None]:
#! python3 -m venv data_harmonization 
#! source data_harmonization/bin/activate 

### Change the default Kernel to  <span style="color:yellow">data_harmonization</span></h3>

### Install package dependencies to kernel

In [None]:
# pip install -r requirements.txt

In [None]:
from scripts.app_config import get_app_config

app_config = get_app_config(is_local_installation=True)

In [None]:
import subprocess
import os
def executeStatement(statement):
    resultConn = subprocess.run(['snow', 'sql', '-q', statement])
    return resultConn
    

# Clear resources (optional)

In [None]:
import subprocess
import os
subprocess.run(['snow', 'app', 'teardown'])
print(executeStatement(f"DROP DATABASE IF EXISTS FIVETRAN_CONNECTOR_DEMO"))
print(executeStatement(f"DROP DATABASE IF EXISTS OMNATA_CONNECTOR_DEMO"))
print(executeStatement(f"DROP DATABASE IF EXISTS DATA_QUALITY_NOTEBOOKS"))
print(executeStatement(f"DROP DATABASE IF EXISTS LLM_DEMO"))
print(executeStatement(f"DROP WAREHOUSE IF EXISTS {app_config['dynamic_table_warehouse']}"))



# Setup Snowflake 

### Deploy the Native App

In [None]:
!snow app run

### Upload Sample Data

#### Upload sample notebooks

In [None]:
create_app_package = f"CREATE DATABASE IF NOT EXISTS {app_config['data_quality_database']};"

create_app_schema = f"CREATE SCHEMA IF NOT EXISTS {app_config['data_quality_database']}.{app_config['schema']};"

create_app_stage = f"CREATE STAGE IF NOT EXISTS {app_config['data_quality_database']}.{app_config['schema']}.{app_config['nativeapp_stage']} \
    DIRECTORY = (ENABLE = TRUE) \
    COMMENT = 'Used for holding source code of native app.';"

create_wh = f"CREATE OR REPLACE WAREHOUSE {app_config['dynamic_table_warehouse']} WITH WAREHOUSE_SIZE= MEDIUM;"

print(executeStatement(create_app_package))
print(executeStatement(create_app_schema))
print(executeStatement(create_app_stage))
print(executeStatement(create_wh))



In [None]:
from scripts.upload_files import upload_files_stage


database = app_config['data_quality_database']
schema = app_config['schema']
stage = app_config['nativeapp_stage']
native_app_dir = './notebooks'


import os
import subprocess


# Directories to ignore
dirs_ignore = ['/streamlit/frontend', 'pycache', 'tests']
is_ignore = lambda path: len(list(filter(lambda ignore: ignore in path, dirs_ignore))) > 0


def upload_files_stage(database: str, schema: str, stage: str, app_dir: str) -> None:
    for path, currentDirectory, files in os.walk(app_dir):
        for file in files:
            dir = app_dir.replace("./","")
            if not file.startswith('.') and not is_ignore(path):
                # build the relative paths to the file
                local_file = os.path.join(path, file)
                replace_path = os.path.join('.',dir)

                # build the path to where the file will be staged
                stage_dir = path.replace(replace_path,'')
                print(f'{local_file} => @{stage}{stage_dir}')
                stage_location = f'@{database}.{schema}.{stage}/{stage_dir}'
                print(local_file)
                print(stage_location)
                subprocess.run(['snow', 'stage', 'copy', local_file, stage_location])
                executeStatement(f'alter stage {database}.{schema}.{stage} refresh; ')

upload_files_stage(database,schema,stage, native_app_dir)

print('Success')

In [None]:
appDeployed = subprocess.run (['snow', 'app', 'version', 'list'], stdout=subprocess.PIPE)
result = appDeployed.stdout.decode("utf-8")
print(result)
appPackageName = result[result.index("Marketing_Data_Foundation"): result.index("\n")]
appName = appPackageName.replace("_pkg_","_")
print(appName)

create_sample_db_facebook = "CREATE DATABASE IF NOT EXISTS FIVETRAN_CONNECTOR_DEMO"
create_sample_schema_facebook = "CREATE SCHEMA IF NOT EXISTS FIVETRAN_CONNECTOR_DEMO.FACEBOOK_RAW"
create_sample_schema_linkedin ="CREATE SCHEMA IF NOT EXISTS OMNATA_CONNECTOR_DEMO.LINKEDIN_RAW"
create_samle_db_linkedin = "CREATE DATABASE IF NOT EXISTS OMNATA_CONNECTOR_DEMO"
create_llm_demo_db = "CREATE DATABASE IF NOT EXISTS LLM_DEMO"
create_llm_schema = "CREATE SCHEMA IF NOT EXISTS LLM_DEMO.DEMO"
grant_db_facebook = f"GRANT USAGE ON DATABASE FIVETRAN_CONNECTOR_DEMO TO APPLICATION {appName}"
grant_db_linkedin = f"GRANT USAGE ON DATABASE OMNATA_CONNECTOR_DEMO TO APPLICATION {appName}"
grant_schema_facebook = f"GRANT USAGE ON SCHEMA FIVETRAN_CONNECTOR_DEMO.FACEBOOK_RAW TO APPLICATION {appName}"
grant_schema_linkedin = f"GRANT USAGE ON SCHEMA OMNATA_CONNECTOR_DEMO.LINKEDIN_RAW TO APPLICATION {appName}"
print(executeStatement(create_sample_db_facebook))
print(executeStatement(create_samle_db_linkedin))
print(executeStatement(create_sample_schema_facebook))
print(executeStatement(create_sample_schema_linkedin))
print(executeStatement(grant_db_facebook))
print(executeStatement(grant_db_linkedin))
print(executeStatement(grant_schema_facebook))
print(executeStatement(grant_schema_linkedin))
print(executeStatement(create_llm_demo_db))
print(executeStatement(create_llm_schema))

executeStatement(f"""CREATE OR REPLACE NOTEBOOK  LLM_DEMO.DEMO.DATA_QUALITY_DEMO_1
    FROM '@data_quality_notebooks.APP.CODE_STG'
    MAIN_FILE = 'data_quality_demo_1.ipynb'
    QUERY_WAREHOUSE = '{app_config['warehouse']}';""")

executeStatement("ALTER NOTEBOOK LLM_DEMO.DEMO.DATA_QUALITY_DEMO_1 ADD LIVE VERSION FROM LAST")


executeStatement(f"""CREATE OR REPLACE NOTEBOOK LLM_DEMO.DEMO.DATA_QUALITY_DEMO_2
    FROM '@data_quality_notebooks.APP.CODE_STG'
    MAIN_FILE = 'data_quality_demo_2.ipynb'
    QUERY_WAREHOUSE = '{app_config['warehouse']}';""")

executeStatement("ALTER NOTEBOOK LLM_DEMO.DEMO.DATA_QUALITY_DEMO_2 ADD LIVE VERSION FROM LAST")

executeStatement(f"""CREATE OR REPLACE NOTEBOOK LLM_DEMO.DEMO.DATA_QUALITY_DEMO_3
    FROM '@data_quality_notebooks.APP.CODE_STG'
    MAIN_FILE = 'data_quality_demo_3.ipynb'
    QUERY_WAREHOUSE = '{app_config['warehouse']}';""")

executeStatement("ALTER NOTEBOOK LLM_DEMO.DEMO.DATA_QUALITY_DEMO_3 ADD LIVE VERSION FROM LAST")


### Load Demo Tables

In [None]:
import pandas as pd
from snowflake.snowpark.functions import parse_json

import os
data_dir = "./data"
print("Loading Tables")


database = app_config['data_quality_database']
schema = app_config['schema']
stage = app_config['nativeapp_stage']
stage_location = f'@{database}.{schema}.{stage}/sample_data'

csv_file_format = f"""
CREATE OR REPLACE FILE FORMAT DATA_QUALITY_NOTEBOOKS.APP.csv_format
  TYPE = csv
  FIELD_OPTIONALLY_ENCLOSED_BY = '"'
  PARSE_HEADER = true;
"""
executeStatement(''.join(csv_file_format.splitlines()))

for path, currentDirectory, files in os.walk(data_dir):
    for file in files:
        if file in [".DS_Store"]:
            continue
        else:
            database = appName.upper()
            file_path = os.path.join(path, file)
            print("Loading File:" + file_path)            
            table_name = file.split(".")[0].upper()
            if table_name == "DIM_PLATFORM":
                schema = "CAMPAIGN_INTELLIGENCE_COMBINED"
            else:
                schema = file_path.replace(data_dir+"/","").replace("/"+file,"").upper()
                schema = schema.lstrip("/")
            if len(schema.split("/")) > 1:
                database, schema = [i.upper() for i in schema.split("/")]
            fileName = os.path.basename(file_path)
            subprocess.run(['snow', 'stage', 'copy', file_path, stage_location])
            createTable = f"""
                CREATE TABLE IF NOT EXISTS {database}.{schema}.{table_name}
                USING TEMPLATE (
                    SELECT ARRAY_AGG(OBJECT_CONSTRUCT(*))
                    FROM TABLE(
                        INFER_SCHEMA(
                        LOCATION=>'{stage_location}/{fileName}',
                        FILE_FORMAT => 'DATA_QUALITY_NOTEBOOKS.APP.csv_format'
                        )
                    ));
            """
            executeStatement(''.join(createTable.splitlines()))
            copyTo = f"""COPY INTO {database}.{schema}.{table_name} FROM {stage_location}/{fileName} FILE_FORMAT = (FORMAT_NAME= 'DATA_QUALITY_NOTEBOOKS.APP.csv_format')  MATCH_BY_COLUMN_NAME="CASE_INSENSITIVE"; """
            executeStatement(copyTo)
            changeTracking = f"ALTER TABLE {database}.{schema}.{table_name} SET CHANGE_TRACKING = TRUE;"
            executeStatement(changeTracking)
  

grant_select_db_linkedin = f"GRANT SELECT ON ALL TABLES IN SCHEMA OMNATA_CONNECTOR_DEMO.LINKEDIN_RAW TO APPLICATION {appName}"
grant_select_db_facebook = f"GRANT SELECT ON ALL TABLES IN SCHEMA FIVETRAN_CONNECTOR_DEMO.FACEBOOK_RAW TO APPLICATION {appName}"
print(executeStatement(grant_select_db_facebook))
print(executeStatement(grant_select_db_linkedin))
print('Success')


### Create Dynamic Table Stored Procedure

In [None]:
application_name = appName

## Create Dynamic Table Procedure
create_procedure_commands = f"""
CREATE OR REPLACE PROCEDURE {application_name}.USER_SETTINGS.CREATE_DYNAMIC_TABLE(dynamic_table_name VARCHAR, query VARCHAR)
RETURNS VARCHAR
LANGUAGE PYTHON
RUNTIME_VERSION = 3.8
HANDLER = 'creator'
PACKAGES = ('snowflake-snowpark-python')
AS
$$
def creator(session, dynamic_table_name, query):
    session.sql(query).collect()
$$;
"""
grant_procedure_usage = f"GRANT USAGE ON PROCEDURE {application_name}.USER_SETTINGS.CREATE_DYNAMIC_TABLE(VARCHAR, VARCHAR) TO APPLICATION {application_name};"
executeStatement(create_procedure_commands)
executeStatement(grant_procedure_usage)


# Create grants procedure
create_procedure_commands = f"""
CREATE OR REPLACE PROCEDURE {application_name}.USER_SETTINGS.GRANTER(application_name VARCHAR, schema VARCHAR, tables VARIANT)
RETURNS VARCHAR
LANGUAGE PYTHON
RUNTIME_VERSION = 3.8
HANDLER = 'granter'
PACKAGES = ('snowflake-snowpark-python')
AS
$$

def granter(session, application_name, schema, tables):
    for table_name in tables:
        session.sql(f'GRANT SELECT ON TABLE {{application_name}}.{{schema}}.{{table_name}} TO APPLICATION {{application_name}}').collect()
$$;
"""
grant_procedure_usage = f"GRANT USAGE ON PROCEDURE {application_name}.USER_SETTINGS.GRANTER(VARCHAR, VARCHAR, VARIANT) TO APPLICATION {application_name};"
executeStatement(create_procedure_commands)
executeStatement(grant_procedure_usage)



## Create Show Tables Procedure
get_tables_query = f"""
CREATE OR REPLACE PROCEDURE {application_name}.USER_SETTINGS.SHOW_TABLES(database_name VARCHAR, schema_name VARCHAR)
RETURNS VARIANT
LANGUAGE PYTHON
RUNTIME_VERSION = 3.8
HANDLER = 'get_non_change_tracking_tables'
PACKAGES = ('snowflake-snowpark-python')
AS
$$
from snowflake.snowpark import dataframe as df
def get_non_change_tracking_tables(session, database_name, schema_name):
    tables = session.sql(f"SHOW TABLES IN SCHEMA {{database_name}}.{{schema_name}}").select('"name"').where(df.col('"change_tracking"') == "OFF").collect()
    table_names = [row[0] if not any(c.islower() for c in row[0]) else f'"{{row[0]}}"' for row in tables]
    return table_names
$$;
"""
grant_procedure_usage = f"GRANT USAGE ON PROCEDURE {application_name}.USER_SETTINGS.SHOW_TABLES(VARCHAR, VARCHAR) TO APPLICATION {application_name};"
executeStatement(get_tables_query)
executeStatement(grant_procedure_usage)

### Upload assistant semantic model configuration

In [None]:
from scripts.upload_files import upload_files_stage
from scripts.update_file_variables import file_replace

database = appName
schema = 'LLM'
semantic_models_stage = 'SEMANTIC_MODEL'
llm_config_stage = 'CONFIGURATION'

replace_map = {
    "<target_database>": appName
}
file_replace('./assistant/semantic_models/UnifiedMarketingModel_CAMPAIGN_PERF.yaml', replace_map)

subprocess.run(['snow', 'stage', 'copy', './assistant/config/assistant_config.yaml', f"@{database}.{schema}.{llm_config_stage}"])
subprocess.run(['snow', 'stage', 'copy', './assistant/semantic_models/UnifiedMarketingModel_CAMPAIGN_PERF.yaml', f"@{database}.{schema}.{semantic_models_stage}"])

print('Success')