# Analyze your Marketing Data with Cortex Analyst

Let's start by generating some data, and saving it as csv's.

In [None]:
import pandas as pd
import numpy as np
import random
from datetime import datetime, timedelta

# Set random seed for reproducibility
np.random.seed(26138)
random.seed(26138)

# Common Parameters
num_records = 1000
start_date = datetime.today() - timedelta(days=90)
campaign_names = ['Brand Awareness', 'Product Launch', 'Holiday Sale']
ad_group_names = ['Group A', 'Group B', 'Group C']
devices = ['Mobile', 'Desktop', 'Tablet']
geo_locations = ['New York', 'California', 'Texas', 'Florida', 'Illinois']
age_groups = ['18-24', '25-34', '35-44', '45-54', '55-64']
genders = ['Male', 'Female']

# Helper Functions
def beta_params(mean, std):
    """Calculate alpha and beta parameters for a beta distribution."""
    var = std ** 2
    alpha = ((1 - mean) / var - 1 / mean) * mean ** 2
    beta = alpha * (1 / mean - 1)
    return alpha, beta

def assign_user_ids(data, engaged_subscribers, engagement_bias=0.8):
    """Assign User IDs to data, biasing towards engaged subscribers."""
    user_ids_with_bias = np.where(np.random.rand(len(data)) < engagement_bias,
                                  np.random.choice(engaged_subscribers, len(data)),
                                  np.random.choice(user_ids, len(data)))
    return user_ids_with_bias

# Generate Subscribers for Mailchimp
num_subscribers = 500
subscriber_statuses = ['subscribed', 'unsubscribed', 'cleaned']

subscribers = pd.DataFrame({
    'email_address': [f'user{num}@example.com' for num in range(1, num_subscribers + 1)],
    'first_name': [f'Name{num}' for num in range(1, num_subscribers + 1)],
    'last_name': [f'Last{num}' for num in range(1, num_subscribers + 1)],
    'signup_date': [start_date - timedelta(days=random.randint(0, 365)) for _ in range(num_subscribers)],
    'status': np.random.choice(subscriber_statuses, p=[0.85, 0.1, 0.05], size=num_subscribers),
})

# Create synthetic User IDs
user_ids = [f'UID{num}' for num in range(1, num_subscribers + 1)]
subscribers['user_id'] = user_ids

# Active subscribers
active_subscribers = subscribers[subscribers['status'] == 'subscribed']
engaged_subscribers = []

# Generate Mailchimp Campaign Data
campaign_names_mailchimp = ['Newsletter', 'Promotion', 'Update']
mailchimp_data = []

for campaign in campaign_names_mailchimp:
    send_date = start_date + timedelta(days=random.randint(0, 90))
    recipients = active_subscribers.sample(frac=np.random.uniform(0.7, 1.0))
    open_rate = np.random.uniform(0.2, 0.6)
    click_rate = open_rate * np.random.uniform(0.1, 0.5)  # Click rate is typically lower than open rate

    for _, subscriber in recipients.iterrows():
        opened = np.random.rand() < open_rate
        clicked = opened and (np.random.rand() < (click_rate / open_rate))
        total_opens = np.random.randint(1, 5) if opened else 0
        total_clicks = np.random.randint(1, 3) if clicked else 0
        last_opened = send_date + timedelta(days=int(random.randint(0, 5))) if opened else None
        last_clicked = last_opened + timedelta(days=int(random.randint(0, 2))) if clicked else None

        mailchimp_data.append({
            'email_address': subscriber['email_address'],
            'user_id': subscriber['user_id'],
            'campaign_name': campaign,
            'send_time': send_date,
            'open_rate': open_rate,
            'click_rate': click_rate,
            'last_opened': last_opened,
            'last_clicked': last_clicked,
            'total_opens': total_opens,
            'total_clicks': total_clicks,
        })

        if total_opens > 0:
            engaged_subscribers.append(subscriber['user_id'])

mailchimp_data = pd.DataFrame(mailchimp_data)

# Remove duplicates in engaged_subscribers
engaged_subscribers = list(set(engaged_subscribers))

# Generate Google Ads Synthetic Data
google_ads_data = pd.DataFrame({
    'date': [start_date + timedelta(days=int(i)) for i in np.random.randint(0, 90, num_records)],
    'campaign_name': np.random.choice(campaign_names, num_records),
    'ad_group_name': np.random.choice(ad_group_names, num_records),
    'device_type': np.random.choice(devices, num_records),
    'geo_targeting': np.random.choice(geo_locations, num_records),
    'age': np.random.choice(age_groups, num_records),
    'gender': np.random.choice(genders, num_records),
})

