In [None]:
!gcloud auth application-default login --no-browser

In [None]:
# Import required libraries
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *
import logging
import os
from datetime import datetime, timedelta

In [None]:
import torch
import gc

# Clear any existing GPU memory
torch.cuda.empty_cache()
gc.collect()

# Set PyTorch memory allocation strategy
import os
os.environ['PYTORCH_CUDA_ALLOC_CONF'] = 'expandable_segments:True'

In [None]:
# import os

# credentials_path = '/content/.config/application_default_credentials.json'
# os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = credentials_path
# Set Java home explicitly
# os.environ['JAVA_HOME'] = '/usr/lib/jvm/java-17-openjdk-amd64'  # Adjust path as needed
spark = SparkSession.builder \
    .appName("generate_ext_user_engagement_dataset") \
    .config(
        "spark.jars.packages",
        "com.google.cloud.spark:spark-bigquery-with-dependencies_2.13:0.43.1,"
        "com.google.cloud.bigdataoss:gcs-connector:hadoop3-2.2.30"
    ) \
    .config("spark.hadoop.fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem") \
    .config("spark.hadoop.google.cloud.auth.application_default_credentials.enabled", "true") \
    .config("spark.driver.memory", "100g") \
    .config("spark.driver.memoryOverhead", "20g") \
    .config("spark.executor.memory", "100g") \
    .config("spark.executor.memoryOverhead", "20g") \
    .config("spark.executor.cores", "12") \
    .config("spark.executor.instances", "2") \
    .config("spark.driver.maxResultSize", "20g") \
    .config("spark.default.parallelism", "96") \
    .config("spark.sql.shuffle.partitions", "96") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .config("spark.sql.autoBroadcastJoinThreshold", "-1") \
    .config("spark.local.dir", "/content/spark-temp") \
    .config("hadoop.tmp.dir", "/content/hadoop-tmp") \
    .config("spark.sql.warehouse.dir", "/content/spark-warehouse") \
    .config("spark.kryoserializer.buffer.max", "1024m") \
    .config("spark.driver.extraJavaOptions", "-Djava.io.tmpdir=/content/tmp") \
    .config("spark.executor.extraJavaOptions", "-Djava.io.tmpdir=/content/tmp") \
    .getOrCreate()

print("Spark Session initialized successfully!")
print(f"Spark Version: {spark.version}")

# Configuration
PROJECT_ID = "honey-production"  # Replace with your GCP project ID
DATASET = "sdata_events_partitioned"       # Replace with your dataset name

# Date parameters (modify as needed)
ANALYSIS_START_DATE = "2025-12-08"
ANALYSIS_END_DATE = "2025-12-08"

print(f"Querying data from {ANALYSIS_START_DATE} to {ANALYSIS_END_DATE}")

In [None]:
# Create temp directories on local SSD
import os
os.makedirs("/content/spark-temp", exist_ok=True)
os.makedirs("/content/spark-warehouse", exist_ok=True)
os.makedirs("/content/hadoop-tmp", exist_ok=True)

In [None]:
page_detected_sql_query = f"""
SELECT distinct
    id,
    ts,
    timestamp,
    store.id AS storeid,
    store.name AS storename,
    store.country AS storecountry,
    store.session_id AS storesessionid,
    extension.screenview_id AS screenviewid,
    page_title,
    page_type,
    user_id,
    session_id,
    referrer_url
FROM `{PROJECT_ID}.{DATASET}.ext_page_detected`
WHERE timestamp IS NOT NULL
AND DATE(timestamp) >= '{ANALYSIS_START_DATE}'
AND DATE(timestamp) <= '{ANALYSIS_END_DATE}'
ORDER BY session_id DESC, user_id DESC, ts DESC
"""
# Execute query and create DataFrame
print("Executing BigQuery SQL...")
page_detected_df = spark.read \
    .format("bigquery") \
    .option("query", page_detected_sql_query) \
    .option("project", PROJECT_ID) \
    .option("viewsEnabled", "true") \
    .load()
    # .option("materializationProject", PROJECT_ID) \
    # .option("materializationDataset", "new_temp_materialization") \
    # .option("dataset", "US") \
    # .load()
# print(f"\n=== SAMPLE DATA (First 50 rows) ===")
# page_detected_df.show(50, truncate=False)

