# Marketing Data Foundation Setup Script

## Project Setup using Notebook

### *Pre-requisites:*

1. Role with access to create Compute Pools, create Warehouses and Databases
2. Any environment to run notebooks like VS code, Jupyterlab, etc.
3. Have Docker Desktop installed
4. Install and configure the Snow CLI to deploy the application in your account.


### Config files
The project contains some config file that should be updated with your own environment variables
1. [app.config.json](app.config.json)

## Setup local environment

In [None]:
! python3.9 -m venv .venv 
! source .venv/bin/activate 
! pip install --upgrade pip && pip install -r requirements.txt

### Load App Configurations

In [None]:
#Update Account Locator
import os
import json
from scripts.update_file_variables import file_replace
from scripts.executeStatement import executeStatement

app_settings = 'app.config.json'
result = executeStatement(f"SELECT CONCAT(CURRENT_ORGANIZATION_NAME(),'-',CURRENT_ACCOUNT_NAME()) as ACCOUNT", "--format json")
account_name_json = json.loads(result)
account_name = account_name_json[0].get('ACCOUNT')
replace_map = {
    "<account_registry>": account_name.replace("_", "-").replace(".", "-"),
}
file_replace(app_settings, replace_map)

In [None]:
from scripts.app_config import get_app_config

app_config_file = 'app.config.json'
app_config = get_app_config(app_config_file)

print(f'Success')

## Create Role


In [None]:
from scripts.auth import get_conn_obj
from scripts.executeStatement import executeStatement
import os

user = os.environ['USER']
role = f"{app_config['database']}_ROLE"
print(executeStatement(f"CREATE ROLE IF NOT EXISTS {role};"))
print(executeStatement(f"GRANT CREATE COMPUTE POOL ON ACCOUNT TO ROLE {role};"))
print(executeStatement(f"GRANT BIND SERVICE ENDPOINT ON ACCOUNT TO ROLE {role};"))
print(executeStatement(f"GRANT CREATE WAREHOUSE ON ACCOUNT TO ROLE {role};"))
print(executeStatement(f"GRANT IMPORTED PRIVILEGES ON DATABASE SNOWFLAKE TO ROLE {role};"))
print('Success')

### Replace application configuration files keys

In [None]:
import os

from scripts.update_file_variables import file_replace

app_config_f = 'app/src/manifest.yml'
snowflake_f = 'app/snowflake.yml'
fullstack_config_f = 'app/src/fullstack.yaml'
makefile_f = 'Makefile'

replace_map = {
    "<image_repository_path>": os.environ["IMAGE_REPOSITORY"],
    "<image_repo_short_path>": os.environ["IMAGE_REPO_SHORT"],
    "<role>": role
}
file_replace(snowflake_f, replace_map)
file_replace(app_config_f, replace_map)
file_replace(fullstack_config_f, replace_map)
file_replace(makefile_f, replace_map)
print(f'Success')

# Clear resources (optional)

In [None]:
! cd app && snow app teardown

In [None]:
from scripts.executeStatement import executeStatement
compute_pool_name = app_config['compute_pool']
print(executeStatement(f"DROP DATABASE IF EXISTS FIVETRAN_CONNECTOR_DEMO"))
print(executeStatement(f"DROP DATABASE IF EXISTS OMNATA_CONNECTOR_DEMO"))
print(executeStatement(f"DROP DATABASE IF EXISTS DATA_QUALITY_NOTEBOOKS"))
print(executeStatement(f"DROP DATABASE IF EXISTS LLM_DEMO"))
print(executeStatement(f"DROP WAREHOUSE IF EXISTS {app_config['dynamic_table_warehouse']}"))
print(executeStatement(f"ALTER COMPUTE POOL IF EXISTS {compute_pool_name} STOP ALL;"))
print(executeStatement(f"DROP SERVICE IF EXISTS {app_config['container_service']}"))
print(executeStatement(f"DROP COMPUTE POOL IF EXISTS {compute_pool_name}"))
print(executeStatement(f"DROP IMAGE REPOSITORY IF EXISTS {app_config['image_stage']};"))
print(executeStatement(f"DROP DATABASE IF EXISTS {app_config['database']};"))
print(executeStatement(f"DROP WAREHOUSE IF EXISTS {app_config['warehouse']};"))
print(executeStatement(f"DROP DATABASE IF EXISTS {app_config['sample_db']};"))

## Setup Snowflake

### Create database, schema and stages