# Impressions
google_ads_data['impressions'] = np.random.normal(loc=5000, scale=2000, size=num_records).astype(int).clip(min=0)

# Click-Through Rate (CTR)
ctr_mean = 0.05
ctr_std = 0.02
a_ctr, b_ctr = beta_params(ctr_mean, ctr_std)
ctr_values = np.random.beta(a_ctr, b_ctr, num_records)
google_ads_data['clicks'] = (google_ads_data['impressions'] * ctr_values).astype(int)

# Cost Per Click (CPC)
cpc_mean = np.log(1.5)
cpc_std = 0.5
cpc_values = np.random.lognormal(mean=cpc_mean, sigma=cpc_std, size=num_records)
google_ads_data['cost'] = (google_ads_data['clicks'] * cpc_values).round(2)

# Assign User IDs
google_ads_data['user_id'] = assign_user_ids(google_ads_data, engaged_subscribers)

# Adjust Conversions without engagement
base_cvr_google = 0.02
google_ads_data['conversions'] = (google_ads_data['clicks'] * base_cvr_google).astype(int)

# Conversion Value
aov_mean = 100
aov_std = 20
aov_values = np.random.normal(loc=aov_mean, scale=aov_std, size=num_records)
google_ads_data['conversion_value'] = (google_ads_data['conversions'] * aov_values).round(2).clip(lower=0)

# Derived Metrics
google_ads_data['conv_rate'] = (google_ads_data['conversions'] / google_ads_data['clicks']).replace([np.inf, np.nan], 0).round(4)
google_ads_data['cost_per_conv'] = (google_ads_data['cost'] / google_ads_data['conversions']).replace([np.inf, np.nan], 0).round(2)

# Clean up negative or NaN values
google_ads_data.fillna(0, inplace=True)
google_ads_data['conv_rate'] = google_ads_data['conv_rate'].clip(0, 1)
google_ads_data['date'] = pd.to_datetime(google_ads_data['date'])

# Generate Facebook Ads Synthetic Data
ad_set_names = ['Set 1', 'Set 2', 'Set 3']
placements = ['Feed', 'Stories', 'Marketplace', 'Video Feeds']

facebook_ads_data = pd.DataFrame({
    'date': [start_date + timedelta(days=int(i)) for i in np.random.randint(0, 90, num_records)],
    'campaign_name': np.random.choice(campaign_names, num_records),
    'ad_set_name': np.random.choice(ad_set_names, num_records),
    'placement': np.random.choice(placements, num_records),
    'age': np.random.choice(age_groups, num_records),
    'gender': np.random.choice(genders, num_records),
    'region': np.random.choice(geo_locations, num_records),
})

# Impressions
facebook_ads_data['impressions'] = np.random.normal(loc=4000, scale=1500, size=num_records).astype(int).clip(min=0)

# Reach
facebook_ads_data['reach'] = (facebook_ads_data['impressions'] * np.random.uniform(0.7, 1.0, size=num_records)).astype(int)

# CTR
ctr_mean_fb = 0.04
ctr_std_fb = 0.015
a_ctr_fb, b_ctr_fb = beta_params(ctr_mean_fb, ctr_std_fb)
ctr_values_fb = np.random.beta(a_ctr_fb, b_ctr_fb, num_records)
facebook_ads_data['clicks'] = (facebook_ads_data['impressions'] * ctr_values_fb).astype(int)

# Cost
cpc_mean_fb = np.log(1.0)
cpc_std_fb = 0.4
cpc_values_fb = np.random.lognormal(mean=cpc_mean_fb, sigma=cpc_std_fb, size=num_records)
facebook_ads_data['cost'] = (facebook_ads_data['clicks'] * cpc_values_fb).round(2)

# Assign User IDs
facebook_ads_data['user_id'] = assign_user_ids(facebook_ads_data, engaged_subscribers)

# Adjust Conversions without engagement
base_cvr_facebook = 0.03
facebook_ads_data['conversions'] = (facebook_ads_data['clicks'] * base_cvr_facebook).astype(int)

