### PROGETTO BIG DATA

Importazione librerie e download

In [0]:
%pip install wordcloud nltk pandas matplotlib seaborn

In [0]:
%sh
apt-get update && apt-get install -y python3-pil.imagetk

In [0]:
%pip install --upgrade pip

In [0]:
%pip install --upgrade pillow

In [0]:
%pip install wordcloud==1.9.2 nltk==3.8.1

In [0]:
%%sh
apt-get update
apt-get install -y fonts-dejavu
fc-cache -f -v

In [0]:
import os
# Verifica la directory corrente
print("Directory corrente:", os.getcwd())
# Verifica spazio disponibile
import shutil
total, used, free = shutil.disk_usage("/")
print(f"Spazio libero: {free // (2**30)} GB")

In [0]:
import requests
import pandas as pd
import numpy as np
from io import StringIO
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.model_selection import train_test_split
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.multiclass import OneVsRestClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import classification_report, confusion_matrix
from wordcloud import WordCloud
import nltk
from nltk.corpus import stopwords
from collections import Counter
from typing import Tuple, Optional, List, Dict
import string  # Per string.punctuation
import warnings
warnings.filterwarnings('ignore')

In [0]:
# Funzione per il download del dataset
def download_in_chunks(url, chunk_size=1024):
    response = requests.get(url, stream=True)
    if response.status_code == 200:
        chunks = []
        for chunk in response.iter_content(chunk_size=chunk_size, decode_unicode=True):
            if chunk:
                chunks.append(chunk)
        return ''.join(chunks)
    return None

# URL del dataset
url = "https://proai-datasets.s3.eu-west-3.amazonaws.com/wikipedia.csv"

# Download e caricamento dati
try:
    data = download_in_chunks(url)
    if data:
        df = pd.read_csv(StringIO(data))
        print("Dataset caricato con successo!")
        print("Shape del dataset:", df.shape)
    else:
        print("Errore nel download del file")
except Exception as e:
    print(f"Errore: {e}")

Dataset caricato con successo!
Shape del dataset: (153232, 5)


In [0]:
def initialize_stopwords(stop_words: set = None) -> set:
    """Initialize analyzer with optional custom stopwords"""
    try:
        nltk.download('stopwords', quiet=True)
        return stop_words or set(stopwords.words('english')).union({
            'because', 'where', 'which', 'when',
            'being', 'having', 'making', 'saying', 'every',
            'everyone', 'everything', 'several'
        })
    except Exception as e:
        print(f"Warning: Could not download NLTK stopwords. Using basic stopwords. Error: {e}")
        return stop_words or {
            'because', 'where', 'which', 'when',
            'being', 'having', 'making', 'saying', 'every',
            'everyone', 'everything', 'several'
        }

def clean_text(text: str) -> str:
    """Clean and normalize text"""
    if pd.isna(text):
        return ""
    return str(text).lower().translate(str.maketrans('', '', string.punctuation))

def create_category_analysis(texts: pd.Series, title: str, stop_words: set) -> Tuple[Optional[plt.Figure], Optional[plt.Figure]]:
    """Create wordcloud and frequency histogram for a category"""
    cleaned_text = ' '.join(clean_text(text) for text in texts if pd.notna(text))
    
    if not cleaned_text.strip():
        print(f"No valid text found for category {title}")
        return None, None
    
    # Word frequency analysis
    words = [w for w in cleaned_text.split() if w not in stop_words and len(w) >= 5]
    word_freq = Counter(words)
    
    try:
        # Create wordcloud
        wc = WordCloud(
            width=1200, height=600,
            background_color='white',
            stopwords=stop_words,
            max_words=100,
            prefer_horizontal=0.7,
            collocations=False,
            random_state=42
        ).generate(' '.join(words))
        
        # Create two figures
        fig_cloud = plt.figure(figsize=(15, 8))
        ax_cloud = fig_cloud.add_subplot(111)
        ax_cloud.imshow(wc, interpolation='bilinear')
        ax_cloud.axis('off')
        ax_cloud.set_title(f'Word Cloud - {title}', size=16, pad=20)
        
        # Create histogram
        fig_hist = plt.figure(figsize=(12, 6))
        ax_hist = fig_hist.add_subplot(111)
        words_freq = pd.DataFrame(word_freq.most_common(20), columns=['word', 'frequency'])
        sns.barplot(data=words_freq, x='frequency', y='word', ax=ax_hist)
        ax_hist.set_title(f'Top 20 Most Frequent Words - {title}')
        
        plt.tight_layout()
        return fig_cloud, fig_hist
        
    except Exception as e:
        print(f"Error creating visualizations for {title}: {str(e)}")
        return None, None

