In [1]:
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pathlib import Path
from datetime import datetime, timedelta
import argparse

from pyspark.sql import DataFrame, Window

In [2]:
import pandas as pd
from IPython.display import display

def show_pd(df, limit=10, truncate=False):
    pandas_df = df.limit(limit).toPandas()

    if not truncate:
        with pd.option_context(
            'display.max_rows', None,        # IMPORTANTE: Exibe todas as linhas do DF (não esconde o meio)
            'display.max_columns', None,     # Exibe todas as colunas (não esconde colunas do meio)
            'display.max_colwidth', None,    # Exibe todo o texto da célula (não corta strings longas)
            'display.max_seq_items', None    # IMPORTANTE: Exibe todos os itens se a célula tiver uma lista/array
        ):
            display(pandas_df)
    else:
        display(pandas_df)

In [3]:
def make_spark(
        memory_storage_fraction: float = 0.2,
) -> SparkSession:
    return (
        SparkSession.builder
        .appName("spark")
        .master("local[*]")
        .config("spark.driver.memory", "112g")
        .config("spark.sql.shuffle.partitions", 200)
        .config("spark.default.parallelism", 200)
        .config("spark.memory.storageFraction", memory_storage_fraction)
        .config("spark.sql.ansi.enabled", "false")
        .config("spark.sql.adaptive.enabled", "true")
        .config("spark.sql.adaptive.coalescePartitions.enabled", "true")
        .config("spark.sql.adaptive.skewJoin.enabled", "true")
        .config("spark.sql.adaptive.advisoryPartitionSizeInBytes", "256m")
        .config("spark.sql.files.maxPartitionBytes", "256m")
        .config("spark.sql.files.openCostInBytes", "8m")
        .config("spark.shuffle.manager", "sort")
        .config("spark.sql.autoBroadcastJoinThreshold", "512m")# isso tem de ficar em um valor bem baixo talvez algo proximo a 10m
        .config("spark.sql.parquet.filterPushdown", "true")
        .config("spark.sql.parquet.enableVectorizedReader", "true")
        .config("spark.sql.execution.arrow.pyspark.enabled", "true") # isso é um teste para deixar o toPandas mais rapido
        .config("spark.driver.bindAddress", "127.0.0.1")
        .config("spark.driver.host", "127.0.0.1")
        .config("spark.driver.extraJavaOptions", "-XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:+ExitOnOutOfMemoryError")
    ).getOrCreate()

In [4]:
def log(message: str):
    """Print timestamped log message"""
    timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    print(f"[{timestamp}] {message}")

In [5]:
spark = make_spark()

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
26/01/06 17:16:45 WARN Utils: Your hostname, DSLSERVER00, resolves to a loopback address: 127.0.1.1; using 172.20.72.17 instead (on interface eno1)
26/01/06 17:16:45 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
26/01/06 17:16:46 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [21]:
EVENTS_PROCESSED_PATH="/home/hygo2025/Documents/data/processed_data/events"
LISTINGS_PROCESSED_PATH="/home/hygo2025/Documents/data/processed_data/listings"
SESSIONS_PROCESSED_PATH="/home/hygo2025/Documents/data/processed_data/sessions"

In [7]:
from pyspark.sql.types import IntegerType


def load_events_data(n_days: int, start_date: str, input_path: str) -> DataFrame:
    """Load raw events from parquet partitions"""
    log(f"Loading {n_days} days starting from {start_date}")

    start_dt = datetime.strptime(start_date, '%Y-%m-%d')
    end_dt = start_dt + timedelta(days=n_days - 1)

    # Load parquet with partition filter
    df = spark.read.parquet(input_path)

    # Filter by date range
    df = df.filter(
        (F.col('dt') >= start_dt.strftime('%Y-%m-%d')) &
        (F.col('dt') <= end_dt.strftime('%Y-%m-%d'))
    )
    total_events = df.count()
    log(f"Loaded {total_events:_} events")

    df = df.filter((F.col("business_type") == "SALE"))
    total_events = df.count()
    log(f"Loaded filter business_type {total_events:_} events")

    return df

