In [1]:
import findspark
findspark.init()

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [2]:
sc = SparkContext() # dòng này phải nằm trước dòng dưới
spark = SparkSession.builder.appName("Python Spark SQL basic example") \
    .config("spark.driver.memory", "10g") \
    .getOrCreate()

In [None]:
spark.stop()

# Test create fact table

In [3]:
# source table
df_dim_payment_transaction = spark.read.parquet("../temp/dim_payment_transaction/*.parquet")
    
df_dim_payment_type = spark.read.parquet("../temp/dim_payment_type.parquet")

df_dim_account = spark.read.parquet("../temp/dim_account.parquet")

df_dim_account_type = spark.read.parquet("../temp/dim_account_type.parquet")

In [5]:
from calendar import monthcalendar
from datetime import datetime, timedelta
from datetime import date
import holidays

def date_range(start_date, end_date):
    start_date = datetime.strptime(start_date, '%Y-%m-%d')
    end_date = datetime.strptime(end_date, '%Y-%m-%d')
    delta = timedelta(days=1)
    dates_list = []
    while start_date <= end_date:
        dates_list.append(start_date.strftime('%Y-%m-%d'))
        start_date += delta
    return dates_list

start_date = '2015-01-01'
end_date = '2024-12-31'

date_list = date_range(start_date, end_date)
vn_holidays = holidays.VN()  # this is a dict

def is_holiday(date):
    return date in vn_holidays

def get_holiday_name(date):
    return vn_holidays.get(date)

def get_week_of_month(year, month, day):
    return next(
        (
            week_number
            for week_number, days_of_week in enumerate(monthcalendar(year, month), start=1)
            if day in days_of_week
        ),
        None,
    )
    
udf_is_holiday = udf(is_holiday, BooleanType())
udf_get_week_of_month = udf(get_week_of_month)
udf_get_holiday_name = udf(get_holiday_name, StringType())

df_dim_date = spark.createDataFrame([(date,) for date in date_list], ['date']) \
    .withColumn('date', to_date(col('date'), 'yyyy-MM-dd')) \
    .withColumn("is_holiday", udf_is_holiday("date")) \
    .withColumn("quarter", quarter("date")) \
    .withColumn("year", date_format("date", "yyyy")) \
    .withColumn("date_key", date_format("date", "yyyyMMdd")) \
    .withColumn("week_of_month", udf_get_week_of_month(year(col('date')), month(col('date')), dayofmonth(col('date')))) \
    .withColumn('holiday_name', when(col('is_holiday') == True, udf_get_holiday_name(col('date'))).otherwise(lit('work day')))
    

df_dim_date.show(truncate=False)

+----------+----------+-------+----+--------+-------------+----------------------------+
|date      |is_holiday|quarter|year|date_key|week_of_month|holiday_name                |
+----------+----------+-------+----+--------+-------------+----------------------------+
|2015-01-01|true      |1      |2015|20150101|1            |International New Year's Day|
|2015-01-02|false     |1      |2015|20150102|1            |work day                    |
|2015-01-03|false     |1      |2015|20150103|1            |work day                    |
|2015-01-04|false     |1      |2015|20150104|1            |work day                    |
|2015-01-05|false     |1      |2015|20150105|2            |work day                    |
|2015-01-06|false     |1      |2015|20150106|2            |work day                    |
|2015-01-07|false     |1      |2015|20150107|2            |work day                    |
|2015-01-08|false     |1      |2015|20150108|2            |work day                    |
|2015-01-09|false    

In [6]:
df_payment_transaction_full = df_dim_payment_transaction.join(df_dim_payment_type, df_dim_payment_transaction['payment_code'] == df_dim_payment_type['type_code']) \
    .withColumn('transaction_date', to_date('transaction_time'))

df_payment_transaction_full.createOrReplaceTempView('dim_payment_transaction')
df_dim_date.createOrReplaceTempView('dim_date')
df_dim_account.createOrReplaceTempView('dim_account')
df_dim_account_type.createOrReplaceTempView('dim_account_type')