In [None]:
import re
def extract_shein_sc_query_term(search_string):
    """
    Extract the query term from sc=<queryterm> in the search string.
    """
    if not search_string:
        return None

    # Pattern: sc= followed by anything until the next backtick or end of string
    match = re.search(r'sc=([^`]+)', search_string)
    if match:
        return match.group(1)
    return None

In [None]:
from urllib.parse import urlparse, parse_qs
import pandas as pd

def extract_search_query(referrer_url):
    """
    Extract the search query term from a referrer URL based on known key variants for different merchants.
    :param referrer_url: Referrer URL string.
    :return: Extracted search query term or None if no match is found.
    """
    # Ensure referrer_url is a string; handle NaN values which are floats
    if pd.isna(referrer_url):
        return None

    # List of key variants used by merchants to denote search query terms
    search_query_keys = [
        'keywords',                # Amazon
        '_skw',                    # eBay
        'src_identifier',          # Shein
        'ga_search_query',         # Etsy
        'search_key',              # Temu
        'searchTerm',              # Lowe's
        'searchText',              # Old Navy
        'q',                       # Etsy, Walmart, REI
        'kw',                      # StubHub
        'keyword',                 # Costco, eBay
        'search'                  # BestBuy, Menards, ULTA Beauty
    ]

    # Parse the referrer URL to extract the query string parameters
    parsed_url = urlparse(referrer_url)
    query_params = parse_qs(parsed_url.query)

    # Check for search query keys in the query parameters
    for key in search_query_keys:
        if key in query_params:
            # Return the first matching value for the key
            sanitized_query_term = query_params[key][0]
            if sanitized_query_term:
                sanitized_query_term = sanitized_query_term.strip()
                if sanitized_query_term.startswith('"') and sanitized_query_term.endswith('"'):
                    sanitized_query_term = sanitized_query_term[1:-1]
                sanitized_query_term = sanitized_query_term.replace('+', ' ')
                sanitized_query_term = re.sub(r'\s+', ' ', sanitized_query_term)
                sanitized_query_term = sanitized_query_term.lower()
                sanitized_query_term = sanitized_query_term.strip()
                #Do a SHEIN specific Search query term extraction
                if key == 'src_identifier':
                    if 'src_module' in query_params and query_params['src_module'][0] == 'search':
                        sanitized_query_term = extract_shein_sc_query_term(sanitized_query_term)
                    else:
                        sanitized_query_term = None
            return sanitized_query_term
            # return query_params[key][0]  # Assuming the first value is the desired search term

    # If no matching key is found, return None
    return None

In [None]:
# Step 1: Filter for specific page_types
allowed_page_types = ['UNSUPPORTED', 'PRODUCT', 'SEARCH', 'CART_PRODUCT', 'CHECKOUT_CONFIRM']
ATC_CHECKOUT_PAGE_TYPE = ['CART_PRODUCT', 'CHECKOUT_CONFIRM']
page_detected_df_filtered = page_detected_df.filter(F.col('page_type').isin(allowed_page_types))

# Register UDF
extract_search_query_udf = F.udf(extract_search_query, StringType())

# Apply transformations
page_detected_df_final = page_detected_df_filtered.withColumn('search_query_term', extract_search_query_udf(F.col('referrer_url')))

# # Show results
# page_detected_df_final.show(50, truncate=False)
# print(f"Total records after filtering: {page_detected_df_final.count()}")

# Filter rows with non-null search_query_term
filtered_click_df = page_detected_df_final.filter(F.col('search_query_term').isNotNull())

# Get unique user_id, session_id combinations from filtered_click_df
filtered_click_df_merge_fields = filtered_click_df.select('user_id', 'session_id').distinct()

# Filter rows with page_type in ATC_CHECKOUT_PAGE_TYPE
atc_checkout_df = page_detected_df_final.filter(F.col('page_type').isin(ATC_CHECKOUT_PAGE_TYPE))

# Filter rows where search_query_term is null and page_type is 'PRODUCT'
product_non_search_query_terms_df = page_detected_df_final.filter(
    F.col('search_query_term').isNull() &
    (F.col('page_type') == 'PRODUCT')
)

# Union the two dataframes
atc_checkout_df = atc_checkout_df.union(product_non_search_query_terms_df)

