In [1]:
import boto3
import pandas as pd
import props
import time
import json
import requests
from dtypes import unlocks_dtypes, view_dtypes 
import numpy as np
import math

In [2]:

athena = boto3.client("athena")
s3 = boto3.client("s3")
DATABASE_NAME = props.get(props.Key.DATABASE_NAME)
SECRET_NAME = props.get(props.Key.SECRET_NAME)

QUERY_RESULTS_S3_BUCKET = props.get(props.Key.QUERY_RESULTS_S3_BUCKET)
QUERY_RESULTS_S3_LOCATION = props.get(props.Key.QUERY_RESULTS_S3_LOCATION)
START_DATE='2023-07-12'

session = boto3.session.Session()
secretsmanager_client = session.client('secretsmanager')

# Retrieve the value of the secret of DB connection 
# secret_dict = secretsmanager_client.get_secret_value(SecretId=DB_SECRET_NAME)
secret_dict = secretsmanager_client.get_secret_value(SecretId=SECRET_NAME)
churnkey_secret = json.loads(secret_dict['SecretString'])



In [3]:
def execute_athena_query(query, table):
    response = athena.start_query_execution(
        QueryString=query,
        QueryExecutionContext={"Database": f"{DATABASE_NAME}"},
        ResultConfiguration={"OutputLocation": QUERY_RESULTS_S3_LOCATION + f'{table}'},
    )

    query_execution_id = response["QueryExecutionId"]
    # Wait for the query result
    done = False
    while not done:
        status_response = athena.get_query_execution(
            QueryExecutionId=query_execution_id
        )
        status = status_response["QueryExecution"]["Status"]["State"]
        print(f"Query status: {status}")
        if status == "QUEUED" or status == "RUNNING":
            time.sleep(1)
        elif status == "FAILED":
            error_message = status_response["QueryExecution"]["Status"][
                "StateChangeReason"
            ]
            print("Query failed: {}".format(error_message))
            raise Exception("Query status is FAILED")
        else:
            done = True
    query_execution_id += '.csv'
    print(f"query_execution_id = {query_execution_id}")

    # print(query)
    res = s3.get_object(Bucket=QUERY_RESULTS_S3_BUCKET, Key=f'reporting/{table}/' + query_execution_id)
    df = pd.read_csv(res['Body'])

    return df


In [14]:
unlocks_query = f"""
WITH solution_manuals_unlocks AS (
	SELECT balance_id,
		user_id,
		MIN(bc.created_at) as created_at,
		'SOLUTION_MANUAL' AS content_type
	FROM "{DATABASE_NAME}"."balance_chapter" bc
	    INNER JOIN "{DATABASE_NAME}"."balance" b ON bc.balance_id = b.id 
	WHERE content_set_type = 'SOLUTION_MANUAL'
	GROUP BY balance_id, user_id, content_set_id
),
quiz_unlocks AS (
	SELECT balance_id,
		user_id,
		bc.created_at,
		'QUIZ' AS content_type
	FROM "{DATABASE_NAME}"."balance_chapter" bc
	    INNER JOIN "{DATABASE_NAME}"."balance" b ON bc.balance_id = b.id 
	WHERE content_set_type <> 'SOLUTION_MANUAL'
),
stripe_subscripers AS (
	SELECT DISTINCT user_id, customer_id
	FROM "{DATABASE_NAME}"."subscription" s
	WHERE s.gateway = 'STRIPE'
)

SELECT 
    'Content Unlocked' AS event,
	x.user_id AS uid,
	s.customer_id AS customerId,
	x.created_at AS eventDate
FROM (
		( SELECT * FROM solution_manuals_unlocks )
		UNION ALL
		( SELECT * FROM quiz_unlocks )
	) x
	INNER JOIN stripe_subscripers s ON x.user_id = s.user_id
WHERE x.created_at > DATE('{START_DATE}')
ORDER BY balance_id, content_type, x.created_at;
"""

views_query = f"""
SELECT 
	IF(weight = 10, 'Answer Viewed', 'Question Viewed') AS event,
	te.user_id AS uid,
	customer_id AS customerId,
	created_at AS eventDate
FROM "{DATABASE_NAME}"."tracking-events" te
	INNER JOIN ( SELECT DISTINCT user_id, customer_id FROM "{DATABASE_NAME}"."subscription" WHERE gateway = 'STRIPE' ) s 
	    ON te.user_id = s.user_id
WHERE te.action <> 'SEARCH' AND te.question_id IS NOT NULL
ORDER BY te.created_at;
"""

In [8]:
print(unlocks_query)


WITH solution_manuals_unlocks AS (
	SELECT balance_id,
		user_id,
		MIN(bc.created_at) as created_at,
		'SOLUTION_MANUAL' AS content_type
	FROM "dev-warehouse"."balance_chapter" bc
	    INNER JOIN "dev-warehouse"."balance" b ON bc.balance_id = b.id 
	WHERE content_set_type = 'SOLUTION_MANUAL'
	GROUP BY balance_id, user_id, content_set_id
),
quiz_unlocks AS (
	SELECT balance_id,
		user_id,
		chapter_id AS content_id,
		bc.created_at,
		'QUIZ' AS content_type,
	FROM "dev-warehouse"."balance_chapter" bc
	    INNER JOIN "dev-warehouse"."balance" b ON bc.balance_id = b.id 
	WHERE content_set_type <> 'SOLUTION_MANUAL'
),
stripe_subscripers AS (
	SELECT DISTINCT user_id, customer_id
	FROM "dev-warehouse"."subscription" s
	WHERE s.gateway = 'STRIPE'
)

