## Parameters
- **APP_ID**: id of the app in the database
- **START_DATE**: Start of the analysis period (inclusive); all metrics are computed from this date onward.
- **END_DATE**: End of the analysis period (exclusive); no data beyond this date is considered.
- **FREQ**: Time aggregation frequency (e.g. daily, every 3 days, or weekly); affects smoothing and compounding of metrics.
    - used in pandas.resample(freq= FREQ)
- **COUNTRY**: Limits the analysis to traffic and revenue originating from this country.
- **VIEWS_THRESHOLD**: Threshold for views after which an account should be considered in the model

- **DIMINISHING_RETURNS**: Controls how strongly marginal returns decay as volume increases; higher values imply stronger saturation effects.
    - 1 => full effect
    - 0 => no effect

- **PROCEEDS_PERCENTAGE_FOR_ATTRIBUTION**: Fraction of total proceeds eligible for attribution to the modeled channel.
- **ASSUMED_ROI**: Expected return on investment used to define the payouts.
    - PAYOUT = ATTRIBUTED_PROCEEDS/ASSUMED_ROI
- **RPM_CAP**: Maximum allowed revenue per 1,000 impressions to prevent unrealistically high revenue estimates.


In [93]:
APP_ID = 1
START_DATE = '2025-08-14'
END_DATE = '2025-12-20'
FREQ = "D"  ## or "D" for daily or  "3D" 3 daily, "W-Mon"
COUNTRY = "United_States"
VIEWS_THRESHOLD = 100

DIMINISHING_RETURNS = 0.9  ## 0-1: 0 no diminishing returns 1 full diminishing returns

DEFAULT_RPM = 3 ## rpm we are paying to creators used as a base, currently 2$
PROCEEDS_PERCENTAGE_FOR_ATTRIBUTION = 0.8
ASSUMED_ROI = 3  ## ROI assumed for payout computation
RPM_CAP = 4  ## max rpm allowed

print(f"""
APP_ID: {APP_ID}
START_DATE: {START_DATE}
END_DATE: {END_DATE}
FREQ: {FREQ}
COUNTRY: {COUNTRY}
VIEWS_THRESHOLD: {VIEWS_THRESHOLD}

DIMINISHING_RETURNS: {DIMINISHING_RETURNS}

DEFAULT_RPM: {DEFAULT_RPM}
PROCEEDS_PERCENTAGE_FOR_ATTRIBUTION: {PROCEEDS_PERCENTAGE_FOR_ATTRIBUTION}
ASSUMED_ROI: {ASSUMED_ROI}
RPM_CAP: {RPM_CAP}
""")


APP_ID: 1
START_DATE: 2025-08-14
END_DATE: 2025-12-20
FREQ: D
COUNTRY: United_States
VIEWS_THRESHOLD: 100

DIMINISHING_RETURNS: 0.9

DEFAULT_RPM: 3
PROCEEDS_PERCENTAGE_FOR_ATTRIBUTION: 0.8
ASSUMED_ROI: 3
RPM_CAP: 4



In [94]:
# Do all imports
import pandas as pd
import scipy
import numpy as np
from sklearn.preprocessing import StandardScaler, MinMaxScaler
from sklearn.pipeline import Pipeline

PWD = %pwd

import sys

sys.path.insert(1, PWD + './transformations.py')
sys.path.insert(1, PWD + './helpers.py')

from src.transformations import Winsorizer, FeatureMultiplier, Average
from src.helpers import logistic_map

import psycopg2

import dotenv
import os

dotenv.load_dotenv(dotenv_path='../.env')


True

In [95]:
# define engagement factor rates
engagement_factor_weights = {
    "likes_rate": 1,
    "comments_rate": 1.2,
    "saves_rate": 0.5,
    "shares_rate": 0,
    "likes_to_comments": 1,
}

# engagement volume weights
engagement_volume_weights = {
    "views": 0.8,
    "likes": 1,
    "comments": 1.2,
    "saves": 0.3,
    "shares": 0.3,
}

