K-Means Clustering 

In [None]:
import pandas as pd
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.cluster import KMeans
from sklearn.preprocessing import LabelEncoder

# load dataset
df = pd.read_csv('Cleaned_News_Articles_Final.csv')
df = df[['headline']]

headlines = df['headline'].tolist()

# TF-IDF Vetorizer
vectorizer = TfidfVectorizer(max_features=1000)
X = vectorizer.fit_transform(headlines)

# kmeans clustering
num_clusters = 3  # nega posi neutral
kmeans = KMeans(n_clusters=num_clusters, random_state=42)
kmeans.fit(X)

# assign sentiment labels to clusters
cluster_centers = kmeans.cluster_centers_
cluster_labels = kmeans.labels_

# clusters and their labels
df['cluster_label'] = kmeans.labels_

label_encoder = LabelEncoder()
df['sentiment'] = label_encoder.fit_transform(df['cluster_label'])

# print(df[['headline', 'sentiment']])

# decode back to negative, positive, neutral
inverse_mapping = {0: 'negative', 1: 'positive', 2: 'neutral'}
df['sentiment'] = df['sentiment'].map(inverse_mapping)

df.drop(columns=['cluster_label'], inplace=True)

# print(df[['headline', 'sentiment']])


In [None]:
print(df['sentiment'].value_counts())

In [None]:
df.to_csv('kmeans_without_augmented.csv', index=False)

Visualize PCA 

In [None]:
from sklearn.decomposition import PCA
import matplotlib.pyplot as plt
import seaborn as sns

# reduce dimensionality using PCA
pca = PCA(n_components=2)
X_pca = pca.fit_transform(X.toarray())

plt.figure(figsize=(10, 8))
sns.scatterplot(x=X_pca[:, 0], y=X_pca[:, 1], hue=df['sentiment'], palette='Set1', legend='full')
plt.title('PCA Visualization of Clusters')
plt.xlabel('PCA Component 1')
plt.ylabel('PCA Component 2')
plt.show()


Silhouette Score Before Augmented

In [None]:
from sklearn.metrics import silhouette_score, davies_bouldin_score
clusters = kmeans.fit_predict(X_pca)

silhouette_avg = silhouette_score(X_pca, clusters)
db_index = davies_bouldin_score(X_pca, clusters)

print(f'Silhouette Score: {silhouette_avg}')
print(f'Davies-Bouldin Index: {db_index}')


Augmented Data with 4 Techniques

In [None]:
import pandas as pd

# load the data 
file_path = "kmeans_without_augmented.csv"
data = pd.read_csv(file_path)

In [None]:
import pandas as pd
import nlpaug.augmenter.word as naw
from tqdm import tqdm
import random
import os
import nltk

os.environ["TOKENIZERS_PARALLELISM"] = "false"

# load dataset
file_path = 'kmeans_without_augmented.csv'
data = pd.read_csv(file_path)

# minority sentiment
minority_labels = {
    "neutral": 2546,
    "positive": 2057,
}

# technique to augmented
aug = naw.ContextualWordEmbsAug(
    model_path='distilbert-base-uncased', action="substitute")

aug2 = naw.ContextualWordEmbsAug(
    model_path='roberta-base', action="substitute")

aug_insert = naw.ContextualWordEmbsAug(
    model_path='roberta-base', action="insert")

aug_synonym = naw.SynonymAug(aug_src='wordnet')

# lists to store augmented data
augmented_summaries = []
multilabels = []

for _, row in tqdm(data.iterrows(), total=len(data)):
    if isinstance(row['sentiment'], str):
        augmented_labels = row['sentiment'].split(',') 
        augmented_labels_filtered = [label.strip() for label in augmented_labels if label.strip() in minority_labels]
        
        if augmented_labels_filtered:
            # augment the summary for in headline
            augmented_summary = aug.augment(row['headline'])
            augmented_summary2 = aug2.augment(row['headline'])
            augmented_summary_insert = aug_insert.augment(row['headline'])
            augmented_summary_synonym = aug_synonym.augment(row['headline'])
            
            # append to the lists
            augmented_summaries.extend([augmented_summary, augmented_summary2, augmented_summary_insert, augmented_summary_synonym])
            multilabels.extend([row['sentiment']] * 4)

