In [None]:
!pip install requests
!pip install boto3




In [2]:
import requests
import urllib3

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, LongType
from pyspark.sql import Window
import pyspark.sql.functions as F
import json

urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

# ----- Elasticsearch config -----
ES_URL   = "https://elasticsearch-sample-es-http.elastic-system.svc:9200"
INDEX    = "spark-logs-*"
AUTH     = ("elastic", "x8E2PKy5hl9Rx58021oZKJV7")  # <-- your password

# Quick sanity check
resp = requests.get(ES_URL, auth=AUTH, verify=False)
print("Cluster status:", resp.status_code)
print(resp.text[:200])


Cluster status: 200
{
  "name" : "elasticsearch-sample-es-default-0",
  "cluster_name" : "elasticsearch-sample",
  "cluster_uuid" : "y5K3JeG9RKy04V1EjNZ2OQ",
  "version" : {
    "number" : "8.14.0",
    "build_flavor" : 


In [3]:
TARGET = 50_000        # how many docs you want at minimum
BATCH  = 5000          # docs per scroll batch

# 1) Initial scroll search
init_body = {
    "size": BATCH,
    "_source": ["message", "EventId", "EventTemplate", "Occurrences"],
    "query": {"match_all": {}},
}

init = requests.post(
    f"{ES_URL}/{INDEX}/_search?scroll=2m",
    auth=AUTH,
    json=init_body,
    verify=False,
)

if init.status_code != 200:
    print(init.text)
    raise SystemExit("Initial scroll failed")

init_json = init.json()
scroll_id = init_json.get("_scroll_id")
hits = init_json["hits"]["hits"]

docs = []

def add_hits_to_docs(hits, docs_list):
    for h in hits:
        src = h.get("_source", {})
        docs_list.append({
            "Content": src.get("message"),
            "EventId": src.get("EventId"),
            "EventTemplate": src.get("EventTemplate"),
            "Occurrences": src.get("Occurrences"),
        })

add_hits_to_docs(hits, docs)
print("Loaded first batch:", len(docs))

# 2) Continue scrolling
while len(docs) < TARGET:
    scroll_body = {
        "scroll": "2m",
        "scroll_id": scroll_id,
    }

    resp = requests.post(
        f"{ES_URL}/_search/scroll",
        auth=AUTH,
        json=scroll_body,
        verify=False,
    )

    if resp.status_code != 200:
        print("Scroll error:", resp.text)
        break

    data = resp.json()
    hits = data["hits"]["hits"]

    if not hits:
        print("No more hits, reached end of index.")
        break

    add_hits_to_docs(hits, docs)
    scroll_id = data.get("_scroll_id", scroll_id)

    print(f"Total loaded so far: {len(docs)}", end="\r")

print("\nDone scrolling. Total docs loaded:", len(docs))
print("Example doc:", docs[0] if docs else None)


Loaded first batch: 5000
Total loaded so far: 50000
Done scrolling. Total docs loaded: 50000
Example doc: {'Content': 'Finished task 11.0 in stage 23.0 (TID 471). 2121 bytes result sent to driver', 'EventId': 'E66', 'EventTemplate': 'Finished task <*> in stage <*> (TID <*>). <*> bytes result sent to driver', 'Occurrences': 2146621}


In [4]:
spark = (
    SparkSession.builder
    .appName("SparkLogsFromESScroll")
    .config("spark.master", "local[*]")
    .config("spark.driver.memory", "8g")
    .getOrCreate()
)

schema = StructType([
    StructField("Content", StringType(), True),
    StructField("EventId", StringType(), True),
    StructField("EventTemplate", StringType(), True),
    StructField("Occurrences", LongType(), True),
])

rows = []
for d in docs:
    content   = d.get("Content")
    event_id  = d.get("EventId")
    template  = d.get("EventTemplate")
    occ_raw   = d.get("Occurrences")

    try:
        occ = int(occ_raw) if occ_raw is not None else None
    except (ValueError, TypeError):
        occ = None

    rows.append((content, event_id, template, occ))

logs_df = spark.createDataFrame(rows, schema=schema)

logs_df.printSchema()
logs_df.show(5, truncate=False)
print("Total rows in logs_df:", logs_df.count())


root
 |-- Content: string (nullable = true)
 |-- EventId: string (nullable = true)
 |-- EventTemplate: string (nullable = true)
 |-- Occurrences: long (nullable = true)

+----------------------------------------------------------------------------+-------+-------------------------------------------------------------------------+-----------+
|Content                                                                     |EventId|EventTemplate                                                            |Occurrences|
+----------------------------------------------------------------------------+-------+-------------------------------------------------------------------------+-----------+
|Finished task 11.0 in stage 23.0 (TID 471). 2121 bytes result sent to driver|E66    |Finished task <*> in stage <*> (TID <*>). <*> bytes result sent to driver|2146621    |
|Finished task 12.0 in stage 23.0 (TID 472). 2161 bytes result sent to driver|E66    |Finished task <*> in stage <*> (TID <*>). <*> bytes 

In [5]:
# Drop rows missing Content or EventId
logs_small = logs_df.dropna(subset=["Content", "EventId"])

print("Rows after dropping nulls:", logs_small.count())
print("Distinct EventIds:", logs_small.select("EventId").distinct().count())

MAX_PER_EVENT = 100  # increase since you have more data now

w = Window.partitionBy("EventId").orderBy(F.rand())

sampled_df = (
    logs_small
    .withColumn("rn", F.row_number().over(w))
    .filter(F.col("rn") <= MAX_PER_EVENT)
    .drop("rn")
)

print("Sampled rows:", sampled_df.count())
sampled_df.show(10, truncate=False)


Rows after dropping nulls: 50000
Distinct EventIds: 121
Sampled rows: 6259
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [6]:
OUT_PATH = "spark_llm_dataset_50k.jsonl"

sampled_pd = sampled_df.toPandas()

with open(OUT_PATH, "w", encoding="utf-8") as f:
    for row in sampled_pd.itertuples(index=False):
        obj = {
            "instruction": "Given this Spark log message, identify its template ID and template text.",
            "input": row.Content,
            "output": f"EventId: {row.EventId}\nEventTemplate: {row.EventTemplate}",
        }
        f.write(json.dumps(obj) + "\n")

print("Wrote JSONL to:", OUT_PATH)

# sanity: count lines
num_lines = sum(1 for _ in open(OUT_PATH, "r", encoding="utf-8"))
print("JSONL lines:", num_lines)

# show first few lines
with open(OUT_PATH, "r", encoding="utf-8") as f:
    for _ in range(3):
        print(f.readline().strip())


Wrote JSONL to: spark_llm_dataset_50k.jsonl
JSONL lines: 6259
{"instruction": "Given this Spark log message, identify its template ID and template text.", "input": "Prepared Local resources Map(__spark__.jar -> resource { scheme: \"hdfs\" host: \"10.10.34.11\" port: 9000 file: \"/user/curi/.sparkStaging/application_1485248649253_0001/spark-assembly-1.6.0-hadoop2.2.0.jar\" } size: 109525492 timestamp: 1485248869215 type: FILE visibility: PRIVATE, pyspark.zip -> resource { scheme: \"hdfs\" host: \"10.10.34.11\" port: 9000 file: \"/user/curi/.sparkStaging/application_1485248649253_0001/pyspark.zip\" } size: 355358 timestamp: 1485248869287 type: FILE visibility: PRIVATE, py4j-0.9-src.zip -> resource { scheme: \"hdfs\" host: \"10.10.34.11\" port: 9000 file: \"/user/curi/.sparkStaging/application_1485248649253_0001/py4j-0.9-src.zip\" } size: 44846 timestamp: 1485248869304 type: FILE visibility: PRIVATE)", "output": "EventId: E1\nEventTemplate: Prepared Local resources Map(__spark__.jar -> re

In [None]:
import json
import boto3

OUT_PATH = "spark_llm_dataset_50k.jsonl"

sampled_pd = sampled_df.toPandas()

# 1) Write JSONL locally (same as before)
with open(OUT_PATH, "w", encoding="utf-8") as f:
    for row in sampled_pd.itertuples(index=False):
        obj = {
            "instruction": "Given this Spark log message, identify its template ID and template text.",
            "input": row.Content,
            "output": f"EventId: {row.EventId}\nEventTemplate: {row.EventTemplate}",
        }
        f.write(json.dumps(obj) + "\n")

print("Wrote JSONL to:", OUT_PATH)

# 2) Sanity check
num_lines = sum(1 for _ in open(OUT_PATH, "r", encoding="utf-8"))
print("JSONL lines:", num_lines)

# 3) Upload to S3
s3 = boto3.client("s3")  # uses env vars from the K8s secret

BUCKET = "mas-pipeline-prod"
KEY    = "spark/spark_llm_dataset_50k.jsonl"  # path inside the bucket

s3.upload_file(OUT_PATH, BUCKET, KEY)

print(f"Uploaded to s3://{BUCKET}/{KEY}")