# other configuration variables
vol_cut_ranges = [0, 1000, 10_000, 100_000, 1_000_000, np.inf]

# %-+ allowed incentive boost
incentive_boost_effect = 0.2
# how curved or linear incentive boost should be, less is more linear
incentive_boost_order = 2

## Load Data

In [96]:
views_shifted_before_date = pd.Timestamp('2025-12-01')

df_revenue = (
    pd
    .read_sql_query(
        sql="""
            select
                s.date,
                s.country,
                sum(s.proceeds) / 100.0 as proceeds
            from superwall_metrics s
            where
                s.app_id = %(APP_ID)s
                and s.date between %(START_DATE)s and %(END_DATE)s
                and country in (%(COUNTRY)s, 'United_Kingdom')

            group by s.date, s.country
            having sum(s.proceeds) > 0
        """,
        con=os.getenv("DATABASE_URL"),
        params={
            "APP_ID": APP_ID,
            "START_DATE": START_DATE,
            "END_DATE": END_DATE,
            "COUNTRY": COUNTRY
        },
        parse_dates=['date']
    )
    .assign(
        date=lambda x: x.date + pd.Timedelta(days=1) * (x.date < views_shifted_before_date),
        country=lambda x: x.country.map({"United_Kingdom": "United_States"}).fillna(x.country)
    )
)

In [97]:
stat_cols = [
    "views_diff",
    "likes_diff",
    "comments_diff",
    "saves_diff",
    "shares_diff"
]

df_scrape_data = (
    pd
    .read_sql_query(
        sql="""
            select
                sma.date,
                sma.id as account_id,
                sma.campaign_id as campaign_id,
                sma.country as account_country,
                sma.total_views_diff as views_diff,
                sma.total_likes_diff as likes_diff,
                sma.total_comments_diff as comments_diff,
                sma.total_saves_diff as saves_diff,
                sma.total_shares_diff as shares_diff,
                sma.total_posts_diff as posts_diff
            from social_media_accounts_daily_diff sma
            where
                sma.date between %(START_DATE)s and %(END_DATE)s
                and total_views_diff > %(VIEWS_THRESHOLD)s
                and total_views_diff > sma.total_likes_diff
                and sma.country = %(COUNTRY)s
            order by sma.date desc
        """,
        con=os.getenv("DATABASE_URL"),
        params={
            "START_DATE": START_DATE,
            "END_DATE": END_DATE,
            "VIEWS_THRESHOLD": VIEWS_THRESHOLD,
            "COUNTRY": COUNTRY
        },
        parse_dates=['date']
    )

)
df_scrape_data[stat_cols] = df_scrape_data[stat_cols].clip(0, np.inf)

## Data prep

In [98]:
def add_aggregate_metrics(df):
    df = (
        df
        .assign(
            likes_rate=lambda x: x['likes_diff'] / x["views_diff"],
            comments_rate=lambda x: x['comments_diff'] / x["views_diff"],
            saves_rate=lambda x: x['saves_diff'] / x["views_diff"],
            shares_rate=lambda x: x['shares_diff'] / x["views_diff"],
            likes_to_comments=lambda x: (x['comments_diff'] / x["likes_diff"]).clip(0, 1),
        )
    )
    return df

In [99]:
# aggregate to get account level metrics per date
df_posts_model = (
    df_scrape_data
    .pipe(lambda d: add_aggregate_metrics(d))
    .sort_values("likes_rate")
    .dropna()
    .groupby([
        pd.Grouper(key="date", freq=FREQ, label="left", closed="left"),
        "account_id",
        "campaign_id",
        "account_country",
    ], as_index=False)
    # .resample(FREQ, on  = "date", label = "left", closed = 'left')
    .agg({v: "sum" for _, v in enumerate(stat_cols)})
    .pipe(lambda x: add_aggregate_metrics(x).fillna(0))
)

df_revenue_model = (
    df_revenue
    .groupby([
        pd.Grouper(key="date", freq=FREQ, label="left", closed="left"),
        "country"
    ])
    .agg({
        "proceeds": "sum"
    })
    .reset_index()
)