augmented_df = pd.DataFrame({'headline': augmented_summaries, 'sentiment': multilabels})

data_augmented_path = 'kmeans_with_augmented.csv'
augmented_df.to_csv(data_augmented_path, index=False)


Check total of augmented

In [None]:
data_augmented_path = "kmeans_with_augmented.csv"
augmented_df = pd.read_csv(data_augmented_path)
augmented_df['sentiment'].value_counts()

Combine Augmented data & without Augmented Data

In [None]:
import pandas as pd

# combine data
csv_files = ['kmeans_with_augmented.csv', 'kmeans_without_augmented.csv']

# initialize an empty list
dfs = []

# append its data to the list
for file in csv_files:
    df = pd.read_csv(file)
    dfs.append(df)

combined_data = pd.concat(dfs, ignore_index=True)

# sort the combined data in sentiment column
sorted2_data = combined_data.sort_values(by='sentiment')

final_aug_path = "final_kmeans.csv"
sorted2_data.to_csv(final_aug_path, index=False)

sorted2_data['sentiment'].value_counts()

Remove square brackets, single quotes, and double quotes

In [None]:
import pandas as pd

input_file = 'final_kmeans.csv'
output_file = 'final_kmeans.csv'

# load dataset
df = pd.read_csv(input_file)

# remove square brackets, single quotes, double quotes
df['headline'] = df['headline'].str.replace(r"[\[\]\"']", '', regex=True)

df.to_csv(output_file, index=False)

Downsampling to select each sentiment 10000 data (Balanced)

In [None]:
import pandas as pd

# load dataset
df = pd.read_csv('final_kmeans.csv')  

# select 10000 data for each sentiment
lower_limit = 100
upper_limit = 10000

# grp the data by sentiment and perform downsampling within each grp
downsampled_data = df.groupby('sentiment').apply(lambda x: x.sample(n=min(upper_limit, max(lower_limit, len(x))), random_state=42))

downsampled_data = downsampled_data.reset_index(drop=True)

downsampled_data.to_csv('final_kmeans_subset.csv', index=False)  # The 'index=False' parameter is used to not write row indices

downsampled_data['sentiment'].value_counts()

Silhouette Score After Augmented

In [None]:
import pandas as pd
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.cluster import KMeans
from sklearn.metrics import silhouette_score
from sklearn.preprocessing import LabelEncoder

# load dataset
df = pd.read_csv('final_kmeans_subset.csv')  

# encode sentiment labels
label_encoder = LabelEncoder()
df['sentiment_label'] = label_encoder.fit_transform(df['sentiment'])

vectorizer = TfidfVectorizer(stop_words='english')
X = vectorizer.fit_transform(df['headline'])

kmeans = KMeans(n_clusters=3, random_state=42)
clusters = kmeans.fit_predict(X)

silhouette_avg = silhouette_score(X, clusters)
print(f'Silhouette Score: {silhouette_avg}')


ML Models

In [None]:
import pandas as pd
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.model_selection import train_test_split

# load the dataset
dataset_path = 'final_kmeans_subset.csv'    #can CHANGE
df = pd.read_csv(dataset_path)

# split into training and testing sets (70 30)
X_train, X_test, y_train, y_test = train_test_split(df['headline'], df['sentiment'], test_size=0.3, random_state=42)

Encode and Vectorizer

In [None]:
# # encode labels (negative=0 positive=1 neutral=2)
y_train = y_train.map({'negative': 0, 'positive': 1, 'neutral': 2})
y_test = y_test.map({'negative': 0, 'positive': 1, 'neutral': 2})

# TF-IDF vector
tfidf_vectorizer = TfidfVectorizer()
X_train_tfidf = tfidf_vectorizer.fit_transform(X_train)
X_test_tfidf = tfidf_vectorizer.transform(X_test)

Train  ML Models

In [None]:
from sklearnex import patch_sklearn
patch_sklearn()

