In [None]:
%%spark
from ua_parser import user_agent_parser
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.functions import expr, pandas_udf
from pyspark.sql.types import StringType
import pandas as pd
import time

## *Tools

In [None]:
def Estimate_df_size(df):
    df.cache().foreach(lambda x: x)
    catalyst_plan = df._jdf.queryExecution().logical()
    size_in_byte = spark._jsparkSession.sessionState().executePlan(catalyst_plan).optimizedPlan().stats().sizeInBytes()
    print(size_in_byte / pow(1024, 3),"GB")

# 1. Organized TSLOG from PV log

In [None]:
## list of url (exact) or url pattern (regex)
find_list=["https://www.example.com/find/", "https?://find.example.com/"]
item_list=["https://www.example.com/item/show"]
home_list=["https://www.example.com/", "http://www.example.com/"]
mail_list=["https://deref-mail.com", "https://mail","http://mail", "http://webmail"]
social_list=["facebook.com", "youtube.com", "instagram.com"]

## Setting
url_regex='https?:\/\/(?:www\.)?'

In [None]:
# Setting
def classify_url_type(col_name, include_extended=False):
    """
    Classify URL column into types (find/item/home/etc.)

    Args:
        col_name: Name of column to classify
        include_extended: If True, include social/mail/referral classifications
    """
    base_expr = F.when(F.col(col_name).rlike("|".join(find_list)), 'find')\
        .when(F.col(col_name).rlike("|".join(item_list)), 'item')\
        .when(F.col(col_name).isin(home_list), 'home')

    if include_extended:
        base_expr = base_expr\
            .when((F.col(col_name).rlike("|".join(social_list)) & (F.col('source').rlike('utm_source'))), 'social_paid')\
            .when((F.col(col_name).rlike("|".join(social_list)) & ~(F.col('source').rlike('utm_source'))), 'social')\
            .when((F.col(col_name).rlike("|".join(mail_list)) | (F.col('source').rlike('utm_medium=email'))), 'mail')\
            .when(F.col(col_name).rlike(url_regex), 'referral')

    return base_expr.otherwise('unknown')

@pandas_udf(StringType())
def parse_os(ua: pd.Series) -> pd.Series:
    return ua.apply(lambda x: user_agent_parser.ParseOS(x).get('family'))
@pandas_udf(StringType())
def parse_browser(ua: pd.Series) -> pd.Series:
    return ua.apply(lambda x: user_agent_parser.ParseUserAgent(x).get('family'))

window=Window.partitionBy('TSID').orderBy('time')
window_cumulative = window.rangeBetween(Window.unboundedPreceding, 0)

date_list = [f'{date:%Y%m%d}' for date in pd.date_range(start='2023-08-07', end='2023-08-07', freq='D')]


# Start Loop
s0 = time.time()
for date_str in date_list:

    # (1) Read Data
    try:
        ts_log = spark.read.text('/path/to/log/{}/*.log.gz'.format(date_str))
        df_ts = ts_log.rdd.map(lambda line:line.value.split('<t@s>')).toDF(["time","IP","request","status","user_agent","source","RC","CID"])
        print(f'Read Date {date_str} Success')
    except Exception as e:
        print(f'Read Date {date_str} Fail: {e}')
        continue

    # (2) Process
    df_ts = df_ts.select(
    '*',
    F.regexp_extract("request",'(&type=)(\w+)',2).alias("type"),
    F.regexp_extract("request",'(&_ts_id=)([0-9A-F.]+)',2).alias("TSID"),
    F.regexp_extract("request",'(&ref=)([^&]+)',2).alias("referrer"),
    F.regexp_extract("request",'(&ts_set=)(\w+)',2).alias("ts_set"))
    df_ts = df_ts.withColumn('referrer', expr("java_method('java.net.URLDecoder', 'decode', referrer, 'UTF-8')"))

    # device - OS, device
    df_ts=df_ts.withColumn('OS',parse_os(F.col('user_agent')))\
        .withColumn('ua_browser',parse_browser(F.col('user_agent')))\
        .withColumn('device', F.when(F.col('user_agent').like('%IOS%'),'app_ios')\
                        .when(F.col('user_agent').like('%Android%'),'app_android')\
                        .when(F.col('OS').isin(['iOS','Android']),'mobile')\
                        .otherwise('pc'))

    # (3) Clean
    df_ts = df_ts.filter('type="pv"')\
             .withColumn('s_type', classify_url_type('source', include_extended=False))\
             .withColumn('r_type', classify_url_type('referrer', include_extended=True))

    use_col = ['TSID', 'CID', 'time', 'IP', 'OS', 'ua_browser', 'device',
              'status', 'source', 's_type', 'ts_set', 'referrer', 'r_type']
    df_ts = df_ts.select(use_col)

    # (4) session variable
    df_ts = df_ts.withColumn('time',F.to_timestamp('time'))\
                 .withColumn('lead_time', F.lead('time').over(window))\
                 .withColumn('lag_time', F.lag('time').over(window))\
                 .withColumn('time_spent', F.col('lead_time').cast('long')-F.col('time').cast('long'))\
                 .withColumn('exit_page',F.when((F.col('time_spent')>=1800) | (F.col('time_spent').isNull()), F.lit(1)).otherwise(0))\
                 .withColumn('lag_exit_page', F.coalesce(F.lag('exit_page').over(window), F.lit(0)))\
                 .withColumn('session_id', F.sum('lag_exit_page').over(window_cumulative) + 1)\
                 .withColumn('time_spent', F.when(F.col('exit_page')==1, None).otherwise(F.col('time_spent')))\
                 .drop('lead_time','lag_time','lag_exit_page')

    # (5) Save
    df_ts.repartition(50).write.parquet(f'/path/to/output/pv_{date_str}',mode='overwrite')
    print(f'Done: {time.time()-s0:.2f}s')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Read Date 20230807 Success
