
# Multi‑Touch Attribution (MTA) — Python Notebook

**When to Use**  
- You have **user‑level journey data** (touchpoints over time) and want to allocate credit for conversions across channels.  
- Need **tactical optimization** (bids, creatives, audiences) and near‑real‑time feedback loops.

**Best Application**  
- Digital media with identifiable touchpoints: **Search, Social, Display, Email**, SMS, Push.  
- Evaluating **paths** (e.g., Paid Social → Search → Direct) and **sequences** (order effects).  
- Complementary to MMM: MTA for short‑term granularity; MMM for long‑term budgeting.

**When Not to Use**  
- When **user‑level tracking is unavailable** (privacy, ATT, cookie deprecation) → prefer MMM or **experiments/geo‑tests**.  
- When **incrementality** is required and selection bias is strong → prefer **causal lift tests** (RCT/geo holdouts) or IV/PSM.

**How to Interpret Results**  
- **Heuristic attributions** (last‑touch, time‑decay) are **rules**, not estimates — use for ops baselines only.  
- **Logistic regression** attributions use coefficients/margins as **associational** signals — validate with experiments.  
- **Markov path removal effect** estimates each channel’s **path contribution**; still non‑causal absent randomization.  
- Always triangulate MTA with **lift experiments** and **MMM** before making large budget moves.


In [None]:

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from collections import Counter, defaultdict

from sklearn.preprocessing import OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import roc_auc_score, classification_report, confusion_matrix

pd.set_option('display.max_columns', 100)
plt.rcParams['figure.figsize'] = (8,4)


### Data: Synthetic user‑journey paths with channels and timestamps

In [None]:

rng = np.random.default_rng(42)

channels = ["Paid_Social","Search","Display","Email","Affiliate","Direct"]
n_users = 600

# Generate journeys: each user gets 1-6 touches; conversion prob depends on touches & presence of Search/Email
rows = []
user_id = 0
for u in range(n_users):
    k = rng.integers(1, 7)
    # sample a path with some bias
    path = list(rng.choice(channels, size=k, replace=True, p=[0.2,0.25,0.2,0.15,0.1,0.1]))
    # conversion probability based on presence and recency of Search/Email
    base = 0.05 + 0.02*len(path)
    if "Search" in path: base += 0.10
    if "Email" in path: base += 0.06
    if path[-1] == "Search": base += 0.05  # last-touch boost
    p_conv = np.clip(base, 0, 0.9)
    converted = (rng.random() < p_conv)
    t0 = rng.integers(1, 30)
    for i, ch in enumerate(path):
        rows.append({
            "user_id": user_id,
            "touch_idx": i+1,
            "channel": ch,
            "ts": t0 + i,
            "converted": int(converted),
        })
    user_id += 1

journeys = pd.DataFrame(rows).sort_values(["user_id","touch_idx"]).reset_index(drop=True)
journeys.head(10)


### Baselines: Last‑Touch and Time‑Decay Attribution

In [None]:

# Last-touch: assign full credit to the last channel in the path for converters only
last_touches = journeys.loc[journeys.groupby('user_id')['touch_idx'].idxmax()][['user_id','channel','converted']]
lt_credit = last_touches[last_touches['converted']==1]['channel'].value_counts().rename('last_touch_credit')

# Time-decay: exponential weight by recency for converters
decay_lambda = 0.6
td_credit = Counter()
for uid, grp in journeys.groupby('user_id'):
    if grp['converted'].max()==1:
        weights = np.exp(decay_lambda * (grp['touch_idx'] - grp['touch_idx'].max()))
        weights = weights / weights.sum()
        for ch, w in zip(grp['channel'], weights):
            td_credit[ch] += w
td_credit = pd.Series(td_credit, name='time_decay_credit').sort_values(ascending=False)

baseline = pd.concat([lt_credit, td_credit], axis=1).fillna(0).sort_values('time_decay_credit', ascending=False)
baseline


### Logistic Regression MTA: Touch indicators + simple sequence features

In [None]:

# Build per-user features: counts per channel, last channel, first channel, path length
def build_user_features(df):
    feats = []
    for uid, grp in df.groupby('user_id'):
        d = {'user_id': uid, 'converted': int(grp['converted'].max())}
        counts = grp['channel'].value_counts()
        for ch in channels:
            d[f'count_{ch}'] = int(counts.get(ch, 0))
        d['first_channel'] = grp.iloc[0]['channel']
        d['last_channel'] = grp.iloc[-1]['channel']
        d['path_len'] = int(grp['touch_idx'].max())
        feats.append(d)
    return pd.DataFrame(feats)

users = build_user_features(journeys)
X = users.drop(columns=['user_id','converted'])
y = users['converted']

categoricals = ['first_channel','last_channel']
numerics = [c for c in X.columns if c not in categoricals]

preprocess = ColumnTransformer([
    ('cat', OneHotEncoder(handle_unknown='ignore'), categoricals),
    ('num', 'passthrough', numerics)
])