def filter_interaction_events(df: DataFrame) -> DataFrame:
    """Keep only real user-item interactions (exclude RankingRendered)"""
    log("Filtering interaction events (excluding RankingRendered,GalleryClicked)")

    # Keep events that represent actual user interest
    interaction_types = [
        'ListingRendered',      # User viewed listing detail
        #'RankingRendered',       # User viewed listing in ranking
        #'GalleryClicked',       # User clicked on gallery/image
        #'RankingClicked',       # User clicked item in ranking
        #'LeadPanelClicked',     # User clicked contact panel
        #'LeadClicked',          # User initiated contact
        #'FavoriteClicked',      # User favorited item
        #'ShareClicked',         # User shared item
    ]

    df_filtered = df.filter(F.col('event_type').isin(interaction_types))

    total_before = df.count()
    total_after = df_filtered.count()
    log(f"Kept {total_after:_} interaction events ({total_after/total_before*100:.2f}%)")

    return df_filtered

def create_sessions(df: DataFrame) -> DataFrame:
    """Create session-based interaction sequences"""
    log("Creating session sequences")

    # Select and rename columns for RecBole compatibility
    sessions = df.select(
        F.col('session_id').alias('session_id'),
        F.col('unique_user_id').alias('user_id'),
        F.col('listing_id').cast(IntegerType()).alias('item_id'),
        F.col('event_ts').alias('timestamp'),
        F.col('event_type').alias('event_type')
    )

    # Add row number within each session (ordered by timestamp)
    window_spec = Window.partitionBy('session_id').orderBy('timestamp')
    sessions = sessions.withColumn('position', F.row_number().over(window_spec))

    unique_sessions = sessions.select('session_id').distinct().count()
    log(f"Created {unique_sessions:,} unique sessions")

    return sessions

def filter_sessions(df: DataFrame, min_session_len: int, max_session_len: int) -> DataFrame:
    """Filter sessions by length (2-50 interactions)"""
    log(f"Filtering sessions: {min_session_len}-{max_session_len} interactions")

    # Count session sizes
    session_sizes = df.groupBy('session_id').agg(
        F.count('*').alias('session_size')
    )

    # Filter valid sessions
    valid_sessions = session_sizes.filter(
        (F.col('session_size') >= min_session_len) &
        (F.col('session_size') <= max_session_len)
    )

    # Join back to keep only valid sessions
    df_filtered = df.join(
        valid_sessions.select('session_id'),
        on='session_id',
        how='inner'
    )

    sessions_before = session_sizes.count()
    sessions_after = valid_sessions.count()
    events_before = df.count()
    events_after = df_filtered.count()

    log(f"Sessions: {sessions_before:,} → {sessions_after:,}")
    log(f"Events: {events_before:,} → {events_after:,}")

    return df_filtered

def filter_rare_items(df: DataFrame, min_item_support: int) -> DataFrame:
    """Remove items with fewer than min_item_support occurrences"""
    log(f"Filtering items with <{min_item_support} occurrences")

    # Count item occurrences
    item_counts = df.groupBy('item_id').agg(
        F.count('*').alias('item_count')
    )

    # Filter valid items
    valid_items = item_counts.filter(
        F.col('item_count') >= min_item_support
    )

    # Join back to keep only valid items
    df_filtered = df.join(
        valid_items.select('item_id'),
        on='item_id',
        how='inner'
    )

    items_before = item_counts.count()
    items_after = valid_items.count()
    events_before = df.count()
    events_after = df_filtered.count()

    log(f"Items: {items_before:,} → {items_after:,}")
    log(f"Events: {events_before:,} → {events_after:,}")

    return df_filtered

In [8]:
listings = spark.read.option("mergeSchema", "true").parquet(LISTINGS_PROCESSED_PATH)
listings_before = listings.count()


listings = listings.filter(
    (F.col("city") == "Vitória") | (F.col("city") == "Serra") | (F.col("city") == "Vila Velha") | (F.col("city") == "Cariacica") | (F.col("city") == "Viana") | (F.col("city") == "Guarapari") | (F.col("city") == "Fundão")
)

listings_after = listings.count()

log(f"Loaded {listings_before:_} listings")
log(f"Filtered to {listings_after:_} listings in the target cities")

events = load_events_data(n_days = 30, start_date="2024-05-01", input_path=EVENTS_PROCESSED_PATH)
events_before = events.count()

events = events.join(
    listings,
    events.listing_id == listings.listing_id_numeric,
    "left_semi"
)

events_after = events.count()
log(f"Filtered events from {events_before:_}")
log(f"to {events_after:_} after joining with listings in target cities")

events = filter_interaction_events(events)
events = create_sessions(events)
events = filter_sessions(events, min_session_len=2, max_session_len=50)
events = filter_rare_items(events, min_item_support = 5)
events = filter_sessions(events, min_session_len=2, max_session_len=50)