### Computation

#### Engagement Factor / Diminishing Returns

  - Weighted average of engagements rates (e. g. likes/views, comments/views, comments/likes etc.)
  - Because of its negative correlation with views, we can use this as a diminishing returns proxy

In [100]:
def compute_engagement_factor(rates, weights):
    pipeline = Pipeline([
        ("scaler", StandardScaler()),
        ("weights", FeatureMultiplier(weights)),
        ("average", Average()),
        ("winsor", Winsorizer(lower_quantile=0.03, upper_quantile=0.99)),  ## clip extremes
        ("minmax", MinMaxScaler(feature_range=(0, 1)))  # scale to 0-1
    ])

    return pipeline.fit_transform(rates)

engagement_factor_weights_series = pd.Series(engagement_factor_weights)

df_posts_model["engagement_factor_raw"] = compute_engagement_factor(
    rates=df_posts_model[["likes_rate", "comments_rate", "saves_rate", "shares_rate", "likes_to_comments"]].clip(0, 1),
    weights=engagement_factor_weights_series,
)

df_posts_model["engagement_factor"] = (
        df_posts_model["engagement_factor_raw"] ** DIMINISHING_RETURNS
)

In [101]:
# scatter_plot(
#     df_posts_model.query("engagement_factor > 0").assign(views_log = lambda x: np.log(x.views_diff+1)),
#     "views_log",
#     "engagement_factor",
#     xlabel = "views_log",
#     ylabel = "engagement_factor",
#     title = "Log Views vs. Engagement Factor"
# )

#### Incentive boost

- Statistical transformation on the engagement_factor
    - boxcox to make the distribution near normal
    - abs max scaling
    - stratified normalization --> to make every number comparable for ranking
    - logistic mapping for incentive multiplier
- here how it works:
    - mean performance is 1
    - the output is in 0.8 to 1.2
      - if above mean ==> max 20% boost (1.2)
      - if belov mean ==> min 20% deboost (0.8)

In [102]:
def compute_incentive_boost(df, engagement_col, group_col, boxcox_lambda=None):
    boost, _ = scipy.stats.boxcox(df[engagement_col] + 1, lmbda=boxcox_lambda)
    df = df[[engagement_col, group_col]].copy()

    df['boost'] = boost
    df['boost'] /= df['boost'].max()
    df['boost'] = df['boost'] / df.groupby(group_col, observed=False)['boost'].transform('mean')

    return df['boost'].clip(0, 2)


df_posts_model["vol_cut"] = pd.cut(df_posts_model['views_diff'], vol_cut_ranges)

df_posts_model["incentive_boost_raw"] = compute_incentive_boost(
    df=df_posts_model,
    engagement_col="engagement_factor",
    group_col="vol_cut",
)

df_posts_model['incentive_boost'] = logistic_map(df_posts_model['incentive_boost_raw'], d=incentive_boost_effect,
                                                 k=incentive_boost_order)

In [103]:
# x = np.linspace(0,2,100)
# y = logistic_map(x, d=incentive_boost_effect, k=incentive_boost_order)
#
# fig, ax = plt.subplots(figsize = (12,5))
# ax.plot(x, y, ls = "", marker = 'o', markersize = 2)

In [104]:
# scatter_plot(
#     df_posts_model.query("engagement_factor > 0").assign(views_log = lambda x: np.log(x.views_diff+1)),
#     "views_log",
#     "incentive_boost_raw",
#     xlabel = "views_log",
#     ylabel = "incentive_boost",
#     title = "Log Views vs. Incentive Boost"
# )

#### Engagement Volume

  - Weighted average of volume metrics (e. g. views, shares, comments, likes)

In [105]:
def compute_engagement_volume(volumes, weights):
    pipeline = Pipeline([
        ("weights", FeatureMultiplier(weights)),
        ("average", Average()),
        ("winsor", Winsorizer(lower_quantile=0, upper_quantile=1)),
    ])

    return pipeline.fit_transform(volumes)


