In [2]:
# Read the CSV file
root_path = '/home/iceberg/data'
app_id = '{id123456789,com.appsflyer.referrersender}'
event_date = '2024-05-20'

input_path = f'{root_path}/gem/appsflyer/installs_report/app_id={app_id}/event_date={event_date}'

df = spark.read.format('csv').options(header='true').load(input_path)

df = spark.sql("""
select event_name, event_time, substring(sha(appsflyer_id), 0, 12) as install_id, af_cost_value, install_time, af_prt, media_source, campaign, af_c_id, country_code, platform
from {installs_report}
where af_c_id is not null
""", installs_report = df)


# Show the data
df.show(5)

+----------+-------------------+------------+-------------+-------------------+------+----------------+--------------------+----------+------------+--------+
|event_name|         event_time|  install_id|af_cost_value|       install_time|af_prt|    media_source|            campaign|   af_c_id|country_code|platform|
+----------+-------------------+------------+-------------+-------------------+------+----------------+--------------------+----------+------------+--------+
|   install|2024-05-20 23:44:46|973c7ba959de|         NULL|2024-05-20 23:44:46|  NULL|Apple Search Ads|912_VHDL_FA Plan_...|1576814064|          VN|     ios|
|   install|2024-05-20 23:42:37|02e573c1ac60|         NULL|2024-05-20 23:42:37|  NULL|Apple Search Ads|912_VHDL_SearchRe...|1575425784|          VN|     ios|
|   install|2024-05-20 23:39:14|91ff3d74e0fb|         NULL|2024-05-20 23:39:14|  NULL|Apple Search Ads|912_VHDL_SearchRe...|1575425784|          VN|     ios|
|   install|2024-05-20 23:34:37|7f172fe05157|       

In [2]:
# Read the CSV file
root_path = '/home/iceberg/data'
app_id = '{id123456789,com.appsflyer.referrersender}'
event_date = '2024-05-23'

input_path = f'{root_path}/gem/appsflyer/in_app_events_report/app_id={app_id}/event_date={event_date}'

df = spark.read.format('csv').options(header='true').load(input_path)

df = spark.sql("""
select event_name, event_time, install_time, af_prt, media_source, campaign, af_c_id, country_code, appsflyer_id, platform
from {in_app_events_report}
where af_c_id is not null and event_name = 'af_first_role_create'
""", in_app_events_report = df)


# Show the data
df.show(5)

+--------------------+-------------------+-------------------+------+-----------------+--------------------+------------------+------------+--------------------+--------+
|          event_name|         event_time|       install_time|af_prt|     media_source|            campaign|           af_c_id|country_code|        appsflyer_id|platform|
+--------------------+-------------------+-------------------+------+-----------------+--------------------+------------------+------------+--------------------+--------+
|af_first_role_create|2024-05-23 23:37:35|2024-05-22 13:51:37|  NULL|     Facebook Ads|VHDL | 912 | FA P...|120209226290500601|          VN|1716360689419-831...| android|
|af_first_role_create|2024-05-23 14:51:06|2024-05-22 14:12:06|  NULL|     Facebook Ads|VHDL | 912 | FA P...|120209226290500601|          VN|1716361914625-791...| android|
|af_first_role_create|2024-05-23 13:46:43|2024-05-22 19:55:36|  NULL|     Facebook Ads|VHDL | 912 | FA P...|120209484852270270|          VN|17163

In [37]:
df = spark.read.parquet('s3a://lakehouse/gem/raw/appsflyer/installs_report/partition_date=2024-05-21')
df.show(1)

+---------------------+---------------------+-------------------+-------------------+----------+------+------------+----------+--------+---------+-------+--------+-----------+-----+--------+----------+----------+-------------+-------------+----------------+-------------------+-------------------------+---------------------+-----------------------+-----------------------+-------------------+-------------------------+---------------------+-----------------------+-----------------------+-------------------+-------------------------+---------------------+-----------------------+-----------------------+------+------------+-----+--------+-----------+--------------+-----+--------+-------+----------+--------------------+--------------+----+----------+----------------+----+----+--------+-----------+----------+-----------+-----------+--------------------+--------------------+--------------+---------------------------+--------------------+----------------------+----------------------+------------

In [49]:
in_app_events = spark.read.parquet('s3a://lakehouse/gem/raw/appsflyer/in_app_events_report/partition_date=2024-05-21')
in_app_events.show(1)