[2026-01-06 17:16:51] Loaded 506_073 listings
[2026-01-06 17:16:51] Filtered to 187_713 listings in the target cities
[2026-01-06 17:16:51] Loading 30 days starting from 2024-05-01
[2026-01-06 17:16:52] Loaded 168_625_527 events


                                                                                

[2026-01-06 17:16:53] Loaded filter business_type 103_853_497 events


                                                                                

[2026-01-06 17:16:55] Filtered events from 103_853_497
[2026-01-06 17:16:55] to 35_333_238 after joining with listings in target cities
[2026-01-06 17:16:55] Filtering interaction events (excluding RankingRendered,GalleryClicked)


                                                                                

[2026-01-06 17:16:57] Kept 790_499 interaction events (2.24%)
[2026-01-06 17:16:57] Creating session sequences


                                                                                

[2026-01-06 17:17:00] Created 228,366 unique sessions
[2026-01-06 17:17:00] Filtering sessions: 2-50 interactions


                                                                                

[2026-01-06 17:17:10] Sessions: 228,366 → 123,323
[2026-01-06 17:17:10] Events: 790,499 → 665,741
[2026-01-06 17:17:10] Filtering items with <5 occurrences


                                                                                

[2026-01-06 17:17:27] Items: 54,373 → 28,910
[2026-01-06 17:17:27] Events: 665,741 → 612,374
[2026-01-06 17:17:27] Filtering sessions: 2-50 interactions


                                                                                

[2026-01-06 17:17:43] Sessions: 122,214 → 118,035
[2026-01-06 17:17:43] Events: 612,374 → 608,195


In [9]:
events.printSchema()

root
 |-- session_id: string (nullable = true)
 |-- item_id: integer (nullable = true)
 |-- user_id: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- event_type: string (nullable = true)
 |-- position: integer (nullable = false)



In [10]:
show_pd(events)

                                                                                

Unnamed: 0,session_id,item_id,user_id,timestamp,event_type,position
0,S_6905781,141203,A_6386093,2024-05-05 15:19:11.721,ListingRendered,3
1,S_6905781,438998,A_6386093,2024-05-05 15:15:51.166,ListingRendered,2
2,S_6905781,263750,A_6386093,2024-05-05 15:15:04.775,ListingRendered,1
3,S_8128600,227256,A_2713322,2024-05-20 08:16:02.288,ListingRendered,6
4,S_8128600,239621,A_2713322,2024-05-20 08:29:01.945,ListingRendered,16
5,S_8128600,239621,A_2713322,2024-05-20 08:27:36.457,ListingRendered,15
6,S_8128600,222308,A_2713322,2024-05-20 08:35:18.380,ListingRendered,28
7,S_8128600,368231,A_2713322,2024-05-20 08:27:13.998,ListingRendered,13
8,S_8128600,170873,A_2713322,2024-05-20 08:31:44.424,ListingRendered,23
9,S_8128600,283815,A_2713322,2024-05-20 08:12:02.643,ListingRendered,1


In [16]:
events.filter(F.col("session_id") == "S_8128600").count()

                                                                                

36

In [17]:
show_pd(events.filter(F.col("session_id") == "S_8128600").sort(F.col("timestamp")), limit=200)

                                                                                

Unnamed: 0,session_id,item_id,user_id,timestamp,event_type,position
0,S_8128600,283815,A_2713322,2024-05-20 08:12:02.643,ListingRendered,1
1,S_8128600,388377,A_2713322,2024-05-20 08:12:07.726,ListingRendered,2
2,S_8128600,92745,A_2713322,2024-05-20 08:12:31.283,ListingRendered,3
3,S_8128600,92745,A_2713322,2024-05-20 08:13:22.763,ListingRendered,4
4,S_8128600,92745,A_2713322,2024-05-20 08:13:54.324,ListingRendered,5
5,S_8128600,227256,A_2713322,2024-05-20 08:16:02.288,ListingRendered,6
6,S_8128600,353791,A_2713322,2024-05-20 08:18:03.051,ListingRendered,7
7,S_8128600,383741,A_2713322,2024-05-20 08:18:27.603,ListingRendered,8
8,S_8128600,314339,A_2713322,2024-05-20 08:18:59.056,ListingRendered,9
9,S_8128600,314339,A_2713322,2024-05-20 08:20:22.673,ListingRendered,10


In [19]:
events.count()

                                                                                

608195

In [22]:
events.coalesce(1).write.mode('overwrite').parquet(SESSIONS_PROCESSED_PATH)

                                                                                