Done 604.8529586791992

In [None]:
df_final.columns
Estimate_df_size(df_final)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2.5624896558001637 GB

## Try 1 day

In [None]:
pv_example = spark.read.parquet("/path/to/log/pv_202304*")

pv_example.select("r_type").distinct().show()
pv_example.groupBy("r_type").count().show()

pv_example = pv_example.filter(F.col("referrer").isNotNull() & (F.col("referrer") != ""))
row_count = pv_example.count()
df = pv_example.groupBy("referrer").agg(F.count("referrer").alias("count"),
                                       F.first("r_type").alias("type"))\
              .withColumn("percent", F.round(F.col("count")/row_count * 100, 2))\
              .orderBy(F.col("count").desc())

# Increase column width for display
spark.conf.set("spark.sql.repl.eagerEval.enabled", True)
spark.conf.set("spark.sql.repl.eagerEval.maxNumRows", 1000)

#df.show(30, truncate=False)

In [None]:
# SYNTAX: F.regexp_extract(column, pattern, group_index)
# - pattern: regex with capture groups ()
# - group_index: 0 = entire match, 1 = first (), 2 = second (), etc.

# EXAMPLE 1: Extract value after parameter name
# r"_content=([^&]*)", 1
# _content=     → match literal text
# ([^&]*)       → group 1: capture everything until "&" (zero or more chars)
# index 1       → return group 1
# "page?_content=electronics&other" → "electronics"

# EXAMPLE 2: Extract second capture group
# r'(q=)([^&]+)', 2
# (q=)          → group 1: match "q="
# ([^&]+)       → group 2: capture everything until "&" (one or more chars)
# index 2       → return group 2
# "search?q=laptop&sort=price" → "laptop"

# EXAMPLE 3: Extract entire pattern match
# r'(\d+)_(\d+)', 0
# (\d+)         → group 1: one or more digits
# _             → literal underscore
# (\d+)         → group 2: one or more digits
# index 0       → return entire match (not individual groups)
# "item_123_456_end" → "123_456"

# EXAMPLE 4: Extract digits after literal "?"
# r'show\?(\d+)', 1
# show          → match literal "show"
# \?            → match literal "?" (escaped)
# (\d+)         → group 1: one or more digits
# index 1       → return group 1
# "show?12345&other" → "12345"

# COMMON PATTERNS:
# \d     → digit          [^&]   → not "&"
# +      → one or more    *      → zero or more
# \?     → literal "?"    ()     → capture group


# Extract & decode query: "http://site.com?q=hello%20world" → "q=hello world"
# urlparse_udf = udf(lambda x: parse.unquote(parse.urlparse(x).query), StringType())

# Decode URL encoding: "hello%20world" → "hello world"
# unquote_udf = udf(lambda x: parse.unquote(x), StringType())

# Create empty JSON object: {}
# .withColumn('COL', F.to_json(F.struct()))

# Merge columns to JSON: cols(a="123", b=456) → {"a":"123","a":3}
# .withColumn('COL', F.to_json(F.struct("a", "b")))

# Broadcast join small table to avoid shuffle (use when right table <10MB)
# Data = df.join(F.broadcast(Data), on=['COL'], how='inner')