# Inner join and select specific columns
filtered_atc_checkout_df = atc_checkout_df.join(
    filtered_click_df_merge_fields,
    on=['user_id', 'session_id'],
    how='inner'
).select(
    'id', 'ts', 'timestamp', 'storeid', 'storename', 'storecountry',
    'storesessionid', 'screenviewid', 'page_title', 'page_type', 'user_id',
    'session_id', 'referrer_url', 'search_query_term'
).distinct()

# Final union
filtered_click_atc_checkout_df = filtered_click_df.union(filtered_atc_checkout_df)
filtered_click_atc_checkout_df = filtered_click_atc_checkout_df.withColumn(
    "is_engagement_verified",
    F.when(F.col("search_query_term").isNotNull(), 1).otherwise(0)
)

# filtered_click_atc_checkout_df.show(100, truncate=False)
# print(f"Total records after filtering for user sessions with at least one search query term: {filtered_click_atc_checkout_df.count()}")

In [None]:
atc_products_sql_query = f"""
SELECT distinct
extension.screenview_id AS screenview_id,
product.parent_id AS parentid, product.name AS productname, product.url AS producturl, product.total_price AS totalprice, product.sku AS productsku,
user_id AS userid, session_id AS sessionid
FROM `{PROJECT_ID}.{DATASET}.ext_cart_products`, UNNEST(cart.products) AS product
WHERE timestamp IS NOT NULL
AND DATE(timestamp) >= '{ANALYSIS_START_DATE}'
AND DATE(timestamp) <= '{ANALYSIS_END_DATE}'
ORDER BY session_id DESC, user_id DESC
"""
# Execute query and create DataFrame
print("Executing BigQuery SQL...")
atc_products_df = spark.read \
    .format("bigquery") \
    .option("query", atc_products_sql_query) \
    .option("project", PROJECT_ID) \
    .option("viewsEnabled", "true") \
    .load()

# print(f"\n=== SAMPLE DATA (First 20 rows) ===")
# atc_products_df.show(50, truncate=False)

# Define the join condition
join_condition = [
    filtered_click_atc_checkout_df.user_id == atc_products_df.userid,
    filtered_click_atc_checkout_df.session_id == atc_products_df.sessionid,
    filtered_click_atc_checkout_df.screenviewid == atc_products_df.screenview_id
]

# Perform the join
joined_df = filtered_click_atc_checkout_df.join(F.broadcast(atc_products_df), on=join_condition, how='left')

# Update the page_title field and add is_engagement_verified
result_df = joined_df.withColumn(
    "page_title",
    F.when(F.col("productname").isNotNull(), F.col("productname")).otherwise(F.col("page_title"))
).withColumn(
    "is_engagement_verified",
    F.when(F.col("productname").isNotNull(), 1).otherwise(F.col("is_engagement_verified"))
)

# Select fields only from `filtered_click_atc_checkout_df`
engagement_click_atc_df_filtered = result_df.select(*[F.col(field) for field in filtered_click_atc_checkout_df.columns])

# Show the final dataframe
engagement_click_atc_df_filtered.show(100, truncate=False)

In [None]:
checkout_products_sql_query = f"""
SELECT distinct
extension.screenview_id AS screenview_id,
product.parent_id AS parentid, product.name AS productname, product.url AS producturl, product.total_price AS totalprice, product.sku AS productsku,
user_id AS userid, session_id AS sessionid
FROM `{PROJECT_ID}.{DATASET}.ext_checkout_products`, UNNEST(cart.products) AS product
WHERE timestamp IS NOT NULL
AND DATE(timestamp) >= '{ANALYSIS_START_DATE}'
AND DATE(timestamp) <= '{ANALYSIS_END_DATE}'
ORDER BY session_id DESC, user_id DESC
"""
# Execute query and create DataFrame
print("Executing BigQuery SQL...")
checkout_products_df = spark.read \
    .format("bigquery") \
    .option("query", checkout_products_sql_query) \
    .option("project", PROJECT_ID) \
    .option("viewsEnabled", "true") \
    .load()

# print(f"\n=== SAMPLE DATA (First 50 rows) ===")
# checkout_products_df.show(50, truncate=False)
# Define the join condition
join_condition = [
    engagement_click_atc_df_filtered.user_id == checkout_products_df.userid,
    engagement_click_atc_df_filtered.session_id == checkout_products_df.sessionid,
    engagement_click_atc_df_filtered.screenviewid == checkout_products_df.screenview_id
]

# Perform the join
joined_df = engagement_click_atc_df_filtered.join(F.broadcast(checkout_products_df), on=join_condition, how='left')

