In [None]:
import pandas as pd
import ast
import numpy as np

## Import data

In [None]:
pathData = pd.read_csv('Datasets/Path_UTMsource_session_2.csv')
pathData['Cleaned_Path'] = pathData['Cleaned_Path'].apply(ast.literal_eval)
pathData['path'] = np.where(
    pathData['Conversion'] == 0,
    ['Start, '] + pathData['Cleaned_Path'].apply(', '.join) + [', No_Conv'],
    ['Start, '] + pathData['Cleaned_Path'].apply(', '.join)+ [', Conversion'])
pathData['path'] = pathData['path'].str.split(', ')

## Conversion

In [None]:
list_of_paths = pathData['path']
total_conversions = sum(pathData['Conversion'])
base_conversion_rate = total_conversions / len(list_of_paths)
print(total_conversions, base_conversion_rate)

## transition matrix

In [None]:
def build_transition_matrix(paths, state_set):
    from collections import Counter
    transition_counts = Counter()
    for path in paths:
        for i in range(len(path) - 1):
            if path[i] in state_set and path[i + 1] in state_set:
                transition_counts[(path[i], path[i + 1])] += 1
    
    matrix = pd.DataFrame(0.0, index=state_set, columns=state_set)
    for (origin, dest), count in transition_counts.items():
        matrix.at[origin, dest] = count
    
    row_sums = matrix.sum(axis=1)
    row_sums = row_sums.replace(0, 1)
    matrix = matrix.div(row_sums, axis=0)
    
    matrix.loc['Conversion'] = 0.0
    matrix.loc['Conversion', 'Conversion'] = 1.0
    matrix.loc['No_Conv'] = 0.0
    matrix.loc['No_Conv', 'No_Conv'] = 1.0
    return matrix

stateSet = ['Start', 'Conversion', 'No_Conv', 'app', 'line', 'System_Inform', 'google', 'PersonalizedMarketing', 'sms', 'MemberPresentReward', 'affiliate', 'facebook', 'instagram', '(direct)', 'Others']
transition_matrix_v2 = build_transition_matrix(list_of_paths, stateSet)
transition_matrix_v2

In [None]:
def compute_absorption_prob(trans_matrix, start_state='Start'):
    absorbing = ['Conversion', 'No_Conv']
    transient = [s for s in trans_matrix.index if s not in absorbing]
    
    Q = trans_matrix.loc[transient, transient].values
    R = trans_matrix.loc[transient, absorbing].values
    
    I = np.eye(len(transient))
    N = np.linalg.inv(I - Q)
    B = N @ R
    
    start_idx = transient.index(start_state)
    # Return probability of absorption into 'Conversion' (index 0 in absorbing list)
    return B[start_idx, 0]

def compute_removal_effect(trans_matrix, channel, base_cvr):
    if channel in ['Start', 'Conversion', 'No_Conv']:
        return 0.0
    
    removal_matrix = trans_matrix.drop(channel, axis=0).drop(channel, axis=1).copy()
    
    row_sums = removal_matrix.sum(axis=1)
    deficit = 1.0 - row_sums
    removal_matrix['No_Conv'] = removal_matrix['No_Conv'] + deficit.clip(lower=0)
    removal_matrix.loc['No_Conv', 'No_Conv'] = 1.0
    removal_matrix.loc['Conversion', 'Conversion'] = 1.0
    
    new_cvr = compute_absorption_prob(removal_matrix)
    return 1 - new_cvr / base_cvr

base_cvr_v2 = compute_absorption_prob(transition_matrix_v2)
print(f"Base conversion rate from matrix: {base_cvr_v2:.6f}")

removal_effects_v2 = {}
channels_v2 = [c for c in stateSet if c not in ['Start', 'Conversion', 'No_Conv']]
for ch in channels_v2:
    removal_effects_v2[ch] = compute_removal_effect(transition_matrix_v2, ch, base_cvr_v2)

removal_effects_v2

In [None]:
attribution_v2 = {}
removal_sum_v2 = np.sum(np.abs(list(removal_effects_v2.values())))
for ch, effect in removal_effects_v2.items():
    attribution_v2[ch] = (effect / removal_sum_v2) * total_conversions

print("Attribution:")
attribution_v2