In [None]:
create_app_package = f"CREATE DATABASE IF NOT EXISTS {app_config['database']};"
create_dedicated_warehouse = f"CREATE WAREHOUSE IF NOT EXISTS {app_config['warehouse']} WITH WAREHOUSE_SIZE = 'XSMALL' AUTO_SUSPEND = 60 AUTO_RESUME = TRUE INITIALLY_SUSPENDED = TRUE;"
grant_database = f"GRANT ALL PRIVILEGES ON DATABASE {app_config['database']} TO ROLE {role}"

create_app_schema = f"CREATE SCHEMA IF NOT EXISTS {app_config['database']}.{app_config['schema']};"

grant_schema = f"GRANT ALL PRIVILEGES ON SCHEMA {app_config['database']}.{app_config['schema']} TO ROLE {role};"

grant_tables = f"GRANT SELECT ON ALL TABLES IN SCHEMA {app_config['database']}.{app_config['schema']} TO ROLE {role};"

create_image_repo = f"CREATE IMAGE REPOSITORY IF NOT EXISTS {app_config['database']}.{app_config['schema']}.{app_config['image_stage']};"

print(executeStatement(create_app_package))
print(executeStatement(create_app_schema))
print(executeStatement(create_image_repo))
print(executeStatement(grant_database))
print(executeStatement(grant_schema))
print(executeStatement(grant_tables))
print(executeStatement(create_dedicated_warehouse))

### Build and Upload Docker Images

In [None]:
! make all

### Deploy Native Application

In [None]:
! cd app && snow app run

### Create Compute Pool

In [None]:
import os
import subprocess
appName = f"MARKETING_DATA_FOUNDATION_STARTER_V3_{os.environ['USER'].upper()}"
print(appName)

### Load Predefined Data Models

### Upload sample notebooks

In [None]:
create_app_package = f"CREATE DATABASE IF NOT EXISTS {app_config['data_quality_database']};"

create_app_schema = f"CREATE SCHEMA IF NOT EXISTS {app_config['data_quality_database']}.{app_config['schema']};"

create_app_stage = f"CREATE STAGE IF NOT EXISTS {app_config['data_quality_database']}.{app_config['schema']}.{app_config['code_stage']} \
    DIRECTORY = (ENABLE = TRUE) \
    COMMENT = 'Used for holding data quality demo notebooks';"

create_wh = f"CREATE OR REPLACE WAREHOUSE {app_config['dynamic_table_warehouse']} WITH WAREHOUSE_SIZE= MEDIUM;"

print(executeStatement(create_app_package))
print(executeStatement(create_app_schema))
print(executeStatement(create_app_stage))
print(executeStatement(create_wh))

In [None]:
from scripts.upload_files import upload_files_stage

database = app_config['data_quality_database']
schema = app_config['schema']
stage = app_config['code_stage']
native_app_dir = './notebooks'

import os
import subprocess

# Directories to ignore
dirs_ignore = ['/streamlit/frontend', 'pycache', 'tests']
is_ignore = lambda path: len(list(filter(lambda ignore: ignore in path, dirs_ignore))) > 0

def upload_files_stage(database: str, schema: str, stage: str, app_dir: str) -> None:
    for path, currentDirectory, files in os.walk(app_dir):
        for file in files:
            dir = app_dir.replace("./","")
            if not file.startswith('.') and not is_ignore(path):
                # build the relative paths to the file
                local_file = os.path.join(path, file)
                replace_path = os.path.join('.',dir)

                # build the path to where the file will be staged
                stage_dir = path.replace(replace_path,'')
                print(f'{local_file} => @{stage}{stage_dir}')
                stage_location = f'@{database}.{schema}.{stage}/{stage_dir}'
                print(local_file)
                print(stage_location)
                subprocess.run(['snow', 'stage', 'copy', local_file, stage_location])
                executeStatement(f'alter stage {database}.{schema}.{stage} refresh; ')

upload_files_stage(database,schema,stage, native_app_dir)

print('Success')