# Update the page_title field and add is_engagement_verified
result_df = joined_df.withColumn(
    "page_title",
    F.when(F.col("productname").isNotNull(), F.col("productname")).otherwise(F.col("page_title"))
).withColumn(
    "is_engagement_verified",
    F.when(F.col("productname").isNotNull(), 1).otherwise(F.col("is_engagement_verified"))
)

# Select fields only from `filtered_click_atc_checkout_df`
all_engagement_filtered_df = result_df.select(*[F.col(field) for field in engagement_click_atc_df_filtered.columns])
all_engagement_filtered_ordered_df = all_engagement_filtered_df.orderBy(['session_id', 'user_id', 'ts'], ascending=False)
all_engagement_filtered_ordered_df_with_date = all_engagement_filtered_ordered_df.withColumn("date", F.to_date(all_engagement_filtered_ordered_df["timestamp"]))
# Show the final dataframe
all_engagement_filtered_ordered_df.show(100, truncate=False)

In [None]:
# # GCS bucket path
# gcs_path = "gs://chanderiyer/projects/honeysearchbenchmarks/datasets/user_engagement/sdata_with_search_query_extract"

# # Loop through each partition and coalesce into a single file for each date subfolder
# for date in all_engagement_filtered_ordered_df_with_date.select("date").distinct().collect():
#     single_date = date["date"]
#     # Filter by the specific date
#     filtered_df = all_engagement_filtered_ordered_df_with_date.filter(F.col("date") == single_date)
#     # Coalesce the DataFrame to ensure a single file per date subfolder
#     filtered_df.coalesce(1).write \
#         .option("header", "true") \
#         .option("sep", "\t") \
#         .mode("overwrite") \
#         .csv(f"{gcs_path}/date={single_date}")

# print(f"Data successfully written to {gcs_path} with coalesced files in each date subfolder.")

In [None]:
from pyspark.sql import Window
from pyspark.sql.functions import col, when, last, sum as spark_sum, lag, coalesce

# Define window spec partitioned by user_id and session_id, ordered by timestamp
window_spec = Window.partitionBy("user_id", "session_id").orderBy("ts")

# Step 1: Identify source rows (where search_query_term can be propagated FROM)
df = all_engagement_filtered_ordered_df_with_date.withColumn(
    "is_source",
    when(
        (col("search_query_term").isNotNull()) &
        (col("page_type").isin(['PRODUCT', 'UNSUPPORTED', 'SEARCH'])),
        1
    ).otherwise(0)
)

# Check if CHECKOUT_CONFIRM exists in the session
df = df.withColumn(
    "has_checkout_confirm",
    spark_sum(when(col("page_type") == "CHECKOUT_CONFIRM", 1).otherwise(0)).over(
        Window.partitionBy("user_id", "session_id")
    )
)

# Create stop event based on priority
df = df.withColumn(
    "is_stop_event",
    when(
        col("has_checkout_confirm") > 0,
        when(col("page_type") == "CHECKOUT_CONFIRM", 1).otherwise(0)
    ).otherwise(
        when(col("page_type") == "CART_PRODUCT", 1).otherwise(0)
    )
)


# Lag the stop event to mark the row AFTER as a boundary
df = df.withColumn(
    "prev_is_stop_event",
    lag("is_stop_event", 1, 0).over(window_spec)
)

# Step 3: Create segment_id by cumulative sum of prev_is_stop_event
df = df.withColumn(
    "segment_id",
    spark_sum("prev_is_stop_event").over(window_spec.rowsBetween(Window.unboundedPreceding, Window.currentRow))
)

# Step 4: Forward fill search_query_term within each segment
window_spec_segment = Window.partitionBy("user_id", "session_id", "segment_id").orderBy("ts")

df = df.withColumn(
    "propagated_search_query_term",
    last(when(col("is_source") == 1, col("search_query_term")), ignorenulls=True)
        .over(window_spec_segment.rowsBetween(Window.unboundedPreceding, Window.currentRow))
)

# Step 5: Apply derivation logic only to target page_types
df = df.withColumn(
    "is_search_query_term_derived",
    when(
        (col("page_type").isin(['PRODUCT', 'UNSUPPORTED', 'CART_PRODUCT', 'CHECKOUT_CONFIRM'])) &
        (col("search_query_term").isNull()) &
        (col("propagated_search_query_term").isNotNull()),
        1
    ).otherwise(0)
)

