In [0]:
!wget https://proai-datasets.s3.eu-west-3.amazonaws.com/wikipedia.csv
from pyspark.sql import SparkSession

--2024-06-15 15:01:36--  https://proai-datasets.s3.eu-west-3.amazonaws.com/wikipedia.csv
Resolving proai-datasets.s3.eu-west-3.amazonaws.com (proai-datasets.s3.eu-west-3.amazonaws.com)... 3.5.225.18, 3.5.224.12
Connecting to proai-datasets.s3.eu-west-3.amazonaws.com (proai-datasets.s3.eu-west-3.amazonaws.com)|3.5.225.18|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1003477941 (957M) [text/csv]
Saving to: ‘wikipedia.csv.1’


2024-06-15 15:02:38 (15.7 MB/s) - ‘wikipedia.csv.1’ saved [1003477941/1003477941]



In [0]:
# Configurazione della sessione Spark con memoria aumentata e G1GC
spark = SparkSession.builder \
    .appName("TextClassification") \
    .config("spark.executor.memory", "4g") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memoryOverhead", "1g") \
    .config("spark.executor.extraJavaOptions", "-XX:+UseG1GC") \
    .config("spark.driver.extraJavaOptions", "-XX:+UseG1GC") \
    .getOrCreate()

In [0]:
# librerie necessarie per l'elaborazione del linguaggio naturale (NLP)
!pip install wordcloud nltk spacy