In [None]:
#LOAD SAMPLE DATA
create_sample_db_facebook = "CREATE DATABASE IF NOT EXISTS FIVETRAN_CONNECTOR_DEMO"
create_sample_schema_facebook = "CREATE SCHEMA IF NOT EXISTS FIVETRAN_CONNECTOR_DEMO.FACEBOOK_RAW"
create_sample_schema_linkedin ="CREATE SCHEMA IF NOT EXISTS OMNATA_CONNECTOR_DEMO.LINKEDIN_RAW"
create_samle_db_linkedin = "CREATE DATABASE IF NOT EXISTS OMNATA_CONNECTOR_DEMO"
create_llm_demo_db = "CREATE DATABASE IF NOT EXISTS LLM_DEMO"
create_llm_schema = "CREATE SCHEMA IF NOT EXISTS LLM_DEMO.DEMO"
grant_db_facebook = f"GRANT USAGE ON DATABASE FIVETRAN_CONNECTOR_DEMO TO APPLICATION {appName}"
grant_db_linkedin = f"GRANT USAGE ON DATABASE OMNATA_CONNECTOR_DEMO TO APPLICATION {appName}"
grant_schema_facebook = f"GRANT USAGE ON SCHEMA FIVETRAN_CONNECTOR_DEMO.FACEBOOK_RAW TO APPLICATION {appName}"
grant_schema_linkedin = f"GRANT USAGE ON SCHEMA OMNATA_CONNECTOR_DEMO.LINKEDIN_RAW TO APPLICATION {appName}"
print(executeStatement(create_sample_db_facebook))
print(executeStatement(create_samle_db_linkedin))
print(executeStatement(create_sample_schema_facebook))
print(executeStatement(create_sample_schema_linkedin))
print(executeStatement(grant_db_facebook))
print(executeStatement(grant_db_linkedin))
print(executeStatement(grant_schema_facebook))
print(executeStatement(grant_schema_linkedin))
print(executeStatement(create_llm_demo_db))
print(executeStatement(create_llm_schema))

executeStatement(f"""CREATE OR REPLACE NOTEBOOK  LLM_DEMO.DEMO.DATA_QUALITY_DEMO_1
    FROM '@data_quality_notebooks.METADATA.CODE_STG'
    MAIN_FILE = 'data_quality_demo_1.ipynb'
    QUERY_WAREHOUSE = '{app_config['warehouse']}';""")

executeStatement("ALTER NOTEBOOK LLM_DEMO.DEMO.DATA_QUALITY_DEMO_1 ADD LIVE VERSION FROM LAST")

executeStatement(f"""CREATE OR REPLACE NOTEBOOK LLM_DEMO.DEMO.DATA_QUALITY_DEMO_2
    FROM '@data_quality_notebooks.METADATA.CODE_STG'
    MAIN_FILE = 'data_quality_demo_2.ipynb'
    QUERY_WAREHOUSE = '{app_config['warehouse']}';""")

executeStatement("ALTER NOTEBOOK LLM_DEMO.DEMO.DATA_QUALITY_DEMO_2 ADD LIVE VERSION FROM LAST")

executeStatement(f"""CREATE OR REPLACE NOTEBOOK LLM_DEMO.DEMO.DATA_QUALITY_DEMO_3
    FROM '@data_quality_notebooks.METADATA.CODE_STG'
    MAIN_FILE = 'data_quality_demo_3.ipynb'
    QUERY_WAREHOUSE = '{app_config['warehouse']}';""")

executeStatement("ALTER NOTEBOOK LLM_DEMO.DEMO.DATA_QUALITY_DEMO_3 ADD LIVE VERSION FROM LAST")

#### Load Demo Tables

In [None]:
import os
data_dir = "./data"
print("Loading Tables")

def executeMultilineStatement(statement):
    return subprocess.run(['snow', 'sql', '-q', statement])


database = app_config['data_quality_database']
schema = app_config['schema']
stage = app_config['code_stage']
stage_location = f'@{database}.{schema}.{stage}/sample_data'

