In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://dlcdn.apache.org/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
!tar xf spark-3.2.1-bin-hadoop3.2.tgz
!pip install -q findspark

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.1-bin-hadoop3.2"

In [None]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").appName("firstSpark").getOrCreate()

In [None]:
from functools import reduce
from pyspark import SparkContext
from pyspark.sql import SparkSession, Window, Row
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [None]:
import pandas as pd
import numpy as np

#Load data

##Promotion

In [None]:
campaign = spark.read.option("header","true").csv("/content/drive/MyDrive/2022/project/data/source/configs/campaign.csv")
campaign = campaign.select(split(col("campaignID\tcampaignType\texpireDate\texpireTime"),"\t").getItem(0).alias("campaignID"),
                           split(col("campaignID\tcampaignType\texpireDate\texpireTime"),"\t").getItem(1).alias("campaignType"),
                           split(col("campaignID\tcampaignType\texpireDate\texpireTime"),"\t").getItem(2).alias("expireDate"),
                           split(col("campaignID\tcampaignType\texpireDate\texpireTime"),"\t").getItem(3).alias("expireTime")) \
                            .drop("campaignID\tcampaignType\texpireDate\texpireTime").cache()
campaign.show(5)

+----------+------------+-------------------+----------+
|campaignID|campaignType|         expireDate|expireTime|
+----------+------------+-------------------+----------+
|      1000|           1|2022-01-01 00:00:00|         0|
|      1001|           1|2022-01-01 00:00:00|         0|
|      1002|           1|2022-01-01 00:00:00|         0|
|      1003|           1|2022-01-01 00:00:00|         0|
|      1004|           1|2022-01-01 00:00:00|         0|
+----------+------------+-------------------+----------+
only showing top 5 rows



In [None]:
promotion = spark.read.option("header","true").csv("/content/drive/MyDrive/2022/project/data/source/promotions/2021-11-01/part-00000-802e6ae2-b27d-463b-a5c0-5b2d91ad831f-c000.csv")
promotion = promotion.select(split(col("userid\tvoucherCode\tstatus\tcampaignID\ttime"),"\t").getItem(0).alias("userid"),
                           split(col("userid\tvoucherCode\tstatus\tcampaignID\ttime"),"\t").getItem(1).alias("voucherCode"),
                           split(col("userid\tvoucherCode\tstatus\tcampaignID\ttime"),"\t").getItem(2).alias("status"),
                           split(col("userid\tvoucherCode\tstatus\tcampaignID\ttime"),"\t").getItem(3).alias("campaignID"),
                           split(col("userid\tvoucherCode\tstatus\tcampaignID\ttime"),"\t").getItem(4).alias("time")) \
                            .drop("userid\tvoucherCode\tstatus\tcampaignID\ttime").cache()
promotion.show(5)

+------+-----------+------+----------+--------------------+
|userid|voucherCode|status|campaignID|                time|
+------+-----------+------+----------+--------------------+
|  3070|     1000-5| GIVEN|      1000|2021-11-01 13:31:...|
|   355|     1008-8| GIVEN|      1008|2021-11-01 15:51:...|
|  9006|    1004-40| GIVEN|      1004|2021-11-01 12:19:...|
|  4891|    1008-86| GIVEN|      1008|2021-11-01 06:50:...|
| 12531|   1000-103| GIVEN|      1000|2021-11-01 00:44:...|
+------+-----------+------+----------+--------------------+
only showing top 5 rows



##Mapping

In [None]:
mapping_appid = spark.read.option("header","true").csv("/content/drive/MyDrive/2022/project/data/source/mapping/appid.csv")
mapping_appid = mapping_appid.select(split(col("appid appname"),"  ").getItem(0).alias("appid"),
                           split(col("appid appname"),"  ").getItem(1).alias("appname"),
                          ) \
                            .drop("appid appname").cache()
mapping_appid.show()

+-----+-----------+
|appid|    appname|
+-----+-----------+
|   10|      Telco|
|   11|     Lazada|
|   12|       Tiki|
|   13|   Internet|
|   14|Electricity|
|   15|      Water|
|   16|         TV|
|   17|    123Phim|
|   18|      123Go|
|   19|     Shopee|
|   20|    Dominos|
|   21|       Game|
|   22|      Bitis|
+-----+-----------+



In [None]:
mapping_gender = spark.read.option("header","true").csv("/content/drive/MyDrive/2022/project/data/source/mapping/gender.csv")
mapping_gender = mapping_gender.select(split(col("gender\tgenderName"),"\t").getItem(0).alias("gender"),
                           split(col("gender\tgenderName"),"\t").getItem(1).alias("genderName"),
                          ) \
                            .drop("gender\tgenderName").cache()