In [7]:
df_fact = spark.sql("""
    with cte_transaction_revenue as (
        select transaction_date, a.cust_id,
            count(trans_id) as cust_no_transaction_daily,
            sum(amount) as cust_daily_spending,
            collect_list(distinct type_nm) as cust_daily_payment_type
        from dim_payment_transaction pm join dim_account a on pm.acc_id = a.acc_id
        group by transaction_date, a.cust_id
    ),
    cte_cust_accum_revenue as (
        select cust_id,
            sum(cust_daily_spending) over (partition by cust_id order by transaction_date) as cust_accum_spending
        from cte_transaction_revenue
    ),
    cte_account_payment_summary as (
        select pm.transaction_date, cust_id, a.acc_id, at.type_nm as account_type_name,
            count(distinct pm.trans_id) as account_no_transactions_daily,
            sum(pm.amount) as account_daily_spending
        from dim_payment_transaction pm
            join dim_account a on pm.acc_id = a.acc_id
            join dim_account_type at on a.acc_type = at.type_id
        group by pm.transaction_date, a.cust_id, a.acc_id, at.type_nm
    ),
    cte_account_accum_revenue as (
        select transaction_date, cust_id, acc_id, account_daily_spending,
            sum(account_daily_spending) over (partition by cust_id, acc_id order by transaction_date) as account_accum_spending
        from cte_account_payment_summary
    ),
    cte_customer_avgerage_daily_spending as (
        select month(transaction_date) as month, a.cust_id, 
            avg(amount) as cust_avgerage_daily_spending
        from dim_payment_transaction pm join dim_account a on pm.acc_id = a.acc_id
        group by month(transaction_date), a.cust_id
    )
    
    select date_format(a1.transaction_date, 'yyyyMMdd') as date_key, a1.cust_id, a1.acc_id, a1.account_type_name,
            a1.account_no_transactions_daily,
            a1.account_daily_spending,
        a2.account_accum_spending,
        c1.cust_no_transaction_daily, c1.cust_daily_spending, c1.cust_daily_payment_type,
        c2.cust_accum_spending,
        c3.cust_avgerage_daily_spending
    from cte_account_payment_summary a1 
        join cte_account_accum_revenue a2 on a1.transaction_date = a2.transaction_date and a1.cust_id = a2.cust_id and a1.acc_id = a2.acc_id
        join cte_transaction_revenue c1 on a1.transaction_date = c1.transaction_date and a1.cust_id = c1.cust_id 
        join cte_cust_accum_revenue c2 on c1.cust_id = c2.cust_id 
        join cte_customer_avgerage_daily_spending c3 on c3.month = month(a1.transaction_date) and c3.cust_id = a1.cust_id
    order by a1.cust_id, a1.transaction_date
""")

df_fact.show(truncate=False)



+--------+---------------------------------------------+---------------------------------------------+-----------------+-----------------------------+----------------------+----------------------+-------------------------+-------------------+------------------------------+-------------------+----------------------------+
|date_key|cust_id                                      |acc_id                                       |account_type_name|account_no_transactions_daily|account_daily_spending|account_accum_spending|cust_no_transaction_daily|cust_daily_spending|cust_daily_payment_type       |cust_accum_spending|cust_avgerage_daily_spending|
+--------+---------------------------------------------+---------------------------------------------+-----------------+-----------------------------+----------------------+----------------------+-------------------------+-------------------+------------------------------+-------------------+----------------------------+
|20230801|011a433974c5e6461a5b3

In [98]:
df_payment_transaction_full.count()

100387

In [100]:
df_fact.count()

134629

In [None]:
df_fact.printSchema()

Trong 1 ngày, có acc_id nào thực hiện 2 transaction ko