engagement_volume_weights_series = pd.Series(engagement_volume_weights)

df_posts_model['engagement_volume'] = compute_engagement_volume(
    volumes=(df_posts_model[stat_cols]).pipe(lambda x: x / x.quantile(0.9)).values,
    weights=engagement_volume_weights_series
)

#### Quality Volume

  - Main metric for attribution, defined as:
    - engagement_volume * engagement_factor * incentive_boost

In [106]:
df_posts_model["quality_volume"] = (
    df_posts_model["engagement_volume"] *
    df_posts_model["engagement_factor"] *
    df_posts_model["incentive_boost"]
)

#### Computing the payout

- **attr_revenue_on_view**: Attributed proceeds, if the allocation is made purely on the view percentages.
- **attr_revenue_on_quality**: Attributed proceeds, if the allocation is made on the quality volume percentage.
- **payout_raw**: Payout if a flat RPM of DEFAULT_RPM is being choosed,
- **payout_on_view**: Payout, computed on the attribted proceeds and assumed ROI, if the allocation is made purely on the view percentage.
- **payout_on_quality**: Payout, computed on the attribted proceeds and assumed ROI, if the allocation is made on the quality volume percentage.
    - payout is capped to the RPM_CAP to prevent unrealistic payouts for small creators

In [107]:
def cap_payout_to_rpm(max_rpm, views, payout):
    rpm = payout / (views / 1000)
    return np.where(rpm < max_rpm, payout, max_rpm * (views / 1000))

In [108]:
df_payout = (
    df_posts_model
    .merge(
        df_revenue_model, left_on=["date"], right_on=["date"]
    )
    .assign(
        total_views_on_day=lambda x: x.groupby("date").views_diff.transform('sum'),
        view_percentage=lambda x: x['views_diff'] / x.groupby("date").views_diff.transform('sum'),
        quality_percentage=lambda x: x['quality_volume'] / x.groupby("date").quality_volume.transform('sum'),
        attr_revenue_on_view=lambda x: x.proceeds * PROCEEDS_PERCENTAGE_FOR_ATTRIBUTION * x["view_percentage"],
        attr_revenue_on_quality=lambda x: x.proceeds * PROCEEDS_PERCENTAGE_FOR_ATTRIBUTION * x["quality_percentage"],
    )
    .assign(
        payout_raw=lambda x: x.views_diff * DEFAULT_RPM / 1000,
        payout_on_view=lambda x: x.attr_revenue_on_view / ASSUMED_ROI,
        payout_on_quality=lambda x: cap_payout_to_rpm(RPM_CAP, x.views_diff, x.attr_revenue_on_quality / ASSUMED_ROI),
        rpm=lambda x: (x.payout_on_quality / (x.views_diff / 1000)).round(2).fillna(0),
        payout_percentage=lambda x: x['payout_on_quality'] / x.groupby("date").payout_on_quality.transform('sum'),
    )
    .sort_values("views_diff", ascending=False)

)

## Summary

In [109]:
(
    df_payout
    .query("date != '2025-12-01'")
    .query("views_diff > 0")
    .groupby("vol_cut", observed=False)
    .agg(
        {
            "account_id": "count",
            "views_diff": "sum",
            "attr_revenue_on_view": "sum",
            "attr_revenue_on_quality": "sum",
        }
    )
    .assign(
        view_perc=lambda x: x.views_diff / x.views_diff.sum(),
        attr_perc_on_quality=lambda x: x.attr_revenue_on_quality / x.attr_revenue_on_quality.sum(),
        attr_perc_on_view=lambda x: x.attr_revenue_on_view / x.attr_revenue_on_view.sum(),
        raw_rpm_payout=lambda x: x.views_diff * 2 / 1000,
        avg_payout_rpm_on_view=lambda x: x.attr_revenue_on_view / (x.views_diff / 1000) / ASSUMED_ROI,
        avg_payout_rpm_on_quality=lambda x: x.attr_revenue_on_quality / (x.views_diff / 1000) / ASSUMED_ROI,
    )
    # .to_clipboard()
)