in_app_events.printSchema()

in_app_events.select('event_name').distinct().show(100, False)

+---------------------+---------------------+-------------------+-------------------+--------------------+--------------------+-------------+----------------------+-----------------+------------+--------------------+------+------------+----------+--------+--------+-------+--------+-----------+-----+--------+----------+----------+--------------+-------------+-------------+----------------+-------------------+-------------------------+---------------------+-----------------------+-----------------------+-------------------+-------------------------+---------------------+-----------------------+-----------------------+-------------------+-------------------------+---------------------+-----------------------+-----------------------+------+------------+-----+--------------+-----------+----+------------+----+--------------+--------------+----------+--------------------+--------------+----+----------+----------------+----+----+--------+-----------+----------+-----------+-----------+---------

In [52]:
in_app_events.where("event_name = 'af_purchase'").show(1, False)

+---------------------+---------------------+-------------------+-------------------+-----------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------+----------------------+------------------+------------+--------------------+------+------------+----------+--------+--------+-----

In [8]:
in_app_events = spark.read.parquet('s3a://lakehouse/gem/raw/appsflyer/in_app_events_report/partition_date=2024-05-21')

df = spark.sql("""
with in_app_events as (
  select *, '{{product_id}}' as product_id, substring(sha(customer_user_id), 0, 12) as user_id, to_date(event_time) as partition_date
  from {in_app_events}
)
, complete_registration as (
  select  product_id, user_id, af_prt as agency, media_source, os_version, device_model, partition_date
        , cast(install_time as timestamp) as install_time, cast(event_time as timestamp) as registration_time
        , substring(sha(appsflyer_id), 0, 12) as install_id, substring(sha(af_c_id), 0, 12) as campaign_id
        , upper(country_code) as country_code, lower(platform) as platform
  from in_app_events
  where (event_name = 'af_role_create' or event_name = 'af_first_role_create') and user_id is not null
)

select  product_id, user_id, install_time, install_id
      , agency, media_source, campaign_id, country_code, platform, os_version, device_model
      , registration_time, partition_date
from (
  select  product_id, user_id, install_time, install_id
        , agency, media_source, campaign_id, country_code, platform, os_version, device_model
        , registration_time, partition_date
        , row_number() over (partition by product_id, user_id, partition_date order by registration_time) as rn
  from complete_registration
)
where rn = 1
""", in_app_events = in_app_events)

df.show(5, False)

df.count()

df.printSchema()

+------------+------------+-------------------+------------+------+----------------------+-----------+------------+--------+----------+-----------------+-------------------+--------------+
|product_id  |user_id     |install_time       |install_id  |agency|media_source          |campaign_id|country_code|platform|os_version|device_model     |registration_time  |partition_date|
+------------+------------+-------------------+------------+------+----------------------+-----------+------------+--------+----------+-----------------+-------------------+--------------+
|{product_id}|362fbe20cd1d|2024-05-20 21:06:54|b5e67b510941|NULL  |appsflyer_sdk_test_int|NULL       |VN          |ios     |16.3      |iPadPro3rdGen    |2024-05-21 11:13:28|2024-05-21    |
|{product_id}|656f804303b6|2024-05-21 10:31:17|d8a76347af11|NULL  |NULL                  |NULL       |VN          |android |12        |OnePlus::GM1910  |2024-05-21 10:38:36|2024-05-21    |
|{product_id}|8100f0ac1f60|2024-05-21 10:31:17|d8a76347

In [10]:
df = spark.read.parquet("s3a://lakehouse/gem/enriched/user/activity/partition_date=2024-05-23")
df.show(1)
df.printSchema()

+----------+------------+-------------------+-------------------+-------------+-----+
|product_id|     user_id|   first_login_time|    last_login_time|session_count|level|
+----------+------------+-------------------+-------------------+-------------+-----+
|       gem|000316b3c382|2024-05-23 10:01:59|2024-05-23 10:16:23|            2|   10|
+----------+------------+-------------------+-------------------+-------------+-----+
only showing top 1 row

root
 |-- product_id: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- first_login_time: timestamp (nullable = true)
 |-- last_login_time: timestamp (nullable = true)
 |-- session_count: long (nullable = true)
 |-- level: integer (nullable = true)



In [11]:
df = spark.read.parquet("s3a://lakehouse/gem/enriched/user/purchase/partition_date=2024-05-23")
df.show(1)
df.printSchema()