# from sklearn.linear_model import LogisticRegression
# from sklearn.ensemble import RandomForestClassifier
# from sklearn.naive_bayes import MultinomialNB
# from sklearn.svm import SVC
from sklearn.neighbors import KNeighborsClassifier

# ML baseline model (select 1 to run)    #can CHANGE
# ml_models = LogisticRegression()
# ml_models = RandomForestClassifier()
# ml_models = MultinomialNB()
# ml_models = SVC()
ml_models = KNeighborsClassifier()


ml_models.fit(X_train_tfidf, y_train)

ML flow

In [None]:
import mlflow
from mlflow.tracking import MlflowClient

# convert model name to string
model_name = ml_models.__class__.__name__

# start MLflow
mlflow.set_tracking_uri("http://127.0.0.1:5000")
mlflow.set_experiment("sentiment-analysis-kmeans")

# initialize MLflow client
client = MlflowClient()

# experiment ID
experiment_id = client.get_experiment_by_name("sentiment-analysis-kmeans").experiment_id

runs = client.search_runs(experiment_ids=[experiment_id])

# initial version to 0
max_version = 0

# find the max version for the current version model
for run in runs:
    run_name = run.data.tags.get('mlflow.runName')
    if run_name and run_name.startswith(model_name):        
        # extract version number from the run name
        try:
            version = int(run_name.split('_v')[-1])
        except ValueError:
            continue  # skip if version is not an integer

        # update max_version if this version is greater
        max_version = max(max_version, version)

# increase the version by adding 1
new_version = max_version + 1

# new run name
new_run_name = f"{model_name}_v{new_version}"
mlflow.start_run(run_name=new_run_name)

mlflow.log_param("model name", model_name)
mlflow.log_param("dataset_name", dataset_path)
mlflow.log_param("data size", df.shape)

Predict Accuracy (Based Model)

In [None]:
from sklearn.metrics import classification_report
from sklearn.metrics import accuracy_score

# predict test 
y_pred = ml_models.predict(X_test_tfidf)

print("Based Model:")

#  training accuracy
trainAccuracy = ml_models.score(X_train_tfidf, y_train)
print("Training Accuracy:", trainAccuracy)

# training loss (MISC)
training_loss = ml_models.score(X_train_tfidf, y_train)
print("Training Loss (MISC):", 1 - training_loss)

# test accuracy
testAccuracy = accuracy_score(y_test, y_pred)
print("Testing Accuracy:", testAccuracy)

# precison recall f1score
print(classification_report(y_test, y_pred))

Confusion Matrix (Based Model)

In [None]:
from sklearn.metrics import confusion_matrix
import matplotlib.pyplot as plt
import seaborn as sns

# decode the labels back to original
reverse_mapping = {0: 'negative', 1: 'positive', 2: 'neutral'}
decoded_y_pred = [reverse_mapping[label] for label in y_pred]
decoded_y_test = [reverse_mapping[label] for label in y_test]

# create confusion matrix
conf_matrix = confusion_matrix(decoded_y_test, decoded_y_pred)

# plot confusion matrix 
plt.figure(figsize=(8, 6))
sns.heatmap(conf_matrix, annot=True, cmap='Blues', fmt='d', cbar=False)
plt.xlabel('Predicted Labels')
plt.ylabel('True Labels')
plt.title('Confusion Matrix')
plt.xticks(ticks=[0.5, 1.5, 2.5], labels=['negative', 'positive', 'neutral'])
plt.yticks(ticks=[0.5, 1.5, 2.5], labels=['negative', 'positive', 'neutral'])
plt.show()

ROC Curve (Based Model)

In [None]:
from sklearn.metrics import roc_curve, roc_auc_score
import matplotlib.pyplot as plt
from sklearn.preprocessing import label_binarize
from sklearn.model_selection import train_test_split

# binarize the labels
y_test_binarized = label_binarize(y_test, classes=[0, 1, 2])

# proba for proba models , decision for svm
y_pred_scores = ml_models.predict_proba(X_test_tfidf)       #can CHANGE
# y_pred_scores = ml_models.decision_function(X_test_tfidf)   