logit = LogisticRegression(max_iter=200, penalty='l2', solver='lbfgs')

pipe = Pipeline([('prep', preprocess), ('clf', logit)])
pipe.fit(X, y)

proba = pipe.predict_proba(X)[:,1]
auc = roc_auc_score(y, proba)
print(f"AUC: {auc:.3f}")

print(classification_report(y, pipe.predict(X)))


### Channel Importance from Model Coefficients (Associational)

In [None]:

# Map model coefficients back to human-readable feature names
ohe = pipe.named_steps['prep'].named_transformers_['cat']
cat_names = list(ohe.get_feature_names_out(categoricals))
feat_names = cat_names + numerics

coef = pipe.named_steps['clf'].coef_[0]
importance = pd.Series(coef, index=feat_names).sort_values(ascending=False)

# Aggregate importance across: counts per channel + first/last indicators for that channel
channel_scores = {ch:0.0 for ch in channels}
for name, val in importance.items():
    for ch in channels:
        if name.endswith(ch) or name == f'count_{ch}':
            channel_scores[ch] += val

channel_scores = pd.Series(channel_scores).sort_values(ascending=False).rename('coef_score')
channel_scores.to_frame()


### Markov Path Attribution: Removal Effect per Channel

In [None]:

# Build transition probabilities between start->channel->...->convert/null states
START, CONV, NULL = "_start_", "_conv_", "_null_"
def build_paths(df):
    paths = []
    for uid, grp in df.groupby('user_id'):
        chain = [START] + list(grp['channel'].values)
        chain.append(CONV if grp['converted'].max()==1 else NULL)
        paths.append(chain)
    return paths

def transition_matrix(paths):
    counts = defaultdict(Counter)
    for path in paths:
        for a, b in zip(path[:-1], path[1:]):
            counts[a][b] += 1
    # normalize
    trans = {a: {b: c/sum(bs.values()) for b, c in bs.items()} for a, bs in counts.items()}
    return trans

def conv_prob(trans, start=START, conv=CONV, null=NULL, max_steps=10_000):
    # simulate many walks to estimate conversion probability
    rng = np.random.default_rng(0)
    convs = 0
    N = 5000
    states = list(trans.keys())
    for _ in range(N):
        s = start
        steps = 0
        while s not in (conv, null) and steps < 50:
            probs = trans.get(s, {})
            if not probs:
                s = null
                break
            next_states = list(probs.keys())
            p = np.array([probs[t] for t in next_states])
            s = rng.choice(next_states, p=p)
            steps += 1
        if s == conv:
            convs += 1
    return convs / N

paths = build_paths(journeys)
T_full = transition_matrix(paths)
p_full = conv_prob(T_full)
p_full


In [None]:

# Removal effect: drop a channel from paths and recompute conversion probability
def remove_channel_from_paths(paths, ch):
    new_paths = []
    for path in paths:
        new_path = [s for s in path if s != ch]
        # ensure start then end state exists
        if new_path[0] != "_start_":
            new_path = [START] + new_path
        if new_path[-1] not in ("_conv_","_null_"):
            # if channel removal leaves end ambiguous, append original end state
            if path[-1] in ("_conv_","_null_"):
                new_path.append(path[-1])
            else:
                new_path.append("_null_")
        new_paths.append(new_path)
    return new_paths

removal_effect = {}
for ch in channels:
    T_drop = transition_matrix(remove_channel_from_paths(paths, ch))
    p_drop = conv_prob(T_drop)
    removal_effect[ch] = max(p_full - p_drop, 0.0)

pd.Series(removal_effect).sort_values(ascending=False).rename('markov_removal_effect').to_frame()


### Compare Views: Heuristics vs. Logistic vs. Markov

In [None]:

comparison = pd.DataFrame({
    'last_touch': baseline['last_touch_credit'],
    'time_decay': baseline['time_decay_credit']
}).reindex(channels).fillna(0)

comparison['coef_score'] = comparison.index.map(lambda ch: channel_scores.get(ch, 0.0))
comparison['markov_removal'] = comparison.index.map(lambda ch: removal_effect.get(ch, 0.0))

# Normalize each column to sum to 1 for comparability
def normalize(s):
    s = s.clip(lower=0)
    tot = s.sum()
    return s / tot if tot > 0 else s

for col in comparison.columns:
    comparison[col] = normalize(comparison[col])

comparison.round(3)



---

### Practical Guidance
- Use **heuristics** as operational baselines only.  
- Prefer a **regularized model** (logistic, gradient boosting) with **time windows** and **sequence features**.  
- Validate directional insights via **geo‑experiments** or **audience holdouts** before shifting budgets.

### References (non‑link citations)
1. Dalessandro, Perlich & Provost — *Causally Motivated Attribution for Online Advertising*.  
2. Shao & Li — *Data‑Driven Multi‑Touch Attribution Models*.  
3. Greene — *Econometric Analysis*.  
4. McCarthy & Perlich — *Modern Approaches to Media Mix Modeling* (contextual complement to MTA).