csv_file_format = f"""
CREATE OR REPLACE FILE FORMAT {database}.{schema}.csv_format
  TYPE = csv
  FIELD_OPTIONALLY_ENCLOSED_BY = '"'
  PARSE_HEADER = true;
"""
statement = ''.join(csv_file_format.splitlines())
executeMultilineStatement(statement)
for path, currentDirectory, files in os.walk(data_dir):
    for file in files:
        if file in [".DS_Store"]:
            continue
        else:
            database = appName.upper()
            file_path = os.path.join(path, file)
            print("Loading File:" + file_path)            
            table_name = file.split(".")[0].upper()
            if table_name == "DIM_PLATFORM":
                schema = "CAMPAIGN_INTELLIGENCE_COMBINED"
            else:
                schema = file_path.replace(data_dir+"/","").replace("/"+file,"").upper()
                schema = schema.lstrip("/")
            if len(schema.split("/")) > 1:
                database, schema = [i.upper() for i in schema.split("/")]
            fileName = os.path.basename(file_path)
            subprocess.run(['snow', 'stage', 'copy', file_path, stage_location])
            createTable = f"""
                CREATE TABLE IF NOT EXISTS {database}.{schema}.{table_name}
                USING TEMPLATE (
                    SELECT ARRAY_AGG(OBJECT_CONSTRUCT(*))
                    FROM TABLE(
                        INFER_SCHEMA(
                        LOCATION=>'{stage_location}/{fileName}',
                        FILE_FORMAT => 'DATA_QUALITY_NOTEBOOKS.METADATA.csv_format'
                        )
                    ));
            """
            executeMultilineStatement(createTable)
            copyTo = f"""COPY INTO {database}.{schema}.{table_name} FROM {stage_location}/{fileName} FILE_FORMAT = (FORMAT_NAME= 'DATA_QUALITY_NOTEBOOKS.METADATA.csv_format')  MATCH_BY_COLUMN_NAME="CASE_INSENSITIVE"; """
            executeStatement(copyTo)
            if database == "OMNATA_CONNECTOR_DEMO":
                executeStatement(f"ALTER TABLE {database}.{schema}.{table_name} ADD COLUMN RECORD_DATA_VARIANT VARIANT;")
                executeStatement(f"UPDATE {database}.{schema}.{table_name} SET RECORD_DATA_VARIANT = TO_VARIANT(PARSE_JSON(RECORD_DATA));")
                executeStatement(f"ALTER TABLE {database}.{schema}.{table_name} DROP COLUMN RECORD_DATA;")
                executeStatement(f"ALTER TABLE {database}.{schema}.{table_name} RENAME COLUMN RECORD_DATA_VARIANT to RECORD_DATA;")
            changeTracking = f"ALTER TABLE {database}.{schema}.{table_name} SET CHANGE_TRACKING = TRUE;"
            executeStatement(changeTracking)
  

grant_select_db_linkedin = f"GRANT SELECT ON ALL TABLES IN SCHEMA OMNATA_CONNECTOR_DEMO.LINKEDIN_RAW TO APPLICATION {appName}"
grant_select_db_facebook = f"GRANT SELECT ON ALL TABLES IN SCHEMA FIVETRAN_CONNECTOR_DEMO.FACEBOOK_RAW TO APPLICATION {appName}"
print(executeStatement(grant_select_db_facebook))
print(executeStatement(grant_select_db_linkedin))
print('Success')

In [None]:
from scripts.load_models import load_models
from scripts.update_file_variables import file_replace

models_folder = "backend/predefined_models"

stage = f"{appName}.{app_config['schema']}.TEMP"
create_sample_stage = f"CREATE STAGE IF NOT EXISTS {stage};"
print(executeStatement(create_sample_stage))

json_file_format = f"""
CREATE OR REPLACE FILE FORMAT {appName}.{app_config['schema']}.json_format
  TYPE = JSON;
"""
print(executeStatement(json_file_format))

replace_map = {
  "<DB>": appName,
  "<SCHEMA>": app_config['schema']
}

file_replace("scripts/models_raw.sql", replace_map)

load_models(appName, models_folder, f"@{stage}")

In [None]:
! snow sql -f scripts/models_raw.sql

In [None]:
from scripts.executeStatement import executeStatement
create_compute_pool_sql = f"CREATE COMPUTE POOL IF NOT EXISTS {compute_pool_name} for application {appName}\
    MIN_NODES = 1 \
    MAX_NODES = 1 \
    AUTO_SUSPEND_SECS = 120 \
    INSTANCE_FAMILY = CPU_X64_S;"
grant_usage_sql = f"GRANT USAGE, MONITOR ON COMPUTE POOL {compute_pool_name} \
    TO application {appName};"

grant_usage_wh = f"GRANT USAGE, MONITOR ON WAREHOUSE {app_config['warehouse']} \
    TO application {appName};"

grant_bind_service = f"GRANT BIND SERVICE ENDPOINT ON ACCOUNT TO application {appName};"

grant_cortex = f"GRANT IMPORTED PRIVILEGES ON DATABASE SNOWFLAKE TO ROLE {role};"

print(executeStatement(create_compute_pool_sql))
print(executeStatement(grant_usage_sql))
print(executeStatement(grant_usage_wh))
print(executeStatement(grant_bind_service))
print(executeStatement(grant_cortex))

In [None]:
create_procedure_commands = f"""
CREATE OR REPLACE PROCEDURE {appName}.METADATA.CREATE_DYNAMIC_TABLE(query VARCHAR)
RETURNS VARCHAR
LANGUAGE PYTHON
RUNTIME_VERSION = 3.8
HANDLER = 'creator'
PACKAGES = ('snowflake-snowpark-python')
AS
\$\$
def creator(session, query):
    return session.sql(query).collect()
\$\$;
"""
grant_procedure_usage = f"GRANT USAGE ON PROCEDURE {appName}.METADATA.CREATE_DYNAMIC_TABLE(VARCHAR) TO APPLICATION {appName};"
print(executeStatement(create_procedure_commands))
print(executeStatement(grant_procedure_usage))


