In [16]:
import polars as pl
from dotenv import load_dotenv
import os
from pathlib import Path
from google.cloud import language_v2
from google import genai
from google.genai import types
import utils as u


In [17]:
KEYWORDS_VERSION = 1

PROJECT_PATH = "dstack/d-stack-home"
RAW_PATH = "data/raw_issues.parquet"
CLEANED_PATH = "data/cleaned_issues.parquet"
ENRICHED_PATH = "data/issues_enriched.parquet"
LABELED_PATH = "data/issues_labeled.parquet"
ORG_ATTRIBUTED_PATH = "data/issues_org_attributed.parquet"
POSTPROCESSED_PATH = "data/issues_postprocessed.parquet"
KEYWORDS_PATH = f"keyword_lists/keywords_config_v{KEYWORDS_VERSION}.txt"

load_dotenv()

# Access the API key
api_key = os.environ.get("API_KEY_GCP")
token_vertex = os.environ.get("ACCES_TOKEN_VERTEX")
PROJECT_ID = "project-8415b93b-4a16-4c2b-901"
LOCATION = "europe-west3"
print("API key loaded:", bool(api_key))
client = language_v2.LanguageServiceClient(client_options={"api_key": api_key})

keywords_path = Path(KEYWORDS_PATH)

LABELS = [
    line.strip()
    for line in keywords_path.read_text(encoding="utf-8").splitlines()
    if line.strip() and not line.lstrip().startswith("#")
]

# some iid s to exclude since they predate the feedback process
iids_to_exclude = list(range(1,9))
columns_to_keep = ["iid", "title", "description", "state", "created_at", "updated_at", "closed_at", "author_id", "author_name", "author_state", "user_notes_count", "upvotes", "downvotes", "references"]
desc_to_exclude = ["", "test", "Test"]

API key loaded: True


### Step 1: Download issues from Open Code

In [None]:
issues = u.fetch_all_gitlab_issues("dstack/d-stack-home")
raw_df = pl.DataFrame(issues)
raw_df.write_parquet(RAW_PATH)

In [None]:
raw_df = pl.read_parquet(RAW_PATH)

In [None]:
df_clean = u.clean_issues_df(raw_df, columns_to_keep, iids_to_exclude)

In [None]:
df_prepared = u.prepare_issues_df(df_clean, desc_to_exclude)

## To Do:
Move all the expensive additional columns to individual data frames so that we can just join all of them together later, so that each can be rerun individually
Each only needs id and computed values

## Add sentiment scores

In [None]:
def get_sentiment_score(text_content: str) -> float:
    """
    Analyzes the sentiment of a single string and returns only the float score.
    Returns 0.0 if the input is None or the API call fails.
    """
    if text_content is None:
        return 0.0

    try:
        # Create the document object
        document = language_v2.Document(
            content=text_content, 
            type_=language_v2.Document.Type.PLAIN_TEXT
        )

        # Call the API
        response = client.analyze_sentiment(document=document)
        
        # Return the sentiment score (-1.0 to +1.0)
        return response.document_sentiment.score

    except Exception as e:
        print(f"Error processing text: {text_content[:20]}... Error: {e}")
        return 0.0 # Return a neutral score on error



In [None]:
df_with_sentiment = df_prepared.with_columns(
    pl.col("desc_clean")
      .map_elements(get_sentiment_score, return_dtype=pl.Float64)
      .alias("sentiment")
)

df_with_sentiment.write_parquet(ENRICHED_PATH)

In [None]:
df_with_sentiment = pl.read_parquet(ENRICHED_PATH)

## Add Labels using pre-defined label list

In [None]:
SYSTEM_INSTRUCTION = (
    "Du bekommst GitLab-Issues aus dem Deutschland-Stack-Konsultationsverfahren. "
    "Du bist ein Klassifizierungs-Experte. Deine Aufgabe ist es, die Beschreibung zu analysieren "
    "und sie anhand der Labels in der Liste zu klassifizieren. "
    "Nutze NUR die zur Verfügung gestellten Labels. Erfinde keine neuen Labels! "
    "Stelle das Ergebnis als Komma-separierten String zur Verfügung. "
    "Der String enthält NUR die von dir vergebenen Labels (eins oder bis zu 5). "
    "Versuche nur so viele Labels wie nötig zu vergeben. "
    "Wenn kein Label passt, nutze das Label Unklar"
)


In [None]:
client = genai.Client(vertexai=True)

