In [None]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover
from pyspark.sql.functions import regexp_replace, col
from pyspark.ml.feature import Tokenizer, StopWordsRemover
from pyspark.sql.functions import explode

In [None]:
#import texts for analysis
for name in file_names:
    globals()[name] = sc.textFile(f"/FileStore/tables/{name}.txt").collect()

In [None]:
#moving texts into dictionary
texts_dict = {name: globals()[name] for name in file_names}
print(texts_dict)

In [None]:
# converting the dictionary into a DataFrame
texts_dict_df = {}

for name, text_list in texts_dict.items():
    text_rdd = sc.parallelize(text_list)
    text_df = text_rdd.map(lambda x: (x, )).toDF(["text"])
    texts_dict_df[f"{name}_df"] = text_df

In [None]:
# cleaning, filtering, and tokenizing texts
texts_cleaned = {}

for key, df in texts_dict_df.items():
    cleaned_df = df.withColumn("cleaned_text", 
                               regexp_replace(col("text"), r"[^\w\sąćęłńóśźżĄĆĘŁŃÓŚŹŻ]", ""))
    tokenizer = Tokenizer(inputCol="cleaned_text", outputCol="words")
    tokenized_cleaned_df = tokenizer.transform(cleaned_df)
    remover = StopWordsRemover(inputCol="words", outputCol="filtered_words")
    filtered_cleaned_df = remover.transform(tokenized_cleaned_df)
    texts_cleaned[key + "_cleaned"] = filtered_cleaned_df

In [None]:
# splitting text into individual words
texts_exploded = {}

for key, df in texts_cleaned.items():
    df.printSchema()
    exploded_df = df.select(explode(df["filtered_words"]).alias("word"))
    texts_exploded[key + "_exploded"] = exploded_df

In [None]:
# loading the PoliMorf dictionary
file_path = "dbfs:/FileStore/tables/PoliMorf_0_6_7.tab"
morph_df_spark = spark.read.text(file_path)

from pyspark.sql.functions import split

morph_df_spark = morph_df_spark.withColumn("split_data", split(morph_df_spark["value"], "\t"))
morph_df_spark = morph_df_spark.select(
    morph_df_spark["split_data"][0].alias("form"),
    morph_df_spark["split_data"][1].alias("lemma"),
    morph_df_spark["split_data"][2].alias("part_of_speech"),
    morph_df_spark["split_data"][3].alias("type")
)
morph_df_spark.show(100, truncate=False)

In [None]:
# loading the affective dictionary
csv_file_path = "dbfs:/FileStore/tables/slownik.csv"

nawl_spark_df = spark.read.option("header", "true").option("delimiter", ";").option("encoding", "windows-1250").csv(csv_file_path)

nawl_spark_df.show(5, truncate=False)

In [None]:
# joining texts with the morphological dictionary
texts_joined = {}

for key, exploded_df in texts_exploded.items():
    joined_df = exploded_df.join(
        morph_df_spark,
        exploded_df.word == morph_df_spark.form,  
        "left"  
    )
    texts_joined[key + "_joined"] = joined_df

In [None]:
# filtering out redundant matches from the morphological dictionary
from pyspark.sql import functions as F
from pyspark.sql.window import Window

texts_first_lemma = {}

for key, joined_df in texts_joined.items():
    window_spec = Window.partitionBy("word").orderBy("form")
    first_lemma_df = joined_df.withColumn("row_num", F.row_number().over(window_spec)) \
                              .filter("row_num = 1") \
                              .drop("row_num")
    texts_first_lemma[key + "_first_lemma"] = first_lemma_df

In [None]:
# renaming column to avoid name conflicts later
for key, unique_lemmas_df in texts_first_lemma.items():
    unique_lemmas_df = unique_lemmas_df.withColumnRenamed("word", "lemma_word")
    texts_first_lemma[key] = unique_lemmas_df

In [None]:
# joining the affective dictionary to the data frame
texts_final = {}
for key, first_lemma_df in texts_first_lemma.items():
    final_df = first_lemma_df.join(nawl_spark_df, first_lemma_df.lemma == nawl_spark_df.word, "left")
    texts_final[key + "_final"] = final_df

