In [None]:
pip install pyspark==3.4.1 spark-nlp pandas matplotlib scikit_learn openpyxl

In [None]:
import json
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
import time

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode, udf, lower, trim, regexp_replace, count, split
from pyspark.ml import Pipeline
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import StandardScaler
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import collect_list
from sklearn.metrics.pairwise import cosine_distances

import sparknlp
from sparknlp.base import DocumentAssembler
from sparknlp.annotator import Tokenizer, XlmRoBertaEmbeddings

# B·∫Øt ƒë·∫ßu t√≠nh gi·ªù to√†n b·ªô pipeline
total_start = time.time()

# Kh·ªüi t·∫°o Spark NLP
spark = SparkSession.builder \
    .appName("Spark NLP Clustering") \
    .master("spark://172.18.0.2:7077") \
    .config("spark.driver.memory", "8g") \
    .config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp_2.12:4.4.0") \
    .getOrCreate()

sparknlp.start(spark)


# ƒê·ªçc file JSONL
input_file_path = "/opt/workspace/gen_1604_formated.jsonl"
df = spark.read.option("multiLine", False).json(input_file_path)

# Tr√≠ch xu·∫•t c√¢u h·ªèi t·ª´ user
user_questions = df.select(explode("messages").alias("msg")) \
    .filter(col("msg.role") == "user") \
    .select(col("msg.content").alias("text")) \
    .filter(col("text").isNotNull())

# NLP pipeline
document_assembler = DocumentAssembler().setInputCol("text").setOutputCol("document")
tokenizer = Tokenizer().setInputCols(["document"]).setOutputCol("token")
embeddings = XlmRoBertaEmbeddings.pretrained("xlm_roberta_base", "xx") \
    .setInputCols(["document", "token"]) \
    .setOutputCol("embeddings") \
    .setCaseSensitive(False)
pipeline = Pipeline(stages=[document_assembler, tokenizer, embeddings])

# Ch·∫°y NLP pipeline
start = time.time()
model = pipeline.fit(user_questions)
embedded_data = model.transform(user_questions)
print(f"‚è± NLP Embedding Pipeline completed in {time.time() - start:.2f} seconds")

# UDF t√≠nh trung b√¨nh embedding
def avg_embeddings(embeddings):
    try:
        if embeddings and len(embeddings) > 0:
            vecs = [e['embeddings'] for e in embeddings if e['embeddings']]
            if vecs:
                avg = np.mean(vecs, axis=0)
                return Vectors.dense(avg.tolist())
    except:
        pass
    return None

avg_embeddings_udf = udf(avg_embeddings, VectorUDT())

# Vector h√≥a v√† chu·∫©n h√≥a
start = time.time()
vectorized_data = embedded_data.withColumn("features", avg_embeddings_udf(col("embeddings")))
vectorized_data = vectorized_data.filter(col("features").isNotNull())

scaler = StandardScaler(inputCol="features", outputCol="scaled_features")
scaler_model = scaler.fit(vectorized_data)
scaled_data = scaler_model.transform(vectorized_data)
print(f"‚è± Feature scaling completed in {time.time() - start:.2f} seconds")

# KMeans clustering
start = time.time()
if scaled_data.count() >= 5:
    kmeans = KMeans(featuresCol="scaled_features", predictionCol="cluster", k=5)
    kmeans_model = kmeans.fit(scaled_data)
    clustered_data = kmeans_model.transform(scaled_data)
    print(f"‚è± KMeans clustering completed in {time.time() - start:.2f} seconds")
else:
    raise Exception("‚ùó Kh√¥ng ƒë·ªß d·ªØ li·ªáu h·ª£p l·ªá ƒë·ªÉ ph√¢n c·ª•m KMeans (y√™u c·∫ßu >= 5).")

# Chu·∫©n h√≥a vƒÉn b·∫£n
normalized_data = clustered_data.withColumn(
    "normalized_text",
    trim(lower(regexp_replace(col("text"), "[\\p{Punct}]", "")))
)

# ƒê·∫øm t·∫ßn su·∫•t c√¢u h·ªèi
start = time.time()
grouped = normalized_data.groupBy("normalized_text", "cluster") \
    .agg(count("*").alias("frequency")) \
    .orderBy(col("frequency").desc())
print("üìã Danh s√°ch t·∫•t c·∫£ c√¢u h·ªèi sau khi chu·∫©n h√≥a v√† ph√¢n c·ª•m (theo t·∫ßn su·∫•t):")
grouped.show(truncate=False, n=50)
print(f"‚è± Counting frequency completed in {time.time() - start:.2f} seconds")