mapping_gender.show(5)

+------+----------+
|gender|genderName|
+------+----------+
|     1|      Male|
|     2|    Female|
+------+----------+



In [None]:
map_profile = spark.read.option("header","true").csv("/content/drive/MyDrive/2022/project/data/source/mapping/profileLevel.csv")
map_profile = map_profile.select(split(col("profileLevel\tprofileLevelName"),"\t").getItem(0).alias("profileLevel"),
                           split(col("profileLevel\tprofileLevelName"),"\t").getItem(1).alias("profileLevelName"),
                          ) \
                            .drop("profileLevel\tprofileLevelName").cache()
map_profile.show(5)

+------------+--------------------+
|profileLevel|    profileLevelName|
+------------+--------------------+
|           1|Dont have phone n...|
|           2|   Have phone number|
|           3|                 KYC|
+------------+--------------------+



In [None]:
map_transtype = spark.read.option("header","true").csv("/content/drive/MyDrive/2022/project/data/source/mapping/transtype.csv")
map_transtype = map_transtype.select(split(col("transtype\ttranstypename"),"\t").getItem(0).alias("transtype"),
                           split(col("transtype\ttranstypename"),"\t").getItem(1).alias("transtypename"),
                          ) \
                            .drop("transtype\ttranstypename").cache()
map_transtype.show(5)

+---------+-------------+
|transtype|transtypename|
+---------+-------------+
|        1|      Fund In|
|        2|     Fund out|
|        3|      Payment|
|        4|         LiXi|
|        5|     Transfer|
+---------+-------------+



##Transactions

In [None]:
transaction = spark.read.option("header","true").csv("/content/drive/MyDrive/2022/project/data/source/transactions/2021-11-01/part-00000-8f0661ad-96d1-41c9-9b42-0d887254a35f-c000.csv")
transaction = transaction.select(split(col("transId\ttransStatus\tuserId\ttransactionTime\tappId\ttransType\tamount\tpmcId"),"\t").getItem(0).alias("transId"),
                           split(col("transId\ttransStatus\tuserId\ttransactionTime\tappId\ttransType\tamount\tpmcId"),"\t").getItem(1).alias("transStatus"),
                           split(col("transId\ttransStatus\tuserId\ttransactionTime\tappId\ttransType\tamount\tpmcId"),"\t").getItem(2).alias("userId"),
                           split(col("transId\ttransStatus\tuserId\ttransactionTime\tappId\ttransType\tamount\tpmcId"),"\t").getItem(3).alias("transactionTime"),
                           split(col("transId\ttransStatus\tuserId\ttransactionTime\tappId\ttransType\tamount\tpmcId"),"\t").getItem(4).alias("appId"),
                           split(col("transId\ttransStatus\tuserId\ttransactionTime\tappId\ttransType\tamount\tpmcId"),"\t").getItem(5).alias("transType"),
                           split(col("transId\ttransStatus\tuserId\ttransactionTime\tappId\ttransType\tamount\tpmcId"),"\t").getItem(6).alias("amount"),
                           split(col("transId\ttransStatus\tuserId\ttransactionTime\tappId\ttransType\tamount\tpmcId"),"\t").getItem(7).alias("pmcId")) \
                            .drop("transId\ttransStatus\tuserId\ttransactionTime\tappId\ttransType\tamount\tpmcId").cache()
transaction.show(5)

+--------------------+-----------+------+--------------------+-----+---------+-------+-----+
|             transId|transStatus|userId|     transactionTime|appId|transType| amount|pmcId|
+--------------------+-----------+------+--------------------+-----+---------+-------+-----+
|20211101--2428536...|          1| 31007|2021-11-01 17:09:...|   70|        3|1603991|    1|
|20211101--8364880...|          1| 26285|2021-11-01 09:27:...|   15|        3|9313080|    2|
|20211101-90996169...|          1| 19721|2021-11-01 17:55:...|   18|        3|3540484|    3|
|20211101--8426461...|         -1|  6826|2021-11-01 03:17:...|    1|        1|4372257|    4|
|20211101-79265576...|         -1| 11744|2021-11-01 00:34:...|    2|        2|6078535|    2|
+--------------------+-----------+------+--------------------+-----+---------+-------+-----+
only showing top 5 rows



##Users