In [None]:
spark.sql("""
    select pm.transaction_date, cust_id, a.acc_id, at.type_nm as account_type_name,
        count(distinct pm.trans_id) as account_no_transactions_daily,
        sum(pm.amount) as account_daily_spending
    from dim_payment_transaction pm
        join dim_account a on pm.acc_id = a.acc_id
        join dim_account_type at on a.acc_type = at.type_id
    group by pm.transaction_date, a.cust_id, a.acc_id, at.type_nm
""").show()

In [None]:
spark.catalog.dropTempView("dim_payment_transaction")
spark.catalog.dropTempView("dim_date")
spark.catalog.dropTempView("dim_account")
spark.catalog.dropTempView("dim_account_type")
spark.catalog.dropTempView("dim_payment_type")


# Analyze most search 2 months

In [3]:
df = spark.read.parquet('../temp/most_search_t67.parquet')
df.show()

+-------+--------------------+--------------------+
|user_id|      most_search_t6|      most_search_t7|
+-------+--------------------+--------------------+
|0003361|         tây hành kỷ|         lộc đỉnh ký|
|0005748|   lời nguyền ma lai|chuyen sinh thanh...|
|0008207|               CONAN|     TIENG ANH LOP 5|
|0017684|sứ mệnh cuói cung...|       bác sĩ yo han|
|0019650|                hori| classroomoftheelite|
|0027835|lời nói dối của h...|     minh lan truyện|
|0041173|detective k: secr...|                anna|
|0060714|   học viện ma vương|học viện anh hùng...|
|0064645|  unforgettable love|      cá mực hầm mật|
|0101498|    công chúa aurora|   nam cung phu nhân|
|0103456|phim thương ngày ...|   nữ luật sư kỳ lạ |
|0115100|  cặp đôi trái ngược|            the 1000|
|0115494|          nhiếp viễn|       đêm giao thừa|
|0124079|               JoJo |             phim lẻ|
|0139158|           lật ngược|   ngôi sao hàng đầu|
|0150958|  vương giả thiên hạ|           one piece|
|0151915|   

In [None]:
df_search_category = spark.read.csv('../temp/map_search_category.csv', header=True)
df_search_category.show(5)

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StopWordsRemover, Tokenizer, NGram, HashingTF, MinHashLSH, RegexTokenizer, SQLTransformer
model = Pipeline(stages=[
    SQLTransformer(statement="SELECT *, lower(Title) lower FROM __THIS__"),
    Tokenizer(inputCol="lower", outputCol="token"),
    StopWordsRemover(inputCol="token", outputCol="stop"),
    SQLTransformer(statement="SELECT *, concat_ws(' ', stop) concat FROM __THIS__"),
    RegexTokenizer(pattern="", inputCol="concat", outputCol="char", minTokenLength=1),
    NGram(n=2, inputCol="char", outputCol="ngram"),
    HashingTF(inputCol="ngram", outputCol="vector"),
    MinHashLSH(inputCol="vector", outputCol="lsh", numHashTables=3)
]).fit(lens_ddf)
result_lens = model.transform(lens_ddf)
result_lens = result_lens.filter(F.size(F.col("ngram")) > 0)

In [None]:
df_search_category = spark.read.csv('../temp/map_search_category.csv', header=True) \
        .withColumnRenamed('Column1', 'category')

df_search_category.createOrReplaceTempView('cte_category')

df_tmp = spark.sql("""
    with cte_mismatch_category as (
        SELECT t1.text as text1, t2.text as text2, 
            t1.category as cat1, t2.category as cat2, 
            levenshtein(t1.text, t2.text) as similarity
        FROM cte_category t1 JOIN cte_category t2 on t1.text != t2.text
        where levenshtein(t1.text, t2.text) < 3
            and t1.category != t2.category
        --order by levenshtein(t1.text, t2.text) desc
    )
    
    select *
    from cte_mismatch_category
""")

print(df_tmp.count())
df_tmp.show(truncate=False)

spark.catalog.dropTempView("cte_category")