# Step 6: Update search_query_term with propagated value where derived
df = df.withColumn(
    "search_query_term",
    when(
        col("is_search_query_term_derived") == 1,
        col("propagated_search_query_term")
    ).otherwise(col("search_query_term"))
)

# Step 7: Clean up helper columns
df = df.drop("is_source", "has_checkout_confirm", "is_stop_event", "prev_is_stop_event", "segment_id", "propagated_search_query_term")

all_engagement_with_search_query_term_derived_ordered_df = df.orderBy(['session_id', 'user_id', 'ts'], ascending=False)

In [None]:
# all_engagement_with_search_query_term_derived_ordered_df.filter(col("user_id") == "300452713876600234").show()

In [None]:
# # GCS bucket path
# gcs_path = "gs://chanderiyer/projects/honeysearchbenchmarks/datasets/user_engagement/sdata_with_search_query_derived"

# # Loop through each partition and coalesce into a single file for each date subfolder
# for date in all_engagement_with_search_query_term_derived_ordered_df.select("date").distinct().collect():
#     single_date = date["date"]
#     # Filter by the specific date
#     filtered_df = all_engagement_with_search_query_term_derived_ordered_df.filter(F.col("date") == single_date)
#     # Coalesce the DataFrame to ensure a single file per date subfolder
#     filtered_df.coalesce(1).write \
#         .option("header", "true") \
#         .option("sep", "\t") \
#         .mode("overwrite") \
#         .csv(f"{gcs_path}/date={single_date}")

# print(f"Data successfully written to {gcs_path} with coalesced files in each date subfolder.")

In [None]:
all_engagement_with_search_query_term_derived_ordered_df = all_engagement_with_search_query_term_derived_ordered_df.filter(col("search_query_term").isNotNull() & (col("page_title").isNotNull()))
all_engagement_refined_ordered_df = all_engagement_with_search_query_term_derived_ordered_df.filter(F.length(F.trim(F.col("search_query_term"))) >= 2)

In [None]:
import torch
device = 'cuda' if torch.cuda.is_available() else 'cpu'
print(device)

In [None]:
from pyspark.sql import SparkSession, Window
from pyspark.sql.functions import col, lag, sum as spark_sum, when, coalesce, lit, row_number, collect_list, broadcast
from pyspark.sql.functions import pandas_udf, PandasUDFType
import pyspark.sql.functions as F
from pyspark.sql.types import StructType, StructField, StringType, ArrayType, FloatType
import pandas as pd
from typing import Iterator, Tuple
import numpy as np

# ===== STEP 1: Extract and Deduplicate Search Queries & Page Titles =====

# Extract unique search queries
unique_search_queries = all_engagement_refined_ordered_df.filter(col("search_query_term").isNotNull()) \
    .select("search_query_term") \
    .distinct() \
    .repartition(200)

# Extract unique page titles
unique_page_titles = all_engagement_refined_ordered_df.filter(col("page_title").isNotNull()) \
    .select("page_title") \
    .distinct() \
    .repartition(200)

# ===== STEP 2: Compute Embeddings with GPU =====

# Define schema for embeddings
embedding_schema = StructType([
    StructField("text", StringType(), False),
    StructField("embedding", ArrayType(FloatType()), False)
])

def compute_embeddings_gpu(iterator: Iterator[pd.DataFrame]) -> Iterator[pd.DataFrame]:
    from sentence_transformers import SentenceTransformer
    import torch
    import pandas as pd
    import gc

    device = 'cuda' if torch.cuda.is_available() else 'cpu'
    model = SentenceTransformer('all-MiniLM-L6-v2', device=device)

    # Enable memory optimization
    torch.cuda.empty_cache()

    for pdf in iterator:
        if len(pdf) == 0:
            yield pd.DataFrame(columns=['text', 'embedding'])
            continue

        texts = pdf['text'].tolist()

        # ⬅️ REDUCE batch size significantly
        embeddings = model.encode(
            texts,
            batch_size=64,  # Changed from 512 to 64
            show_progress_bar=False,
            convert_to_numpy=True,
            device=device
        )

        result = pd.DataFrame({
            'text': texts,
            'embedding': [emb.tolist() for emb in embeddings]
        })

        # Clear GPU cache after processing
        torch.cuda.empty_cache()
        gc.collect()

        yield result