In [None]:
users = spark.read.option("header","true").csv("/content/drive/MyDrive/2022/project/data/source/users/2021-11-01/part-00000-90950e0e-e0ae-4b04-a8ba-e2e6d8420f9e-c000.csv")
users = users.select(split(col("userid\tbirthdate\tprofileLevel\tgender\tupdatedTime"),"\t").getItem(0).alias("userid"),
                           split(col("userid\tbirthdate\tprofileLevel\tgender\tupdatedTime"),"\t").getItem(1).alias("birthdate"),
                           split(col("userid\tbirthdate\tprofileLevel\tgender\tupdatedTime"),"\t").getItem(2).alias("profileLevel"),
                           split(col("userid\tbirthdate\tprofileLevel\tgender\tupdatedTime"),"\t").getItem(3).alias("gender"),
                           split(col("userid\tbirthdate\tprofileLevel\tgender\tupdatedTime"),"\t").getItem(4).alias("updatedTime")) \
                            .drop("transId\ttransStatus\tuserId\ttransactionTime\tappId\ttransType\tamount\tpmcId").cache()
users.show(5)

+------+----------+------------+------+--------------------+
|userid| birthdate|profileLevel|gender|         updatedTime|
+------+----------+------------+------+--------------------+
| 16739|2006-07-07|           3|     2|2021-11-01 23:33:...|
|  2212|1970-04-26|           3|     1|2021-11-01 13:01:...|
| 21246|1975-05-25|           3|     2|2021-11-01 05:21:...|
| 17283|1990-12-12|           1|     2|2021-11-01 01:02:...|
| 38267|1972-05-23|           1|     1|2021-11-01 14:42:...|
+------+----------+------------+------+--------------------+
only showing top 5 rows



#Update to latest data

###Latest users 

In [None]:
winSpec_user = Window.partitionBy("userid")
latest_users = users.withColumn("latest_update", max("updatedTime").over(winSpec_user))
latest_users = latest_users.filter("latest_update == updatedTime")
latest_users.show(5)
print(latest_users.count())

+------+----------+------------+------+--------------------+--------------------+
|userid| birthdate|profileLevel|gender|         updatedTime|       latest_update|
+------+----------+------------+------+--------------------+--------------------+
| 10010|1965-04-13|           2|     1|2021-11-01 19:23:...|2021-11-01 19:23:...|
| 10042|1967-06-13|           1|     2|2021-11-01 15:46:...|2021-11-01 15:46:...|
| 10070|1967-05-28|           3|     2|2021-11-01 21:26:...|2021-11-01 21:26:...|
| 10271|1975-08-31|           1|     2|2021-11-01 08:00:...|2021-11-01 08:00:...|
| 10286|1961-01-11|           2|     1|2021-11-01 20:50:...|2021-11-01 20:50:...|
+------+----------+------------+------+--------------------+--------------------+
only showing top 5 rows

424


###Earliest and latest transactions 

In [None]:
winSpec_trans = Window.partitionBy("userid")
updated_transaction = transaction.withColumn("latest_active", max("transactionTime").over(winSpec_trans))
updated_transaction = updated_transaction.withColumn("earliest_active", min("transactionTime").over(winSpec_trans))
updated_transaction = updated_transaction.withColumn("latest_transaction", 
                                                     when(updated_transaction.transStatus== 1,
                                                          max("transactionTime").over(winSpec_trans)).otherwise("null"))
updated_transaction = updated_transaction.withColumn("earliest_transaction", 
                                                     when(updated_transaction.transStatus== 1,
                                                          min("transactionTime").over(winSpec_trans)).otherwise("null"))
updated_transaction.show(5)
print(updated_transaction.count())

+--------------------+-----------+------+--------------------+-----+---------+-------+-----+--------------------+--------------------+--------------------+--------------------+
|             transId|transStatus|userId|     transactionTime|appId|transType| amount|pmcId|       latest_active|     earliest_active|  latest_transaction|earliest_transaction|
+--------------------+-----------+------+--------------------+-----+---------+-------+-----+--------------------+--------------------+--------------------+--------------------+
|20211101--7029178...|          1|   100|2021-11-01 09:25:...|    5|        5|9821462|    2|2021-11-01 09:25:...|2021-11-01 09:25:...|2021-11-01 09:25:...|2021-11-01 09:25:...|
|20211101-69441642...|         -1| 10002|2021-11-01 04:33:...|    1|        1|2919106|    2|2021-11-01 04:33:...|2021-11-01 04:33:...|                null|                null|
|20211101--6108721...|          0| 10011|2021-11-01 08:26:...|    4|        4|9270370|    3|2021-11-01 08:26:...|20

In [None]:
updated_transaction.filter(updated_transaction.appId.isNull()).show()