def compute_basic_metrics(df: pd.DataFrame) -> pd.DataFrame:
    """Compute basic metrics for analysis"""
    df = df.copy()
    df['word_count'] = df['documents'].str.split().str.len()
    return df

def get_basic_statistics(df: pd.DataFrame) -> pd.DataFrame:
    """Calculate basic statistics by category"""
    stats = df.groupby('categoria').agg({
        'categoria': 'count',  # numero di articoli
        'word_count': ['mean', 'min', 'max']  # statistiche sulle parole
    })
    
    # Rinomina le colonne per maggiore chiarezza
    stats.columns = ['Numero articoli', 'Media parole', 'Minimo parole', 'Massimo parole']
    return stats.round(2)

def print_statistics(stats: pd.DataFrame):
    """Print statistics in a readable format"""
    print("\nANALISI STATISTICA PER CATEGORIA:")
    print("-" * 50)
    
    for categoria in stats.index:
        print(f"\nCategoria: {categoria}")
        print(f"  • Numero di articoli: {stats.loc[categoria, 'Numero articoli']}")
        print(f"  • Media parole per articolo: {stats.loc[categoria, 'Media parole']:.1f}")
        print(f"  • Articolo più corto: {stats.loc[categoria, 'Minimo parole']:.0f} parole")
        print(f"  • Articolo più lungo: {stats.loc[categoria, 'Massimo parole']:.0f} parole")

def create_category_distribution(df: pd.DataFrame) -> plt.Figure:
    """Create distribution plot of articles by category"""
    fig = plt.figure(figsize=(12, 6))
    ax = fig.add_subplot(111)
    
    cat_counts = df['categoria'].value_counts()
    sns.barplot(x=cat_counts.index, y=cat_counts.values, ax=ax)
    ax.set_title('Distribuzione Articoli per Categoria')
    ax.set_xlabel('Categoria')
    ax.set_ylabel('Numero di Articoli')
    ax.tick_params(axis='x', rotation=45)
    
    plt.tight_layout()
    return fig

def analisi_esplorativa(df: pd.DataFrame) -> Dict:
    """Main function to perform basic exploratory analysis"""
    # Initialize stopwords
    stop_words = initialize_stopwords()
    
    # Compute metrics
    df = compute_basic_metrics(df)
    
    # Get and print basic statistics
    statistics = get_basic_statistics(df)
    print_statistics(statistics)
    
    # Create distribution plot
    dist_plot = create_category_distribution(df)
    
    # Create wordclouds and histograms for each category
    visualizations = {}
    for categoria in df['categoria'].unique():
        texts = df[df['categoria'] == categoria]['documents']
        wordcloud, histogram = create_category_analysis(texts, categoria, stop_words)
        visualizations[categoria] = {
            'wordcloud': wordcloud,
            'histogram': histogram
        }
    
    return {
        'visualizations': visualizations,
        'statistics': statistics,
        'distribution': dist_plot
    }

# Usage
risultati = analisi_esplorativa(df)
plt.show()  # To display all plots

Fine analisi esplorativa

In [0]:
from pyspark.ml.feature import Tokenizer, HashingTF, IDF, StringIndexer
from pyspark.ml import Pipeline
from pyspark.sql import functions as F

# DataFrame da Pandas a Spark 
spark_df = spark.createDataFrame(df)

