In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, avg, max, min, coalesce
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
import json
from pyspark.sql import functions as F
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.clustering import KMeans
from pyspark import SparkFiles


spark = SparkSession.builder \
    .appName("Valmet-Notebook") \
    .master("spark://spark-master:7077") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .getOrCreate()

print("‚úÖ Spark Session gi√† attiva")
print(f"Spark Version: {spark.version}")
print(f"App Name: {spark.sparkContext.appName}")
print(f"Master: {spark.sparkContext.master}")

‚úÖ Spark Session gi√† attiva
Spark Version: 3.5.7
App Name: Valmet-Notebook
Master: spark://spark-master:7077


In [4]:
df = spark.read.csv(
    SparkFiles.get("/opt/spark/data/20251204_bronze.csv"),
    header=True,
    inferSchema=True
)

                                                                                

In [5]:
df.show(5)
df.printSchema()


+------------------+----------+-----------+---------------------+-----------+--------------------+--------------------+------------+-------------+-------------------+-------------------+----------+
|ArrayIndex_metrics|custumerID|messageToID|EventProcessedUtcTime|PartitionId|EventEnqueuedUtcTime|                name|       value|    timestamp|     timestamp_true|formatted_timestamp| date_only|
+------------------+----------+-----------+---------------------+-----------+--------------------+--------------------+------------+-------------+-------------------+-------------------+----------+
|                 0|  10048224|    2717272| 2025-12-04 07:50:...|          1|2025-12-04 07:50:...|CONV.LS1.Status.P...|        91.8|1764834618282|2025-12-04 07:50:18|2025-12-04 07:50:18|2025-12-04|
|                 2|  10048224|    2717272| 2025-12-04 07:50:...|          1|2025-12-04 07:50:...|CONV.LS1.Status.P...|       117.0|1764834618282|2025-12-04 07:50:18|2025-12-04 07:50:18|2025-12-04|
|         

In [6]:
json_path = "/opt/spark/data/metrics_name_mapping_dict.json"
with open(json_path, 'r') as f:
    mapping_dict = json.load(f)

In [10]:
# Trasformiamo: { "Padre": ["Figlio1", "Figlio2"] }
# In: [ ("Figlio1", "Padre"), ("Figlio2", "Padre") ]
flat_mapping_data = []

for normalized_name, variations_list in mapping_dict.items():
    for raw_variant in variations_list:
        # Creiamo una riga per ogni variante che punta al nome normalizzato
        flat_mapping_data.append((raw_variant, normalized_name))

# Definiamo lo schema
schema_mapping = StructType([
    StructField("raw_name_map", StringType(), True),       # Questo deve matchare il dataset (le varianti)
    StructField("normalized_name_map", StringType(), True) # Questo √® il nome finale pulito (la chiave del json)
])

# Creiamo il DataFrame di mapping corretto
df_mapping = spark.createDataFrame(flat_mapping_data, schema=schema_mapping)

print("Tabella di Mapping Corretta (Esempio):")
df_mapping.show(5, truncate=False)

# Se nel dataset (df.name) trovi una variante (df_mapping.raw_name_map),
# prendi il suo normalized_name_map.
df_normalized = df.join(
    df_mapping,
    df.name == df_mapping.raw_name_map,
    "left"
)

# Se la join ha successo, prendi il nome normalizzato, altrimenti tieni quello originale
df_final = df_normalized.withColumn(
    "name_clean", 
    coalesce(col("normalized_name_map"), col("name"))
).drop("raw_name_map", "normalized_name_map") # Rimuoviamo le colonne del mapping

# Sostituiamo la colonna name vecchia con quella pulita
df_final = df_final.withColumn("name", col("name_clean")).drop("name_clean")

print("Dataset Normalizzato Correttamente:")
df_final.show(10, truncate=False)

Tabella di Mapping Corretta (Esempio):
+----------------------------------------------------+----------------------------------------------------+
|raw_name_map                                        |normalized_name_map                                 |
+----------------------------------------------------+----------------------------------------------------+
|CONV.LINE.Status.Monitoring.Process.Speed           |CONV.LINE.Status.Monitoring.Process.Speed           |
|CONV.LINE.Status.Speed                              |CONV.LINE.Status.Monitoring.Process.Speed           |
|CONV.LINE.Status.State.Speed                        |CONV.LINE.Status.Monitoring.Process.Speed           |
|CONV.LINE.Statistics.Production.Log.ProducedNotReset|CONV.LINE.Statistics.Production.Log.ProducedNotReset|
|CONV.REW.Statistics.Production.LogProduced.NotReset |CONV.LINE.Statistics.Production.Log.ProducedNotReset|
+----------------------------------------------------+-------------------------------------------

In [8]:

# Raggruppiamo per 'name' (che ora √® pulito)
df_analysis = df_final.groupBy("name").agg(
    F.count("value").alias("count"),            # Quanti dati abbiamo?
    
    # Statistiche di Base
    F.round(F.min("value"), 2).alias("min_val"),
    F.round(F.max("value"), 2).alias("max_val"),
    F.round(F.avg("value"), 2).alias("avg_val"),
    F.round(F.stddev("value"), 2).alias("std_dev"),
    
    # Analisi Temporale (Inizio e Fine rilevazione)
    F.min("timestamp_true").alias("start_time"),
    F.max("timestamp_true").alias("end_time"),
    
    # CALCOLO TREND (Correlation)
    # Calcola se 'value' sale o scende al passare del 'timestamp'.
    # Restituisce un numero tra -1 (Discesa) e +1 (Salita).
    F.round(F.corr("timestamp", "value"), 3).alias("trend_corr")
)