# V·∫Ω bi·ªÉu ƒë·ªì t·∫ßn su·∫•t theo cluster
cluster_counts = grouped.groupBy("cluster").sum("frequency") \
    .withColumnRenamed("sum(frequency)", "count") \
    .orderBy("cluster") \
    .toPandas()
plt.figure(figsize=(8, 5))
plt.bar(cluster_counts["cluster"], cluster_counts["count"], color="teal")
plt.xlabel("Cluster ID")
plt.ylabel("Number of Questions")
plt.title("Semantic Question Clustering Frequency")
plt.xticks(cluster_counts["cluster"])
plt.tight_layout()
plt.show()

# T√≠nh medoid cho m·ªói c·ª•m
start = time.time()
clusters = clustered_data.select("cluster", "text", "features") \
    .groupBy("cluster") \
    .agg(
        collect_list("text").alias("questions"),
        collect_list("features").alias("features_list")
    ).collect()

def get_medoid_question(questions, features):
    vecs = np.array([v.toArray() for v in features])
    dists = cosine_distances(vecs)
    total_dists = dists.sum(axis=1)
    medoid_idx = np.argmin(total_dists)
    return questions[medoid_idx]

topic_table = []
for row in clusters:
    cluster_id = row["cluster"]
    questions = row["questions"]
    features = row["features_list"]
    topic = get_medoid_question(questions, features)
    frequency = len(questions)
    topic_table.append((cluster_id, topic, frequency))

topic_df = pd.DataFrame(topic_table, columns=["Cluster", "Topic", "Frequency"])
topic_df = topic_df.sort_values("Cluster")
print("\nüìå Ch·ªß ƒë·ªÅ ƒë·∫°i di·ªán cho t·ª´ng c·ª•m:")
print(topic_df.to_string(index=False))
print(f"‚è± Topic medoid selection completed in {time.time() - start:.2f} seconds")

# G√°n nh√£n th·ªß c√¥ng theo ch·ªß ƒë·ªÅ
def classify_topic(text):
    text = text.lower()
    if any(word in text for word in ["b√°o h·ªèng", "s·ª± c·ªë", "b·ªã h·ªèng", "tr·∫°ng th√°i h·ªèng", "h·ªèng thi·∫øt b·ªã"]):
        return 1
    elif any(word in text for word in ["b·∫£o d∆∞·ª°ng", "l·ªãch b·∫£o d∆∞·ª°ng", "tr·∫°ng th√°i b·∫£o d∆∞·ª°ng"]):
        return 2
    elif any(word in text for word in ["ƒëi·ªÅu chuy·ªÉn", "chuy·ªÉn thi·∫øt b·ªã", "chuy·ªÉn ƒë·∫øn", "chuy·ªÉn ƒëi"]):
        return 3
    elif any(word in text for word in ["khu v·ª±c", "qu·∫£n l√Ω", "nh√¢n s·ª±", "ch·ª©c v·ª•", "ng∆∞·ªùi qu·∫£n l√Ω", "th√¥ng tin c√° nh√¢n"]):
        return 4
    elif any(word in text for word in ["thi·∫øt b·ªã", "t√†i s·∫£n", "lo·∫°i thi·∫øt b·ªã", "lo·∫°i t√†i s·∫£n", "ch·ª©a t√†i s·∫£n"]):
        return 5
    else:
        return 0  # Kh√¥ng x√°c ƒë·ªãnh

classify_topic_udf = udf(classify_topic)
final_df = normalized_data.withColumn("classified_topic", classify_topic_udf(col("normalized_text")))

# Th·ªëng k√™ theo nh√£n ph√¢n lo·∫°i ch·ªß ƒë·ªÅ
final_stats = final_df.groupBy("classified_topic").count().orderBy("classified_topic")
print("\nüìä Th·ªëng k√™ s·ªë l∆∞·ª£ng c√¢u h·ªèi theo ch·ªß ƒë·ªÅ ph√¢n lo·∫°i:")
final_stats.show()

# üîΩ Xu·∫•t ra file CSV v√† Excel
export_df = final_df.select("text", "normalized_text", "cluster", "classified_topic").toPandas()
export_df.to_csv("/opt/workspace/clustered_questions.csv", index=False, encoding='utf-8-sig')
export_df.to_excel("/opt/workspace/clustered_questions.xlsx", index=False)

print("\n‚úÖ ƒê√£ xu·∫•t d·ªØ li·ªáu c√¢u h·ªèi ra file:")
print("   üìÑ /opt/workspace/clustered_questions.csv")
print("   üìÑ /opt/workspace/clustered_questions.xlsx")

# T·ªïng th·ªùi gian
print(f"\nüöÄ T·ªïng th·ªùi gian to√†n b·ªô pipeline: {time.time() - total_start:.2f} seconds")
