<h1>Marketing Data Foundation Setup Script</h1>

<h2>Project Setup using Notebook</h2>

<h3> <ins>Pre-requisites:</ins></h3>

1. <b>You have conda/miniconda installed. If not, please install following the [doc](https://docs.conda.io/projects/miniconda/en/latest/)</b>
2. <b>Role with access to install NativeApps from marketplace like ACCOUNTADMIN</b>
3. <b>Any environment to run notebooks like VS code, Jupyterlab, etc.,</b>


<h2> <ins>Process Flow</ins></h2>

<p> Before we start building the app, the high level flow of the steps is as below</p>

![](./Images/Marketing_Data_Foundation_process_flow.png)

<h2> <ins>Local Environment Setup</ins> [skip this step, if Setup_Campaign_Intelligence.pynb or Setup_Customer360_Starter.pynb is executed already]</h2>

In [30]:
%%capture
#### We have disabled the results of this block using %%capture but feel free to uncomment to debug

! conda create --name campaign_intelligence python=3.8 -y
! conda install -c anaconda ipykernel -y
! python -m ipykernel install --user --name=campaign_intelligence

<h3> <ins>Kernet Setup and Package Installation</ins></h3>
<h4>PLEASE CHANGE THE DEFAULT KERNEL TO <span style="color:blue">campaign_intelligence </span>NOW</h4>

In [1]:
#### To ensure the environment is created
! conda env list

# conda environments:
#
base                  *  /Users/nthomas/anaconda3
campaign_intelligence     /Users/nthomas/anaconda3/envs/campaign_intelligence
cirruslink               /Users/nthomas/anaconda3/envs/cirruslink
cirruslink_naveen        /Users/nthomas/anaconda3/envs/cirruslink_naveen
dbt_base                 /Users/nthomas/anaconda3/envs/dbt_base
energy-forecast          /Users/nthomas/anaconda3/envs/energy-forecast
fsi_portfolio            /Users/nthomas/anaconda3/envs/fsi_portfolio
fsi_quant_research       /Users/nthomas/anaconda3/envs/fsi_quant_research
pricing_trans            /Users/nthomas/anaconda3/envs/pricing_trans
pysnowpark_dicom         /Users/nthomas/anaconda3/envs/pysnowpark_dicom
snowvation               /Users/nthomas/anaconda3/envs/snowvation



<h4>Package Installation</h4>

We have disabled the results of this block using %%capture but feel free to uncomment to debug

In [32]:
%%capture
pip install -r requirements.txt

<h4>PLEASE <span style="color:blue">RESTART KERNEL campaign_intelligence </span>NOW</h4>

In [1]:
#### This is just to ensure all the packages in requirements.txt is install in the current environment (we are checking only Snowpark package here)
! conda list ^snow

# packages in environment at /Users/nthomas/anaconda3:
#
# Name                    Version                   Build  Channel
snowballstemmer           2.2.0              pyhd3eb1b0_0  
snowflake-connector-python 3.6.0                    pypi_0    pypi
snowflake-snowpark-python 1.11.1                   pypi_0    pypi


<h2> <ins>Establish SF connection</ins></h2>

<h3> <ins>Configuration for the App</ins></h3>

<h4> Please ensure the <span style="color:blue">connection_config.json</span> has the creds to develop the solution </h4>

<p> <span style="color:brown">Ensure to provide an existing database name, schema name, warehouse name that you have access to in the connection_config.json, we will create our own DB in the demo as we proceed. This is just to ensure the connection works.<span style="color:brown"></p>

<b>
{
    "account": "account_locator"
    ,"user": "USER_NAME"
    ,"password": "PASSWORD"
    ,"database": "DATABASE"
    ,"schema": "SCHEMA"
    ,"warehouse": "WAREHOUSE_NAME"
    ,"role":"ROLE_NAME"
}</b>

<h5><ins>Authentication</ins></h5>

In [155]:
# Snowpark initialization 
import snowflake.snowpark
from snowflake.snowpark.functions import sproc
from snowflake.snowpark.session import Session
from snowflake.snowpark import version as v
import json
from snowflake.snowpark.functions import *

import pandas as pd
import numpy as np
import datetime
import io
import os

snow_conn_flpath = 'connection_config.json'

with open(snow_conn_flpath) as conn_f:
    snow_conn_info = json.load(conn_f)

sp_session = Session.builder.configs(snow_conn_info).create()
sp_session.use_warehouse('COMPUTE_WH')
print(sp_session.sql('select current_account(), current_warehouse(), current_database(), current_schema()').collect())

sp_session.add_packages('snowflake-snowpark-python', 'pandas', 'numpy', 'pydeck')


[Row(CURRENT_ACCOUNT()='MNB70701', CURRENT_WAREHOUSE()='COMPUTE_WH', CURRENT_DATABASE()='CUSTOMER72_DB', CURRENT_SCHEMA()='PUBLIC')]


The version of package 'pydeck' in the local environment is 0.8.1b0, which does not fit the criteria for the requirement 'pydeck'. Your UDF might not work when the package version is different between the server and your local environment.


In [156]:
app_config_flpath = 'marketing_data_foundation_config.json'

with open(app_config_flpath) as app_f:
    app_config = json.load(app_f)

<h2> <ins>Snowflake Setup</ins></h2>

In [120]:
sp_session.sql("create or replace database {0} comment = 'used for demonstrating Snowflake for Marketing demo';".format(app_config['database'])).collect()
sp_session.sql("create or replace schema {0}.{1};".format(app_config['database'], app_config['schema'])).collect()
sp_session.sql("use schema {0}.{1};".format(app_config['database'], app_config['schema'])).collect()
sp_session.sql("create or replace stage lib_stg directory = ( enable = true ) comment = 'used for holding udfs and procs.';").collect()
sp_session.sql("create or replace stage data_stg comment = 'used for holding data.' DIRECTORY = (ENABLE = TRUE);  ").collect()
sp_session.sql("create or replace stage scripts_stg comment = 'used for holding scripts.';").collect()



[Row(status='Stage area SCRIPTS_STG successfully created.')]

In [121]:
sp_session.sql("USE DATABASE {0}".format(app_config['database'])).collect()
sp_session.sql("USE SCHEMA {0}.{1}".format(app_config['database'], app_config['schema'])).collect()

[Row(status='Statement executed successfully.')]

<h2> <ins> Sample Data Loading:</ins> </h2> 

<h4> <ins> Sample data Loading:</ins> </h4> 

In [122]:
sp_session.file.put('./data/worldcities.csv', '@{0}.{1}.data_stg/data/'.format(app_config['database'], app_config['schema']), auto_compress=False, overwrite=True)


[PutResult(source='worldcities.csv', target='worldcities.csv', source_size=4734682, target_size=4734688, source_compression='NONE', target_compression='NONE', status='UPLOADED', message='')]

In [123]:
import os
sf_dir = './data/sf_data/'
for files in os.listdir(sf_dir):
    if not files.startswith('.'):
        print('Loading the file from {0}'.format(os.path.join(sf_dir, files)))
        sp_session.file.put(os.path.join(sf_dir, files), '@{0}.{1}.data_stg/data/sf_data/'.format(app_config['database'], app_config['schema']), auto_compress=False, overwrite=True)

Loading the file from ./data/sf_data/salesforce_sample_data_0_2_0.csv.gz
Loading the file from ./data/sf_data/salesforce_sample_data_0_3_0.csv.gz
Loading the file from ./data/sf_data/salesforce_sample_data_0_5_0.csv.gz
Loading the file from ./data/sf_data/salesforce_sample_data_0_4_0.csv.gz
Loading the file from ./data/sf_data/salesforce_sample_data_0_1_0.csv.gz
Loading the file from ./data/sf_data/salesforce_sample_data_0_0_0.csv.gz
Loading the file from ./data/sf_data/salesforce_sample_data_0_6_0.csv.gz
Loading the file from ./data/sf_data/salesforce_sample_data_0_7_0.csv.gz


In [124]:
ga_dir = './data/ga_data/'
for files in os.listdir('./data/ga_data/'):
    if not files.startswith('.'):
        print('Loading the file from {0}'.format(os.path.join(ga_dir, files)))
        sp_session.file.put(os.path.join(ga_dir, files), '@{0}.{1}.data_stg/data/ga_data/'.format(app_config['database'], app_config['schema']), auto_compress=False, overwrite=True)

Loading the file from ./data/ga_data/ga_sample_data_0_2_1.csv.gz
Loading the file from ./data/ga_data/ga_sample_data_0_3_1.csv.gz
Loading the file from ./data/ga_data/ga_sample_data_0_0_2.csv.gz
Loading the file from ./data/ga_data/ga_sample_data_0_1_2.csv.gz
Loading the file from ./data/ga_data/ga_sample_data_0_0_0.csv.gz
Loading the file from ./data/ga_data/ga_sample_data_0_1_0.csv.gz
Loading the file from ./data/ga_data/ga_sample_data_0_7_0.csv.gz
Loading the file from ./data/ga_data/ga_sample_data_0_6_0.csv.gz
Loading the file from ./data/ga_data/ga_sample_data_0_5_1.csv.gz
Loading the file from ./data/ga_data/ga_sample_data_0_4_1.csv.gz
Loading the file from ./data/ga_data/ga_sample_data_0_7_2.csv.gz
Loading the file from ./data/ga_data/ga_sample_data_0_3_0.csv.gz
Loading the file from ./data/ga_data/ga_sample_data_0_2_0.csv.gz
Loading the file from ./data/ga_data/ga_sample_data_0_3_2.csv.gz
Loading the file from ./data/ga_data/ga_sample_data_0_2_2.csv.gz
Loading the file from ./d

<h3> <ins> Alternate to PUT:</ins> - [If you do not have permission to do a PUT via Snowpark due to security reasons]</h3> 

![](./Images/view_stage.png)

<h3>Click Files button and upload files directly via Snowsight</h3> 

![](./Images/upload_to_stage.png)

<h4><ins>Loading and creating lat_long_table</ins></h4> 

In [126]:
sp_session.sql("""create or replace TABLE {0}.{1}.SIMPLEMAPS (
	CITY VARCHAR(16777216),
	CITY_ASCII VARCHAR(16777216),
	LAT FLOAT,
	LNG FLOAT,
	COUNTRY VARCHAR(16777216),
	ISO2 VARCHAR(16777216),
	ISO3 VARCHAR(16777216),
	ADMIN_NAME VARCHAR(16777216),
	CAPITAL VARCHAR(16777216),
	POPULATION VARCHAR(16777216),
	ID NUMBER(38,0)
);""".format(app_config['database'],app_config['schema'] )).collect()

sp_session.sql("""CREATE OR REPLACE FILE FORMAT {0}.{1}.CSV_FORMAT TYPE=CSV
    SKIP_HEADER=1
    FIELD_DELIMITER=','
    TRIM_SPACE=TRUE
    FIELD_OPTIONALLY_ENCLOSED_BY='"'
    REPLACE_INVALID_CHARACTERS=TRUE
    DATE_FORMAT=AUTO
    TIME_FORMAT=AUTO
    TIMESTAMP_FORMAT=AUTOs; """.format(app_config['database'],app_config['schema'])).collect()

sp_session.sql("""
            COPY INTO "{0}"."{1}"."SIMPLEMAPS" 
FROM (SELECT $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11
	FROM '@"{0}"."{1}"."DATA_STG"/data/worldcities.csv') 
FILE_FORMAT = '{0}.{1}.CSV_FORMAT' 
ON_ERROR=ABORT_STATEMENT 
                """.format(app_config['database'],app_config['schema'])).collect()

lat_long_df = sp_session.sql("SELECT * from {0}.{1}.SIMPLEMAPS LIMIT 10 ".format(app_config['database'],app_config['schema'])).to_pandas()
lat_long_df

Unnamed: 0,CITY,CITY_ASCII,LAT,LNG,COUNTRY,ISO2,ISO3,ADMIN_NAME,CAPITAL,POPULATION,ID
0,Tokyo,Tokyo,35.6897,139.6922,Japan,JP,JPN,Tōkyō,primary,37732000,1392685764
1,Jakarta,Jakarta,-6.175,106.8275,Indonesia,ID,IDN,Jakarta,primary,33756000,1360771077
2,Delhi,Delhi,28.61,77.23,India,IN,IND,Delhi,admin,32226000,1356872604
3,Guangzhou,Guangzhou,23.13,113.26,China,CN,CHN,Guangdong,admin,26940000,1156237133
4,Mumbai,Mumbai,19.0761,72.8775,India,IN,IND,Mahārāshtra,admin,24973000,1356226629
5,Manila,Manila,14.5958,120.9772,Philippines,PH,PHL,Manila,primary,24922000,1608618140
6,Shanghai,Shanghai,31.1667,121.4667,China,CN,CHN,Shanghai,admin,24073000,1156073548
7,São Paulo,Sao Paulo,-23.55,-46.6333,Brazil,BR,BRA,São Paulo,admin,23086000,1076532519
8,Seoul,Seoul,37.56,126.99,South Korea,KR,KOR,Seoul,primary,23016000,1410836482
9,Mexico City,Mexico City,19.4333,-99.1333,Mexico,MX,MEX,Ciudad de México,primary,21804000,1484247881


<h4><ins>Loading and creating GA_RAW_DATA table</ins></h4> 

In [127]:
sp_session.sql("""CREATE OR REPLACE TABLE  {0}.{1}.GA_RAW_DATA 
               ( EVENT_DATE DATE , EVENT_TIMESTAMP VARCHAR , EVENT_NAME VARCHAR , EVENT_PREVIOUS_TIMESTAMP VARCHAR , EVENT_VALUE_IN_USD VARCHAR , 
               EVENT_BUNDLE_SEQUENCE_ID NUMBER , EVENT_SERVER_TIMESTAMP_OFFSET VARCHAR , USER_ID VARCHAR , USER_PSEUDO_ID VARCHAR , 
               PRIVACY_INFO_ANALYTICS_STORAGE VARCHAR , PRIVACY_INFO_ADS_STORAGE VARCHAR , PRIVACY_INFO_USES_TRANSIENT_TOKEN BOOLEAN , 
               USER_PROPERTIES VARCHAR , USER_FIRST_TOUCH_TIMESTAMP VARCHAR , USER_LTV_CURRENCY VARCHAR , 
               DEVICE_CATEGORY VARCHAR , DEVICE_MOBILE_BRAND_NAME VARCHAR , DEVICE_MOBILE_MODEL_NAME VARCHAR , DEVICE_MOBILE_MARKETING_NAME VARCHAR , 
               DEVICE_MOBILE_OS_HARDWARE_MODEL VARCHAR , DEVICE_OPERATING_SYSTEM VARCHAR , DEVICE_OPERATING_SYSTEM_VERSION VARCHAR , DEVICE_VENDOR_ID VARCHAR , 
               DEVICE_ADVERTISING_ID VARCHAR , DEVICE_LANGUAGE VARCHAR , DEVICE_IS_LIMITED_AD_TRACKING BOOLEAN , DEVICE_TIME_ZONE_OFFSET_SECONDS VARCHAR , 
               DEVICE_BROWSER VARCHAR , DEVICE_BROWSER_VERSION VARCHAR , DEVICE_WEB_INFO_BROWSER VARCHAR , DEVICE_WEB_INFO_BROWSER_VERSION VARCHAR , 
               GEO_CONTINENT VARCHAR , GEO_COUNTRY VARCHAR , GEO_REGION VARCHAR , GEO_CITY VARCHAR , GEO_SUB_CONTINENT VARCHAR , GEO_METRO VARCHAR , 
               APP_INFO_ID VARCHAR , APP_INFO_VERSION VARCHAR , APP_INFO_INSTALL_STORE VARCHAR , APP_INFO_FIREBASE_APP_ID VARCHAR , APP_INFO_INSTALL_SOURCE VARCHAR , 
               TRAFFIC_SOURCE_MEDIUM VARCHAR , STREAM_ID NUMBER , PLATFORM VARCHAR , EVENT_DIMENSIONS_HOSTNAME VARCHAR , ECOMMERCE_TOTAL_ITEM_QUANTITY VARCHAR , 
               ECOMMERCE_PURCHASE_REVENUE_IN_USD VARCHAR , ECOMMERCE_PURCHASE_REVENUE VARCHAR , ECOMMERCE_REFUND_VALUE_IN_USD VARCHAR , ECOMMERCE_REFUND_VALUE VARCHAR , 
               ECOMMERCE_SHIPPING_VALUE_IN_USD VARCHAR , ECOMMERCE_SHIPPING_VALUE VARCHAR , ECOMMERCE_TAX_VALUE_IN_USD VARCHAR , ECOMMERCE_TAX_VALUE VARCHAR , 
               ECOMMERCE_UNIQUE_ITEMS VARCHAR , ECOMMERCE_TRANSACTION_ID VARCHAR , ITEMS VARCHAR , COLLECTED_TRAFIC_SOURCE_MANUAL_CAMPAIGN_ID VARCHAR , 
               COLLECTED_TRAFIC_SOURCE_MANUAL_MEDIUM VARCHAR , COLLECTED_TRAFIC_SOURCE_GCLID VARCHAR , COLLECTED_TRAFIC_SOURCE_DCLID VARCHAR , 
               COLLECTED_TRAFIC_SOURCE_SRSLTID VARCHAR , DEVICE_WEB_INFO_HOSTNAME VARCHAR , COLLECTED_TRAFIC_SOURCE_MANUAL_CAMPAIGN_NAME VARCHAR , 
               COLLECTED_TRAFIC_SOURCE_MANUAL_SOURCE VARCHAR , TRAFFIC_SOURCE_SOURCE VARCHAR , TRAFFIC_SOURCE_NAME VARCHAR , COLLECTED_TRAFIC_SOURCE_MANUAL_CONTENT VARCHAR , 
               COLLECTED_TRAFIC_SOURCE_MANUAL_TERM VARCHAR , USER_LTV_REVENUE NUMBER, EVENT_PARAMS_UTMS VARIANT );""".format(app_config['database'],app_config['schema'])).collect()

sp_session.sql("""
            COPY INTO "{0}"."{1}"."GA_RAW_DATA" 
FROM (SELECT $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24, $25, $26, $27, $28, $29, $30, $31, $32, $33, $34, 
$35, $36, $37, $38, $39, $40, $41, $42, $43, $44, $45, $46, $47, $48, $49, $50, $51, $52, $53, $54, $55, $56, $57, $58, $59, $60, $61, $62, $63, $64, $65, $66, $67, $68, 
$69, $70, $71, $72 
	FROM '@"{0}"."{1}"."DATA_STG"/data/ga_data/') 
FILE_FORMAT = '{0}.{1}.CSV_FORMAT' 
ON_ERROR=ABORT_STATEMENT 
                """.format(app_config['database'],app_config['schema'])).collect()

ga_data_df = sp_session.sql('SELECT * from {0}.{1}.GA_RAW_DATA LIMIT 10 '.format(app_config['database'],app_config['schema'])).to_pandas()
ga_data_df

Unnamed: 0,EVENT_DATE,EVENT_TIMESTAMP,EVENT_NAME,EVENT_PREVIOUS_TIMESTAMP,EVENT_VALUE_IN_USD,EVENT_BUNDLE_SEQUENCE_ID,EVENT_SERVER_TIMESTAMP_OFFSET,USER_ID,USER_PSEUDO_ID,PRIVACY_INFO_ANALYTICS_STORAGE,...,COLLECTED_TRAFIC_SOURCE_SRSLTID,DEVICE_WEB_INFO_HOSTNAME,COLLECTED_TRAFIC_SOURCE_MANUAL_CAMPAIGN_NAME,COLLECTED_TRAFIC_SOURCE_MANUAL_SOURCE,TRAFFIC_SOURCE_SOURCE,TRAFFIC_SOURCE_NAME,COLLECTED_TRAFIC_SOURCE_MANUAL_CONTENT,COLLECTED_TRAFIC_SOURCE_MANUAL_TERM,USER_LTV_REVENUE,EVENT_PARAMS_UTMS
0,2022-12-12,2022-12-12 19:59:09.771,scroll,,,-272359114,,0053r000009kQYc,596825730.1659627,,...,,demo.abc_corp.com,,,(direct),(direct),,,156914,"[\n {\n ""key"": ""ga_session_id"",\n ""valu..."
1,2022-12-12,2022-12-12 19:59:09.771,page_view,,,-272359114,,0053r000009kQYc,596825730.1659627,,...,,demo.abc_corp.com,,,(direct),(direct),,,417812,"[\n {\n ""key"": ""page_referrer"",\n ""valu..."
2,2022-12-12,2022-12-12 19:59:22.317,page_view,,,-259812989,,0053r000009kQYc,596825730.1659627,,...,,demo.abc_corp.com,,,(direct),(direct),,,335491,"[\n {\n ""key"": ""page_title"",\n ""value"":..."
3,2022-12-12,2022-12-12 19:59:22.317,page_view,,,-259812989,,0053r000009kQYc,596825730.1659627,,...,,demo.abc_corp.com,,,(direct),(direct),,,46395,"[\n {\n ""key"": ""engaged_session_event"",\n ..."
4,2022-12-12,2022-12-12 19:59:30.704,scroll,,,-251425342,,0053r000009kQYc,596825730.1659627,,...,,demo.abc_corp.com,,,(direct),(direct),,,492088,"[\n {\n ""key"": ""session_engaged"",\n ""va..."
5,2022-12-12,2022-12-12 19:59:30.704,page_view,,,-251425342,,0053r000009kQYc,596825730.1659627,,...,,demo.abc_corp.com,,,(direct),(direct),,,201828,"[\n {\n ""key"": ""ga_session_id"",\n ""valu..."
6,2022-12-12,2022-12-12 19:59:44.159,page_view,,,-237970648,,0053r000009kQYc,596825730.1659627,,...,,demo.abc_corp.com,,,(direct),(direct),,,483962,"[\n {\n ""key"": ""session_engaged"",\n ""va..."
7,2022-05-24,2022-05-24 10:46:04.336,user_engagement,,,-1445931423,,0053r00000A4w03,1058961907.1641312,,...,,demo.abc_corp.com,,,(direct),(direct),,,627,"[\n {\n ""key"": ""page_title"",\n ""value"":..."
8,2022-05-24,2022-05-24 10:46:09.995,scroll,,,-1440272230,,0053r00000A4w03,1058961907.1641312,,...,,demo.abc_corp.com,,,(direct),(direct),,,283187,"[\n {\n ""key"": ""ignore_referrer"",\n ""va..."
9,2022-05-24,2022-05-24 10:46:09.995,page_view,,,-1440272230,,0053r00000A4w03,1058961907.1641312,,...,,demo.abc_corp.com,,,(direct),(direct),,,430331,"[\n {\n ""key"": ""ignore_referrer"",\n ""va..."


<h4><ins>Loading and creating SALESFORCE_RAW_DATA table</ins></h4> 

In [128]:
sp_session.sql("""CREATE OR REPLACE TABLE  {0}.{1}.SALESFORCE_RAW_DATA 
                ( SALESFORCE_CAMPAIGN_MEMBER_ID VARCHAR , SALESFORCE_CAMPAIGN_ID VARCHAR , SALESFORCE_PERSON_ID VARCHAR , 
                SALESFORCE_ACCOUNT_ID VARCHAR , FIRST_ASSOCIATED_DATE VARCHAR , HAS_RESPONDED VARCHAR , FIRST_RESPONDED_DATE VARCHAR , 
                RESPONSE_GEO VARCHAR , RESPONSE_COUNTRY VARCHAR , EMPLOYEE VARCHAR , 
                CAMPAIGN_PARTNER VARCHAR , CAMPAIGN_PARTNER_OR_EMPLOYEE VARCHAR , SALESFORCE_CAMPAIGN_NAME VARCHAR , 
                SALESFORCE_ACCOUNT_NAME VARCHAR , SALESFORCE_PERSON_NAME VARCHAR, QUERY_PARAMETERS_UTMS VARCHAR, STATUS VARCHAR );""".format(app_config['database'],app_config['schema'])).collect()

sp_session.sql("""
            COPY INTO "{0}"."{1}"."SALESFORCE_RAW_DATA" 
FROM (SELECT $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17
	FROM '@"{0}"."{1}"."DATA_STG"/data/sf_data/') 
FILE_FORMAT = '{0}.{1}.CSV_FORMAT' 
ON_ERROR=ABORT_STATEMENT 
                """.format(app_config['database'],app_config['schema'])).collect()

salesforce_df = sp_session.sql('SELECT * from {0}.{1}.SALESFORCE_RAW_DATA LIMIT 10 '.format(app_config['database'],app_config['schema'])).to_pandas()
salesforce_df



Unnamed: 0,SALESFORCE_CAMPAIGN_MEMBER_ID,SALESFORCE_CAMPAIGN_ID,SALESFORCE_PERSON_ID,SALESFORCE_ACCOUNT_ID,FIRST_ASSOCIATED_DATE,HAS_RESPONDED,FIRST_RESPONDED_DATE,RESPONSE_GEO,RESPONSE_COUNTRY,EMPLOYEE,CAMPAIGN_PARTNER,CAMPAIGN_PARTNER_OR_EMPLOYEE,SALESFORCE_CAMPAIGN_NAME,SALESFORCE_ACCOUNT_NAME,SALESFORCE_PERSON_NAME,QUERY_PARAMETERS_UTMS,STATUS
0,00vDo000008sn3QIAQ,701Do00000052kOIAQ,003Do000009mTvsIAE,0010Z000026p8nSQAQ,2023-05-23 15:11:25.000,True,,NoAM,Canada,False,False,False,Self-Service Registration,Istanbul Technical University,Brianna Williams,offline,No Show
1,00v3r00001uoS5MAAU,701Do00000011AbIAI,00QDo00000FEuDiMAL,0010Z000027215BQAQ,2022-06-12 22:46:24.000,False,2023-06-12,NoAM,Colombia,False,False,False,Self-Service Registration,data.ai Inc. (fka App Annie Inc.),Tina Mccoy,offline,No Show
2,00vDo000003QxSvIAK,7013r000001rQgEAAU,00QDo00000C9aDrMAJ,0013r00002UsNAsAAN,2020-11-25 23:23:16.000,True,,NoAM,United States,False,False,False,Self-Service Registration,LSCC,Sara Clarke,https://www.abc_corp.com/event/northwest-data-...,Attended
3,00vDo0000062co2IAA,7013r000001rRjmAAE,00Q0Z00001MqzsYUAR,,2023-06-12 13:20:18.000,True,2022-03-03,NoAM,Canada,False,False,False,2020-05-19 - Virtual Hands On Lab Real Time Da...,Abc_corp Computing,Gary Torres,offline,No Show
4,00v3r00002FFO8tAAH,7013r000001rOXiAAM,00QDo00000FFMC1MAP,,2023-06-12 13:20:18.000,True,2020-11-18,EMEA,Singapore,False,False,False,2021-11-17 - Abc_corp Winter Release - EMEA,Associated Builders and Contractors,Jennifer Francis,"""https://demo.abc_corp.com/s/contentdocument/0...",Registered
5,00v3r00001nkr48AAA,7010Z000001a4abQAA,00Q0Z00001JOjpZUAT,0013r00002dOLXrAAO,2020-11-13 22:29:41.000,True,,NoAM,Singapore,False,False,False,2023-03-07 - D4B FY24 - AMS - Princeton,BRI,Erin Collins,https://www.abc_corp.com/event/data-for-breakf...,Attended
6,00vDo000008uik9IAA,7013r000001lTJkAAM,0033r00003qppZJAAY,,2020-10-25 04:58:16.000,True,2023-08-31,EMEA,France,False,False,False,Cloud Data Lakes for Dummies,SaleCycle,Patricia Smith,offline,Attended
7,00v0Z00001i3GQ1QAM,7013r000001ay1zAAA,0033r00003pAyd6AAC,0013r00002UsNAsAAN,2021-12-19 20:05:51.000,True,,NoAM,United States,False,False,False,2020-03-31 - Webinar - Data for Breakfast - On...,Nexa3D,Zachary Gonzales,https://www.abc_corp.com/events/data-cloud-wor...,No Show
8,00v3r000026FuOEAA0,7013r000001lTweAAE,003Do000009mTvsIAE,0013100001rtArIAAU,2022-06-08 18:45:37.000,True,2020-11-25,NoAM,China,False,False,False,2022-06-14 - Abc_corp Summit - MFG HH with LTI,NCL Corporation Ltd.,Kristi Henderson,offline,No Show
9,00vDo000003OuCsIAK,7013r000001azGbAAI,0033r00003xwa8pAAA,0013100001rtArIAAU,2021-12-06 18:54:16.000,True,2022-03-03,APJ,United States,False,False,False,3 Tips for Building Modern Data Applications,Abc_corp Community Use,Todd Cooper,"""https://demo.abc_corp.com/s/contentdocument/0...",No Show


<h2> <ins> Build Customer 72 View</ins> </h2> 

In [129]:
sp_session.sql("USE DATABASE {0}".format(app_config['database'])).collect()
sp_session.sql("USE SCHEMA {0}.{1}".format(app_config['database'], app_config['schema'])).collect()

sp_session.sql("CREATE OR REPLACE VIEW {0}.{1}.CUSTOMER72_GA_LAT_LONG_VW as select * from {0}.{1}.GA_RAW_DATA a join {0}.{1}.SIMPLEMAPS b on b.city_ascii = a.geo_city".format(app_config['database'], app_config['schema'])).collect()

sp_session.sql("""
            CREATE OR REPLACE VIEW {0}.{1}.SALESFORCE_VW AS 
            select CAMPAIGN_PARTNER,CAMPAIGN_PARTNER_OR_EMPLOYEE,EMPLOYEE,FIRST_ASSOCIATED_DATE,FIRST_RESPONDED_DATE,HAS_RESPONDED,
            QUERY_PARAMETERS_UTMS as SALESFORCE_QUERY_PARAMETERS_UTMS,RESPONSE_COUNTRY,RESPONSE_GEO,SALESFORCE_ACCOUNT_ID,SALESFORCE_ACCOUNT_NAME,SALESFORCE_CAMPAIGN_ID,
            SALESFORCE_CAMPAIGN_MEMBER_ID,SALESFORCE_CAMPAIGN_NAME,SALESFORCE_PERSON_ID,
            SALESFORCE_PERSON_NAME,STATUS, SPLIT_PART(SPLIT_PART(QUERY_PARAMETERS_UTMS, 'utm_source=', 2),'&',0) as SF_UTM_SOURCE,
               SPLIT_PART(SPLIT_PART(SPLIT_PART(QUERY_PARAMETERS_UTMS, 'utm_source=', 2),'&utm_medium=',2), '&', 1) as SF_UTM_MEDIUM, 
               SPLIT_PART(SPLIT_PART(QUERY_PARAMETERS_UTMS, '&utm_campaign=', 2),'&',0) as SF_utm_campaign,
               SPLIT_PART(SPLIT_PART(QUERY_PARAMETERS_UTMS, '&utm_content=', 2),'&',0) as SF_utm_ad_name,
               SPLIT_PART(SPLIT_PART(QUERY_PARAMETERS_UTMS, '&ad_id=', 2),'&',0) as SF_utm_ad_id
               from {0}.{1}.SALESFORCE_RAW_DATA;
               """.format(app_config['database'],app_config['schema'])).collect()

sp_session.sql("""
CREATE OR REPLACE VIEW {0}.{1}.CUSTOMER72_GA_LAT_LONG_UTMS_VW AS 
select  *, value:value.string_value as QUERY_PARAMETERS_UTMS, SPLIT_PART(SPLIT_PART(QUERY_PARAMETERS_UTMS, 'utm_source=', 2),'&',0) as UTM_SOURCE,
               SPLIT_PART(SPLIT_PART(SPLIT_PART(QUERY_PARAMETERS_UTMS, 'utm_source=', 2),'&utm_medium=',2), '&', 1) as UTM_MEDIUM, 
               SPLIT_PART(SPLIT_PART(QUERY_PARAMETERS_UTMS, '&utm_campaign=', 2),'&',0) as utm_campaign,
               SPLIT_PART(SPLIT_PART(QUERY_PARAMETERS_UTMS, '&utm_content=', 2),'&',0) as utm_ad_name,
               SPLIT_PART(SPLIT_PART(QUERY_PARAMETERS_UTMS, '&ad_id=', 2),'&',0) as utm_ad_id from CUSTOMER72_GA_LAT_LONG_VW,  LATERAL FLATTEN( INPUT => PARSE_JSON(EVENT_PARAMS_UTMS) );
""".format(app_config['database'],app_config['schema'])).collect()


sp_session.sql("""
CREATE OR REPLACE VIEW {0}.{1}.C360_CLICKS_CRM_JOINED_VW as
    SELECT *  FROM {0}.{1}.salesforce_vw salesforce  
    join {0}.{1}.CUSTOMER72_GA_LAT_LONG_UTMS_VW  ga 
    on salesforce.SF_UTM_SOURCE = ga.UTM_SOURCE and
    salesforce.SF_UTM_MEDIUM = ga.UTM_MEDIUM and 
    salesforce.SF_utm_campaign = ga.utm_campaign where length(trim(salesforce.SF_UTM_SOURCE)) > 0 ; 
""".format(app_config['database'],app_config['schema'])).collect()


[Row(status='View C360_CLICKS_CRM_JOINED_VW successfully created.')]

## Loading Campaign Intelligence Sample Data

<h4><ins>Loading and creating CAMPAIGN_JOINED_SAMPLE_DATA table</ins></h4> 

In [130]:
sp_session.file.put('./data/sample_data.gz', '@{0}.{1}.data_stg/data/'.format(app_config['database'], app_config['schema']), auto_compress=False, overwrite=True)

[PutResult(source='sample_data.gz', target='sample_data.gz', source_size=22158239, target_size=22158240, source_compression='GZIP', target_compression='GZIP', status='UPLOADED', message='')]

In [131]:
sp_session.sql("USE DATABASE {0}".format(app_config['database'])).collect()
sp_session.sql("USE SCHEMA {0}.{1}".format(app_config['database'], app_config['schema'])).collect()

[Row(status='Statement executed successfully.')]

In [132]:
sp_session.sql("""CREATE OR REPLACE TABLE "{0}"."{1}"."CAMPAIGN_JOINED_SAMPLE_DATA" 
            ( DATE_DAY DATE , PLATFORM VARCHAR , ACCOUNT_ID NUMBER , ACCOUNT_NAME VARCHAR , 
            CAMPAIGN_ID NUMBER , CAMPAIGN_NAME VARCHAR , AD_GROUP_ID NUMBER , AD_GROUP_NAME VARCHAR , 
            AD_ID NUMBER , AD_NAME VARCHAR , CLICKS NUMBER , IMPRESSIONS NUMBER , SPEND NUMBER );""".format(app_config['database'],app_config['schema'] )).collect()

sp_session.sql("""CREATE OR REPLACE FILE FORMAT {0}.{1}.CSV_FORMAT TYPE=CSV
    SKIP_HEADER=1
    FIELD_DELIMITER=','
    TRIM_SPACE=TRUE
    FIELD_OPTIONALLY_ENCLOSED_BY='"'
    REPLACE_INVALID_CHARACTERS=TRUE
    DATE_FORMAT=AUTO
    TIME_FORMAT=AUTO
    TIMESTAMP_FORMAT=AUTOs; """.format(app_config['database'],app_config['schema'] )).collect()

sp_session.sql("""
            COPY INTO "{0}"."{1}"."CAMPAIGN_JOINED_SAMPLE_DATA" 
FROM (SELECT $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13
	FROM '@"{0}"."{1}"."DATA_STG"/data/sample_data.gz') 
FILE_FORMAT = '{0}.{1}.CSV_FORMAT' 
ON_ERROR=ABORT_STATEMENT 
                """.format(app_config['database'],app_config['schema'] )).collect()

df = sp_session.sql("SELECT * from {0}.{1}.CAMPAIGN_JOINED_SAMPLE_DATA LIMIT 10 ".format(app_config['database'],app_config['schema'] )).to_pandas()
df

Unnamed: 0,DATE_DAY,PLATFORM,ACCOUNT_ID,ACCOUNT_NAME,CAMPAIGN_ID,CAMPAIGN_NAME,AD_GROUP_ID,AD_GROUP_NAME,AD_ID,AD_NAME,CLICKS,IMPRESSIONS,SPEND
0,2020-06-07,linkedin_ads,507488250,ABC Corp,605096246,EMEA_Campaign4,151532106,NA_Tier1,87354556,NA_CAMPAIGN,1.0,0.0,0.0
1,2021-11-08,linkedin_ads,506553487,ABC Corp,606058844,EMEA_Campaign3,163402586,APJ_Tier3,203130353,APJ_CAMPAIGN,,4.0,0.0
2,2023-05-16,linkedin_ads,506553487,ABC Corp,620727996,NA_Campaign6,182717396,APJ_Tier3,210976886,EMEA_CAMPAIGN,,1.0,1.0
3,2023-01-04,linkedin_ads,506553487,ABC Corp NA,606058744,NA_Campaign3,156201344,APJ_Tier2,186219986,NA_CAMPAIGN,,7.0,1.0
4,2023-07-01,linkedin_ads,507488250,ABC Corp NA,626098976,NA_Campaign5,204658316,APJ_Tier5,264084536,APJ_Q2_CAMPAIGN,2.0,,
5,2020-09-03,linkedin_ads,507488250,ABC Corp APJ,605446924,NA_Campaign4,143276386,APJ_Tier2,73605586,EMEA_Q1_CAMPAIGN,,20.0,0.0
6,2022-06-19,linkedin_ads,506553487,ABC Corp,606058744,NA_Campaign2,152820714,APJ_Tier2,86546384,NA_Q1_CAMPAIGN,1.0,112.0,14.0
7,2021-05-04,linkedin_ads,507488250,ABC Corp,614349276,EMEA_Campaign6,163655576,NA_Tier2,119016306,NA_CAMPAIGN,3.0,8.0,2.0
8,2020-08-13,linkedin_ads,507488250,ABC Corp EMEA,605096246,APJ_Campaign5,139908496,EMEA_Tier1,79581666,EMEA_Q2_CAMPAIGN,12.0,,
9,2020-09-20,linkedin_ads,507488250,ABC Corp NA,605096246,NA_Campaign7,149411336,APJ_Tier1,83695146,EMEA_CAMPAIGN,2.0,5.0,1.0


In [133]:
sp_session.sql("CREATE OR REPLACE SECURE VIEW {0}.{1}.CAMPAIGN72_VIEW as SELECT * from {0}.{1}.CAMPAIGN_JOINED_SAMPLE_DATA".format(app_config['database'], app_config['schema'])).collect()

[Row(status='View CAMPAIGN72_VIEW successfully created.')]

#### Building NativeApp

In [157]:
def create_nativeapp_pkg(_sp_session, _config, app_name, view_list, role_name, database, schema):
    """ Create a nativeapp package and update native app codebase as per the views shared. 
    """  
    _sp_session.sql("use role {0};".format(role_name)).collect()

    _sp_session.sql("CREATE OR REPLACE DATABASE {0};".format(app_name+'_CODE')).collect()
    _sp_session.sql("USE DATABASE {0};".format(app_name+'_CODE')).collect()
    _sp_session.sql("CREATE OR REPLACE SCHEMA {0};".format('CORE')).collect()
    _sp_session.sql("CREATE OR REPLACE STAGE {0}.{1}.{2} DIRECTORY = (ENABLE = TRUE) FILE_FORMAT = (TYPE = 'CSV' FIELD_DELIMITER = '|' SKIP_HEADER = 1);".format(app_name+'_CODE','CORE',app_name+'_CODE_STG')).collect()
    _sp_session.sql("DROP APPLICATION PACKAGE IF EXISTS {0}".format(app_name+'_PKG')).collect()
    _sp_session.sql("CREATE APPLICATION PACKAGE {0};".format(app_name+'_PKG')).collect()
    _sp_session.sql("USE {0};".format(app_name+'_PKG')).collect()
    _sp_session.sql("CREATE OR REPLACE SCHEMA {0};".format('SHARED_CONTENT')).collect()
    _sp_session.sql("GRANT USAGE ON SCHEMA {0} TO SHARE IN APPLICATION PACKAGE {1};".format('SHARED_CONTENT', app_name+'_PKG')).collect()
    _sp_session.sql("USE SCHEMA {0}.{1};".format(app_name+'_PKG', 'SHARED_CONTENT')).collect()

    with open("./Snowflake_for_Marketing_App/scripts/setup.sql", "a") as sqlscript:
        for views in view_list:
            _sp_session.sql("GRANT REFERENCE_USAGE ON DATABASE {0} TO SHARE IN APPLICATION PACKAGE {1};".format(database, app_name+'_PKG')).collect()
            _sp_session.sql("CREATE OR REPLACE SECURE VIEW SHARED_CONTENT.{2} AS SELECT * FROM {0}.{1}.{2};".format(database, schema,views)).collect()
            # _sp_session.sql("ALTER VIEW SHARED_CONTENT.{0} SET CHANGE_TRACKING = True;".format(views)).collect()
            _sp_session.sql("GRANT SELECT ON VIEW SHARED_CONTENT.{0} TO SHARE IN APPLICATION PACKAGE {1};".format(views, app_name+'_PKG')).collect()
            sqlscript.writelines("\n")
            sqlscript.writelines("CREATE OR REPLACE SECURE VIEW CRM.{0} AS SELECT * FROM SHARED_CONTENT.{0};\n".format(views))
            sqlscript.writelines("GRANT SELECT ON VIEW CRM.{0} TO APPLICATION ROLE APP_PUBLIC;\n".format(views))
    return 'Success'

In [158]:
import os
def upload_nativeappfiles_to_stage(p_sflk_session ,p_local_dir ,p_db ,p_sch ,p_stage, role_name ):
    """ Used for uploading third party libraries from a local directory to an stage. 
        :param p_sflk_session: The snowflake session
        :param p_local_dir: The local directory where the nativeapp files are available
        :param p_db: The database for the stage
        :param p_sch: The schema for the stage
        :param p_stage: The stage to upload too
        :param p_stage_dir: The stage folder under which the library will be stored. Typically 'lib'
        
        :returns: Nothing
    """
    print(f" Uploading files to stage: {p_db}.{p_sch}.{p_stage} ")

    for path, currentDirectory, files in os.walk(p_local_dir):
        for file in files:
            if not file.startswith('.'):
                # build the relative paths to the file
                local_file = os.path.join(path, file)
                replace_path = os.path.join('.','Snowflake_for_Marketing_App')

                # build the path to where the file will be staged
                stage_dir = path.replace(replace_path,'')
                p_sflk_session.sql("USE ROLE {0}".format(role_name)).collect()
                print(f'    {local_file} => @{p_stage}{stage_dir}')
                p_sflk_session.file.put(
                    local_file_name = local_file
                    ,stage_location = f'{p_db}.{p_sch}.{p_stage}/{stage_dir}'
                    ,auto_compress=False ,overwrite=True)
    
    p_sflk_session.sql(f'alter stage {p_db}.{p_sch}.{p_stage} refresh; ').collect()
    return 'Success'

<h5><ins>Initialization</ins></h5>


In [159]:
native_app_dir = './Snowflake_for_Marketing_App'
app_name = 'Marketing_Data_Foundation'
viewlist_to_app = ['C360_CLICKS_CRM_JOINED_VW', 'CAMPAIGN72_VIEW'] ## This is the list of dataset that needs to be bundled with the app. In our case, it is just one merged view. But it can be a list of views


<h5><ins>Creating Native App setup</ins></h5>

In [160]:
return_str_pkg = create_nativeapp_pkg(sp_session, app_config, app_name, viewlist_to_app, app_config['role'], app_config['database'], app_config['schema'])

print(return_str_pkg)

Success


<h5><ins>Upload the local files to stage for Nativeapp creation</ins></h5>

In [161]:

return_text = upload_nativeappfiles_to_stage(sp_session ,native_app_dir 
                            ,app_name+'_CODE', 'CORE'
                            ,app_name+'_CODE_STG' , app_config['role']) 
print(return_text)

 Uploading files to stage: Marketing_Data_Foundation_CODE.CORE.Marketing_Data_Foundation_CODE_STG 
    ./Snowflake_for_Marketing_App/readme.md => @Marketing_Data_Foundation_CODE_STG
    ./Snowflake_for_Marketing_App/manifest.yml => @Marketing_Data_Foundation_CODE_STG
    ./Snowflake_for_Marketing_App/streamlit/Home.py => @Marketing_Data_Foundation_CODE_STG/streamlit
    ./Snowflake_for_Marketing_App/streamlit/environment.yml => @Marketing_Data_Foundation_CODE_STG/streamlit
    ./Snowflake_for_Marketing_App/streamlit/images/Customer_Starter.png => @Marketing_Data_Foundation_CODE_STG/streamlit/images
    ./Snowflake_for_Marketing_App/streamlit/images/Arch_Customer72.png => @Marketing_Data_Foundation_CODE_STG/streamlit/images
    ./Snowflake_for_Marketing_App/streamlit/images/Arch_Marketing72.png => @Marketing_Data_Foundation_CODE_STG/streamlit/images
    ./Snowflake_for_Marketing_App/streamlit/images/Data_Catalog.png => @Marketing_Data_Foundation_CODE_STG/streamlit/images
    ./Snowflake

<h2> <ins> Create NativeApp</ins> </h2> 

In [162]:
def create_nativeapp(_sp_session, app_name):
    """ Create Native App post the stage upload is completed. Add versions to the package and also add default patches to create NativeApp 
    """ 
    try:
        _sp_session.sql("DROP APPLICATION IF EXISTS {0};".format(app_name)).collect()
        _sp_session.sql("ALTER APPLICATION PACKAGE {0} ADD VERSION V1_0 USING '@{1}';".format(app_name+'_PKG', app_name+'_CODE'+'.'+'CORE'+'.'+app_name+'_CODE_STG' )).collect()
        _sp_session.sql("ALTER APPLICATION PACKAGE {0} ADD PATCH FOR VERSION V1_0 USING '@{1}';".format(app_name+'_PKG', app_name+'_CODE'+'.'+'CORE'+'.'+app_name+'_CODE_STG' )).collect()
        _sp_session.sql("SHOW VERSIONS IN APPLICATION PACKAGE {0};".format(app_name+'_PKG')).collect()
        df = _sp_session.sql('SELECT "patch" as patch FROM TABLE(RESULT_SCAN(LAST_QUERY_ID())) order by patch desc limit 1;').to_pandas()
        patch_num = df['PATCH'][0]
        _sp_session.sql("ALTER APPLICATION PACKAGE {0} SET DEFAULT RELEASE DIRECTIVE VERSION = V1_0 patch = {1};".format(app_name+'_PKG', patch_num)).collect()
        _sp_session.sql("CREATE APPLICATION {0} FROM APPLICATION PACKAGE {1} USING VERSION V1_0;".format(app_name, app_name+'_PKG', patch_num)).collect()
        _sp_session.sql("ALTER APPLICATION {0} SET DEBUG_MODE=TRUE;".format(app_name)).collect()
        _sp_session.sql("ALTER APPLICATION PACKAGE {0} SET DISTRIBUTION = EXTERNAL".format(app_name+'_PKG')).collect()
        return 'Success'
    except Exception as e:
        print(e)
        return 'Failed'


In [163]:
return_app_creation_str = create_nativeapp(sp_session, app_name)
print(return_app_creation_str)

Success


![](./Images/Marketing_data_foundation_success.png)