# 📝 Apache JIRA Comment Sentiment Analysis

## Overview
This notebook performs sentiment analysis on JIRA issue comments, evaluates classification models, extracts discussion topics, and analyzes correlations with issue resolution times, incorporating:
- Text cleaning and preprocessing
- Sentiment classification (rule-based and logistic regression)
- Model evaluation metrics
- Topic extraction with LDA
- Correlation analysis with issue metrics

Each section is documented with explanations.

## 🏗️ Setup & Imports

In [0]:
from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import (
    col, when, count, length, lower, regexp_replace, udf, 
    collect_list, explode, array_contains, lit, avg, max, min,
    concat_ws, split, trim, size
)
from pyspark.sql.types import FloatType, ArrayType, StringType, IntegerType
from pyspark.ml.feature import (
    Tokenizer, StopWordsRemover, CountVectorizer, IDF,
    StringIndexer, VectorAssembler 
)
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.clustering import LDA
from pyspark.ml import Pipeline

## 🚀 Initialize Spark Session

In [0]:
print("Initializing Spark session...")
spark = SparkSession.builder     .appName("Enhanced Comment Sentiment Analysis")     .config("spark.driver.memory", "2g")     .getOrCreate()
spark.sparkContext.setLogLevel("WARN")

## 📂 Loading Data

In [0]:
comments_path = "/user/szreiqa/Apache_JIRA_Issues/cleaned_comments.parquet"
issues_path = "/user/szreiqa/Apache_JIRA_Issues/cleaned_issues.parquet"
comments_df = spark.read.parquet(comments_path)
print(f"Total comments: {comments_df.count()}")
try:
    issues_df = spark.read.parquet(issues_path)
    print(f"Total issues: {issues_df.count()}")
    has_issues = True
except:
    print("Issues data not available or could not be loaded")
    has_issues = False

## 🔍 Identify Comment Body Column

In [0]:
body_column = None
for column in ["comment_body", "body", "comment_text", "text"]:
    if column in comments_df.columns:
        body_column = column
        break
if not body_column:
    print("Could not find comment body column")
    exit(1)
print(f"Using column: {body_column}")

## 🧹 Text Cleaning & Preprocessing

In [0]:
cleaned_comments = comments_df.withColumn(
    "cleaned_text", 
    regexp_replace(
        regexp_replace(
            regexp_replace(lower(col(body_column)), r"<[^>]+>", ""),
            r"http\S+", ""
        ),
        r"[^a-zA-Z\s]", " "
    )
)
filtered_comments = cleaned_comments.filter(length(col("cleaned_text")) > 10)
print(f"Comments after filtering: {filtered_comments.count()}")
if filtered_comments.count() > 50000:
    print("Sampling comments for faster processing...")
    filtered_comments = filtered_comments.sample(False, 50000.0/filtered_comments.count(), seed=42)
    print(f"Sampled to {filtered_comments.count()} comments")

## 🧮 Sentiment Scoring

In [0]:
#!/usr/bin/env python
# coding: utf-8

"""
Fixed Comment Sentiment Analysis with Topics

This script analyzes sentiment in JIRA comments with fixed topic modeling
implementation to avoid the error with explode function on DenseVector.

Use: spark-submit fixed_sentiment_analysis.py
"""

from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import (
    col, when, count, length, lower, regexp_replace, udf, 
    collect_list, lit, avg, max, min, array, explode, struct
)
from pyspark.sql.types import FloatType, ArrayType, StringType, IntegerType
from pyspark.ml.feature import (
    Tokenizer, StopWordsRemover, CountVectorizer, IDF,
    StringIndexer, VectorAssembler 
)
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.clustering import LDA
from pyspark.ml import Pipeline

# Initialize Spark session
print("Initializing Spark session...")
spark = SparkSession.builder \
    .appName("Fixed Comment Sentiment Analysis") \
    .config("spark.driver.memory", "2g") \
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")

