## Assignment 03
### Streaming Analytics on Text Data

Here we set up a pyspark cluster on local machine, then send the recieved JSON to Gemini 2.5 LLM API with a preset prompt and record the outputs. upto 1000 fre LLM API calls per day, we will limit our loop to 50 requests at a time.

### Setting Up Streaming with Pyspark

In [1]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StringType
import threading

spark = SparkSession.builder \
    .appName("ArxivStreamingLLM") \
    .master("local[*]") \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/05/26 14:06:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
schema = StructType() \
    .add("aid", StringType()) \
    .add("title", StringType()) \
    .add("summary", StringType()) \
    .add("main_category", StringType()) \
    .add("categories", StringType()) \
    .add("published", StringType())

In [3]:
# Raw stream
raw_stream_df = spark.readStream \
    .format("socket") \
    .option("host", "seppe.net") \
    .option("port", 7778) \
    .load()

25/05/26 14:06:28 WARN TextSocketSourceProvider: The socket source should not be used for production applications! It does not support recovery.


In [4]:
# Convert each line of raw text into structured JSON
# Parse JSON
json_stream_df = raw_stream_df \
    .select(from_json(col("value"), schema).alias("data")) \
    .select("data.*")

In [5]:
received_rows = []

def handle_batch(df, batch_id):
    global received_rows
    pandas_df = df.toPandas()
    if not pandas_df.empty:
        for _, row in pandas_df.iterrows():
            received_rows.append(row.to_dict())
            print("\n New article received:\n", row.to_dict())
            if len(received_rows) >= 1:
                # Stop the stream in another thread to avoid Spark deadlock
                threading.Thread(target=query.stop).start()

In [6]:
query = json_stream_df.writeStream \
    .foreachBatch(handle_batch) \
    .start()

query.awaitTermination()