# compute and plot ROC curve
fpr = dict()
tpr = dict()
roc_auc = dict()
n_classes = y_test_binarized.shape[1]

for i in range(n_classes):
    fpr[i], tpr[i], _ = roc_curve(y_test_binarized[:, i], y_pred_scores[:, i])
    roc_auc[i] = roc_auc_score(y_test_binarized[:, i], y_pred_scores[:, i])

plt.figure()

colors = ['blue', 'red', 'green']
classes = ['negative', 'positive', 'neutral']
for i, color in enumerate(colors):
    plt.plot(fpr[i], tpr[i], color=color, lw=2, 
             label=f'ROC curve for {classes[i]} (area = {roc_auc[i]:0.2f})')

plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--')
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Receiver Operating Characteristic (ROC) Curve for Multi-class')
plt.legend(loc="lower right")
plt.show()


Hypertuning

In [None]:
from sklearn.model_selection import GridSearchCV

# ML models classifier                 #can CHANGE
# ml_models = LogisticRegression()  
# ml_models = RandomForestClassifier()
# ml_models = MultinomialNB()
# ml_models = SVC()
ml_models = KNeighborsClassifier()


# hyperparameters to tune    #can CHANGE

#LR
# param_grid = {
#     'C': [0.1, 1.0, 10.0],
#     'penalty': ['l1', 'l2'],
#     'solver': ['liblinear', 'saga']
# }

# #RF
# param_grid = {
#     'n_estimators': [100, 200],
#     'max_features': ['auto', 'sqrt'],
#     'max_depth': [10, 20],
#     'min_samples_split': [2, 5],
#     'min_samples_leaf': [1, 2],
#     'bootstrap': [True]
# }

# #MNB
# param_grid = {
#     'alpha': [0.1, 0.5, 1.0],
#     'fit_prior': [True, False]
# }

# #SVM
# param_grid = {
#     'C': [1, 5],
#     'kernel': ['linear', 'poly'],
#     'gamma': ['scale', 'auto']
# }

# KNN
param_grid = {
    'n_neighbors': [3, 5, 7],
    'weights': ['uniform', 'distance'],
    'algorithm': ['auto', 'ball_tree', 'kd_tree', 'brute']
}


# perform grid search with cross-validation
grid_search = GridSearchCV(estimator=ml_models, param_grid=param_grid, cv=5, scoring='accuracy')

# fit the grid search to the data
grid_search.fit(X_train_tfidf, y_train)

# best hyperparameters and corresponding score
best_params = grid_search.best_params_
best_score = grid_search.best_score_

print("Best Hyperparameters:", best_params)
print("Best Cross-Validation Accuracy:", best_score)

# make predictions
best_model = grid_search.best_estimator_
predictions_test = best_model.predict(X_test_tfidf)

# calculate accuracy 
best_model_train_score = best_model.score(X_train_tfidf, y_train)
best_model_test_score = best_model.score(X_test_tfidf, y_test)

print()
print("Best Model:")
print("Training Score: {}\nTest Score: {}".format(best_model_train_score, best_model_test_score))

# classification report
report = classification_report(y_test, predictions_test, output_dict=True)

print("\nClassification Report:")
print(classification_report(y_test, predictions_test))

mlflow.log_param("best params", best_params)

mlflow.log_metric("train accuracy", best_model_train_score)
mlflow.log_metric("test accuracy", best_model_test_score)

mlflow.log_metric("Precision", report['weighted avg']['precision'])
mlflow.log_metric("Recall", report['weighted avg']['recall'])
mlflow.log_metric("F1 Score", report['weighted avg']['f1-score'])

Confusion Matrix (Best Model)

In [None]:
from sklearn.metrics import confusion_matrix
import matplotlib.pyplot as plt
import seaborn as sns

# decode the labels back to original
reverse_mapping = {0: 'negative', 1: 'positive', 2: 'neutral'}
decoded_y_pred = [reverse_mapping[label] for label in predictions_test]
decoded_y_test = [reverse_mapping[label] for label in y_test]