SELECT 
    'Content Unlocked' AS event,
	x.user_id AS uid,
	s.customer_id AS customerId,
	x.created_at AS eventDate
FROM (
		( SELECT * FROM solution_manuals_unlocks )
		UNION ALL
		( SELECT * FROM quiz_unlocks )
	) x
	INNER JOIN stripe_subscr

In [15]:
unlocks_df = execute_athena_query(unlocks_query, 'unlocks_events')
unlocks_df = unlocks_df.astype({x: unlocks_dtypes[x] for x in unlocks_df.columns})
unlocks_df['eventDate'] = pd.to_datetime(unlocks_df['eventDate'])
views_df = execute_athena_query(views_query, 'view_events')
views_df = views_df.astype({x: view_dtypes[x] for x in views_df.columns})
views_df['eventDate'] = pd.to_datetime(views_df['eventDate'])


Query status: QUEUED
Query status: RUNNING
Query status: SUCCEEDED
query_execution_id = 64fc02d2-a949-4c2c-8f8d-31945700e78b.csv
Query status: QUEUED
Query status: RUNNING
Query status: RUNNING
Query status: SUCCEEDED
query_execution_id = 4a52cdf2-a4bd-478a-809e-808c92b3739a.csv


In [16]:
# views_agg = views_df[['event', 'uid', 'customerId', 'eventDate']].assign(Aggregated_Data=views_df.set_index(['event', 'uid', 'customerId', 'eventDate']).to_dict(orient='records'))
# unlocks_agg = unlocks_df[['event', 'uid', 'customerId', 'eventDate']].assign(Aggregated_Data=unlocks_df.set_index(['event', 'uid', 'customerId', 'eventDate']).to_dict(orient='records'))

views_agg = views_df
unlocks_agg = unlocks_df
df = pd.concat([views_agg, unlocks_agg]).sort_values(by=["eventDate"], ascending=True).reset_index(drop=True)
# df = df.rename(columns={'Aggregated_Data': "eventData" })
df = df.astype({"eventDate": str})

In [17]:
unlocks_df.dtypes

event                 object
uid                    int64
customerId            object
eventDate     datetime64[ns]
dtype: object

In [33]:
unlocks_df.sort_values(by=["uid", "contentType", "eventDate"], ascending=[False, False, True])[["uid", "contentId", "used", "total", "usagePercentage", "eventDate", "contentType"]]

Unnamed: 0,uid,contentId,used,total,usagePercentage,eventDate,contentType
11142,146881,1882,1,3,33.333333,2023-10-05 13:11:26,QUIZ
11143,146880,1880,1,28,3.571429,2023-10-05 13:19:37,QUIZ
11144,146880,2380,2,28,7.142857,2023-10-05 13:19:53,QUIZ
11137,146859,2380,1,53,1.886792,2023-10-05 05:38:35,QUIZ
11138,146859,2381,2,53,3.773585,2023-10-05 05:38:48,QUIZ
...,...,...,...,...,...,...,...
15,116701,222641,37,216,17.129630,2023-11-27 10:02:54,QUIZ
16,116701,222642,38,216,17.592593,2023-11-27 10:02:54,QUIZ
17,116701,222643,39,216,18.055556,2023-11-27 10:02:54,QUIZ
18,116701,222644,40,216,18.518519,2023-11-27 10:02:54,QUIZ


In [None]:
headers = {
    "x-ck-api-key": churnkey_secret["secret"],
    "x-ck-app": churnkey_secret["ck_app_id"],
}

chunks = np.array_split(df, math.ceil(len(df) / 100))
for ind, chunk in enumerate(chunks):
    print(ind)
    # event_body = df[["event", "uid", "customerId", 'eventDate']].loc[:50].to_dict('records')
    event_body = chunk.to_dict('records')
    # print(headers)
    # break
    response = requests.post("https://api.churnkey.co/v1/api/events/bulk", json=event_body, headers=headers)
    print(response.status_code)
    print(response.json())
    


  return bound(*args, **kwds)


0
200
{'status': 'success', 'event_count': 100, 'verified': True}
1
200
{'status': 'success', 'event_count': 100, 'verified': True}
2
200
{'status': 'success', 'event_count': 100, 'verified': True}
3
200
{'status': 'success', 'event_count': 100, 'verified': True}
4
200
{'status': 'success', 'event_count': 100, 'verified': True}
5
200
{'status': 'success', 'event_count': 100, 'verified': True}
6
200
{'status': 'success', 'event_count': 100, 'verified': True}
7
200
{'status': 'success', 'event_count': 100, 'verified': True}
8
200
{'status': 'success', 'event_count': 100, 'verified': True}
9
200
{'status': 'success', 'event_count': 100, 'verified': True}
10
200
{'status': 'success', 'event_count': 100, 'verified': True}
11
200
{'status': 'success', 'event_count': 100, 'verified': True}
12
200
{'status': 'success', 'event_count': 100, 'verified': True}
13
200
{'status': 'success', 'event_count': 100, 'verified': True}
14
200
{'status': 'success', 'event_count': 100, 'verified': True}
15
20