MODEL = "gemini-2.0-flash"

In [None]:
def validate_labels(labels_str: str, allowed_labels: list[str]) -> list[str]:
    """
    Filtert die durch Komma getrennten Labels und entfernt alle,
    die nicht in allowed_labels sind.
    """
    labels = [label.strip() for label in labels_str.split(",")]
    return [label for label in labels if label in allowed_labels]

def classify_issue_multilabel(issue_text: str, labels: list[str]) -> str:
    user_prompt = f"""
Labels:
{", ".join(labels)}

Issue:
{issue_text}
"""

    response = client.models.generate_content(
        model=MODEL,
        contents=[
            types.Content(
                role="user",
                parts=[types.Part(text=SYSTEM_INSTRUCTION + "\n\n" + user_prompt)],
            ),
        ],
    )

    return response.text.strip()


In [None]:
# Process in batches to handle rate limits and save progress
import time

batch_size = 10  # Adjust based on rate limits

# Check if we have a partial result
if Path(LABELED_PATH).exists():
    df_labeled = pl.read_parquet(LABELED_PATH)
    start_idx = len(df_labeled)
else:
    df_labeled = df_with_sentiment.clone()
    labels_list = [[] for _ in range(len(df_labeled))]
    start_idx = 0

for i in range(start_idx, len(df_labeled), batch_size):
    end_idx = min(i + batch_size, len(df_labeled))
    print(f"Processing batch {i//batch_size + 1}: rows {i} to {end_idx-1}")

    for j in range(end_idx - i):
        row_idx = i + j
        desc = df_labeled["desc_clean"][row_idx]
        try:
            labels_str = classify_issue_multilabel(desc, LABELS)
            validated_labels = validate_labels(labels_str, LABELS)
            labels_list[row_idx] = validated_labels
            time.sleep(2)  # Increased sleep to avoid rate limits
        except Exception as e:
            print(f"Error processing row {row_idx}: {e}")
            labels_list[row_idx] = []  # Assign empty list on error
            time.sleep(5)  # Longer sleep on error

    # Update the dataframe with the current labels_list
    df_labeled = df_labeled.with_columns(pl.Series(f"labels_v{KEYWORDS_VERSION}", labels_list))

    # Save progress
    df_labeled.write_parquet(LABELED_PATH)
    print(f"Saved progress after batch {i//batch_size + 1}")

print("Labeling completed!")

In [None]:
df_labeled.write_parquet(LABELED_PATH)


## Add org attribution
Usually the author submits the ticket in the name of an organization, e.g. Julia Schöpp for D64. We can probably get most of these organizations from the title / content

In [None]:
df_postprocessed = pl.read_parquet(POSTPROCESSED_PATH)

In [None]:
SYSTEM_INSTRUCTION = (
    "Du bekommstGitLab-Issues aus dem Deutschland-Stack-Konsultationsverfahren. "
    "Deine Aufgabe ist es, die Issues einer Organisation zuzuordnen, "
    "wenn möglich. "
    "Ordne sie NUR EINER ORGANISATION zu. Erfinde keine neuen Organisationen! "
    "Stelle das Ergebnis als zur Verfügung. "
    "Wenn keine Organisation erkennbar ist, antworte mit 'Unklar'. "
    "Beispiel: "
    "Konsultationsbeitrag der publicplan GmbH zum Deutschland-Stack "
    "Deine Antwort: publicplan GmbH"
    "Beispiel: "
    "Die Erlang Ecosystem Foundation (EEF)(erlef.org) plädiert für die Inklusion von .... "
    "Deine Antwort: Erlang Ecosystem Foundation "
)


In [None]:
client = genai.Client(vertexai=True)

MODEL = "gemini-2.0-flash"

In [None]:
def add_issue_org_attribution(issue_title: str, issue_text: str) -> str:
    user_prompt = f"""
Titel:
{issue_title}

Inhalt:
{issue_text}
"""

    response = client.models.generate_content(
        model=MODEL,
        contents=[
            types.Content(
                role="user",
                parts=[types.Part(text=SYSTEM_INSTRUCTION + "\n\n" + user_prompt)],
            ),
        ],
    )

    return response.text.strip()


In [None]:
# same process as before to add organization attribution

# Process in batches to handle rate limits and save progress
import time

batch_size = 10  # Adjust based on rate limits

# Load the labeled dataframe
df_labeled = pl.read_parquet(LABELED_PATH)