# Load and prepare data
print("Loading data...")
comments_path = "/user/szreiqa/Apache_JIRA_Issues/cleaned_comments.parquet"
issues_path = "/user/szreiqa/Apache_JIRA_Issues/cleaned_issues.parquet"

comments_df = spark.read.parquet(comments_path)
print(f"Total comments: {comments_df.count()}")

# Try to load issues for additional analysis
try:
    issues_df = spark.read.parquet(issues_path)
    print(f"Total issues: {issues_df.count()}")
    has_issues = True
except:
    print("Issues data not available or could not be loaded")
    has_issues = False

# Find comment body column
body_column = None
for column in ["comment_body", "body", "comment_text", "text"]:
    if column in comments_df.columns:
        body_column = column
        break

if not body_column:
    print("Could not find comment body column")
    exit(1)

print(f"Using column: {body_column}")

# Clean and prepare text
print("Cleaning and preparing text...")
cleaned_comments = comments_df.withColumn(
    "cleaned_text", 
    regexp_replace(
        regexp_replace(
            regexp_replace(lower(col(body_column)), 
                r"<[^>]+>", ""  # Remove HTML
            ), 
            r"http\S+", ""  # Remove URLs
        ),
        r"[^a-zA-Z\s]", " "  # Keep only letters and spaces
    )
)

# Filter out very short comments
filtered_comments = cleaned_comments.filter(length(col("cleaned_text")) > 10)
print(f"Comments after filtering: {filtered_comments.count()}")

# Sample if too many comments to process
if filtered_comments.count() > 10000:
    print("Sampling comments for faster processing...")
    filtered_comments = filtered_comments.sample(False, 10000.0/filtered_comments.count(), seed=42)
    print(f"Sampled to {filtered_comments.count()} comments")

# Define sentiment patterns
positive_patterns = [
    "thank", "good", "great", "excellent", "awesome",
    "works", "working", "fixed", "resolved", "solved",
    "correct", "perfect", "nice", "love", "happy",
    "helpful", "appreciated", "appreciate"
]

negative_patterns = [
    "bug", "issue", "problem", "error", "fail",
    "failed", "wrong", "bad", "broken", "not work",
    "doesn't work", "incorrect", "crash", "stuck",
    "terrible", "horrible", "awful", "disappointing"
]

# Rule-based sentiment scoring function
def calculate_sentiment(text):
    if not text:
        return 0.0
    
    text = text.lower()
    pos_count = sum(1 for pattern in positive_patterns if pattern in text)
    neg_count = sum(1 for pattern in negative_patterns if pattern in text)
    
    if pos_count == 0 and neg_count == 0:
        return 0.0
    
    # Calculate normalized score
    return (pos_count - neg_count) / (pos_count + neg_count)

# Register UDF for sentiment calculation
sentiment_udf = udf(calculate_sentiment, FloatType())

# Calculate sentiment scores
print("Calculating sentiment scores...")
comments_with_sentiment = filtered_comments.withColumn(
    "sentiment_score", sentiment_udf(col("cleaned_text"))
)

# Categorize sentiment
comments_with_sentiment = comments_with_sentiment.withColumn(
    "sentiment_category",
    when(col("sentiment_score") <= -0.6, "Very Negative")
    .when(col("sentiment_score") <= -0.2, "Negative")
    .when(col("sentiment_score") <= 0.2, "Neutral")
    .when(col("sentiment_score") <= 0.6, "Positive")
    .otherwise("Very Positive")
)

# Display sentiment distribution
print("\nComment sentiment distribution:")
sentiment_distribution = comments_with_sentiment.groupBy("sentiment_category").count().orderBy(
    when(col("sentiment_category") == "Very Negative", 1)
    .when(col("sentiment_category") == "Negative", 2)
    .when(col("sentiment_category") == "Neutral", 3)
    .when(col("sentiment_category") == "Positive", 4)
    .when(col("sentiment_category") == "Very Positive", 5)
)
sentiment_distribution.show()