# Compute search query embeddings
search_query_embeddings = unique_search_queries \
    .withColumnRenamed("search_query_term", "text") \
    .repartition(4) \
    .mapInPandas(compute_embeddings_gpu, schema=embedding_schema) \
    .withColumnRenamed("text", "search_query_term") \
    .withColumnRenamed("embedding", "search_query_embedding")

# print(f"Search query embeddings computed: {search_query_embeddings.count()}")

# Compute page title embeddings
page_title_embeddings = unique_page_titles \
    .withColumnRenamed("page_title", "text") \
    .repartition(128) \
    .mapInPandas(compute_embeddings_gpu, schema=embedding_schema) \
    .withColumnRenamed("text", "page_title") \
    .withColumnRenamed("embedding", "page_title_embedding")

# print(f"Page title embeddings computed: {page_title_embeddings.count()}")

# ===== STEP 3: Join Embeddings to Main DataFrame =====

# Repartition main dataframe for efficient joins
df_repartitioned = all_engagement_refined_ordered_df.repartition(400, "search_query_term", "page_title")

# Broadcast smaller embedding tables for join optimization
df_with_embeddings = df_repartitioned \
    .join(search_query_embeddings, on="search_query_term", how="left") \
    .join(page_title_embeddings, on="page_title", how="left")

# ===== STEP 4: Compute Cosine Similarity with GPU =====

# GPU-Accelerated Iterator-based Pandas UDF for cosine similarity
@pandas_udf(FloatType(), functionType=PandasUDFType.SCALAR_ITER)
def cosine_similarity_gpu(iterator: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
    import numpy as np
    import pandas as pd

    for emb1_series, emb2_series in iterator:
        # Handle nulls
        mask = emb1_series.notna() & emb2_series.notna()

        # Initialize result
        result = pd.Series([None] * len(emb1_series), dtype=float)

        if mask.sum() == 0:
            yield result
            continue

        # Convert to numpy arrays
        emb1_valid = np.array(emb1_series[mask].tolist())
        emb2_valid = np.array(emb2_series[mask].tolist())

        # Compute cosine similarity in batch (vectorized)
        similarities = (emb1_valid * emb2_valid).sum(axis=1) / (
            np.linalg.norm(emb1_valid, axis=1) * np.linalg.norm(emb2_valid, axis=1)
        )

        # Assign back to result
        result[mask] = similarities
        yield result

# Compute similarity scores
df_with_similarity = df_with_embeddings.withColumn(
    "similarity_score",
    cosine_similarity_gpu(col("search_query_embedding"), col("page_title_embedding"))
)

# Drop embedding columns to save memory
all_engagement_refined_ordered_final_df_with_similarity = df_with_similarity.drop("search_query_embedding", "page_title_embedding")

# print(f"Final dataframe with similarity scores: {all_engagement_refined_ordered_final_df_with_similarity.count()}")


In [None]:
all_engagement_refined_ordered_final_df_with_similarity = all_engagement_refined_ordered_final_df_with_similarity.orderBy(['session_id', 'user_id', 'ts'], ascending=False)
# all_engagement_refined_ordered_final_df_with_similarity.show(200, truncate=False)

In [None]:
all_engagement_refined_ordered_final_df_with_similarity_cached = all_engagement_refined_ordered_final_df_with_similarity.cache()

In [None]:
print(f"Total records after refining with computed similarity: {all_engagement_refined_ordered_final_df_with_similarity_cached.count()}")

In [None]:
# GCS bucket path
gcs_path = "gs://chanderiyer/projects/honeysearchbenchmarks/datasets/user_engagement/sdata_with_search_query_refined_coalesced"

# Loop through each partition and coalesce into a single file for each date subfolder
for date in all_engagement_refined_ordered_final_df_with_similarity_cached.select("date").distinct().collect():
    single_date = date["date"]
    # Filter by the specific date
    filtered_df = all_engagement_refined_ordered_final_df_with_similarity_cached.filter(F.col("date") == single_date)
    # Coalesce the DataFrame to ensure a single file per date subfolder
    filtered_df.coalesce(1).write \
        .option("header", "true") \
        .option("sep", "\t") \
        .mode("overwrite") \
        .csv(f"{gcs_path}/date={single_date}")

print(f"Data successfully written to {gcs_path} with coalesced files in each date subfolder.")