df_report = df_analysis.withColumn("trend_desc",
    F.when(F.col("count") < 2, "Dati Insufficienti")      # Se c'√® solo 1 punto, non c'√® trend
    .when(F.col("std_dev") == 0, "Costante (=)")          # Se non cambia mai
    .when(F.col("trend_corr") > 0.5, "Crescente ‚ÜóÔ∏è")      # Forte salita
    .when(F.col("trend_corr") < -0.5, "Decrescente ‚ÜòÔ∏è")   # Forte discesa
    .otherwise("Fluttuante/Stabile ‚û°Ô∏è")                   # Oscilla o cambia poco
).orderBy(F.desc("count"))

# 3. VISUALIZZAZIONE REPORT
print("Report Analisi Metriche:")
df_report.select(
    "name", "count", "min_val", "max_val", "avg_val", "trend_desc"
).show(1000, truncate=False)

Report Analisi Metriche:




+----------------------------------------------------------+-----+-----------+------------+--------------+---------------------+
|name                                                      |count|min_val    |max_val     |avg_val       |trend_desc           |
+----------------------------------------------------------+-----+-----------+------------+--------------+---------------------+
|CONV.LS1.Status.Product.Log.CuttingLenght                 |25631|91.8       |217.3       |135.7         |Fluttuante/Stabile ‚û°Ô∏è|
|CONV.LINE.Statistics.Production.Log.ProducedNotReset      |25631|8460934.0  |1.4894469E7 |1.161117172E7 |Fluttuante/Stabile ‚û°Ô∏è|
|CONV.LS1.Status.Product.Log.Diameter                      |25631|115.0      |200.0       |144.29        |Fluttuante/Stabile ‚û°Ô∏è|
|CONV.LS1.Statistics.Monitoring.Blade.RemainingPercentage  |25631|37.0       |83.0        |58.42         |Fluttuante/Stabile ‚û°Ô∏è|
|CONV.REW.Status.Product.Core.Diameter                     |25631|46.5       |50.

                                                                                

In [9]:
print("=== ANALISI DEGLI STATI OPERATIVI (CLUSTERING PER SINGOLA METRICA) ===\n")

# 1. Otteniamo la lista di tutte le metriche uniche
unique_metrics = [row['name'] for row in df_final.select("name").distinct().collect()]

for metric_name in unique_metrics:
    
    # 2. Filtro iniziale per nome
    df_single_metric = df_final.filter(F.col("name") == metric_name)
    
    # Rimuoviamo NULL E ANCHE NaN
    # Convertiamo prima in Double per essere sicuri che isnan funzioni
    df_clean = df_single_metric.withColumn("value", F.col("value").cast(DoubleType())) \
        .filter(
            F.col("value").isNotNull() & 
            (~F.isnan(F.col("value")))
        )
    
    # Cache per velocizzare dato che usiamo questo df pi√π volte (count e fit)
    df_clean.cache()
    
    # Controllo: abbiamo abbastanza dati PULITI?
    data_count = df_clean.count()
    if data_count < 3:
        df_clean.unpersist() # Liberiamo memoria
        continue

    print(f"üîπ Analisi Metrica: {metric_name} ({data_count} righe)")

    try:
        # 3. Prepariamo il dato 'value'
        assembler = VectorAssembler(inputCols=["value"], outputCol="features")
        df_vec = assembler.transform(df_clean)

        # 4. Applichiamo K-Means (k=3)
        kmeans = KMeans(k=3, seed=1)
        model = kmeans.fit(df_vec)
        
        # 5. Estraiamo i CENTRI (Valori tipici)
        centers = model.clusterCenters()
        # Prendi il valore scalare dal vettore del centroide
        centers_values = sorted([float(c[0]) for c in centers])
        centers_str = [f"{v:.2f}" for v in centers_values]
        
        print(f"   üìä Stati rilevati: {centers_str}")
        
        # 6. Distribuzione
        predictions = model.transform(df_vec)
        summary = predictions.groupBy("prediction").agg(
            F.count("value").alias("count"),
            F.round(F.avg("value"), 2).alias("avg_val")
        ).orderBy("avg_val")
        
        summary.show(truncate=False)
        print("-" * 60)
        
    except Exception as e:
        print(f"‚ùå Errore processando {metric_name}: {e}")
        print("-" * 60)
    
    # Pulizia finale della memoria per questo ciclo
    df_clean.unpersist()

=== ANALISI DEGLI STATI OPERATIVI (CLUSTERING PER SINGOLA METRICA) ===



                                                                                

üîπ Analisi Metrica: CONV.LS1.Status.Product.Log.CuttingLenght (25631 righe)


25/12/12 08:54:56 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS


   üìä Stati rilevati: ['91.80', '97.52', '216.51']
+----------+-----+-------+
|prediction|count|avg_val|
+----------+-----+-------+
|0         |8470 |91.8   |
|2         |8529 |97.52  |
|1         |8632 |216.51 |
+----------+-----+-------+

------------------------------------------------------------
üîπ Analisi Metrica: CONV.LINE.Statistics.Production.Log.ProducedNotReset (25631 righe)


ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/lib/python3.12/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/socket.py", line 720, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt


KeyboardInterrupt: 