# Prepare data for ML evaluation similar to lab example
print("\nPreparing data for ML evaluation...")

# StringIndexer for sentiment labels
indexer = StringIndexer(
    inputCol="sentiment_category", 
    outputCol="label",
    handleInvalid="skip"
)

# ML pipeline for text features
tokenizer = Tokenizer(inputCol="cleaned_text", outputCol="words")
stop_words_remover = StopWordsRemover(inputCol="words", outputCol="filtered_words")
cv = CountVectorizer(inputCol="filtered_words", outputCol="features", minDF=2.0, vocabSize=10000)

# Build and fit pipeline
pipeline = Pipeline(stages=[indexer, tokenizer, stop_words_remover, cv])
model = pipeline.fit(comments_with_sentiment)
prepared_data = model.transform(comments_with_sentiment)

# Split into training and test sets
train_data, test_data = prepared_data.randomSplit([0.8, 0.2], seed=42)
print(f"Training set size: {train_data.count()}, Test set size: {test_data.count()}")

# Train a simple logistic regression model
print("Training sentiment classification model...")
lr = LogisticRegression(maxIter=10, regParam=0.01, elasticNetParam=0.8)
lr_model = lr.fit(train_data)

# Make predictions
predictions = lr_model.transform(test_data)

# Use MulticlassClassificationEvaluator
print("\nEvaluating model with MulticlassClassificationEvaluator:")
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="label")

# Calculate different metrics
precision = evaluator.evaluate(predictions, {evaluator.metricName: "weightedPrecision"})
recall = evaluator.evaluate(predictions, {evaluator.metricName: "weightedRecall"})
f1 = evaluator.evaluate(predictions, {evaluator.metricName: "f1"})
accuracy = evaluator.evaluate(predictions, {evaluator.metricName: "accuracy"})

print(f"Weighted Precision: {precision:.4f}")
print(f"Weighted Recall: {recall:.4f}")
print(f"F1 Score: {f1:.4f}")
print(f"Accuracy: {accuracy:.4f}")

# Simple topic modeling similar to the LDA example in the lab
print("\nPerforming simple topic modeling on comments...")

# Use the processed features for LDA
num_topics = 5
lda = LDA(k=num_topics, maxIter=10, featuresCol="features")
lda_model = lda.fit(prepared_data)

# Extract topics
topics = lda_model.describeTopics(3)  # Top 3 terms per topic
topics.show(truncate=False)

# Get topic keywords
cv_model = model.stages[3]  # CountVectorizer model
vocabulary = cv_model.vocabulary

# Map topic indices to actual words
def map_indices_to_words(indices):
    return [vocabulary[int(idx)] for idx in indices]

map_indices_udf = udf(map_indices_to_words, ArrayType(StringType()))
topics_with_words = topics.withColumn("termWords", map_indices_udf(col("termIndices")))
print("\nTop terms for each topic:")
topics_with_words.select("topic", "termWords").show(truncate=False)

# FIX: Topic distribution analysis - properly handle DenseVector
print("\nCreating topic distribution data...")
# Transform data to get topic distributions
transformed_data = lda_model.transform(prepared_data)

# Create a function to convert DenseVector to array
def vector_to_array(vec):
    return vec.toArray().tolist()

vector_to_array_udf = udf(vector_to_array, ArrayType(FloatType()))

# Convert DenseVector to array
transformed_data = transformed_data.withColumn("topic_dist_array", 
                                              vector_to_array_udf(col("topicDistribution")))

# Use array positions to analyze topics
print("\nAnalyzing topic weights by sentiment category:")

# Calculate average topic weights per sentiment category
summary_data = transformed_data.select("sentiment_category", "topic_dist_array")