In [None]:
# summarizing emotions for the set of texts
from pyspark.sql import functions as F

def calculate_emotions_summary(final_df):
    aggregated_emotions = final_df.select(
        F.avg("mean Happiness").alias("avg_Happiness"),
        F.avg("mean Anger").alias("avg_Anger"),
        F.avg("mean Sadness").alias("avg_Sadness"),
        F.avg("mean Fear").alias("avg_Fear"),
        F.avg("mean Disgust").alias("avg_Disgust")
    ).first()  

    emotions = {
        "Happiness": aggregated_emotions["avg_Happiness"],
        "Anger": aggregated_emotions["avg_Anger"],
        "Sadness": aggregated_emotions["avg_Sadness"],
        "Fear": aggregated_emotions["avg_Fear"],
        "Disgust": aggregated_emotions["avg_Disgust"]
    }

    filtered_emotions = {emotion: value for emotion, value in emotions.items() if value is not None}

    dominant_emotion = max(filtered_emotions, key=filtered_emotions.get) if filtered_emotions else None

    return {
        "dominant_emotion": dominant_emotion,
        "emotion_values": filtered_emotions
    }

emotions_summary = {}

for key, final_df in texts_final.items():
    final_df.cache()
    emotion_summary = calculate_emotions_summary(final_df)
    emotions_summary[key] = emotion_summary
    print(f"Emotion summary for document {key}:")
    print(f"Dominant emotion: {emotion_summary['dominant_emotion']}")
    print(f"Average emotion values: {emotion_summary['emotion_values']}")
    print("-" * 50)

In [None]:
# renaming variables (for convenience)
emotions_summary_renamed = {}

old_fragment = "df_cleaned_exploded_joined_first_lemma_final"
new_fragment = "emotions"

for key, value in emotions_summary.items():
    new_key = key.replace(old_fragment, new_fragment)
    emotions_summary_renamed[new_key] = value

emotions_summary_renamed

In [None]:
# transferring emotions from dictionary to data frame
import pandas as pd
emotion_data = []

for doc, summary in emotions_summary_renamed.items():
    emotion_values = summary['emotion_values']
    emotion_data.append({
        'Document': doc,
        'Happiness': emotion_values['Happiness'],
        'Anger': emotion_values['Anger'],
        'Sadness': emotion_values['Sadness'],
        'Fear': emotion_values['Fear'],
        'Disgust': emotion_values['Disgust']
    })

emotion_df = pd.DataFrame(emotion_data)

In [None]:
# standardizing emotion values
from sklearn.preprocessing import StandardScaler
X = emotion_df.drop(columns=['Document'])
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)

In [None]:
# K-Means cluster analysis using the Elbow method
from sklearn.cluster import KMeans
import matplotlib.pyplot as plt

inertias = []
for k in range(1, 11):
    kmeans = KMeans(n_clusters=k, random_state=42)
    kmeans.fit(X_scaled)
    inertias.append(kmeans.inertia_)

plt.plot(range(1, 11), inertias)
plt.xlabel('Liczba klastrów (k)')
plt.ylabel('Inertia')
plt.title('Metoda Elbow')
plt.show()

In [None]:
# K-Means clustering for the selected number of clusters
k = 3  
kmeans = KMeans(n_clusters=k, random_state=42)
emotion_df['Cluster'] = kmeans.fit_predict(X_scaled)

In [None]:
# preview of clustering results
print(emotion_df[['Document', 'Cluster']])

In [None]:
# characteristics of individual clusters
cluster_means = emotion_df.groupby('Cluster').mean()
print(cluster_means)

In [None]:
# calculating Silhouette Score
from sklearn.metrics import silhouette_score
score = silhouette_score(X_scaled, kmeans.labels_)
print(f"Silhouette Score: {score}")

In [None]:
# Principal Component Analysis (PCA) and visualization of K-Means clustering results
from sklearn.decomposition import PCA
import matplotlib.pyplot as plt

pca = PCA(n_components=2)
X_pca = pca.fit_transform(X_scaled)