In [None]:
df_search_category = spark.read.csv('../temp/map_search_category.csv', header=True) \
        .withColumnRenamed('Column1', 'category')

df_search_category.createOrReplaceTempView('cte_category')

df_tmp = spark.sql("""
    with cte_mismatch_category as (
        SELECT t1.text as text1, t2.text as text2, 
            t1.category as cat1, t2.category as cat2, 
            levenshtein(t1.text, t2.text) as similarity,
            sort_array(array(t1.text, t2.text)) AS sorted_array
        FROM cte_category t1 JOIN cte_category t2 on t1.text != t2.text
        where levenshtein(t1.text, t2.text) < 3
            and t1.category != t2.category
        order by levenshtein(t1.text, t2.text) desc
    ),
    cte_replace_mismatched_category as (
        select text1, text2, cat1, cat1 as cat2 
        from (
            select row_number() over (partition by sorted_array order by text1) as rn,
                text1, text2, cat1, cat2
            from cte_mismatch_category 
        ) 
        where rn = 1
    ),
    cte_reduced_mismatch_category as (
        select text, category
        from (
            select text, category,
                row_number() over (partition by text order by category) as rn --need distinct text
            from (
                (select text1 as text, cat1 as category from cte_replace_mismatched_category)
                union all 
                (select text2 as text, cat2 as category from cte_replace_mismatched_category)
            )
        ) t
        where rn = 1
    )
    
    select text, label, category as Column1 from (
        select *
        from cte_category
        where text not in (select text from cte_reduced_mismatch_category)
    ) t union all (
        select text, -1 as label, category 
        from cte_reduced_mismatch_category
    )

""")

# select * from cte_mismatch_category t1
# where t1.text1_text2 not in (
#     select t2.text1_text2
#     from cte_mismatch_category t2 
#     where t2.text2_text1 = t1.text1_text2
# )

print(df_tmp.count())
df_tmp.show(truncate=False)

spark.catalog.dropTempView("cte_category")

In [None]:
df_tmp.coalesce(1).write.mode('overwrite').csv('../temp/map_search_category2.csv', header=True)

In [None]:
df_tmp.where("text2 = 'mộng hoa lục'").show()

In [None]:
all_search_text = df.select('most_search_t6').union(df.select('most_search_t7')) \
    .distinct() \
    .collect() 
    
all_search_text = [row[0] for row in all_search_text]

In [None]:
all_search_text[:5]

In [None]:
len(all_search_text)

# Đọc log search

- Dữ liệu log_search có data 2 tháng, tháng 6 & 7 (14 ngày đầu tháng 6 & 14 ngày đầu tháng 7)

In [None]:
# import os

# parent_folder = 'E:/Dataset/log_search'
# os.listdir(parent_folder)[:5]

In [4]:
parent_folder = 'E:/Dataset/log_search'
df = spark.read.parquet(f"{parent_folder}/*/*.parquet") \
    .withColumn('date', to_date(col('datetime')))

df.show()

+--------------------+--------------------+--------+--------------------+--------+---------+-------------------+-----------+------+--------------------+----------+
|             eventID|            datetime| user_id|             keyword|category|proxy_isp|           platform|networkType|action|        userPlansMap|      date|
+--------------------+--------------------+--------+--------------------+--------+---------+-------------------+-----------+------+--------------------+----------+
|750fef93-60c8-402...|2022-07-13 20:00:...|06229349|ban ket 2 u19 tha...|   enter|     vnpt|            android|       wifi|search|                  []|2022-07-13|
|523ff747-8f6b-426...|2022-07-13 20:00:...|    NULL|phim khuc vu thie...|   enter|      fpt|            android|       wifi|search|                NULL|2022-07-13|
|f4fe6d50-4153-4ae...|2022-07-13 20:00:...|06189742|https://xembd.org...|   enter|      fpt|            android|       wifi|search|                  []|2022-07-13|
|d39414ca-f816-4