# Revenue
aov_mean_fb = 90
aov_std_fb = 15
aov_values_fb = np.random.normal(loc=aov_mean_fb, scale=aov_std_fb, size=num_records)
facebook_ads_data['revenue'] = (facebook_ads_data['conversions'] * aov_values_fb).round(2).clip(lower=0)

# Purchase ROAS
facebook_ads_data['purchase_roas'] = (facebook_ads_data['revenue'] / facebook_ads_data['cost']).replace([np.inf, np.nan], 0).round(2)

# CPC and CTR Calculations
facebook_ads_data['cpc'] = (facebook_ads_data['cost'] / facebook_ads_data['clicks']).replace([np.inf, np.nan], 0).round(2)
facebook_ads_data['ctr'] = (facebook_ads_data['clicks'] / facebook_ads_data['impressions']).replace([np.inf, np.nan], 0).round(4)

# Clean up negative or NaN values
facebook_ads_data.fillna(0, inplace=True)
facebook_ads_data['ctr'] = facebook_ads_data['ctr'].clip(0, 1)
facebook_ads_data['date'] = pd.to_datetime(facebook_ads_data['date'])

# Seasonality Adjustments
peak_dates = [start_date + timedelta(days=30), start_date + timedelta(days=60)]

for peak_date in peak_dates:
    # Google Ads
    mask_google = google_ads_data['date'] == peak_date
    google_ads_data.loc[mask_google, 'impressions'] = (google_ads_data.loc[mask_google, 'impressions'] * 1.5).astype(int)
    google_ads_data.loc[mask_google, 'clicks'] = (google_ads_data.loc[mask_google, 'clicks'] * 1.5).astype(int)
    google_ads_data.loc[mask_google, 'conversions'] = (google_ads_data.loc[mask_google, 'conversions'] * 1.5).astype(int)
    # Facebook Ads
    mask_facebook = facebook_ads_data['date'] == peak_date
    facebook_ads_data.loc[mask_facebook, 'impressions'] = (facebook_ads_data.loc[mask_facebook, 'impressions'] * 1.5).astype(int)
    facebook_ads_data.loc[mask_facebook, 'clicks'] = (facebook_ads_data.loc[mask_facebook, 'clicks'] * 1.5).astype(int)
    facebook_ads_data.loc[mask_facebook, 'conversions'] = (facebook_ads_data.loc[mask_facebook, 'conversions'] * 1.5).astype(int)

# Campaign Performance Variance
campaign_performance = {
    'Brand Awareness': 1.0,
    'Product Launch': 1.2,
    'Holiday Sale': 1.5,
}

# Adjust Google Ads
google_ads_data['performance_multiplier'] = google_ads_data['campaign_name'].map(campaign_performance)
google_ads_data['impressions'] = (google_ads_data['impressions'] * google_ads_data['performance_multiplier']).astype(int)
google_ads_data['clicks'] = (google_ads_data['clicks'] * google_ads_data['performance_multiplier']).astype(int)
google_ads_data['conversions'] = (google_ads_data['conversions'] * google_ads_data['performance_multiplier']).astype(int)
google_ads_data.drop('performance_multiplier', axis=1, inplace=True)

# Adjust Facebook Ads
facebook_ads_data['performance_multiplier'] = facebook_ads_data['campaign_name'].map(campaign_performance)
facebook_ads_data['impressions'] = (facebook_ads_data['impressions'] * facebook_ads_data['performance_multiplier']).astype(int)
facebook_ads_data['clicks'] = (facebook_ads_data['clicks'] * facebook_ads_data['performance_multiplier']).astype(int)
facebook_ads_data['conversions'] = (facebook_ads_data['conversions'] * facebook_ads_data['performance_multiplier']).astype(int)
facebook_ads_data.drop('performance_multiplier', axis=1, inplace=True)

# Demographic Variance
age_group_engagement = {
    '18-24': 1.2,
    '25-34': 1.0,
    '35-44': 0.9,
    '45-54': 0.8,
    '55-64': 0.7,
}

# Adjust Google Ads based on Age Group
google_ads_data['age_multiplier'] = google_ads_data['age'].map(age_group_engagement)
google_ads_data['clicks'] = (google_ads_data['clicks'] * google_ads_data['age_multiplier']).astype(int)
google_ads_data.drop('age_multiplier', axis=1, inplace=True)

