In [1]:
from pathlib import Path
from pyspark.sql import SparkSession
from pyspark.sql.functions import broadcast, col, concat_ws, lit, when
# Start Spark

"""
TODO: read from hadoop, draw some graph show the dataset

"""
"""
read in csv, use dictionary to explain values, left join 2 csvs.
"""
spark = SparkSession.builder \
    .appName("football-events-pipeline") \
    .getOrCreate()

events_path = str("events.csv")
ginf_path   = str( "ginf.csv")

events = spark.read.option("header", True).option("inferSchema", True).csv(events_path)
ginf   = spark.read.option("header", True).option("inferSchema", True).csv(ginf_path)

df = events.alias("e").join(ginf.alias("g"), on="id_odsp", how="left")
print("Joined df cols:", df.columns)

DICT_SCHEMA = "code INT, label STRING"

def read_dict(name: str):
    path = str(f"{name}.txt")
    return spark.read.option("sep", "\t").schema(DICT_SCHEMA).csv(path)

dict_event_type   = read_dict("event_type")
dict_event_type2  = read_dict("event_type2")
dict_shot_place   = read_dict("shot_place")
dict_shot_outcome = read_dict("shot_outcome")
dict_location     = read_dict("location")


def replace_with_dict(df_in, col_name, dict_df):
    if col_name not in df_in.columns:
        print(f"[skip] {col_name} not in dataframe")
        return df_in

    d = broadcast(
        dict_df.select(
            col("code").cast("string").alias(col_name),
            col("label").alias(f"{col_name}__label")
        )
    )

    out = (
        df_in.withColumn(col_name, col(col_name).cast("string"))
            .join(d, on=col_name, how="left")
            .withColumn(
                col_name,
                when(col(f"{col_name}__label").isNotNull(), col(f"{col_name}__label"))
                .otherwise(col(col_name))
            )
            .drop(f"{col_name}__label")
    )
    return out

cols_to_replace = {
    "event_type":   dict_event_type,
    "event_type2":  dict_event_type2,
    "shot_place":   dict_shot_place,
    "shot_outcome": dict_shot_outcome,
    "location":     dict_location
}

for c, ddf in cols_to_replace.items():
    df = replace_with_dict(df, c, ddf)

# df.select("id_odsp", "event_type", "event_type2", "shot_place", "shot_outcome", "location").show(5, truncate=False)     # For unknow reason, this line will cause the lost of event_type2

Joined df cols: ['id_odsp', 'id_event', 'sort_order', 'time', 'text', 'event_type', 'event_type2', 'side', 'event_team', 'opponent', 'player', 'player2', 'player_in', 'player_out', 'shot_place', 'shot_outcome', 'is_goal', 'location', 'bodypart', 'assist_method', 'situation', 'fast_break', 'link_odsp', 'adv_stats', 'date', 'league', 'season', 'country', 'ht', 'at', 'fthg', 'ftag', 'odd_h', 'odd_d', 'odd_a', 'odd_over', 'odd_under', 'odd_bts', 'odd_bts_n']


In [2]:
from pyspark.sql import functions as F

# 1) Keep only these columns
KEEP_COLS = [
    "id_odsp","sort_order","time","text","event_type","event_type2","event_team","player","player2",
    "shot_place","shot_outcome","is_goal","location","bodypart","date","league",
    "season","country","ht","at"
]
present = [c for c in KEEP_COLS if c in df.columns]
missing = [c for c in KEEP_COLS if c not in df.columns]
if missing:
    print("[warn] missing columns:", missing)

df = df.select(*present)

# Split into per-event fields and match-level metadata
EVENT_FIELDS = [
    "sort_order","time","text","event_type","event_type2","event_team","player","player2",
    "shot_place","shot_outcome","is_goal","location","bodypart"
]
MATCH_META = ["id_odsp","date","league","season","country","ht","at"]

present_event = [c for c in EVENT_FIELDS if c in df.columns]
present_meta  = [c for c in MATCH_META if c in df.columns]

# 2) One formatted line per event (easier to read in RAG)
event_line = F.concat_ws(
    " | ",
    *[F.concat_ws(": ", F.lit(c), F.col(c).cast("string")) for c in present_event if c not in ("sort_order","time")]
).alias("event_line_body")

# Include time/sort_order at the start for readability
prefix = F.concat_ws(
    " | ",
    *([F.concat_ws(": ", F.lit("time"), F.col("time").cast("string"))] if "time" in present_event else []),
    *([F.concat_ws(": ", F.lit("sort_order"), F.col("sort_order").cast("string"))] if "sort_order" in present_event else [])
).alias("event_prefix")