In [None]:
df.select('action').distinct().show()

In [None]:
parent_folder = 'E:/Dataset/log_search'
file_name = f"{parent_folder}/20220601/*.parquet"
df = spark.read.parquet(file_name)

df.show(5, truncate=False)

In [None]:
df.printSchema()

In [None]:
df.where('datetime is null').count()

In [None]:
df_pre = df.withColumn('datetime_new', to_timestamp(col('datetime')))
df_pre.where('datetime_new is null').select('datetime', 'datetime_new').show(truncate=False)

In [None]:
df_pre.select('datetime', 'datetime_new').show(5,truncate=False)

In [None]:
df.select('platform').distinct().show()

- Field giá trị là keyword và datetime, có thể là cả platform, đại diện cho "user search keyword gì vào thời gian nào trên platform nào"

In [None]:
# find most search trong ngày
df.count()

In [None]:
df_count_keyword = df.where("user_id is not null AND keyword is not null") \
    .groupBy('user_id', 'keyword') \
    .count() \
    .orderBy(col('count').desc()) 
    
df_count_keyword.show()

In [None]:
df_count_keyword.createOrReplaceTempView('keyword_count')

df_most_search = spark.sql(f"""
    select user_id, keyword as most_search, frequency
    from (
        select user_id, keyword, count as frequency,
            row_number() over (partition by user_id order by count desc) as rnk
        from keyword_count
    ) t
    where rnk = 1
    order by frequency desc
""")

df_most_search.show()

spark.catalog.dropTempView("keyword_count")

In [None]:
df_most_search.createOrReplaceTempView('most_search')

df_top_users = spark.sql(f"""
    select distinct(user_id), frequency
    from most_search
    order by frequency desc
    limit 1000
""")

df_top_users.show()

spark.catalog.dropTempView("most_search")

In [None]:
df_top_most_search = df_most_search.join(df_top_users, ['user_id'])
df_top_most_search.count()

- Classify search text into categories

In [None]:
from sentence_transformers import SentenceTransformer
from pyvi.ViTokenizer import tokenize
from sklearn.cluster import KMeans
from sklearn.decomposition import PCA
import numpy as np
import pandas as pd
import plotly.express as px

In [None]:
model = SentenceTransformer('VoVanPhuc/sup-SimCSE-VietNamese-phobert-base')

In [None]:
list_keywords = [row[0] for row in df_count_keyword.select('keyword').collect()]
list_keywords

In [None]:
categories = {
    'Animation': ['DOREMON', 'MINECRAFT', 'BOBOIBOY', 'fairy tail', 'BORUTO', 'NARUTO', 'INUYASA', 'TSUBASA', 'FAIRY', 'LBX'],
    'Music': ['ZUMBA', 'HOOWOO', 'KADAOKE', 'EDM', 'KARAOKE'],
    'Entertainment': ['JOJO', 'mr. queen', 'GONJIAM', 'cuộc chiến thượng lưu', 'SIXT', 'CID', 'SVSV388', 'DAD NEEG'],
    'TV Shows/Movies': ['paw', 'MAIKA', 'ANNE', 'bác sĩ ma', 'BAKI', 'boku no hero academia (season 2)', 'tìm mẹ'],
    'Sport': ['COIDABANHTRUCTIEP', 'TRUCTIEPBONGDAHOMNAY'],
    'Others': ['FPT', 'LUFF', 'DMSS', 'HOPE', 'hậu duệ mặt trời', 'HAOLAM', 'SCTV9', 'kênh vtv6', 'YUON', 'KPM']
}

# Classify similar search text into the same category

-> Impossible, take too long to run (1hr for a day of search)

- Ref: https://huggingface.co/VoVanPhuc/sup-SimCSE-VietNamese-phobert-base
- Example notebook: https://colab.research.google.com/drive/12__EXJoQYHe9nhi4aXLTf9idtXT8yr7H?usp=sharing#scrollTo=5Dv9v66PwTLR