# Create grants procedure
create_procedure_commands = f"""
CREATE OR REPLACE PROCEDURE {appName}.METADATA.GRANTER(appName VARCHAR, tables VARIANT)
RETURNS VARCHAR
LANGUAGE PYTHON
RUNTIME_VERSION = 3.8
HANDLER = 'granter'
PACKAGES = ('snowflake-snowpark-python')
AS
\$\$

def granter(session, appName, tables):
    for table_name in tables:
        session.sql(f'GRANT SELECT ON TABLE {{table_name}} TO APPLICATION {{appName}}').collect()
\$\$;
"""
grant_procedure_usage = f"GRANT USAGE ON PROCEDURE {appName}.METADATA.GRANTER(VARCHAR, VARIANT) TO APPLICATION {appName};"
print(executeStatement(create_procedure_commands))
print(executeStatement(grant_procedure_usage))

#### Upload Semantic Model Configuration


In [None]:
from scripts.upload_files import upload_files_stage
from scripts.update_file_variables import file_replace

database = appName
schema = 'LLM'
semantic_models_stage = 'SEMANTIC_MODEL'
llm_config_stage = 'CONFIGURATION'

replace_map = {
    "<target_database>": appName
}
file_replace('./assistant/semantic_models/UnifiedMarketingModel_CAMPAIGN_PERF.yaml', replace_map)

subprocess.run(['snow', 'stage', 'copy', './assistant/config/assistant_config.yaml', f"@{database}.{schema}.{llm_config_stage}"])
subprocess.run(['snow', 'stage', 'copy', './assistant/semantic_models/UnifiedMarketingModel_CAMPAIGN_PERF.yaml', f"@{database}.{schema}.{semantic_models_stage}"])

print('Success')

### Customer 360 Demo Upload


In [None]:
from scripts.update_file_variables import file_replace
from scripts.executeStatement import executeCopyToStage, executeStatement

create_sample_db = f"CREATE DATABASE IF NOT EXISTS {app_config['sample_db']};"
create_sample_schema = f"CREATE SCHEMA IF NOT EXISTS {app_config['sample_db']}.{app_config['sample_schema']};"
create_sample_stage = f"CREATE STAGE IF NOT EXISTS {app_config['sample_db']}.{app_config['sample_schema']}.{app_config['sample_stage']};"

print(executeStatement(create_sample_db))
print(executeStatement(create_sample_schema))
print(executeStatement(create_sample_stage))


sample_stage= f"@{app_config['sample_db']}.{app_config['sample_schema']}.{app_config['sample_stage']}"
ga_data = "c360demo/data/ga_data/"
sf_data = "c360demo/data/sf_data/"
worldcities = "c360demo/data/worldcities.csv"

print(executeCopyToStage(ga_data,f"{sample_stage}/data/ga_data/"))
print(executeCopyToStage(sf_data, f"{sample_stage}/data/sf_data/"))
print(executeCopyToStage(worldcities, f"{sample_stage}/data"))

data_script_f = 'scripts/build_raw_samples.sql'

replace_map = {
    "<DB>": app_config['sample_db'],
    "<SCHEMA>": app_config['sample_schema'],
    "<STAGE>": app_config['sample_stage']
}

file_replace(data_script_f, replace_map)
print(f'Success')

In [None]:
grant_db_c360 = f"GRANT USAGE ON DATABASE {app_config['sample_db']} TO APPLICATION {appName}"
print(executeStatement(grant_db_c360))
grant_schema_360 = f"GRANT USAGE ON SCHEMA {app_config['sample_db']}.{app_config['sample_schema']} TO APPLICATION {appName}"
grant_select_360 = f"GRANT SELECT ON ALL TABLES IN SCHEMA {app_config['sample_db']}.{app_config['sample_schema']} TO APPLICATION {appName}"
print(executeStatement(grant_schema_360))
print(executeStatement(grant_select_360))

In [None]:
! snow sql -f scripts/build_raw_samples.sql

## Start container service

In [None]:
service_query = f"call {appName}.app_public.start_app(\'{compute_pool_name}\',\'{app_config['warehouse']}\')"
executeStatement(service_query)

### Show container endpoint

In [None]:
from scripts.executeStatement import executeStatement
from scripts.endpoint_provider import get_public_url_na
get_public_url_na(appName, executeStatement)