plt.scatter(X_pca[:, 0], X_pca[:, 1], c=kmeans.labels_, cmap='viridis')
plt.title('PCA - KMeans Clusters')
plt.show()

In [None]:
# adding articles titles to data frame
# dodanie tytułów artykułów do data frame

article_titles = [
    "To najgorszy ketchup, a Polacy go uwielbiają. Dietetyk ostrzega przed zakupem", 
    "Antycyklon Beata coraz bliżej Polski. Kolejne regiony skują się lodem [POGODA]", 
    "Cios dla Rosji. Armenia i USA podpisały porozumienie", 
    "Premier Australii żąda wyjaśnień od Rosji. 'Podejmiemy zdecydowane kroki'", 
    "Premier Szwecji: wyślemy trzy okręty i samolot rozpoznawczy do ochrony Bałtyku",
    "Tłum ludzi szuka skarbów nad Bałtykiem. Do sieci trafiły nagrania",
    "Czechy są gotowe zrezygnować z rosyjskiej ropy. 'Rosja nie może już nas szantażować'",
    "Wybuch i pożar w restauracji w Czechach. Są ofiary",
    "USA ograniczają eksport czipów AI. Polska krajem drugiej kategorii obok Mongolii i Nigerii",
    "Kolejna wygrana i wielki rekord Djokovicia! Serb pisze historię w Melbourne",
    "Arktyczna bomba uderzy w USA. Miliony ludzi zagrożone ekstremalnym mrozem",
    "Hamas zaakceptował projekt zawieszenia broni",
    "Skandal w Iławie. Jest reakcja komendanta wojewódzkiego policji. 'Haniebne zachowanie'",
    "Sankcje USA zadziałały. Indie zamykają porty dla rosyjskich tankowców",
    "Przez miesiąc jadła codziennie dwa jajka na śniadanie. Efekty były szybkie",
    "Śniadanie na długowieczność. Japońska dietetyczka robi je codziennie",
    "Eksperci mylili się co do jajek. Po latach prawda wyszła na jaw",
    "Policja aresztowała zawieszonego prezydenta Korei Płd. Służby wdarły się do rezydencji",
    "Policja w Seulu próbuje aresztować zawieszonego prezydenta. Pokonali pierwszy kordon",
    "Pożary w Los Angeles. Śledczy podali prawdopodobną przyczynę",
    "Coraz więcej obcokrajowców chce mieszkać w Polsce. Przodują obywatele Izraela",
    "Oto najlepsze licea i technika według rankingu Perspektyw 2025",
    "Rotterdam idealnym miejscem na tegoroczny city break",
    "Ewenement na skalę całej Europy. Ponad 90 proc. samochodów sprzedanych w tym kraju w 2024 r. stanowiły elektryki",
    "Dwaj najlepsi siatkarze świata w jednym klubie! Tomasz Fornal w duecie marzeń",
    "Żołnierze z Korei Północnej przebijają w tym Rosjan. Ekspert: to nowy poziom zagrożenia",
    "Węgry pozbawione prawa weta? Polskę czeka wyzwanie",
    "Włochy liczą na porozumienie Izraela z Hamasem. Są gotowe wysłać żołnierzy do Strefy Gazy",
    "Donald Tusk po rozmowach z Wołodymyrem Zełenskim. Mówi o 'bezwarunkowej przyjaźni'"
]
emotion_df['article_title'] = article_titles
print(emotion_df)

In [None]:
# assigning names to individual clusters, joining them to titles
emotion_map = {0: 'pozytywne', 1: 'niepokojące', 2: 'stresujące'}
emotion_df['Emotions'] = emotion_df['Cluster'].map(emotion_map)
print(emotion_df)

In [None]:
# presentation of final results (histogram below)
display(emotion_df[['article_title','Emotions']])

In [None]:
#visualisation with histogram
plt.hist(emotion_df['Emotions'], bins=3, color='yellow', edgecolor='black')

plt.xticks(rotation=45)

plt.title('Rozkład emocji w artykułach')
plt.ylabel('Liczba artykułów z dominującą daną emocją')
plt.xticks(rotation=45)
plt.tight_layout()

plt.show()