# Adjust Facebook Ads based on Age Group
facebook_ads_data['age_multiplier'] = facebook_ads_data['age'].map(age_group_engagement)
facebook_ads_data['clicks'] = (facebook_ads_data['clicks'] * facebook_ads_data['age_multiplier']).astype(int)
facebook_ads_data.drop('age_multiplier', axis=1, inplace=True)

# Recalculate Derived Metrics after Adjustments
# Google Ads
google_ads_data['conv_rate'] = (google_ads_data['conversions'] / google_ads_data['clicks']).replace([np.inf, np.nan], 0).round(4)
google_ads_data['cost_per_conv'] = (google_ads_data['cost'] / google_ads_data['conversions']).replace([np.inf, np.nan], 0).round(2)
google_ads_data['conv_rate'] = google_ads_data['conv_rate'].clip(0, 1)

# Facebook Ads
facebook_ads_data['cpc'] = (facebook_ads_data['cost'] / facebook_ads_data['clicks']).replace([np.inf, np.nan], 0).round(2)
facebook_ads_data['ctr'] = (facebook_ads_data['clicks'] / facebook_ads_data['impressions']).replace([np.inf, np.nan], 0).round(4)
facebook_ads_data['purchase_roas'] = (facebook_ads_data['revenue'] / facebook_ads_data['cost']).replace([np.inf, np.nan], 0).round(2)
facebook_ads_data['ctr'] = facebook_ads_data['ctr'].clip(0, 1)

# Save Data to CSV Files
google_ads_data.to_csv('google_ads_data.csv', index=False)
facebook_ads_data.to_csv('facebook_ads_data.csv', index=False)
subscribers.to_csv('mailchimp_subscribers.csv', index=False)
mailchimp_data.to_csv('mailchimp_campaigns.csv', index=False)

print("Synthetic data generation complete. CSV files have been saved.")


# Connect to the snowlfake instaned, and create our database, schema, and table information

In [None]:
import snowflake.connector

# Set up Snowflake connection parameters
conn = snowflake.connector.connect(
    user='<your_username>',
    password='<your_password>',
    account='<your_account_identifier>',
    warehouse='<your_warehouse>',
    role='<your_role>',
    authenticator='snowflake'  # Use 'externalbrowser' if using SSO
)

# Create a cursor object
cur = conn.cursor()

# Execute the SQL statements
sql_commands = [
    '''
    CREATE DATABASE IF NOT EXISTS MARKETING_DEMO;
    USE DATABASE MARKETING_DEMO;
    CREATE SCHEMA IF NOT EXISTS MARKETING_DATA;
    USE SCHEMA MARKETING_DATA;
    ''',
    '''
    CREATE OR REPLACE TABLE GOOGLE_ADS_DATA (
      date DATE,
      campaign_name STRING,
      ad_group_name STRING,
      device_type STRING,
      geo_targeting STRING,
      age STRING,
      gender STRING,
      impressions INTEGER,
      clicks INTEGER,
      cost FLOAT,
      user_id STRING,
      conversions INTEGER,
      conversion_value FLOAT,
      conv_rate FLOAT,
      cost_per_conv FLOAT
    );
    ''',
    '''
    CREATE OR REPLACE TABLE FACEBOOK_ADS_DATA (
      date DATE,
      campaign_name STRING,
      ad_set_name STRING,
      placement STRING,
      age STRING,
      gender STRING,
      region STRING,
      impressions INTEGER,
      reach INTEGER,
      clicks INTEGER,
      cost FLOAT,
      user_id STRING,
      conversions INTEGER,
      revenue FLOAT,
      purchase_roas FLOAT,
      cpc FLOAT,
      ctr FLOAT
    );
    ''',
    '''
    CREATE OR REPLACE TABLE MAILCHIMP_SUBSCRIBERS (
      email_address STRING,
      first_name STRING,
      last_name STRING,
      signup_date DATE,
      status STRING,
      user_id STRING
    );
    ''',
    '''
    CREATE OR REPLACE TABLE MAILCHIMP_CAMPAIGNS (
      email_address STRING,
      user_id STRING,
      campaign_name STRING,
      send_time DATE,
      open_rate FLOAT,
      click_rate FLOAT,
      last_opened DATE,
      last_clicked DATE,
      total_opens INTEGER,
      total_clicks INTEGER
    );
    ''',
    '''
    CREATE STAGE raw_data DIRECTORY = (ENABLE = TRUE);
    '''
]

for command in sql_commands:
    cur.execute(command)
    print(f"Executed:\n{command}\n")
    