Unnamed: 0_level_0,account_id,views_diff,attr_revenue_on_view,attr_revenue_on_quality,view_perc,attr_perc_on_quality,attr_perc_on_view,raw_rpm_payout,avg_payout_rpm_on_view,avg_payout_rpm_on_quality
vol_cut,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1
"(0.0, 1000.0]",11193,4338541,25415.470062,39909.182801,0.075435,0.209547,0.133447,8677.082,1.952689,3.066252
"(1000.0, 10000.0]",2880,7025837,36571.892955,53213.165389,0.12216,0.279401,0.192025,14051.674,1.735114,2.524642
"(10000.0, 100000.0]",390,12831856,51692.317659,48007.863951,0.22311,0.25207,0.271416,25663.712,1.342812,1.247101
"(100000.0, 1000000.0]",92,24642926,69144.355995,43775.57406,0.428472,0.229848,0.36305,49285.852,0.935283,0.592132
"(1000000.0, inf]",4,8674346,7630.267329,5548.517798,0.150823,0.029133,0.040064,17348.692,0.293212,0.213216


## Upsert Data to Postgres

In [123]:
INSERT_BATCH_SIZE = 100

df_payout = df_payout.assign(
    date_str=lambda x: x.date.dt.strftime("%Y-%m-%d"),
    vol_cut_str=lambda x: str(x.vol_cut),
)

# in order
payout_insert_cols = {
    'date_str': 'date',
    'account_id': 'account_id',
    'campaign_id': 'campaign_id',
    'total_views_on_day': 'total_views_on_day',
    'views_diff': 'views_diff',
    'likes_diff': 'likes_diff',
    'comments_diff': 'comments_diff',
    'saves_diff': 'saves_diff',
    'shares_diff': 'shares_diff',
    'likes_rate': 'likes_rate',
    'comments_rate': 'comments_rate',
    'saves_rate': 'saves_rate',
    'shares_rate': 'shares_rate',
    'likes_to_comments': 'likes_to_comments',
    'engagement_factor_raw': 'engagement_factor_raw',
    'engagement_factor': 'engagement_factor',
    'vol_cut_str': 'vol_cut',
    'incentive_boost_raw': 'incentive_boost_raw',
    'incentive_boost': 'incentive_boost',
    'engagement_volume': 'engagement_volume',
    'quality_volume': 'quality_volume',
    'country': 'country',
    'proceeds': 'proceeds',
    'view_percentage': 'view_percentage',
    'quality_percentage': 'quality_percentage',
    'attr_revenue_on_view': 'attr_revenue_on_view',
    'attr_revenue_on_quality': 'attr_revenue_on_quality',
    'payout_raw': 'payout_raw',
    'payout_on_view': 'payout_on_view',
    'payout_on_quality': 'payout_on_quality',
    'rpm': 'rpm',
    'payout_percentage': 'payout_percentage',
}

upsert_ignore_list = ['date', 'account_id', 'campaign_id']
upsert_keys = filter(lambda x: x not in upsert_ignore_list, payout_insert_cols.values())

df_payout_reduced = df_payout[payout_insert_cols.keys()]

data = list(df_payout_reduced.itertuples(index=False, name=None))
UPSERT_COMMA_SEPERATOR = ",\n"

with psycopg2.connect(os.getenv("DATABASE_URL_LOCAL")) as conn:
    with conn.cursor() as cur:
        insert_query = f"""
            insert into public.account_daily_rpm_calculations ({", ".join(payout_insert_cols.values())}) values %s
            on conflict on constraint account_daily_rpm_calculations_pk do update
            set
                {UPSERT_COMMA_SEPERATOR.join(list(map(lambda x: f"{x} = excluded.{x}",upsert_keys)))},
                updated_at = NOW()
        """
        psycopg2.extras.execute_values (
            cur, insert_query, data, template=None, page_size=INSERT_BATCH_SIZE
        )
    conn.commit()