+-------+-----------+------+---------------+-----+---------+------+-----+-------------+---------------+------------------+--------------------+
|transId|transStatus|userId|transactionTime|appId|transType|amount|pmcId|latest_active|earliest_active|latest_transaction|earliest_transaction|
+-------+-----------+------+---------------+-----+---------+------+-----+-------------+---------------+------------------+--------------------+
+-------+-----------+------+---------------+-----+---------+------+-----+-------------+---------------+------------------+--------------------+



#Create tables

In [None]:
demographic = latest_users
demographic = demographic.join(mapping_gender,"gender","leftouter").join(map_profile,"profileLevel","leftouter").drop("profileLevel","gender")
demographic.show(5)
demographic.summary().show()

+------+----------+--------------------+--------------------+----------+--------------------+
|userid| birthdate|         updatedTime|       latest_update|genderName|    profileLevelName|
+------+----------+--------------------+--------------------+----------+--------------------+
| 10010|1965-04-13|2021-11-01 19:23:...|2021-11-01 19:23:...|      Male|   Have phone number|
| 10042|1967-06-13|2021-11-01 15:46:...|2021-11-01 15:46:...|    Female|Dont have phone n...|
| 10070|1967-05-28|2021-11-01 21:26:...|2021-11-01 21:26:...|    Female|                 KYC|
| 10271|1975-08-31|2021-11-01 08:00:...|2021-11-01 08:00:...|    Female|Dont have phone n...|
| 10286|1961-01-11|2021-11-01 20:50:...|2021-11-01 20:50:...|      Male|   Have phone number|
+------+----------+--------------------+--------------------+----------+--------------------+
only showing top 5 rows

+-------+------------------+----------+--------------------+--------------------+----------+--------------------+
|summary|      

In [None]:
demographic.select(demographic.genderName).distinct().show()

+----------+
|genderName|
+----------+
|    Female|
|      Male|
+----------+



In [None]:
activity = updated_transaction.select("userId","appId","transType","pmcId","latest_active","earliest_active","latest_transaction","earliest_transaction")
activity = activity.join(mapping_appid,activity.appId==mapping_appid.appid,"fullouter") \
                    .join(map_transtype,activity.transType==map_transtype.transtype,"fullouter") \
                    .drop(mapping_appid.appid).drop(map_transtype.transtype)

activity.show()
print(activity.count())

+------+-----+---------+-----+--------------------+--------------------+--------------------+--------------------+-------+-------------+
|userId|appId|transType|pmcId|       latest_active|     earliest_active|  latest_transaction|earliest_transaction|appname|transtypename|
+------+-----+---------+-----+--------------------+--------------------+--------------------+--------------------+-------+-------------+
| 10002|    1|        1|    2|2021-11-01 04:33:...|2021-11-01 04:33:...|                null|                null|   null|      Fund In|
| 10026|    1|        1|    4|2021-11-01 04:36:...|2021-11-01 04:36:...|                null|                null|   null|      Fund In|
| 10048|    1|        1|    3|2021-11-01 18:17:...|2021-11-01 18:17:...|                null|                null|   null|      Fund In|
| 10147|    1|        1|    1|2021-11-01 10:14:...|2021-11-01 10:14:...|                null|                null|   null|      Fund In|
| 10176|    1|        1|    2|2021-11-01 

In [None]:
activity.select(activity.appname).distinct().show()

+-----------+
|    appname|
+-----------+
|      Water|
|     Lazada|
|    Dominos|
|Electricity|
|         TV|
|       null|
|   Internet|
|      123Go|
|    123Phim|
|     Shopee|
|       Game|
|      Telco|
|      Bitis|
|       Tiki|
+-----------+



In [None]:
promotions = promotion.join(campaign,"campaignID","fullouter")
promotions.show(5)

+----------+------+-----------+------+--------------------+------------+-------------------+----------+
|campaignID|userid|voucherCode|status|                time|campaignType|         expireDate|expireTime|
+----------+------+-----------+------+--------------------+------------+-------------------+----------+
|      1000|  3070|     1000-5| GIVEN|2021-11-01 13:31:...|           1|2022-01-01 00:00:00|         0|
|      1000| 12531|   1000-103| GIVEN|2021-11-01 00:44:...|           1|2022-01-01 00:00:00|         0|
|      1000|  6860|   1000-146| GIVEN|2021-11-01 03:26:...|           1|2022-01-01 00:00:00|         0|
|      1000| 11744|   1000-375| GIVEN|2021-11-01 10:01:...|           1|2022-01-01 00:00:00|         0|
|      1000|  2580|   1000-670| GIVEN|2021-11-01 17:44:...|           1|2022-01-01 00:00:00|         0|
+----------+------+-----------+------+--------------------+------------+-------------------+----------+
only showing top 5 rows