[43mNote: you may need to restart the kernel using dbutils.library.restartPython() to use updated packages.[0m
Collecting wordcloud
  Using cached wordcloud-1.9.3-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (511 kB)
Collecting nltk
  Using cached nltk-3.8.1-py3-none-any.whl (1.5 MB)
Collecting spacy
  Using cached spacy-3.7.5-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (6.6 MB)
Collecting regex>=2021.8.3
  Using cached regex-2024.5.15-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (775 kB)
Collecting tqdm
  Using cached tqdm-4.66.4-py3-none-any.whl (78 kB)
Collecting catalogue<2.1.0,>=2.0.6
  Using cached catalogue-2.0.10-py3-none-any.whl (17 kB)
Collecting typer<1.0.0,>=0.3.0
  Using cached typer-0.12.3-py3-none-any.whl (47 kB)
Collecting preshed<3.1.0,>=3.0.2
  Using cached preshed-3.0.9-cp310-cp310-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl (156 kB)
Collecting srsly<3.0.0,>=2.4.3
  Using cached sr

In [0]:
# modello linguistico "en_core_web_sm" per la lingua inglese utilizzando spaCy
!python3 -m spacy download en_core_web_sm
import re
import pandas as pd
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer
import nltk
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, regexp_replace, col, lower
from pyspark.sql.types import StringType
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, Tokenizer, HashingTF, IDF, StopWordsRemover
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

Collecting en-core-web-sm==3.7.1
  Using cached https://github.com/explosion/spacy-models/releases/download/en_core_web_sm-3.7.1/en_core_web_sm-3.7.1-py3-none-any.whl (12.8 MB)

[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip available: [0m[31;49m22.3.1[0m[39;49m -> [0m[32;49m24.0[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m
[38;5;2m✔ Download and installation successful[0m
You can now load the package via spacy.load('en_core_web_sm')


In [0]:
# Scarica risorse per nltk
nltk.download('stopwords')
nltk.download('wordnet')

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
[nltk_data] Downloading package wordnet to /root/nltk_data...
[nltk_data]   Package wordnet is already up-to-date!


True

In [0]:
# Carica il dataset
dataset = pd.read_csv('/databricks/driver/wikipedia.csv')
spark_df = spark.createDataFrame(dataset)
spark_df = spark_df.drop("Unnamed: 0").withColumnRenamed("categoria", "category")# Rimozione dei valori nulli e dei duplicati


In [0]:
spark_df = spark_df.dropna(subset=["summary", "documents", "category"])
spark_df = spark_df.dropDuplicates()

In [0]:
# Preprocessing del testo usando funzioni PySpark
def preprocess_dataframe(df, text_column):
    df = df.withColumn(text_column, regexp_replace(col(text_column), r'\W', ' '))
    df = df.withColumn(text_column, regexp_replace(col(text_column), r'\d', ' '))
    df = df.withColumn(text_column, lower(col(text_column)))
    return df

In [0]:
spark_df = preprocess_dataframe(spark_df, "summary")
spark_df = preprocess_dataframe(spark_df, "documents")

In [0]:
# Tokenizer
tokenizer_summary = Tokenizer(inputCol="summary", outputCol="summary_tokens")
tokenizer_documents = Tokenizer(inputCol="documents", outputCol="documents_tokens")

In [0]:
# Rimuovi parole di stop
stopwords = StopWordsRemover.loadDefaultStopWords("english")
remover_summary = StopWordsRemover(inputCol="summary_tokens", outputCol="filtered_summary_tokens", stopWords=stopwords)
remover_documents = StopWordsRemover(inputCol="documents_tokens", outputCol="filtered_documents_tokens", stopWords=stopwords)


In [0]:
# TF-IDF per summary
hashingTF_summary = HashingTF(inputCol="filtered_summary_tokens", outputCol="raw_summary_features")
idf_summary = IDF(inputCol="raw_summary_features", outputCol="summary_features")



In [0]:
# TF-IDF per documents
hashingTF_documents = HashingTF(inputCol="filtered_documents_tokens", outputCol="raw_documents_features")
idf_documents = IDF(inputCol="raw_documents_features", outputCol="documents_features")


In [0]:
# Naive Bayes
nb_summary = NaiveBayes(featuresCol="summary_features", labelCol="label")
nb_documents = NaiveBayes(featuresCol="documents_features", labelCol="label")

In [0]:
# Pipeline per summary
pipeline_summary = Pipeline(stages=[
    StringIndexer(inputCol="category", outputCol="label"),
    tokenizer_summary,
    remover_summary,
    hashingTF_summary,
    idf_summary,
    nb_summary
])



In [0]:
# Pipeline per documents
pipeline_documents = Pipeline(stages=[
    StringIndexer(inputCol="category", outputCol="label"),
    tokenizer_documents,
    remover_documents,
    hashingTF_documents,
    idf_documents,
    nb_documents
])

In [0]:
# Suddivisione del dataset in training e test set (stessa suddivisione per entrambi i modelli)
train, test = spark_df.randomSplit([0.8, 0.2], seed=42)

In [0]:
# Addestramento del modello per summary
model_summary = pipeline_summary.fit(train)
# Addestramento del modello per documents
model_documents = pipeline_documents.fit(train)

In [0]:
# Funzione per valutare il modello con diverse metriche
def evaluate_model(predictions, label_col="label", prediction_col="prediction"):
    metrics = ["accuracy", "f1", "weightedPrecision", "weightedRecall"]
    evaluator = MulticlassClassificationEvaluator(labelCol=label_col, predictionCol=prediction_col)

    results = {}
    for metric in metrics:
        evaluator.setMetricName(metric)
        results[metric] = evaluator.evaluate(predictions)
    
    return results

In [0]:
# Predizione e valutazione per summary
predictions_summary = model_summary.transform(test)
results_summary = evaluate_model(predictions_summary)
print("Risultati per summary:")
for metric, value in results_summary.items():
    print(f"{metric}: {value}")

Risultati per summary:
accuracy: 0.8224261233301848
f1: 0.8212939356492959
weightedPrecision: 0.8255882425644988
weightedRecall: 0.8224261233301848


In [0]:
# Predizione e valutazione per documents
predictions_documents = model_documents.transform(test)
results_documents = evaluate_model(predictions_documents)
print("Risultati per documents:")
for metric, value in results_documents.items():
    print(f"{metric}: {value}")

Risultati per documents:
accuracy: 0.8222237214950749
f1: 0.8215391824531326
weightedPrecision: 0.8272954219300815
weightedRecall: 0.8222237214950749