# Close the cursor and the connection
cur.close()
conn.close()


# Now we can load our CSV's through the Snowflake UI into our created tables and ensure the mappings are correct.

# Next, we'll run the semantic model generator locally, you can find instructions for that [here](https://github.com/Snowflake-Labs/semantic-model-generator)

# Once our Model has been uploaded, we can upload the following Python code as a Streamlit app on the Snowflake platform

In [None]:
import _snowflake
import json
import streamlit as st
import pandas as pd
from snowflake.snowpark.context import get_active_session
from typing import (List)

session = get_active_session()

#update following lines 
DATABASE = "MARKETING_DEMO"
SCHEMA = "MARKETING_DATA"
STAGE = "RAW_DATA"
FILE = "marketing_data.yaml"

if "DATABASE" not in st.session_state:
    st.session_state.DATABASE = "MARKETING_DEMO"

if "SCHEMA" not in st.session_state:
    st.session_state.SCHEMA = "MARKETING_DATA"

FULLPATH = f"{DATABASE}.{SCHEMA}.{STAGE}"
user_input=""

def send_message(prompt: str) -> dict:
    """Calls the REST API and returns the response."""
    request_body = {
        "messages": [
            {"role": "user",
             "content": [{
                        "type": "text",
                        "text": prompt
             }]}],
        "semantic_model_file": f"@{DATABASE}.{SCHEMA}.{STAGE}/{FILE}",
    }
    resp = _snowflake.send_snow_api_request("POST",f"/api/v2/cortex/analyst/message", {}, {}, request_body, {}, 30000,)
    if resp["status"] < 400:
        return json.loads(resp["content"])
    else:
        raise Exception(
            f"Failed request with status {resp['status']}: {resp}"
        )

def process_message(prompt: str) -> None:
    """Processes a message and adds the response to the chat."""
    st.session_state.messages.append(
        {"role": "user", "content": [{"type": "text", "text": prompt}]}
    )
    with st.chat_message("user"):
        st.markdown(prompt)
    with st.chat_message("assistant"):
        with st.spinner("Generating response..."):
            response = send_message(prompt=prompt)
            content = response["message"]["content"]
            display_content(content=content)
    st.session_state.messages.append({"role": "assistant", "content": content})


def display_content(content: list, message_index: int = None) -> None:
    """Displays a content item for a message."""
    message_index = message_index or len(st.session_state.messages)
    for item in content:
        if item["type"] == "text":
            st.markdown(item["text"])
        elif item["type"] == "suggestions":
            with st.expander("Suggestions", expanded=True):
                for suggestion_index, suggestion in enumerate(item["suggestions"]):
                    if st.button(suggestion, key=f"{message_index}_{suggestion_index}"):
                        st.session_state.active_suggestion = suggestion
        elif item["type"] == "sql":
            with st.expander("SQL Query", expanded=False):
                st.code(item["statement"], language="sql")
            with st.expander("Results", expanded=True):
                with st.spinner("Running SQL..."):
                    df = session.sql(item["statement"]).to_pandas()
                    if len(df.index) > 1:
                        data_tab, line_tab, bar_tab = st.tabs(
                            ["Data", "Line Chart", "Bar Chart"]
                        )
                        data_tab.dataframe(df)
                        if len(df.columns) > 1:
                            df = df.set_index(df.columns[0])
                        with line_tab:
                            st.line_chart(df)
                        with bar_tab:
                            st.bar_chart(df)
                    else:
                        st.dataframe(df)
    

st.title("Cortex analyst")
st.markdown(f"Semantic Model: `{FILE}`")



if "messages" not in st.session_state:
    st.session_state.messages = []
    st.session_state.suggestions = []
    st.session_state.active_suggestion = None

for message_index, message in enumerate(st.session_state.messages):
    with st.chat_message(message["role"]):
        display_content(content=message["content"], message_index=message_index)

if user_input := st.chat_input("What is your question?"):
    process_message(prompt=user_input)

if st.session_state.active_suggestion:
    process_message(prompt=st.session_state.active_suggestion)
    st.session_state.active_suggestion = None

# Sticky Clear Button
# Add this button right after the chat input to make it "sticky"
clear_button_col, _ = st.columns([1, 5])
with clear_button_col:
    if st.button("Clear Chat"):
        st.session_state.messages = []  # Clear the chat history
        st.experimental_rerun()  # Force rerun to clear the output

# And we're finished!