# create confusion matrix
conf_matrix = confusion_matrix(decoded_y_test, decoded_y_pred)

# plot confusion matrix 
plt.figure(figsize=(8, 6))
sns.heatmap(conf_matrix, annot=True, cmap='Blues', fmt='d', cbar=False)
plt.xlabel('Predicted Labels')
plt.ylabel('True Labels')
plt.title('Confusion Matrix')
plt.xticks(ticks=[0.5, 1.5, 2.5], labels=['negative', 'positive', 'neutral'])
plt.yticks(ticks=[0.5, 1.5, 2.5], labels=['negative', 'positive', 'neutral'])
plt.show()

ROC Curve (Best Model)

In [None]:
from sklearn.metrics import roc_curve, roc_auc_score
import matplotlib.pyplot as plt
from sklearn.preprocessing import label_binarize
from sklearn.model_selection import train_test_split

# binarize the labels
y_test_binarized = label_binarize(y_test, classes=[0, 1, 2])


# proba for proba models , decision for svm
y_pred_scores = best_model.predict_proba(X_test_tfidf)        #can CHANGE
# y_pred_scores = best_model.decision_function(X_test_tfidf)

# compute and plot ROC curve
fpr = dict()
tpr = dict()
roc_auc = dict()
n_classes = y_test_binarized.shape[1]

for i in range(n_classes):
    fpr[i], tpr[i], _ = roc_curve(y_test_binarized[:, i], y_pred_scores[:, i])
    roc_auc[i] = roc_auc_score(y_test_binarized[:, i], y_pred_scores[:, i])

plt.figure()

colors = ['blue', 'red', 'green']
classes = ['negative', 'positive', 'neutral']
for i, color in enumerate(colors):
    plt.plot(fpr[i], tpr[i], color=color, lw=2, 
             label=f'ROC curve for {classes[i]} (area = {roc_auc[i]:0.2f})')

plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--')
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Receiver Operating Characteristic (ROC) Curve for Multi-class')
plt.legend(loc="lower right")
plt.show()

Save Best Model

In [None]:
import os
import joblib

# create subfolders
base_folder = '5 Unsupervised Sentiment Analysis/kmeans_models_fextract'
models_folder = os.path.join(base_folder, 'models')
feature_extract_folder = os.path.join(base_folder, 'feature_extract')

# double confirm 
os.makedirs(models_folder, exist_ok=True)
os.makedirs(feature_extract_folder, exist_ok=True)

# save best model and TFIDF vectorizer 
joblib.dump(best_model, os.path.join(models_folder, 'best_knn_model_kmeans.pkl'))    #CHANGE model name and dataset name
print('Best model saved')

joblib.dump(tfidf_vectorizer, os.path.join(feature_extract_folder, 'knn_tfidf_kmeans.pkl'))    #CHANGE also
print('TFIDF saved')

Test unseen data

In [None]:
import joblib
from sklearn.feature_extraction.text import TfidfVectorizer

# load the saved model and tfidf vectorizer
model_filename = '5 Unsupervised Sentiment Analysis/kmeans_models_fextract/models/best_knn_model_kmeans.pkl'
vectorizer_filename = '5 Unsupervised Sentiment Analysis/kmeans_models_fextract/feature_extract/knn_tfidf_kmeans.pkl'

model = joblib.load(model_filename)
vectorizer = joblib.load(vectorizer_filename)

# encode the label
label_encoding = {0: 'negative', 1: 'positive', 2: 'neutral'}


# unseen data
# input_text = "McDonald’s shortens breakfast time in Australia as bird flu causes egg shortage"
# input_text = "Tesla sales fall again as more automakers crowd electric vehicle market"
input_text = "today is a bad day"
# input_text = "Global Summit Yields New Climate Accord"
# input_text = 'Local Communities Rally for Better Infrastructure'

# vector the unseen data
transformed_input = vectorizer.transform([input_text])

# predit the unseen data by usig saved model
prediction = model.predict(transformed_input)

# decode the label into its original class
decoded_prediction = label_encoding[prediction[0]]

# print result
print(f"Prediction: {decoded_prediction}")