def prepara_dati(spark_df):
    """
    Preparazione dei dati per il modello di classificazione usando PySpark
    """
    print("\n=== Preparazione dei Dati ===")
    
    # Verifichiamo i valori nulli
    print("\nValori nulli nel dataset:")
    null_counts = spark_df.select([F.sum(F.col(c).isNull().cast("int")).alias(c) for c in spark_df.columns])
    null_counts.show()
    
    # Pulizia dei dati
    df_clean = spark_df.dropna(subset=['documents', 'categoria'])
    
    rows_before = spark_df.count()
    rows_after = df_clean.count()
    print(f"\nRighe rimosse per valori nulli: {rows_before - rows_after}")
    
    # Preparazione delle features
    # Pipeline di preprocessing del testo
    tokenizer = Tokenizer(inputCol="documents", outputCol="words")
    hashingTF = HashingTF(inputCol="words", outputCol="raw_features", numFeatures=5000)
    idf = IDF(inputCol="raw_features", outputCol="features", minDocFreq=2)
    
    # Indicizzazione delle categorie
    indexer = StringIndexer(inputCol="categoria", outputCol="label")
    
    # Pipeline
    pipeline = Pipeline(stages=[
        tokenizer,
        hashingTF,
        idf,
        indexer
    ])
    
    # Split dei dati
    train_df, test_df = df_clean.randomSplit([0.8, 0.2], seed=42)
    
    print("\nDimensioni dei set di dati prima della trasformazione:")
    print(f"Training set: {train_df.count()} righe")
    print(f"Test set: {test_df.count()} righe")
    
    # Fit della pipeline
    print("\nApplico la pipeline di trasformazione...")
    pipeline_model = pipeline.fit(train_df)
    
    # Transform dei dati
    train_transformed = pipeline_model.transform(train_df)
    test_transformed = pipeline_model.transform(test_df)
    
    # Selezione delle sole colonne necessarie per il training
    final_columns = ["label", "features"]
    train_final = train_transformed.select(final_columns)
    test_final = test_transformed.select(final_columns)
    
    print("\nDimensioni finali dei set di dati:")
    print(f"Training set: {train_final.count()} righe")
    print(f"Test set: {test_final.count()} righe")
    
    return train_final, test_final, pipeline_model

# prima in Spark DataFrame e poi preparazione dei dati
spark_df = spark.createDataFrame(df)
train_df, test_df, pipeline_model = prepara_dati(spark_df)



In [0]:
from pyspark.ml.feature import Tokenizer, HashingTF, IDF, StringIndexer, VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql import functions as F
from pyspark.sql.types import FloatType