# Check if we have a partial result
if Path(ORG_ATTRIBUTED_PATH).exists():
    df_org = pl.read_parquet(ORG_ATTRIBUTED_PATH)
    if "org" in df_org.columns:
        org_list = df_org["org"].to_list()
    else:
        org_list = [""] * len(df_org)
    start_idx = len(df_org)
else:
    df_org = df_labeled.clone()
    org_list = [""] * len(df_org)
    start_idx = 0

for i in range(start_idx, len(df_org), batch_size):
    end_idx = min(i + batch_size, len(df_org))
    print(f"Processing batch {i//batch_size + 1}: rows {i} to {end_idx-1}")

    for j in range(end_idx - i):
        row_idx = i + j
        title = df_org["title"][row_idx]
        desc = df_org["desc_clean"][row_idx]
        try:
            org = add_issue_org_attribution(title, desc)
            org_list[row_idx] = org
            time.sleep(2)  # Increased sleep to avoid rate limits
        except Exception as e:
            print(f"Error processing row {row_idx}: {e}")
            org_list[row_idx] = "Unklar"  # Default on error
            time.sleep(5)  # Longer sleep on error

    # Update the dataframe with the current org_list
    df_org = df_org.with_columns(pl.Series("org", org_list))

    # Save progress
    df_org.write_parquet(ORG_ATTRIBUTED_PATH)
    print(f"Saved progress after batch {i//batch_size + 1}")

print("Org attribution completed!")

In [None]:
df_org.select("title", "desc_clean", "org").sample(5).rows()

## Postprocess to clean the results a bit more

In [21]:
df_postprocessed = pl.read_parquet(ORG_ATTRIBUTED_PATH)

In [24]:
df_postprocessed = u.postprocess_issues(df_postprocessed, label_version=1)

In [27]:
df_postprocessed.write_parquet(f"{BASE}/processing_date=2025-12-31/issues.parquet")

In [25]:
df_postprocessed = df_postprocessed.with_columns(
    pl.col("created_at").dt.year().map_elements(
        lambda year: 2 if year == 2026 else 1,
        return_dtype=pl.Int8
    ).alias("feedback_round")
)

## Check for redundant and unassigned labels

In [None]:
from collections import Counter

# Get all assigned labels
assigned_labels = set(df_postprocessed.select(pl.col(f"labels_v{KEYWORDS_VERSION}").explode().unique()).to_series().to_list())

# Labels not assigned at all
unassigned_labels = [label for label in LABELS if label not in assigned_labels]
print("Labels not assigned at all:")
print(unassigned_labels)

# Compute co-occurrence
co_occ = Counter()
label_counts = Counter()

for row in df_postprocessed.select(f"labels_v{KEYWORDS_VERSION}").rows():
    labels_in_row = set(row[0])
    for label in labels_in_row:
        label_counts[label] += 1
    for a in labels_in_row:
        for b in labels_in_row:
            if a < b:
                co_occ[(a, b)] += 1

# Find labels that mostly occur together (co-occurrence > 80%)
mostly_together = []
for (a, b), co_count in co_occ.items():
    freq_a = label_counts[a]
    freq_b = label_counts[b]
    pct_a_with_b = co_count / freq_a if freq_a > 0 else 0
    pct_b_with_a = co_count / freq_b if freq_b > 0 else 0
    if pct_a_with_b > 0.8 and pct_b_with_a > 0.8:
        mostly_together.append((a, b, pct_a_with_b, pct_b_with_a))

print("\nLabels that mostly occur together (>80% co-occurrence):")
if mostly_together:
    for a, b, p1, p2 in mostly_together:
        print(f"{a} and {b}: {p1:.2f} / {p2:.2f}")
else:
    print("None found.")

### Schema differences

In [13]:
BASE = "data/issues_postprocessed"
old_df = pl.read_parquet(f"{BASE}/processing_date=2025-12-31/issues.parquet")
new_df = pl.read_parquet(f"{BASE}/processing_date=2026-01-17/issues.parquet")

In [14]:
old_df.columns

['iid',
 'title',
 'description',
 'state',
 'created_at',
 'updated_at',
 'closed_at',
 'author_id',
 'author_name',
 'author_state',
 'user_notes_count',
 'upvotes',
 'downvotes',
 'references',
 'desc_clean',
 'is_from_form',
 'form_page',
 'sentiment',
 'labels_v1',
 'org']