per_event = (
    df
    .select(*(present_meta + ["id_odsp"]), *present_event)
    .withColumn("event_prefix", prefix)
    .withColumn("event_line_body", event_line)
    .withColumn(
        "event_line_full",
        F.when(F.col("event_prefix").isNotNull() & (F.col("event_prefix") != ""),
               F.concat_ws(" | ", F.col("event_prefix"), F.col("event_line_body")))
         .otherwise(F.col("event_line_body"))
    )
)

# Choose ordering column: prefer sort_order, else time, else 0
order_col = (
    F.when(F.col("sort_order").isNotNull(), F.col("sort_order").cast("long"))
     .when(F.col("time").isNotNull(), F.col("time").cast("double"))
     .otherwise(F.lit(0).cast("double"))
).alias("ord")

per_event = per_event.withColumn("ord", order_col)

# 3) Aggregate rows -> one chunk per id_odsp, preserving order
agg = (
    per_event
    .groupBy("id_odsp", *[c for c in present_meta if c != "id_odsp"])
    .agg(F.sort_array(F.collect_list(F.struct(F.col("ord"), F.col("event_line_full")))).alias("events"))
    .withColumn("text", F.concat_ws("\n", F.transform("events", lambda x: x["event_line_full"])))
    .drop("events")
)

# Result: one row per match, with a single RAG-ready text chunk
rag_matches = agg.select("id_odsp", "text", *[c for c in present_meta if c != "id_odsp"])

# Quick sanity check
print("Match docs:", rag_matches.count())
rag_matches.select("id_odsp", "text").show(2, truncate=False)

pdf = rag_matches.toPandas()  # typically ~your ginf size
from langchain_core.documents import Document
import pandas as pd

meta_keys = [c for c in ["id_odsp","date","league","season","country","ht","at"] if c in pdf.columns]
docs = []
for _, row in pdf.iterrows():
    meta = {k: (None if (isinstance(row[k], float) and pd.isna(row[k])) else str(row[k])) for k in meta_keys}
    docs.append(Document(page_content=row["text"], metadata=meta))


Match docs: 9074
+---------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [None]:
# Spark -> JSON String -> Python write JSONL
from pyspark.sql.functions import col, concat_ws, lit

def _ensure_cols(df, cols):
    for c in cols:
        if c not in df.columns:
            df = df.withColumn(c, lit(""))
    return df

need_cols = ["id_odsp","event_team","season","shot_outcome","location","text","event_type","event_type2","player"]
df = _ensure_cols(df, need_cols)

TEXT_COL = concat_ws(" | ",
    lit("team="), col("event_team"),
    lit("season="), col("season"),
    lit("event_info="), col("text"),
    lit("event_type="), col("event_type"),
    lit("event_type2="), col("event_type2"),
    lit("shot="), col("shot_outcome"),
    lit("location="), col("location"),
    lit("player="), col("player")
).alias("text")




In [9]:
sel = df.select(
    col("id_odsp"),
    col("event_team"),
    col("season"),
    col("shot_outcome"),
    col("location"),
    col("event_type"),
    col("event_type2"),
    col("player"),
    TEXT_COL  
)


OUT_JSONL = "rag_events.jsonl"

count = 0
with open(OUT_JSONL, "w", encoding="utf-8") as f:
    for row_json in sel.toJSON().toLocalIterator():
        f.write(row_json + "\n")
        count += 1

print(f"Wrote JSONL: {OUT_JSONL}, rows: {count}")


Wrote JSONL: rag_events.jsonl, rows: 941009


In [5]:
sel = df.select(
    col("id_odsp"), col("event_team"), col("season"), col("shot_outcome"), col("location"), col("event_type"), col("event_type2"), col("player"), TEXT_COL
)

# MAX_ROWS = None  
json_rows = sel.toJSON().collect()   
print(json_rows)

# Write to JSONL（each line is a JSON）
OUT_JSONL = "rag_events.jsonl"
with open(OUT_JSONL, "w", encoding="utf-8") as f:
    for line in json_rows:
        print(line)
        # f.write(line + "\n")

print(f"Wrote JSONL: {OUT_JSONL}, rows: {len(json_rows)}")

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 8 in stage 44.0 failed 1 times, most recent failure: Lost task 8.0 in stage 44.0 (TID 137) (LAPTOP-6BJL7SFU executor driver): TaskResultLost (result lost from block manager)
Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$3(DAGScheduler.scala:2935)
	at scala.Option.getOrElse(Option.scala:201)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2935)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2927)
	at scala.collection.immutable.List.foreach(List.scala:334)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2927)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1295)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1295)
	at scala.Option.foreach(Option.scala:437)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1295)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3207)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3141)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3130)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:50)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1009)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2484)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2505)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2524)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2549)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1057)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:417)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1056)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:203)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:184)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:108)
	at java.base/java.lang.Thread.run(Thread.java:842)


In [4]:
df.count()

941009