25/05/26 14:06:28 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-31dd06c8-42dc-4683-a02e-d46c647f9d79. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
25/05/26 14:06:28 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.



 New article received:
 {'aid': 'http://arxiv.org/abs/2505.16575v1', 'title': 'Data Center Model for Transient Stability Analysis of Power Systems', 'summary': 'The rising demand of computing power leads to the installation of a large\nnumber of Data Centers (DCs). Their Fault-Ride-Through (FRT) behavior and their\nunique power characteristics, especially for DCs catered to Artificial\nIntelligence (AI) workloads, pose a threat to the stability of power systems.\nTo ensure its stability, it is required accurate models of the loads involved.\nHere we propose a dynamic load model that properly captures the behaviour of\nDCs. Its three most defining features are the use of an Uninterrupted Power\nSupply (UPS) which sits between the server load and the grid, the cooling load\nrepresented by an induction motor, and a pulsing load that represents the\ntransients caused by contemporary DCs with significant AI workloads. The\nfeatures of the proposed model and its impact on the dynamic perfor

In [7]:
print(received_rows[0])

{'aid': 'http://arxiv.org/abs/2505.16575v1', 'title': 'Data Center Model for Transient Stability Analysis of Power Systems', 'summary': 'The rising demand of computing power leads to the installation of a large\nnumber of Data Centers (DCs). Their Fault-Ride-Through (FRT) behavior and their\nunique power characteristics, especially for DCs catered to Artificial\nIntelligence (AI) workloads, pose a threat to the stability of power systems.\nTo ensure its stability, it is required accurate models of the loads involved.\nHere we propose a dynamic load model that properly captures the behaviour of\nDCs. Its three most defining features are the use of an Uninterrupted Power\nSupply (UPS) which sits between the server load and the grid, the cooling load\nrepresented by an induction motor, and a pulsing load that represents the\ntransients caused by contemporary DCs with significant AI workloads. The\nfeatures of the proposed model and its impact on the dynamic performance of\ntransmission sy

In [8]:
import google.generativeai as genai
import getpass

# Go here to get your free api key: https://aistudio.google.com/app/apikey

api_key = getpass.getpass("Enter your Gemini API key (Go here to generate your own: https://aistudio.google.com/app/apikey): ")
genai.configure(api_key=api_key)

In [9]:
import json

model = genai.GenerativeModel("gemini-2.0-flash")

def predict_categories(article_json: dict) -> dict:
    # Read base prompt from file
    with open("../assets/llm_prompt9", "r", encoding="utf-8") as f:
        base_prompt = f.read()
    
    # Create full prompt
    json_str = json.dumps(article_json, separators=(",", ":"))
    full_prompt = f"Sample to predict:\n\n{json_str}\n\n{base_prompt}"

    # Send to Gemini
    response = model.generate_content(full_prompt)
    
    # Try to parse result into dict if possible
    try:
        # Strip backticks and optional json marker
        raw = response.text.strip()
        if raw.startswith("```json") or raw.startswith("```"):
            raw = raw.strip("`")  # Remove all backticks
            raw = raw.replace("json", "", 1).strip()  # Remove language marker
        prediction = json.loads(raw)
    except json.JSONDecodeError:
        print("Could not parse JSON response. Raw output:")
        print(response.text)
        return {"error": "unparsable", "raw_output": response.text}

    return prediction


In [10]:
first_article = received_rows[0]
result = predict_categories(first_article)

print("Predicted categories:", result)

Predicted categories: {'main_category': 'eess.SY', 'categories': 'eess.SY,cs.SY'}


In [11]:
import time
import pandas as pd
import threading
from tqdm import tqdm

# Global result collector
results = []

# Target number of requests
MAX_REQUESTS = 5

# Setup tqdm in a side thread
def track_progress():
    with tqdm(total=MAX_REQUESTS, desc="Processing articles", position=0) as pbar:
        last = 0
        while len(results) < MAX_REQUESTS:
            current = len(results)
            pbar.update(current - last)
            last = current
            time.sleep(0.5)  # Update every half second

# Start the progress bar tracking in a separate thread
progress_thread = threading.Thread(target=track_progress)
progress_thread.start()

# Spark streaming callback
def process_batch(df, batch_id):
    global results
    pandas_df = df.toPandas()
    for _, row in pandas_df.iterrows():
        if len(results) >= MAX_REQUESTS:
            threading.Thread(target=query.stop).start()
            return

        article = row.to_dict()
        prediction = predict_categories(article)

        results.append({
            "Aid": article.get("aid"),
            "Title": article.get("title"),
            "Main Category": prediction.get("main_category", "N/A"),
            "Categories": prediction.get("categories", "N/A"),
            "True Main Category": article.get("main_category", "N/A"),
            "True Categories": article.get("categories", "N/A")
        })

        time.sleep(4.1)  # Stay within 15 RPM

# Start the Spark stream
query = json_stream_df.writeStream \
    .foreachBatch(process_batch) \
    .start()

query.awaitTermination()
progress_thread.join()  # Wait for tqdm to finish


Processing articles:   0%|          | 0/5 [00:00<?, ?it/s]25/05/26 14:06:34 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-d0e35c14-fa73-4451-9847-4b66ecec2f60. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
25/05/26 14:06:34 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
Processing articles:  80%|████████  | 4/5 [00:21<00:05,  5.27s/it]


In [12]:
df = pd.DataFrame(results)
print(df.shape)
df.head(10)

(5, 6)


Unnamed: 0,Aid,Title,Main Category,Categories,True Main Category,True Categories
0,http://arxiv.org/abs/2505.16575v1,Data Center Model for Transient Stability Anal...,eess.SY,"eess.SY,cs.SY",eess.SY,"eess.SY,cs.SY"
1,http://arxiv.org/abs/2505.16576v1,EMULATE: A Multi-Agent Framework for Determini...,cs.CL,cs.CL,cs.CL,cs.CL
2,http://arxiv.org/abs/2505.16577v1,Large Language Model-Empowered Interactive Loa...,cs.LG,"cs.LG,cs.AI",cs.LG,cs.LG
3,http://arxiv.org/abs/2505.16578v1,"Standard Running, ""Physical Running"" and Cosmo...",hep-th,"hep-th,gr-qc",hep-th,"hep-th,gr-qc"
4,http://arxiv.org/abs/2505.16579v1,Bridging the Dynamic Perception Gap: Training-...,cs.AI,"cs.AI,cs.CV",cs.AI,"cs.AI,cs.CV"


In [13]:
df.to_json("../outputs/predicted_categories_final_prompt.json", orient="records", indent=2)
df.to_csv("../outputs/predicted_categories_final_prompt.csv", index=False)

In [14]:
# Load sub_dict
with open("../assets/sub_dict", "r", encoding="utf-8") as f:
    sub_dict = eval(f.read())

# Load main_dict
with open("../assets/main_dict", "r", encoding="utf-8") as f:
    main_dict = eval(f.read())

In [15]:
# List of all subcategories
sub_list = list(sub_dict.keys()) + list(main_dict.keys())

# Extract all predicted and true subcategories
all_used_tags = set()

for col in ["Categories", "True Categories"]:
    df[col].dropna().apply(lambda x: all_used_tags.update(map(str.strip, str(x).split(","))))

for col in ["Main Category", "True Main Category"]:
    df[col].dropna().apply(lambda x: all_used_tags.add(str(x).strip()))

# Add missing tags to sub_list
missing = [tag for tag in all_used_tags if tag not in sub_list]
print("Missing tags added to sub_list:", missing)
sub_list += missing

# Map subcategory to main category
sub_to_main_map = {sub: sub.split(".")[0] if "." in sub else sub for sub in sub_list}

# Function to map sub to main
def sub_to_main(sub):
    return sub_to_main_map.get(sub, "unknown")

Missing tags added to sub_list: ['eess.SY']


In [16]:
## Error function
# Error term for same main category:
A = 0.5

# Error term for different main category:
B = 1

# Multiplier for the highlighted prediction:
C = 3

# Multiplier for the other predictions:
D = 1

def sub_vs_sub_error(sub1, sub2, a = A, b = B, printing = False):
    sub1 = str(sub1)
    assert isinstance(sub1, str), f"Error: Subcategory {sub1} was given, which is not a string."
    assert sub1 in sub_list, f"Error: Subcategory {sub1} was given, which is not an acceptable subcategory."
    
    sub2 = str(sub2)
    assert isinstance(sub2, str), f"Error: Subcategory {sub2} was given, which is not a string."
    assert sub2 in sub_list, f"Error: Subcategory {sub2} was given, which is not an acceptable subcategory."

    if sub1 == sub2:
        return 0

    main1 = sub_to_main(sub1)
    main2 = sub_to_main(sub2)

    if main1 == main2:
        return a

    return b

def list_of_subs_vs_list_of_subs_error(list1, list2, a = A, b = B, printing = False):
    if not isinstance(list1, list):
        list1 = list1.split(",")
    assert isinstance(list1, list), f"Error: List of categories {list1} was given, which is not a list."

    if not isinstance(list2, list):
        list2 = list2.split(",")
    assert isinstance(list2, list), f"Error: List of categories {list2} was given, which is not a list."
    
    for i in range(len(list1)):
        list1[i] = str(list1[i])
        assert isinstance(list1[i], str), f"Error: Subcategory {list1[i]} was given, which is not a string."
        assert list1[i] in sub_list, f"Error: Subcategory {list1[i]} was given, which is not an acceptable subcategory."

    for i in range(len(list2)):
        list2[i] = str(list2[i])
        assert isinstance(list2[i], str), f"Error: Subcategory {list2[i]} was given, which is not a string."
        assert list2[i] in sub_list, f"Error: Subcategory {list2[i]} was given, which is not an acceptable subcategory."

    # We delete the exact same subcategories present in both lists.
    intersection1 = set(list1) & set(list2)
    list1 = [x for x in list1 if x not in intersection1]
    list2 = [x for x in list2 if x not in intersection1]

    mains_present_in_both = [sub_to_main(sub) for sub in intersection1]
    list1 = [sub_to_main(sub) for sub in list1]
    list2 = [sub_to_main(sub) for sub in list2]
    mains_present_in_both += (set(list1) & set(list2))
    if printing:
        print(mains_present_in_both, list1, list2)

    half_bad1 = [sub1 for sub1 in list1 if sub1 in mains_present_in_both]
    half_bad2 = [sub2 for sub2 in list2 if sub2 in mains_present_in_both]
    if printing:
        print(half_bad1, half_bad2)

    list1 = [sub1 for sub1 in list1 if sub1 not in half_bad1]
    list2 = [sub2 for sub2 in list2 if sub2 not in half_bad2]

    intersection2 = set(list1) & set(list2)
    list1 = [x for x in list1 if x not in intersection2]
    list2 = [x for x in list2 if x not in intersection2]
    if printing:
        print(len(intersection1), len(intersection2), len(list1), len(list2), len(half_bad1), len(half_bad2))

    return a * (len(intersection2) + len(half_bad1) + len(half_bad2)) + b * (len(list1) + len(list2))

def pred_vs_pred_error(pred1, pred2, a = A, b = B, c = C, d = D, printing = False):
    assert isinstance(pred1, dict), f"Error: Prediction {pred1} was given, which is not a dictionary."
    assert isinstance(pred2, dict), f"Error: Prediction {pred2} was given, which is not a dictionary."
    highlighted_pred1 = pred1["main_category"]
    highlighted_pred2 = pred2["main_category"]
    others_pred1 = pred1["categories"]
    others_pred2 = pred2["categories"]

    return c * sub_vs_sub_error(highlighted_pred1, highlighted_pred2, printing = printing) + d * list_of_subs_vs_list_of_subs_error(others_pred1, others_pred2, printing = printing)



In [17]:
def make_pred_dict(row):
    return {
        "main_category": row["Main Category"],
        "categories": row["Categories"]
    }

def make_true_dict(row):
    return {
        "main_category": row["True Main Category"],
        "categories": row["True Categories"]
    }

df["Error"] = df.apply(lambda row: pred_vs_pred_error(make_pred_dict(row), make_true_dict(row)), axis=1)

In [18]:
models = ["gemini-2.0-flash"]
prompts = [f"../assets/llm_prompt{i}" for i in [9]]
NUM_ITERATIONS = 4
MAX_REQUESTS = 20
error_summary = []

for prompt_file in prompts:
    for model_name in models:
        print(f"\nRunning {prompt_file} with {model_name}")
        model = genai.GenerativeModel(model_name)
        iteration_errors = []

        for iteration in range(1, NUM_ITERATIONS + 1):
            print(f"Iteration {iteration}/{NUM_ITERATIONS}")
            received_rows = []

            # === Fetch 30 JSONs from stream ===
            def handle_batch(df, batch_id):
                global received_rows
                pdf = df.toPandas()
                for _, r in pdf.iterrows():
                    received_rows.append(r.to_dict())
                    if len(received_rows) >= MAX_REQUESTS:
                        threading.Thread(target=query.stop).start()

            query = json_stream_df.writeStream.foreachBatch(handle_batch).start()
            query.awaitTermination()

            if len(received_rows) < MAX_REQUESTS:
                print("Skipping this iteration (not enough samples)")
                continue

            # === Prediction and Scoring ===
            eval_results = []
            with open(prompt_file, "r", encoding="utf-8") as f:
                base_prompt = f.read()

            for article in received_rows[:MAX_REQUESTS]:
                json_str = json.dumps(article, separators=(",", ":"))
                full_prompt = f"Sample to predict:\n\n{json_str}\n\n{base_prompt}"

                try:
                    response = model.generate_content(full_prompt)
                    raw = response.text.strip()
                    if raw.startswith("```json") or raw.startswith("```"):
                        raw = raw.strip("`").replace("json", "", 1).strip()
                    pred = json.loads(raw)
                except Exception:
                    pred = {"main_category": "unknown", "categories": "unknown"}

                eval_results.append({
                    "Aid": article.get("aid"),
                    "Title": article.get("title"),
                    "Main Category": pred.get("main_category", "N/A"),
                    "Categories": pred.get("categories", "N/A"),
                    "True Main Category": article.get("main_category", "N/A"),
                    "True Categories": article.get("categories", "N/A")
                })
                time.sleep(4.1)

            df_temp = pd.DataFrame(eval_results)

            # Error handling
            errors = []
            for idx, row in df_temp.iterrows():
                try:
                    pred = {
                        "main_category": row["Main Category"],
                        "categories": row["Categories"]
                    }
                    true = {
                        "main_category": row["True Main Category"],
                        "categories": row["True Categories"]
                    }
                    err = pred_vs_pred_error(pred, true)
                except AssertionError as e:
                    print(f"Skipping row {idx} due to category error: {e}")
                    err = None
                errors.append(err)

            df_temp["Error"] = errors
            df_temp_clean = df_temp.dropna(subset=["Error"])

            # Save per-iteration detailed results
            fname = f"../outputs/{prompt_file}_{model_name.replace('.', '_')}_iter{iteration}.csv"
            df_temp.to_csv(fname, index=False)
            print(f"Saved {fname}")

            if not df_temp_clean.empty:
                iteration_errors.append(df_temp_clean["Error"].mean())

        # Store final mean error for this prompt-model combo
        mean_error = round(sum(iteration_errors) / len(iteration_errors), 3) if iteration_errors else None
        error_summary.append({
            "llm_prompt": prompt_file,
            f"mean_error_{model_name.replace('.', '_')}": mean_error
        })

# === Final summary ===
df_summary = pd.DataFrame(error_summary)
df_summary.to_csv("../outputs/prompt_model_mean_error_final_prompt.csv", index=False)
print("Summary saved as prompt_model_mean_errors.csv")
df_summary



Running ../assets/llm_prompt9 with gemini-2.0-flash
Iteration 1/4


25/05/26 14:06:59 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-6e3d8f4b-8dd9-43ba-84f8-4934ad1987b7. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
25/05/26 14:06:59 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


Saved ../outputs/../assets/llm_prompt9_gemini-2_0-flash_iter1.csv
Iteration 2/4


25/05/26 14:08:44 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-89511389-5551-49fa-82c1-34e96ba1bb87. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
25/05/26 14:08:44 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


Saved ../outputs/../assets/llm_prompt9_gemini-2_0-flash_iter2.csv
Iteration 3/4


25/05/26 14:10:29 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-0cc78e3d-7402-46db-a8ec-c49b51d51d91. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
25/05/26 14:10:29 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


Saved ../outputs/../assets/llm_prompt9_gemini-2_0-flash_iter3.csv
Iteration 4/4


25/05/26 14:12:15 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-81bdde7e-fab8-4086-aec1-ec8852bb2fa8. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
25/05/26 14:12:15 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


Saved ../outputs/../assets/llm_prompt9_gemini-2_0-flash_iter4.csv
Summary saved as prompt_model_mean_errors.csv


Unnamed: 0,llm_prompt,mean_error_gemini-2_0-flash
0,../assets/llm_prompt9,0.206


### Testing the final prompt with updated error metric coefficients for comparison to Bart LLM model

In [19]:
## Updated Error Coefficients
# Error term for same main category:
A = 0.5

# Error term for different main category:
B = 1

# Multiplier for the highlighted prediction:
C = 1

# Multiplier for the other predictions:
D = 0

models = ["gemini-2.0-flash"]
prompts = [f"../assets/llm_prompt{i}" for i in [9]]
NUM_ITERATIONS = 4
MAX_REQUESTS = 20
error_summary = []

for prompt_file in prompts:
    for model_name in models:
        print(f"\nRunning {prompt_file} with {model_name}")
        model = genai.GenerativeModel(model_name)
        iteration_errors = []

        for iteration in range(1, NUM_ITERATIONS + 1):
            print(f"Iteration {iteration}/{NUM_ITERATIONS}")
            received_rows = []

            # === Fetch 30 JSONs from stream ===
            def handle_batch(df, batch_id):
                global received_rows
                pdf = df.toPandas()
                for _, r in pdf.iterrows():
                    received_rows.append(r.to_dict())
                    if len(received_rows) >= MAX_REQUESTS:
                        threading.Thread(target=query.stop).start()

            query = json_stream_df.writeStream.foreachBatch(handle_batch).start()
            query.awaitTermination()

            if len(received_rows) < MAX_REQUESTS:
                print("Skipping this iteration (not enough samples)")
                continue

            # === Prediction and Scoring ===
            eval_results = []
            with open(prompt_file, "r", encoding="utf-8") as f:
                base_prompt = f.read()

            for article in received_rows[:MAX_REQUESTS]:
                json_str = json.dumps(article, separators=(",", ":"))
                full_prompt = f"Sample to predict:\n\n{json_str}\n\n{base_prompt}"

                try:
                    response = model.generate_content(full_prompt)
                    raw = response.text.strip()
                    if raw.startswith("```json") or raw.startswith("```"):
                        raw = raw.strip("`").replace("json", "", 1).strip()
                    pred = json.loads(raw)
                except Exception:
                    pred = {"main_category": "unknown", "categories": "unknown"}

                eval_results.append({
                    "Aid": article.get("aid"),
                    "Title": article.get("title"),
                    "Main Category": pred.get("main_category", "N/A"),
                    "Categories": pred.get("categories", "N/A"),
                    "True Main Category": article.get("main_category", "N/A"),
                    "True Categories": article.get("categories", "N/A")
                })
                time.sleep(4.1)

            df_temp = pd.DataFrame(eval_results)

            # Error handling
            errors = []
            for idx, row in df_temp.iterrows():
                try:
                    pred = {
                        "main_category": row["Main Category"],
                        "categories": row["Categories"]
                    }
                    true = {
                        "main_category": row["True Main Category"],
                        "categories": row["True Categories"]
                    }
                    err = pred_vs_pred_error(pred, true)
                except AssertionError as e:
                    print(f"Skipping row {idx} due to category error: {e}")
                    err = None
                errors.append(err)

            df_temp["Error"] = errors
            df_temp_clean = df_temp.dropna(subset=["Error"])

            # Save per-iteration detailed results
            fname = f"../outputs/{prompt_file}_{model_name.replace('.', '_')}_iter{iteration}.csv"
            df_temp.to_csv(fname, index=False)
            print(f"Saved {fname}")

            if not df_temp_clean.empty:
                iteration_errors.append(df_temp_clean["Error"].mean())

        # Store final mean error for this prompt-model combo
        mean_error = round(sum(iteration_errors) / len(iteration_errors), 3) if iteration_errors else None
        error_summary.append({
            "llm_prompt": prompt_file,
            f"mean_error_{model_name.replace('.', '_')}": mean_error
        })

# === Final summary ===
df_summary = pd.DataFrame(error_summary)
df_summary.to_csv("../outputs/prompt_model_mean_error_final_prompt.csv", index=False)
print("Summary saved as prompt_model_mean_errors.csv")
df_summary



Running ../assets/llm_prompt9 with gemini-2.0-flash
Iteration 1/4


25/05/26 14:14:00 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-e41e322b-e458-4b2f-a67b-f19000289d02. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
25/05/26 14:14:00 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


Saved ../outputs/../assets/llm_prompt9_gemini-2_0-flash_iter1.csv
Iteration 2/4


25/05/26 14:15:45 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-f422c197-58dd-4025-9d09-dabfa7b97675. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
25/05/26 14:15:45 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


Saved ../outputs/../assets/llm_prompt9_gemini-2_0-flash_iter2.csv
Iteration 3/4


25/05/26 14:17:31 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-0748bf05-bb79-40b2-b60a-ce787bd4ce10. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
25/05/26 14:17:31 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


Saved ../outputs/../assets/llm_prompt9_gemini-2_0-flash_iter3.csv
Iteration 4/4


25/05/26 14:19:20 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-1f02ab31-5b57-411e-a596-b3b99e32c32b. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
25/05/26 14:19:20 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


Saved ../outputs/../assets/llm_prompt9_gemini-2_0-flash_iter4.csv
Summary saved as prompt_model_mean_errors.csv


Unnamed: 0,llm_prompt,mean_error_gemini-2_0-flash
0,../assets/llm_prompt9,0.169
