In [1]:
cd /Users/karolinegriesbach/Documents/Innkeepr/Git/evaluation-and-execution-scripts/

In [2]:
import ast
import pandas as pd
import numpy as np
import delta_sharing
import awswrangler as wr
from general_functions.conncet_s3 import S3Connection
from general_functions.return_account_ids import return_account_ids
import general_functions.databricks_client as db_client
from src.utils.directory_handling import create_directory_if_not_exists

In [3]:
customer = "LILLYDOO"
ids_for_profile_matching = ["email_sha256"]
column_profile_matching = "traits"
bigquery_conversion_config = {"Wipes"}
workspace_id = return_account_ids()
workspace_id = [acc["id"] for acc in workspace_id if acc["name"] == customer]
workspace_id = workspace_id[0]
path = "SprintStories/EN-3032-BigQuery/"
path_data = f"{path}data/"
create_directory_if_not_exists(path_data)

In [4]:
path_to_views = f"{path_data}{customer}_views_30_outlook.csv"
try:
    views = pd.read_csv(path_to_views)
except FileNotFoundError:
    view_path = db_client.return_databricks_client()
    table_path = f"{view_path}#delta_share_events.{workspace_id}.features_view_30_outlook"
    views = delta_sharing.load_as_pandas(table_path, limit=100000)
    views.to_csv(f"{path_data}{customer}_views_30_outlook.csv")

In [5]:
path_to_profiles = f"{path_data}{customer}_profiles_view_30_outlook.csv"
try:
    profiles = pd.read_csv(path_to_profiles)
except FileNotFoundError:
    profile_path = db_client.return_databricks_client()
    table_path = f"{profile_path}#delta_share_events.{workspace_id}.profiles_view_30_outlook" #features_view_30_outlook"
    profiles = delta_sharing.load_as_pandas(table_path)#, limit=20000)
    profiles.to_csv(f"{path_data}{customer}_profiles_view_30_outlook.csv")

In [6]:
path_s3_bigquery = f"{path_data}{customer}_bigQueryEvents.csv"
try:
    bigquery = pd.read_csv(path_s3_bigquery)
except FileNotFoundError:
    s3 = S3Connection()
    list_all_files = s3.list_files(f"{workspace_id}", "bigQueryEvents/")
    bigquery = pd.DataFrame()
    for file in list_all_files:
        temp = wr.s3.read_json(f"s3://{workspace_id}/{file}")
        bigquery = pd.concat([bigquery, temp])
    bigquery.to_csv(f"{path_data}{customer}_bigQueryEvents.csv")
bigquery

In [7]:
bigquery["_id"].value_counts(dropna=False)

In [8]:
bigquery["date"] = pd.to_datetime(bigquery["created"]).dt.date
print(f"Min data = {bigquery['date'].min()} until max date = {bigquery['date'].max()}")
bigquery.groupby("date")["_id"].value_counts(dropna=False).reset_index()

# Preprocess Big Query Data

In [9]:
# preprocess data
bigquery = bigquery.dropna(subset=["_id"])
bigquery = bigquery[["_id","name","created","traits"]]
print(f"after dropping nans: {bigquery.shape}")
bigquery["bigquery_name"] = bigquery["name"]
bigquery = bigquery.drop(columns=["name"])

In [10]:
def return_id(x, item):
    if isinstance(x, str):
        x = ast.literal_eval(x)
    try:
        return x[item]
    except KeyError:
        return None
    
for item in ids_for_profile_matching:
    if bigquery["traits"].astype("string").str.contains("email_sha256", case=True).sum() > 0:
        print(f"return id for item = {item}")
        bigquery[item] = None
        bigquery[item] = bigquery["traits"].apply(lambda x: return_id(x, item))
bigquery

In [11]:
#profiles[profiles["email_sha256_externalIds"].astype("string").str.contains("c00f48e66ad7801ac696681fa0543fcc4820bc84c35620d2f206d60979328442")]

# Merge profiles and bigquery data

In [12]:
def extract_email_sha256_ids(external_ids):
    if isinstance(external_ids, str):
        external_ids = ast.literal_eval(external_ids)
    if not external_ids:
        return []
    return [item['id'] for item in external_ids if item.get('name') == 'email_sha256']