# Calculate average weight for each topic by sentiment
topic_avgs = []
for topic_idx in range(num_topics):
    # Create a selector for this topic's weight
    topic_avg = summary_data.withColumn(f"topic_{topic_idx}_weight", 
                                       col("topic_dist_array")[topic_idx])
    
    # Group by sentiment and calculate average
    avg_by_sentiment = topic_avg.groupBy("sentiment_category").agg(
        avg(f"topic_{topic_idx}_weight").alias(f"avg_topic_{topic_idx}")
    )
    
    topic_avgs.append(avg_by_sentiment)

# Join the results together
from functools import reduce
from pyspark.sql import DataFrame

if topic_avgs:
    joined_avgs = reduce(lambda df1, df2: df1.join(df2, "sentiment_category"), topic_avgs)
    print("\nAverage topic weights by sentiment category:")
    joined_avgs.show()
else:
    print("No topic averages to display")

# Show examples for each sentiment category
print("\nExample comments for each sentiment category:")
for category in ["Very Negative", "Negative", "Neutral", "Positive", "Very Positive"]:
    examples = comments_with_sentiment.filter(col("sentiment_category") == category).limit(2)
    examples_count = examples.count()
    
    if examples_count > 0:
        print(f"\n--- {category} examples ---")
        for row in examples.collect():
            score = row["sentiment_score"]
            # Get text and truncate if needed
            text = row[body_column]
            if len(text) > 100:
                text = text[:97] + "..."
            print(f"Score: {score:.2f}")
            print(f"Text: {text}\n")
    else:
        print(f"\nNo {category} examples found")

print("\nAnalysis complete!")
spark.stop()

## 📊 Sentiment Distribution

In [0]:
sentiment_distribution = comments_with_sentiment.groupBy("sentiment_category").count().orderBy(
    when(col("sentiment_category") == "Very Negative", 1)
    .when(col("sentiment_category") == "Negative", 2)
    .when(col("sentiment_category") == "Neutral", 3)
    .when(col("sentiment_category") == "Positive", 4)
    .when(col("sentiment_category") == "Very Positive", 5)
)
sentiment_distribution.show()

## 🤖 Sentiment Classification

In [0]:
indexer = StringIndexer(inputCol="sentiment_category", outputCol="label", handleInvalid="skip")
tokenizer = Tokenizer(inputCol="cleaned_text", outputCol="words")
stop_words_remover = StopWordsRemover(inputCol="words", outputCol="filtered_words")
cv = CountVectorizer(inputCol="filtered_words", outputCol="features", minDF=2.0, vocabSize=10000)
pipeline = Pipeline(stages=[indexer, tokenizer, stop_words_remover, cv])
model = pipeline.fit(comments_with_sentiment)
prepared_data = model.transform(comments_with_sentiment)
train_data, test_data = prepared_data.randomSplit([0.8, 0.2], seed=42)
lr = LogisticRegression(maxIter=10, regParam=0.01, elasticNetParam=0.8)
lr_model = lr.fit(train_data)
predictions = lr_model.transform(test_data)

## 📝 Model Evaluation

In [0]:
label_counts = predictions.groupBy("label").count().collect()
prediction_counts = predictions.groupBy("prediction").count().collect()
label_prediction_counts = predictions.groupBy("label", "prediction").count().collect()
label_count_dict = {row["label"]: row["count"] for row in label_counts}
prediction_count_dict = {row["prediction"]: row["count"] for row in prediction_counts}
label_prediction_dict = {(row["label"], row["prediction"]): row["count"] for row in label_prediction_counts}
for label in sorted(label_count_dict.keys()):
    tp = label_prediction_dict.get((label, label), 0)
    fp = sum(label_prediction_dict.get((other, label), 0) for other in label_count_dict if other != label)
    fn = sum(label_prediction_dict.get((label, other), 0) for other in prediction_count_dict if other != label)
    precision = tp / (tp + fp) if (tp + fp) > 0 else 0.0
    recall = tp / (tp + fn) if (tp + fn) > 0 else 0.0
    f1 = 2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0.0
    print(f"Class {label}: Precision={precision:.4f}, Recall={recall:.4f}, F1={f1:.4f}")
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="label")
precision = evaluator.evaluate(predictions, {evaluator.metricName: "weightedPrecision"})
recall = evaluator.evaluate(predictions, {evaluator.metricName: "weightedRecall"})
f1 = evaluator.evaluate(predictions, {evaluator.metricName: "f1"})
accuracy = evaluator.evaluate(predictions, {evaluator.metricName: "accuracy"})
print(f"Weighted Precision: {precision:.4f}\nWeighted Recall: {recall:.4f}\nF1 Score: {f1:.4f}\nAccuracy: {accuracy:.4f}")