+----------+------------+--------+--------+-------------------+-------------------+-----------+-----------------+------------------+-----------------+-----------------+------------------+
|product_id|     user_id|currency| revenue|first_purchase_time| last_purchase_time|total_trans|     total_amount|        min_amount|       max_amount|     first_amount|       last_amount|
+----------+------------+--------+--------+-------------------+-------------------+-----------+-----------------+------------------+-----------------+-----------------+------------------+
|       gem|001660e4a9d5|     VND|110000.0|2024-05-23 11:02:46|2024-05-23 11:20:26|          3|4.318922054550167|0.7852683654638973|2.748385323622372|2.748385323622372|0.7852683654638973|
+----------+------------+--------+--------+-------------------+-------------------+-----------+-----------------+------------------+-----------------+-----------------+------------------+
only showing top 1 row

root
 |-- product_id: string (nullab

In [4]:
df = spark.read.parquet("s3a://lakehouse/gem/enriched/user_profile")
df.where('campaign_id is not null').show(1)
df.printSchema()

+----------+------------+-------------------+------------+------+------------+------------+------------+--------+----------+-----------------+-------------------+-------------------+-------------------+-------------+-----+-------------------+------------------+-----------+------------+----------+----------+------------+-----------+
|product_id|     user_id|       install_time|  install_id|agency|media_source| campaign_id|country_code|platform|os_version|     device_model|  registration_time|   first_login_time|    last_login_time|session_count|level|first_purchase_time|last_purchase_time|total_trans|total_amount|min_amount|max_amount|first_amount|last_amount|
+----------+------------+-------------------+------------+------+------------+------------+------------+--------+----------+-----------------+-------------------+-------------------+-------------------+-------------+-----+-------------------+------------------+-----------+------------+----------+----------+------------+-----------

In [6]:
from pyspark.sql.functions import *
df = spark.read.parquet("s3a://lakehouse/gem/enriched/user_profile")
df.groupBy('product_id').agg(
  count('user_id'),
  countDistinct('user_id'),
  sum('total_amount'),
  min('registration_time'),
  max('registration_time')
).show()

[Stage 22:>                                                         (0 + 1) / 1]

+----------+--------------+-----------------------+-----------------+----------------------+----------------------+
|product_id|count(user_id)|count(DISTINCT user_id)|sum(total_amount)|min(registration_time)|max(registration_time)|
+----------+--------------+-----------------------+-----------------+----------------------+----------------------+
|       gem|         30804|                  30804|25154.74273764151|   2024-05-23 00:05:21|   2024-05-23 23:59:45|
+----------+--------------+-----------------------+-----------------+----------------------+----------------------+



                                                                                

In [1]:
from pyspark.sql.functions import *
df = spark.read.parquet("s3a://lakehouse/gem/enriched/user_profile")
df.groupBy('product_id').agg(
  count('user_id'),
  countDistinct('user_id'),
  sum('total_amount'),
  min('registration_time'),
  max('registration_time')
).show()

26/01/23 04:03:28 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
                                                                                

+----------+--------------+-----------------------+-----------------+----------------------+----------------------+
|product_id|count(user_id)|count(DISTINCT user_id)|sum(total_amount)|min(registration_time)|max(registration_time)|
+----------+--------------+-----------------------+-----------------+----------------------+----------------------+
|       gem|        193510|                 193510|463330.7682903693|   2024-05-23 00:09:47|   2024-06-29 23:59:44|
+----------+--------------+-----------------------+-----------------+----------------------+----------------------+



In [4]:
from pyspark.sql.functions import *
df = spark.read.parquet("s3a://lakehouse/gem/curated/user_profile")
df.groupBy('product_id').agg(
  count('user_id'),
  countDistinct('user_id'),
  sum('total_amount'),
  min('registration_time'),
  max('registration_time')
).show()



+----------+--------------+-----------------------+-----------------+----------------------+----------------------+
|product_id|count(user_id)|count(DISTINCT user_id)|sum(total_amount)|min(registration_time)|max(registration_time)|
+----------+--------------+-----------------------+-----------------+----------------------+----------------------+
|       gem|        194723|                 194723|468391.0635026869|   2024-05-23 00:09:47|   2024-06-30 23:58:43|
+----------+--------------+-----------------------+-----------------+----------------------+----------------------+



                                                                                

In [2]:
# 2. Check libraries
import pandas as pd
import xgboost as xgb
import sklearn

print("Pandas version:", pd.__version__)
print("XGBoost version:", xgb.__version__)
print("scikit-learn version:", sklearn.__version__)

Pandas version: 1.4.3
XGBoost version: 2.1.4
scikit-learn version: 1.3.2


In [31]:
user_activity = spark.read.parquet('s3a://lakehouse/gem/enriched/user/activity/partition_date={2024-05-2*,2024-05-3*,2024-06-*}')
user_purchase = spark.read.parquet('s3a://lakehouse/gem/enriched/user/purchase/partition_date={2024-05-2*,2024-05-3*,2024-06-*}')
user_profile = spark.read.parquet('s3a://lakehouse/gem/curated/user_profile')

df = spark.sql("""
with user_activity as (
  select  product_id, user_id, to_date(first_login_time) as event_date, session_count, level
  from {user_activity}
)
, user_purchase as (
  select  product_id, user_id, to_date(first_purchase_time) as event_date, total_amount
  from {user_purchase}
)
, daily_snapshot as (
  select  product_id, user_id, event_date, session_count, level, total_amount
  from user_activity a full join user_purchase p using (product_id, user_id, event_date)
)
, daily_snapshot_with_user_profile as (
  select  product_id, user_id, to_date(install_time) as install_date
        , agency, media_source, campaign_id, country_code, platform
        , datediff(event_date, install_time) as day_since_install
        , (unix_timestamp(first_purchase_time) - unix_timestamp(install_time)) / 3600 as hours_to_first_purchase
        , daily_snapshot.session_count, daily_snapshot.level, daily_snapshot.total_amount, event_date
  from {user_profile}
  left join daily_snapshot using (product_id, user_id)
  where install_time is not null and first_purchase_time is not null and datediff(event_date, install_time) <= 30
)
, user_base as (
  -- Get unique players who have at least some data in the last 30+ days
  select distinct user_id from daily_snapshot_with_user_profile where day_since_install >= 30
)
, user_features as (
  -- Aggregate features per player (only from D0 to D30)
  select  product_id, user_id, install_date, agency, media_source, campaign_id, country_code, platform
        , hours_to_first_purchase
        -- Retention flags (binary)
        , max(case when day_since_install = 1 then 1 else 0 end) as retention_d01
        , max(case when day_since_install = 3 then 1 else 0 end) as retention_d03
        , max(case when day_since_install = 7 then 1 else 0 end) as retention_d07
        , max(case when day_since_install = 14 then 1 else 0 end) as retention_d14
        -- Session & progression metrics (D1-D7 window)
        , sum(case when day_since_install <= 7 then session_count end) as d1_d7_sessions
        , avg(case when day_since_install <= 7 then session_count end) as avg_d1_d7_sessions_per_day
        , max(case when day_since_install <= 7 then level else 0 end) as max_level_d7
        -- Monetization features
        , sum(case when day_since_install <= 7 then total_amount end) as total_amount_7d
        , sum(case when day_since_install <= 30 then total_amount end) as total_amount_30d -- This is the TARGET for LTV modeling!
        , max(case when day_since_install <= 30 then total_amount end) as max_amount_30d
        , count(case when total_amount > 0 then 1 end) as num_of_purchase_days_30d
        , to_date('{{event_date}}') as partition_date
  from user_base left join daily_snapshot_with_user_profile using (user_id)
  where day_since_install >= 1
  group by product_id, user_id, install_date, agency, media_source, campaign_id, country_code, platform, hours_to_first_purchase
)

select * from user_features
where total_amount_30d is not null -- Ensure we have target
""", user_activity = user_activity, user_purchase = user_purchase, user_profile = user_profile)

df.printSchema()
df.orderBy(desc('total_amount_30d')).show()

root
 |-- product_id: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- install_date: date (nullable = true)
 |-- agency: string (nullable = true)
 |-- media_source: string (nullable = true)
 |-- campaign_id: string (nullable = true)
 |-- country_code: string (nullable = true)
 |-- platform: string (nullable = true)
 |-- hours_to_first_purchase: double (nullable = true)
 |-- retention_d01: integer (nullable = true)
 |-- retention_d03: integer (nullable = true)
 |-- retention_d07: integer (nullable = true)
 |-- retention_d14: integer (nullable = true)
 |-- d1_d7_sessions: long (nullable = true)
 |-- avg_d1_d7_sessions_per_day: double (nullable = true)
 |-- max_level_d7: integer (nullable = true)
 |-- total_amount_7d: double (nullable = true)
 |-- total_amount_30d: double (nullable = true)
 |-- max_amount_30d: double (nullable = true)
 |-- num_of_purchase_days_30d: long (nullable = false)
 |-- partition_date: date (nullable = true)



                                                                                

+----------+------------+------------+---------+------------+------------+------------+--------+-----------------------+-------------+-------------+-------------+-------------+--------------+--------------------------+------------+------------------+------------------+------------------+------------------------+--------------+
|product_id|     user_id|install_date|   agency|media_source| campaign_id|country_code|platform|hours_to_first_purchase|retention_d01|retention_d03|retention_d07|retention_d14|d1_d7_sessions|avg_d1_d7_sessions_per_day|max_level_d7|   total_amount_7d|  total_amount_30d|    max_amount_30d|num_of_purchase_days_30d|partition_date|
+----------+------------+------------+---------+------------+------------+------------+--------+-----------------------+-------------+-------------+-------------+-------------+--------------+--------------------------+------------+------------------+------------------+------------------+------------------------+--------------+
|       gem|f

In [41]:
spark.read.parquet("s3a://lakehouse/gem/curated/user_profile").where("user_id = '6bc157004d3c'").show()

+----------+------------+-------------------+------------+------+------------+-----------+------------+--------+----------+-----------------+-------------------+-------------------+-------------------+-------------+-----+-------------------+-------------------+-----------+------------------+------------------+------------------+------------+------------------+
|product_id|     user_id|       install_time|  install_id|agency|media_source|campaign_id|country_code|platform|os_version|     device_model|  registration_time|   first_login_time|    last_login_time|session_count|level|first_purchase_time| last_purchase_time|total_trans|      total_amount|        min_amount|        max_amount|first_amount|       last_amount|
+----------+------------+-------------------+------------+------+------------+-----------+------------+--------+----------+-----------------+-------------------+-------------------+-------------------+-------------+-----+-------------------+-------------------+-----------+-

In [45]:
df = spark.read.parquet("s3a://lakehouse/gem/curated/feature_group_ltv")
df.show()
df.agg(count('user_id'), min('install_date'), max('install_date')).show()

+----------+------------+------------+------------+-----------------+------------+------------+--------+-----------------------+-------------+-------------+-------------+-------------+--------------+--------------------------+------------+------------------+------------------+------------------+------------------------+
|product_id|     user_id|install_date|      agency|     media_source| campaign_id|country_code|platform|hours_to_first_purchase|retention_d01|retention_d03|retention_d07|retention_d14|d1_d7_sessions|avg_d1_d7_sessions_per_day|max_level_d7|   total_amount_7d|  total_amount_30d|    max_amount_30d|num_of_purchase_days_30d|
+----------+------------+------------+------------+-----------------+------------+------------+--------+-----------------------+-------------+-------------+-------------+-------------+--------------+--------------------------+------------+------------------+------------------+------------------+------------------------+
|       gem|acd43588149b|  2024-05

26/01/23 11:18:00 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 1827138 ms exceeds timeout 120000 ms
26/01/23 11:18:00 WARN SparkContext: Killing executors is not supported by current scheduler.
26/01/23 11:18:01 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:124)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$

In [33]:
spark.read.parquet("s3a://lakehouse/gem/enriched/user/purchase/partition_date=2024-05-24").where("user_id = 'f273f7226b72'").show()

+----------+------------+--------+--------+-------------------+-------------------+-----------+------------------+-----------------+------------------+------------------+-----------------+
|product_id|     user_id|currency| revenue|first_purchase_time| last_purchase_time|total_trans|      total_amount|       min_amount|        max_amount|      first_amount|      last_amount|
+----------+------------+--------+--------+-------------------+-------------------+-----------+------------------+-----------------+------------------+------------------+-----------------+
|       gem|f273f7226b72|     VND|450000.0|2024-05-24 15:18:32|2024-05-24 15:27:18|          2|17.663683466792275|6.672947087454859|10.990736379337415|10.990736379337415|6.672947087454859|
+----------+------------+--------+--------+-------------------+-------------------+-----------+------------------+-----------------+------------------+------------------+-----------------+

