In [114]:
import collections
import pandas as pd
import numpy as np

"""
SELECT
    CONTENT_ID,
    sum(SAVE_COUNT) as save_cnt,
    sum(OPEN_COUNT) as open_cnt,
    sum(TOTAL_SHARE_COUNT) as share_cnt,
    sum(FAVORITE_COUNT) as favorite_cnt,
    sum(ARCHIVE_COUNT) as archive_cnt,
    sum(DELETE_COUNT) as delete_cnt
FROM ANALYTICS.DBT.CONTENT_ENGAGEMENT_BY_DAY
WHERE CONTENT_ID IN (
    SELECT content_id FROM ANALYTICS.DBT.CONTENT_ENGAGEMENT_BY_DAY WHERE HAPPENED_AT > DATEADD(day, -1, CURRENT_DATE())
)
GROUP BY 1;
"""



df = pd.read_csv('~/Downloads/content_metrics_lifetime.csv', dtype=collections.defaultdict(np.int32, resolved_id=np.int64))


In [115]:
df.columns = ['resolved_id', 'save_count', 'open_count', 'share_count', 'favorite_count', 'archive_count', 'delete_count']
df.head()


Unnamed: 0,resolved_id,save_count,open_count,share_count,favorite_count,archive_count,delete_count
0,2739538749,4831,3659,44,37,974,1016
1,754081,1443,513,10,9,62,154
2,344212991,200,115,7,2,17,31
3,2041711173,33670,10999,11,76,285,9074
4,115881204,1885,522,12,11,381,294


In [120]:
df = df.head(10)
len(df)

10

In [121]:
import scipy

loss_multiple = 9
inc_beta_var = float(1) / (1 + loss_multiple)
open_var = 1
fav_var = 5
share_var = 7

# 'total_score': total_score,
# 'open_score': open_score,
# 'favorite_score': favorite_score,
# 'share_score': share_score

def score(row):
    resolved_id, save_cnt, open_cnt, share_cnt, favorite_cnt, archive_cnt, delete_cnt = row.values

    open_uv = float(open_cnt) + 2.2 + save_cnt * 0.05
    open_dv = (float(save_cnt) - open_cnt) + 7.8
    open_dv = open_dv if save_cnt >= open_cnt else (open_dv + open_uv) * 4
    fav_uv = float(favorite_cnt) + 0.2 + open_cnt * 0.02
    fav_dv = (float(open_cnt) - favorite_cnt) + 9.8
    fav_dv = fav_dv if open_cnt >= favorite_cnt else (fav_dv + fav_uv) * 4
    share_uv = float(share_cnt) + 0.2 + open_cnt * 0.02
    share_dv = (float(open_cnt) - share_cnt) + 9.8
    share_dv = share_dv if open_cnt >= share_cnt else (share_dv + share_uv) * 4

    open_score = scipy.special.betaincinv(open_uv, open_dv, inc_beta_var)
    fav_score = scipy.special.betaincinv(fav_uv, fav_dv, inc_beta_var)
    share_score = scipy.special.betaincinv(share_uv, share_dv, inc_beta_var)

    total_score = (open_score * open_var + fav_score * fav_var + share_score * share_var) / (
            open_var + fav_var + share_var)

    return [open_score, fav_score, share_score, total_score]

scores = df.apply(score, axis=1, result_type='expand')

result = df.copy(deep=False)
result['open_score'] = scores[0]
result['favorite_score'] = scores[1]
result['share_score'] = scores[2]
result['total_score'] = scores[3]
result


Index(['resolved_id', 'save_count', 'open_count', 'share_count',
       'favorite_count', 'archive_count', 'delete_count', 'open_score',
       'fav_score', 'share_score', 'total_score'],
      dtype='object')

Index(['resolved_id', 'save_count', 'open_count', 'share_count',
       'favorite_count', 'archive_count', 'delete_count', 'open_score',
       'fav_score', 'share_score', 'total_score'],
      dtype='object')

In [None]:
import boto3


dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('jesh-test-item-score')

with table.batch_writer() as batch:
    for index, row in df.iterrows():
        content = {
            'field_A', row['A'],
            'field_B', row['B']
        }
        batch.put_item(Item=content)


In [118]:
result.to_csv('s3://pocket-data-items/fact_content_score_all_time/dump.csv', header=True)


PermissionError: Access Denied

In [None]:
sql = """
MERGE INTO DEVELOPMENT.JESH.FACT_CONTENT_SCORE_ALL_TIME old USING
(SELECT
    content_id,
    save_count,
    open_count,
    share_count,
    favorite_count,
    archive_count,
    delete_count,
    open_score,
    favorite_score,
    share_score,
    total_score
from 's3://pocket-data-items/fact_content_score_all_time/dump.csv'
storage_integration = aws_integration_readonly_prod
file_format = (type = 'CSV', skip_header=1)
) new ON old.content_id = new.content_id
WHEN MATCHED THEN
 UPDATE SET
    save_count = new.save_count,
    open_count = new.open_count,
    share_count = new.share_count,
    favorite_count = new.favorite_count,
    archive_count = new.archive_count,
    delete_count = new.delete_count,
    open_score = new.open_score,
    favorite_score = new.favorite_score,
    share_score = new.share_score,
    total_score = new.total_score
WHEN NOT MATCHED THEN
 INSERT
 (
    content_id,
    save_count,
    open_count,
    share_count,
    favorite_count,
    archive_count,
    delete_count,
    open_score,
    favorite_score,
    share_score,
    total_score
)
 VALUES
 (
    new.content_id,
    new.save_count,
    new.open_count,
    new.share_count,
    new.favorite_count,
    new.archive_count,
    new.delete_count,
    new.open_score,
    new.favorite_score,
    new.share_score,
    new.total_score
 );
"""


Send data to dynamodb
* Only update records which already exist in dynamodb. There are some rules for when to add items to dynamo. We want to rely on these upstream rules.