- User có thể search khác keyword nhưng chúng có thể đồng nghĩa nhau. VD: 'trữ tình' & 'tru tinh', 'tình yêu' & 'yêu đương'. Để giải quyết, 

In [None]:
from sentence_transformers import SentenceTransformer
from pyvi.ViTokenizer import tokenize
from sklearn.cluster import KMeans
from sklearn.decomposition import PCA
import numpy as np
import pandas as pd
import plotly.express as px

def clustering(embeddings, num_clusters):
    model = KMeans(n_clusters=num_clusters)
    model.fit(embeddings)
    return model.labels_

model = SentenceTransformer('VoVanPhuc/sup-SimCSE-VietNamese-phobert-base')
sentences = all_search_text
n_categories = 10

# sentences = ['tây hành kỷ',
#             'công chúa aurora',
#           'nhiếp viễn',
#           'taxi, em tên gì?',
#           'nam cung phu nhân',
#           'unforgettable love',
#           ]

sentences_tokenizer = [tokenize(sentence) for sentence in sentences]

embeddings = model.encode(sentences_tokenizer)

cluster_labels = clustering(embeddings, n_categories).astype(str)

print(f"cluster_labels: {cluster_labels}")

In [None]:
# sentences = sentences[:6]
# cluster_labels = list(cluster_labels)

In [None]:
import csv

result_dict = {key: value for key, value in zip(sentences, cluster_labels)}

csv_file_path = "../temp/map_search_category.csv"

# Open the CSV file in write mode
with open(csv_file_path, "w+", newline="") as csv_file:
    # Create a CSV writer object
    csv_writer = csv.writer(csv_file)
    
    # Write the header row
    csv_writer.writerow(["text", "label"])
    
    # Write the key-value pairs from the dictionary
    for key, value in result_dict.items():
        csv_writer.writerow([key, value])

print(f"Result saved to {csv_file_path}")

In [None]:
from collections import Counter

Counter(cluster_labels)

# Create date dimension

In [None]:
from calendar import monthcalendar
from datetime import datetime, timedelta
from datetime import date
import holidays

def date_range(start_date, end_date):
    start_date = datetime.strptime(start_date, '%Y-%m-%d')
    end_date = datetime.strptime(end_date, '%Y-%m-%d')
    delta = timedelta(days=1)
    dates_list = []
    while start_date <= end_date:
        dates_list.append(start_date.strftime('%Y-%m-%d'))
        start_date += delta
    return dates_list

start_date = '2015-01-01'
end_date = '2024-12-31'

date_list = date_range(start_date, end_date)
vn_holidays = holidays.VN()  # this is a dict

def is_holiday(date):
    return date in vn_holidays

def get_holiday_name(date):
    return vn_holidays.get(date)

def get_week_of_month(year, month, day):
    return next(
        (
            week_number
            for week_number, days_of_week in enumerate(monthcalendar(year, month), start=1)
            if day in days_of_week
        ),
        None,
    )
    
udf_is_holiday = udf(is_holiday, BooleanType())
udf_get_week_of_month = udf(get_week_of_month)
udf_get_holiday_name = udf(get_holiday_name, StringType())

df = spark.createDataFrame([(date,) for date in date_list], ['date']) \
    .withColumn('date', to_date(col('date'), 'yyyy-MM-dd')) \
    .withColumn("is_holiday", udf_is_holiday("date")) \
    .withColumn("quarter", quarter("date")) \
    .withColumn("year", date_format("date", "yyyy")) \
    .withColumn("date_key", date_format("date", "yyyyMMdd")) \
    .withColumn("week_of_month", udf_get_week_of_month(year(col('date')), month(col('date')), dayofmonth(col('date')))) \
    .withColumn('holiday_name', when(col('is_holiday') == True, udf_get_holiday_name(col('date'))).otherwise(lit('work day')))
    

df.show()