def crea_modello_classificazione(spark_df):
    """
    Creazione e valutazione di un modello di classificazione usando sia sommario che testo completo
    """
    print("\n=== Creazione Modello di Classificazione ===")
    
    # Verifica dati
    print("\nValori nulli nel dataset:")
    null_counts = spark_df.select([F.sum(F.col(c).isNull().cast("int")).alias(c) 
                                 for c in ['summary', 'documents', 'categoria']])
    null_counts.show()
    
    # Pulizia dati
    df_clean = spark_df.dropna(subset=['summary', 'documents', 'categoria'])
    
    rows_before = spark_df.count()
    rows_after = df_clean.count()
    print(f"\nRighe rimosse per valori nulli: {rows_before - rows_after}")
    
    # Pipeline di preprocessing
    # sommario
    summary_tokenizer = Tokenizer(inputCol="summary", outputCol="summary_words")
    summary_hashingTF = HashingTF(inputCol="summary_words", outputCol="summary_raw_features", numFeatures=2000)
    summary_idf = IDF(inputCol="summary_raw_features", outputCol="summary_features", minDocFreq=2)
    
    # testo completo
    doc_tokenizer = Tokenizer(inputCol="documents", outputCol="doc_words")
    doc_hashingTF = HashingTF(inputCol="doc_words", outputCol="doc_raw_features", numFeatures=5000)
    doc_idf = IDF(inputCol="doc_raw_features", outputCol="doc_features", minDocFreq=2)
    
    # Combina le features
    assembler = VectorAssembler(inputCols=["summary_features", "doc_features"], outputCol="features")
    
    # Indicizzazione delle categorie
    indexer = StringIndexer(inputCol="categoria", outputCol="label")
    
    # Modello di classificazione
    lr = LogisticRegression(maxIter=20, elasticNetParam=0.5)
    
    # Pipeline completa
    pipeline = Pipeline(stages=[
        summary_tokenizer,
        summary_hashingTF,
        summary_idf,
        doc_tokenizer,
        doc_hashingTF,
        doc_idf,
        assembler,
        indexer,
        lr
    ])
    
    # Split dei dati
    train_df, test_df = df_clean.randomSplit([0.8, 0.2], seed=42)
    
    print("\nDimensioni dei set di dati:")
    print(f"Training set: {train_df.count()} righe")
    print(f"Test set: {test_df.count()} righe")
    
    # Training del modello
    print("\nAddestramento del modello...")
    model = pipeline.fit(train_df)
    
    # Valutazione sul test set
    print("\nValutazione del modello...")
    predictions = model.transform(test_df)
    
    # Calcolo metriche
    evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction")
    
    # Calcolo accuracy
    accuracy = evaluator.evaluate(predictions)
    print(f"\nAccuracy sul test set: {accuracy:.4f}")
    
    # Calcolo precision e recall per ogni categoria
    print("\nMetriche per categoria:")
    label_dict = {float(idx): cat for idx, cat in enumerate(model.stages[-2].labels)}
    
    # Calcolo della confusion matrix
    conf_matrix = (predictions.groupBy("label", "prediction").count().toPandas())
    
    # Calcolo metriche per classe
    for label in label_dict:
        true_pos = conf_matrix[
            (conf_matrix.label == label) & 
            (conf_matrix.prediction == label)]['count'].sum()
        
        false_pos = conf_matrix[
            (conf_matrix.label != label) & 
            (conf_matrix.prediction == label)]['count'].sum()
        
        false_neg = conf_matrix[
            (conf_matrix.label == label) & 
            (conf_matrix.prediction != label)]['count'].sum()
        
        precision = true_pos / (true_pos + false_pos) if (true_pos + false_pos) > 0 else 0
        recall = true_pos / (true_pos + false_neg) if (true_pos + false_neg) > 0 else 0
        f1 = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0
        
        print(f"\nCategoria: {label_dict[label]}")
        print(f"Precision: {precision:.4f}")
        print(f"Recall: {recall:.4f}")
        print(f"F1-Score: {f1:.4f}")
    
    return model, accuracy, predictions

# Esecuzione
spark_df = spark.createDataFrame(df)
model, accuracy, predictions = crea_modello_classificazione(spark_df)

# Per predizioni su nuovi dati
def predici_categoria(model, testo, sommario):
    """
    Predice la categoria di un nuovo articolo
    """
    # Creare un DataFrame con il nuovo testo
    nuovo_df = spark.createDataFrame(
        [(sommario, testo)],
        ["summary", "documents"]
    )
    
    # Applicare il modello
    prediction = model.transform(nuovo_df)
    
    # Ottenere la categoria predetta
    categoria_idx = prediction.select("prediction").first()[0]
    categoria = model.stages[-2].labels[int(categoria_idx)]
    
    return categoria

# categoria_predetta = predici_categoria(model, "testo", "sommario")

Il modello di classificazione automatica degli articoli Wikimedia ha raggiunto risultati eccellenti, con un'accuratezza superiore al 91%. La quasi totalità delle categorie tematiche viene classificata correttamente con F1-Score superiori al 90%, ad eccezione di "medicine" e "research" che si attestano intorno al 75%. Questo suggerisce che il modello è affidabile e pronto per essere utilizzato

In [0]:
import pandas as pd
import numpy as np
from collections import Counter
import matplotlib.pyplot as plt
import seaborn as sns
import nltk
from nltk.tokenize import word_tokenize
import re