In [13]:
profiles['email_list'] = profiles['email_sha256_externalIds'].apply(
    lambda x: ast.literal_eval(x) if isinstance(x, str) else (x if x else [])
)

# Build reverse index: email_sha256 -> profile row index or id
email_to_profile = {}
for idx, row in profiles.iterrows():
    if row['email_sha256_externalIds'] in [np.nan]:
        continue
    for email_id in extract_email_sha256_ids(row['email_sha256_externalIds']):
        email_to_profile[email_id] = row['profile_id']  # or idx
bigquery['profile_id'] = bigquery['email_sha256'].map(email_to_profile)
bigquery = pd.merge(bigquery, profiles[["profile_id","anonymousId"]], how="left", on="profile_id")
bigquery = bigquery.rename(columns={"_id":"bigquery_conversion_id","created":"bigquery_created"})
bigquery=bigquery[["profile_id","anonymousId","conv_name","bigquery_name","bigquery_created","bigquery_conversion_id"]]
bigquery

In [None]:
matched_conversion_ids = bigquery[bigquery["profile_id"].notnull()]["bigquery_conversion_id"].tolist()
unmatched_conversion_ids = bigquery[bigquery["bigquery_conversion_id"].isin(matched_conversion_ids) == False]["bigquery_conversion_id"].tolist()
total_conversion_ids = bigquery["bigquery_conversion_id"].nunique()
print(f"total_conversion_ids = {total_conversion_ids}")
print(f"Matched conversion_ids = {len(set(matched_conversion_ids))}")
print(f"Unmatched conversion_ids = {len(set(unmatched_conversion_ids))}")
print(f"Percentage Matched conversion_ids = {len(set(matched_conversion_ids))/total_conversion_ids*100}")
if len(set(matched_conversion_ids))+ len(set(unmatched_conversion_ids)) != total_conversion_ids:
    raise ValueError(f"Matched conversion_ids + Unmatched conversion_ids != total_conversion_ids")

In [None]:
bigquery = bigquery.dropna(subset=["anonymousId"])
bigquery = bigquery.drop_duplicates(subset=["anonymousId","bigquery_conversion_id"], keep="first")
bigquery

In [None]:
bigquery.groupby(by="bigquery_conversion_id")["anonymousId"].nunique().reset_index()

# Merge with Data

In [None]:
views = views[["session","created","anonymousId","conv_name"]]
views

In [None]:
# Ensure datetime types
views['created'] = pd.to_datetime(views['created'])
bigquery['bigquery_created'] = pd.to_datetime(bigquery['bigquery_created'])
# Get session time ranges per anonymousId
session_ranges = views.groupby(['anonymousId', 'session']).agg(
    session_start=('created', 'min'),
    session_end=('created', 'max')
).reset_index()
# Merge on anonymousId first (creates cartesian product per user)
merged = bigquery.merge(session_ranges, on='anonymousId', how='inner')# Merge on anonymousId first (creates cartesian product per user)
print(f"Unique anonymousIds after profile merge: {bigquery['anonymousId'].nunique()}")
print(f"Unique anonymousIds after session merge: {merged['anonymousId'].nunique()}")
# Filter: bigquery_created must be within session time range
# Add buffer if needed (e.g., session_end + 30 minutes for conversions after last pageview)
merged = merged[
    (merged['bigquery_created'] >= merged['session_start'] - pd.Timedelta(minutes=60)) & 
    (merged['bigquery_created'] <= merged['session_end'] + pd.Timedelta(minutes=60))
]
print(f"Unique anonymousIds after created merge: {merged['anonymousId'].nunique()}")
merged

In [None]:
# Then join back to full views data if needed
final = views.merge(merged[['anonymousId', 'session', 'bigquery_name', 'bigquery_conversion_id']], on=['anonymousId', 'session'], how='left')
print("count conv_name")
print(final["conv_name"].value_counts(dropna=False))
final["conv_name_copy"] = final["conv_name"]
final["conv_name"] = np.where((final["bigquery_name"].notnull()) & (final["conv_name"].isnull()), "checkout_completed", final["conv_name"]) #TODO: check name, how to do checkout completed dynamically?
print("count conv_name after adding bigquery")
print(final["conv_name"].value_counts(dropna=False))
final

In [None]:
final[(final["conv_name"]!=final["conv_name_copy"])]