## 🏷️ Topic Extraction

In [0]:
num_topics = 5
lda = LDA(k=num_topics, maxIter=10, featuresCol="features")
lda_model = lda.fit(prepared_data)
topics = lda_model.describeTopics(3)
topics.show(truncate=False)
cv_model = model.stages[3]
vocabulary = cv_model.vocabulary
def map_indices_to_words(indices): return [vocabulary[int(idx)] for idx in indices]
map_indices_udf = udf(map_indices_to_words, ArrayType(StringType()))
topics_with_words = topics.withColumn("termWords", map_indices_udf(col("termIndices")))
topics_with_words.select("topic", "termWords").show(truncate=False)

## 🔗 Correlation Analysis

In [0]:
if has_issues:
    sentiment_by_issue = comments_with_sentiment.groupBy("key").agg(
        avg("sentiment_score").alias("avg_sentiment"),
        count("*").alias("comment_count")
    )
    issues_with_sentiment = sentiment_by_issue.join(issues_df, "key", "inner")
    if all(c in issues_df.columns for c in ["created", "resolutiondate"]):
        from pyspark.sql.functions import to_date, datediff
        issues_with_time = issues_with_sentiment.withColumn(
            "created_date", to_date(col("created"))
        ).withColumn(
            "resolution_date", to_date(col("resolutiondate"))
        ).withColumn(
            "resolution_days", when(col("resolution_date").isNotNull() & col("created_date").isNotNull(),
                                   datediff(col("resolution_date"), col("created_date"))).otherwise(None)
        )
        resolved_issues = issues_with_time.filter(col("resolution_days").isNotNull() & (col("resolution_days") >= 0))
        resolved_issues.withColumn(
            "sentiment_bucket",
            when(col("avg_sentiment") <= -0.6, "Very Negative")
            .when(col("avg_sentiment") <= -0.2, "Negative")
            .when(col("avg_sentiment") <= 0.2, "Neutral")
            .when(col("avg_sentiment") <= 0.6, "Positive")
            .otherwise("Very Positive")
        ).groupBy("sentiment_bucket").agg(
            count("*").alias("issue_count"),
            avg("resolution_days").alias("avg_resolution_days"),
            min("resolution_days").alias("min_resolution_days"),
            max("resolution_days").alias("max_resolution_days")
        ).orderBy(
            when(col("sentiment_bucket") == "Very Negative", 1)
            .when(col("sentiment_bucket") == "Negative", 2)
            .when(col("sentiment_bucket") == "Neutral", 3)
            .when(col("sentiment_bucket") == "Positive", 4)
            .when(col("sentiment_bucket") == "Very Positive", 5)
        ).show()

## 📝 Example Comments

In [0]:
for category in ["Very Negative", "Negative", "Neutral", "Positive", "Very Positive"]:
    examples = comments_with_sentiment.filter(col("sentiment_category") == category).limit(2)
    if examples.count() > 0:
        print(f"\n--- {category} examples ---")
        for row in examples.collect():
            text = row[body_column]
            text = (text[:97] + "...") if len(text) > 100 else text
            print(f"Score: {row['sentiment_score']:.2f}\nText: {text}\n")
    else:
        print(f"\nNo {category} examples found")

## ✅ Completion

In [0]:
spark.stop()
print("Analysis complete!")