def analisi_contenuti_wikimedia(df):
    """
    Analisi dei contenuti Wikimedia:
    - Densità articoli per categoria
    - Parole più frequenti per categoria
    """
    print("\n=== ANALISI CONTENUTI WIKIMEDIA ===")
    
    # 1. Densità articoli per categoria
    print("\n1. Densità degli articoli")
    
    densita = df['categoria'].value_counts()
    percentuali = df['categoria'].value_counts(normalize=True) * 100
    
    print("\nDistribuzione degli articoli per categoria:")
    for cat in densita.index:
        print(f"  • {cat}: {densita[cat]} articoli ({percentuali[cat]:.1f}%)")
    
    # Visualizzazione della densità
    plt.figure(figsize=(12, 6))
    sns.barplot(x=densita.values, y=densita.index)
    plt.title('Densità degli articoli per categoria')
    plt.xlabel('Numero di articoli')
    plt.tight_layout()
    
    # 2. Analisi parole frequenti
    # Parole da escludere
    stop_words = {
        'the', 'and', 'for', 'that', 'was', 'with', 'this', 'from', 'his', 'her', 
        'they', 'are', 'were', 'their', 'one', 'all', 'had', 'but', 'not', 'what',
        'when', 'who', 'which', 'she', 'would', 'been', 'will', 'there', 'more',
        'has', 'link', 'links', 'external', 'reference', 'references', 'http', 'https',
        'www', 'com', 'org', 'ref', 'cite', 'cited', 'page', 'pages', 'retrieved',
        'volume', 'edition', 'press', 'published'
    }
    
    def pulisci_e_conta_parole(testi):
        """Pulisce e conta le parole più frequenti (max una per articolo)"""
        try:
            # Conteggio in quanti articoli appare ogni parola
            word_in_articles = Counter()
            
            for testo in testi.dropna():
                # Convertire in minuscolo
                testo = testo.lower()
                
                # Rimuovere caratteri speciali e numeri
                testo_pulito = re.sub(r'[^\w\s]', ' ', testo)
                testo_pulito = re.sub(r'\d+', ' ', testo_pulito)
                
                # Tokenizzazione
                words = word_tokenize(testo_pulito)
                
                # Creare set di parole uniche per questo articolo
                article_words = {
                    w for w in words 
                    if len(w) >= 5  # esclude parole troppo corte
                    and w not in stop_words  # esclude stop words
                    and not any(c.isdigit() for c in w)  # esclude parole con numeri
                }
                
                # Aggiornare il conteggio
                word_in_articles.update(article_words)
            
            return word_in_articles.most_common(15)
            
        except Exception as e:
            print(f"Errore nel conteggio parole: {str(e)}")
            return []
    
    print("\n2. Tendenze linguistiche per categoria")
    
    # Analisi per ogni categoria
    for categoria in df['categoria'].unique():
        testi = df[df['categoria'] == categoria]['documents']
        n_articoli = len(testi)
        parole_freq = pulisci_e_conta_parole(testi)
        
        if parole_freq:
            print(f"\nParole più frequenti in {categoria} ({n_articoli} articoli):")
            for parola, freq in parole_freq:
                perc = (freq / n_articoli) * 100
                print(f"  • {parola:<15} {freq:>6} occorrenze ({perc:>5.1f}% degli articoli)")
    
    return {
        'densita': densita,
        'percentuali': percentuali
    }

# Esecuzione dell'analisi
risultati = analisi_contenuti_wikimedia(df)
plt.show()

L'analisi mostra una distribuzione quasi uniforme degli articoli tra le categorie, con una leggera predominanza della categoria "politics". Per quanto riguarda le tendenze linguistiche, ogni categoria mostra parole chiave fortemente caratterizzanti: ad esempio "power" e "plant" sono presenti in oltre il 60% degli articoli di energia, "tennis" domina la categoria sport, e "university" e "research" sono molto frequenti nella categoria ricerca (oltre l'80% degli articoli)