# Klaviyo API

Our goal is to create a flattened table of email campaign data, with their audiences, tags, and applicable metrics.\
There are a few transformations below outside of the calls, mostly related to creating variables like end dates, pivoting output for merging into the main df, and string cleanup for misc characters.

### The API Sum Of Parts

* Campaigns
  * [Get Campaigns](https://developers.klaviyo.com/en/reference/get_campaigns)
* Audiences (Segments & Lists)
  * [Get Segments](https://developers.klaviyo.com/en/reference/get_segments)
  * [Get Lists](https://developers.klaviyo.com/en/reference/get_lists)
* Tags
  * [Get Tags](https://developers.klaviyo.com/en/reference/get_campaign_tags)
  * [Get Tag Groups](https://developers.klaviyo.com/en/reference/get_tag_groups)
  * [Get Campaign Relationship Tags](https://developers.klaviyo.com/en/reference/get_campaign_relationships_tags)
      * You can alternatively use [Get Campaign Tags](https://developers.klaviyo.com/en/reference/get_campaign_tags), but I wanted to avoid string splitting.
* Metrics
  * [Get Metrics](https://developers.klaviyo.com/en/reference/get_metrics)
  * [Query Metric Aggregates](https://developers.klaviyo.com/en/reference/query_metric_aggregates)


In [None]:
import requests
import time
import pandas as pd
from datetime import datetime, timedelta
from config import *

In [None]:
# Set target database/brand.
database = "CT"

# Assigning private_key based on db value.
if database == "CT":
    private_key = ct_private_key
    connection_string = ct_connection_string
elif database == "KP":
    private_key = kp_private_key
    connection_string = kp_connection_string
elif database == "ST":
    private_key = st_private_key
    connection_string = st_connection_string
elif database == "WC":
    private_key = wc_private_key
    connection_string = wc_connection_string

### Campaigns 

Not all data is neatly appended like in the documentation, it's just links for subsequent API calls to get these fields.\
We will only need to pull in: campaign_id, campaign_name, status, audiences, send_strategy, scheduled, sent.

We are also adding a column called end_date, 14 days from sent, to mark when the attribution window closes.

_Note: A/B tests will group all metrics for all sub-mailings. You will need to make a specialized call to break these apart into their own campaigns._


In [None]:
# Pull all in all email campaign ids.

url = "https://a.klaviyo.com/api/campaigns/?filter=equals(messages.channel,'email')"
headers = {
    "accept": "application/json",
    "revision": "2023-10-15",
    "Authorization": "Klaviyo-API-Key " + private_key
}

campaigns = []

while url:
    response = requests.get(url, headers=headers)

    if response.status_code == 200:
        data = response.json()
        campaigns += [campaign for campaign in data['data'] if campaign['type'] == 'campaign']
        url = data['links'].get('next')  # Get the next page URL
    else:
        print('Failed to retrieve campaigns: ', response.status_code)
        break

# Output is heavily nested, and contains a lot of useless meta data.
# Just going to cherry pick the values we need into a flattened table.
df_campaigns = pd.DataFrame([{
    'campaign_id': campaign['id'],
    'campaign_name': campaign['attributes']['name'],
    'status': campaign['attributes']['status'],
    'audiences': campaign['attributes']['audiences']['included'],
    'send_strategy': campaign['attributes']['send_strategy']['method'],
    'scheduled': campaign['attributes']['scheduled_at'], # Used for metric capture window.
    'sent': campaign['attributes']['send_time'], # Effectively useless depending on smart send times.
} for campaign in campaigns])

# Adding an end date to mark when Klaviyo attribution window ends.
# Will be referenced later when calling for aggregate metrics.
df_campaigns['end_date'] = pd.to_datetime(df_campaigns['sent'])
df_campaigns['end_date'] = df_campaigns['end_date'].apply(lambda x: (x + pd.Timedelta(days=14)).isoformat() if pd.notna(x) else None)

### Limiting Campaigns

We are pulling in _a lot_ of camaigns, so we are going to limit to the last 20 days from scheduled.\
There are nuances to the differences in scheduled/sent when it comes to "smart send", so we'll use this one.

In [None]:
#Doing a bit of clean up. There are a lot of subsequent API calls, limiting to recent campaigns speeds this up.

# Limit to only sent emails. No need to pull in email drafts.
df_campaigns = df_campaigns[df_campaigns['status'] != 'Draft']

# Get the date string for x days ago.
date_str = (datetime.now() - timedelta(days=20)).strftime('%Y-%m-%dT%H:%M:%S')

# Filter rows where 'scheduled' >= date_str.
df_campaigns = df_campaigns[df_campaigns['sent'] >= date_str]
df_campaigns.reset_index(drop=True, inplace=True)

### Audiences

The column 'audiences' in campaigns is a combination of segments and lists.\
They are functionally the same thing, but you need to make two calls to get the names of each id, break apart the strings in campaigns, and append to campaign ids.


In [None]:
# Pull all segments.
url = "https://a.klaviyo.com/api/segments/"
headers = {
    "accept": "application/json",
    "revision": "2023-10-15",
    "Authorization": "Klaviyo-API-Key " + private_key
}

# API only allows 10 at a time, need to paginate.

segments = []

while url:
    response = requests.get(url, headers=headers)

    if response.status_code == 200:
        data = response.json()
        segments += [segment for segment in data['data'] if segment['type'] == 'segment']
        url = data['links'].get('next')  # Get the next page URL
    else:
        print('Failed to retrieve segments: ', response.status_code)
        break

# Converting to DataFrame
df_segments = pd.DataFrame([{
    'audience_id': segment['id'],
    'audience_name': segment['attributes']['name'],
} for segment in segments])

In [None]:
# Pull all lists.
url = "https://a.klaviyo.com/api/lists/"
headers = {
    "accept": "application/json",
    "revision": "2023-10-15",
    "Authorization": "Klaviyo-API-Key " + private_key
}

# API only allows 10 at a time, need to paginate.

lists = []

while url:
    response = requests.get(url, headers=headers)

    if response.status_code == 200:
        data = response.json()
        lists += [list for list in data['data'] if list['type'] == 'list']
        url = data['links'].get('next')  # Get the next page URL
    else:
        print('Failed to retrieve lists: ', response.status_code)
        break

# Converting to DataFrame
df_lists = pd.DataFrame([{
    'audience_id': list['id'],
    'audience_name': list['attributes']['name'],
} for list in lists])

In [None]:
# Combine lists and segments into one data frame.
df_audience = pd.concat([df_segments, df_lists], ignore_index=True)

# Cleaning the 'audiences' column in df_campaigns
df_campaigns['audiences'] = df_campaigns['audiences'].astype(str).str.replace(r'[\[\]\'"]', '', regex=True)

# Splitting the audience IDs into lists, handling None, empty or invalid formats in df_campaigns
df_campaigns['audiences'] = df_campaigns['audiences'].apply(lambda x: x.split(',') if x and x.strip() else [])

# Creating a dictionary from df_audience for mapping
audience_id_name_map = dict(zip(df_audience['audience_id'], df_audience['audience_name']))

# Function to replace IDs with names and concatenate in df_campaigns
def replace_ids_with_names(ids):
    if not ids:
        return None
    names = [audience_id_name_map.get(id.strip()) for id in ids if id.strip() in audience_id_name_map]
    return ', '.join(filter(None, names))  # filter(None, names) removes any None values

# Applying the function to the 'audiences' column in df_campaigns
df_campaigns['audiences'] = df_campaigns['audiences'].apply(replace_ids_with_names)

### Tags

You need to make a call to get tag names, the tag group names, and then create a table merging them all together.\
Once this is done, we are making a call to Get Campaign Relationship Tags one-by-one for all campaigns, this should return associated tags.\
I think there might be an easier way to achieve this, but I opted to go this route.

In [None]:
# Pull all tags
url = "https://a.klaviyo.com/api/tags/"
headers = {
    "accept": "application/json",
    "revision": "2023-10-15",
    "Authorization": "Klaviyo-API-Key " + private_key
}

response = requests.get(url, headers=headers)

# Request test
if response.status_code == 200:
    response = response.json()
else:
    print('Failed to retrieve campaigns: ', response.status_code) 
    
# Extracting metric data
tags = [tag for tag in response['data'] if tag['type'] == 'tag']

# Converting to DataFrame
df_tags = pd.DataFrame([{
    'tag_id': tag['id'],
    'tag_name': tag['attributes']['name'],
    'tag_group_id': tag['relationships']['tag-group']['data']['id']
} for tag in tags])

In [None]:
# Pull all tag groups

url = "https://a.klaviyo.com/api/tag-groups/"
headers = {
    "accept": "application/json",
    "revision": "2023-10-15",
    "Authorization": "Klaviyo-API-Key " + private_key
}

response = requests.get(url, headers=headers)

# Request test
if response.status_code == 200:
    response = response.json()
else:
    print('Failed to retrieve tag groups: ', response.status_code) 
    
# Extracting metric data
tag_groups = [tag_group for tag_group in response['data'] if tag_group['type'] == 'tag-group']

# Converting to DataFrame
df_tag_groups = pd.DataFrame([{
    'tag_group_id': tag_group['id'],
    'tag_group_name': tag_group['attributes']['name']
} for tag_group in tag_groups])

# Pulling in only tag groups we want.
df_tag_groups = df_tag_groups[df_tag_groups['tag_group_name'].isin(['Content', 'Format', 'Program/Audience', 'Sales'])]

In [None]:
# Merge tags with tag groupings.
df_tag_table = df_tags.merge(df_tag_groups, on='tag_group_id', how='left')
# Drop null/old tags that don't have a group assigned.
df_tag_table.dropna(subset=['tag_group_name'], inplace=True)

In [None]:
# Function to call API with entire row and process response
def call_api_for_tags(campaign_row, private_key):
    # Extracting values from the row
    campaign_id = campaign_row['campaign_id']
    campaign_name = campaign_row['campaign_name']

    # API call setup
    url = "https://a.klaviyo.com/api/campaigns/" + campaign_id + "/relationships/tags/"
    headers = {
        "accept": "application/json",
        "revision": "2023-10-15",
        "Authorization": "Klaviyo-API-Key " + private_key
    }

    # Making the API call
    response = requests.get(url, headers=headers)
    if response.status_code == 200:
        response = response.json()
        campaign_tags = [tag for tag in response['data'] if tag['type'] == 'tag']
        return pd.DataFrame([{
            'campaign_id': campaign_id,
            'campaign_name': campaign_name,
            'tag_id': tag['id']
        } for tag in campaign_tags])
    else:
        print('Failed to retrieve tags: ', response.status_code)
        return pd.DataFrame()

# DataFrame to store results
df_campaign_tags = pd.DataFrame()

# Iterate over each row in df_campaigns
for index, row in df_campaigns.iterrows():
    df_temp = call_api_for_tags(row, private_key)
    df_campaign_tags = pd.concat([df_campaign_tags, df_temp], ignore_index=True)
    
    # Rate limiting: Burst 3/s, Steady 60/m; appears to be a lie.
    time.sleep(1)

# Merge tags with tag groupings.
df_tag_table = df_campaign_tags.merge(df_tag_table, on='tag_id', how='left')

# Grouping by 'campaign_id' and 'tag_group_name', then joining the 'tag_name's
grouped = df_tag_table.groupby(['campaign_id', 'tag_group_name'])['tag_name'].apply(','.join).reset_index()

# Pivoting the table to get the desired format
pivot = grouped.pivot(index='campaign_id', columns='tag_group_name', values='tag_name').reset_index()

# Merging with df_campaigns
df_campaigns = df_campaigns.merge(pivot, on='campaign_id', how='left')

### Metrics

Here we are pulling in all of the metric name and ids.\
Each brand/account has a different id, and is often polluted with old tags, so we are specifiying the standardized ones we use.\
Once the metrics are put into a table, we are recursively calling all for metrics for each campaign in the main table, pivoting the output, and merging into the main campaigns table.

In [None]:
# Pull in unique metric IDs.

url = "https://a.klaviyo.com/api/metrics/"
headers = {
    "accept": "application/json",
    "revision": "2023-10-15",
    "Authorization": "Klaviyo-API-Key " + private_key
}

response = requests.get(url, headers=headers)
    
# Request test
if response.status_code == 200:
    response = response.json()
else:
    print('Failed to retrieve metrics: ', response.status_code) 
    
# Output is heavily nested, and contains a lot of useless meta data.
# Just going to cherry pick the values we need into a flattened table.
metrics = [item for item in response['data'] if item['type'] == 'metric']

# Converting to DataFrame
df_metrics = pd.DataFrame([{
    'metric_id': metric['id'],
    'metric_name': metric['attributes']['name']
} for metric in metrics])

# ID's below are: received email, opened email, clicked email, unsubscribed.
# Each brand has different/unique metric_id's, need to use the default names to filter what we want.
df_metrics = df_metrics[df_metrics['metric_name'].isin(['Received Email', 'Opened Email', 'Clicked Email', 'Unsubscribed'])]
df_metrics.reset_index(drop=True, inplace=True)

In [None]:
# Klaviyo requires that you call every metric for every campaign.
# Going to have to call every row in df_campaign, and cycle through every df_metric id.

def call_api_for_metrics(campaign_row, metric_row, private_key):
    
    campaign_id = campaign_row['campaign_id']
    campaign_name = campaign_row['campaign_name']
    scheduled = campaign_row['scheduled']
    end_date = campaign_row['end_date']

    metric_id = metric_row['metric_id']
    metric_name = metric_row['metric_name']

    # API URL
    url = "https://a.klaviyo.com/api/metric-aggregates/"

    # Preparing the payload
    payload = { "data": {
            "type": "metric-aggregate",
            "attributes": {
                "metric_id": metric_id,
                "measurements": ["unique"], # Can use: sum_value, count, unique
                "interval": "month", # Can use: hour, day, week, month
                "by": ["$message"],
                "filter": [
                    "greater-or-equal(datetime," + scheduled + ")",  
                    "less-than(datetime," + end_date + ")",  
                    "equals($message,\"" + campaign_id+ "\")"],
                "timezone": "US/Pacific"
            }
        } }

    # Headers for the API request
    headers = {
        "accept": "application/json",
        "revision": "2023-10-15",
        "content-type": "application/json",
        "Authorization": "Klaviyo-API-Key " + private_key
    }

    # Making the API request
    response = requests.post(url, json=payload, headers=headers)

    # Check if the request was successful
    if response.status_code == 200:
        response_data = response.json()
    else:
        print('Failed to retrieve data: ', response.status_code)
        return pd.DataFrame()

    # Extracting metric data
    metric_aggregate = response_data['data']

    # Preparing data for DataFrame
    data_for_df = []
    dates = metric_aggregate['attributes']['dates']
    measurements = metric_aggregate['attributes']['data'][0]['measurements']['unique']

    for date, measurement in zip(dates, measurements):
        data_for_df.append({
            'campaign_id': campaign_id,
            'campaign_name': campaign_name,
            'metric_id': metric_id,
            'metric_name': metric_name,
            'date': date,
            'aggregate': measurement
        })

    return pd.DataFrame(data_for_df)

def main(df_campaigns, df_metrics, private_key):
    final_output = pd.DataFrame()

    for campaign_index, campaign_row in df_campaigns.iterrows():
        for metric_index, metric_row in df_metrics.iterrows():
            result = call_api_for_metrics(campaign_row, metric_row, private_key)
            final_output = pd.concat([final_output, result])

            # Rate limiting: Burst 10/s, Steady 150/m; Actual seems to be 60-75/m.
            time.sleep(1)

    return final_output

# Call main()
metric_output = main(df_campaigns, df_metrics, private_key)

# Remove date columns, sum aggregate values to get a readable table.
metric_output = metric_output.groupby(['campaign_id', 'campaign_name', 'metric_id', 'metric_name'])['aggregate'].sum().reset_index()

# Pivot the output, merge into df_campaigns.
pivot = metric_output.pivot(index='campaign_id', columns='metric_name', values='aggregate')
df_campaigns = df_campaigns.merge(pivot, on='campaign_id', how='left')

### Storage

This obviously is not the best way to go about upserting fields into our campaigns table,\
...but is substantially easier.

In [None]:
# We are just going to pull all existing records, merge with new records, all in a dataframe.
import pyodbc
from sqlalchemy.engine import URL
connection_url = URL.create("mssql+pyodbc", query={"odbc_connect": connection_string})

from sqlalchemy import create_engine
engine = create_engine(connection_url)

# Select all records from existing table.
query = ("SELECT * FROM " + database + ".klaviyo.Campaigns")

# Run Query.
df_historical = pd.read_sql(query, engine)

# Merge new with old
df_merged = pd.concat([df_historical, df_campaigns]).drop_duplicates(subset='campaign_id', keep='last').reset_index(drop=True)

In [None]:
# Replace and insert into the Campaigns table.
from sqlalchemy.types import NVARCHAR
cursor = engine.connect()

default_length = 255  # You can adjust this as needed
df_merged.to_sql(name='Campaigns', con=engine, schema='klaviyo', if_exists='replace', index=False, 
                    dtype={column_name: NVARCHAR(default_length) for column_name in df_merged.columns})