In [0]:
from pyspark.sql import functions as F
from pyspark.sql.functions import (
    count, when, col, to_timestamp, hour, minute, date_format,
    window, sum, avg, min, max, round as pyspark_round, lag, stddev
)
from pyspark.sql.types import DoubleType, IntegerType
from pyspark.sql.window import Window

# LETTURA E PRE-PROCESSING 

df = spark.read.table("default.dati_borsa_mese")

# Rimuovo i valori nulli e converto i tipi di dati
df_cleaned = (
    df.dropna(subset=["price", "volume"])
    .withColumn("timestamp", to_timestamp(col("timestamp"), "yyyy-MM-dd'T'HH:mm:ss.SSSXXX"))
    .withColumn("price", col("price").cast(DoubleType()))
    .withColumn("volume", col("volume").cast(IntegerType()))
    .withColumn("data", date_format(col("timestamp"), "yyyy-MM-dd"))
    .withColumn("ora", hour(col("timestamp")))
    .withColumn("minuto", minute(col("timestamp")))
)
print(f" Pre-processing completato. Righe totali: {df_cleaned.count()}")

# Visualizzazione prime 10 righe per capire la tabella
display(df_cleaned.limit(10))

 Pre-processing completato. Righe totali: 64386


timestamp,symbol,price,volume,trade_id,side,data,ora,minuto
2025-06-02T09:00:00.000Z,TSLA,344.22,1197,1,buy,2025-06-02,9,0
2025-06-02T09:00:00.000Z,AAPL,203.39,1147,2,buy,2025-06-02,9,0
2025-06-02T09:00:00.000Z,MSFT,463.08,3013,3,buy,2025-06-02,9,0
2025-06-02T09:00:00.000Z,SGLD,323.67,881,4,sell,2025-06-02,9,0
2025-06-02T09:00:00.000Z,SPY,596.03,3017,5,sell,2025-06-02,9,0
2025-06-02T09:00:00.000Z,META,666.71,1258,6,buy,2025-06-02,9,0
2025-06-02T09:01:00.000Z,TSLA,344.25,1175,7,sell,2025-06-02,9,1
2025-06-02T09:01:00.000Z,AAPL,203.42,1151,8,buy,2025-06-02,9,1
2025-06-02T09:01:00.000Z,MSFT,463.04,3049,9,buy,2025-06-02,9,1
2025-06-02T09:01:00.000Z,SGLD,323.57,925,10,sell,2025-06-02,9,1


In [0]:
# AGGREGAZIONI TEMPORALI (5min, 30min, 1h)

def create_time_window_aggregation(df, window_duration, table_name):
    print(f"Creating {table_name}...")
    df_agg = (
        df.groupBy("symbol", window(col("timestamp"), window_duration))
        .agg(
            pyspark_round(avg("price"), 2).alias("avg_price"),
            pyspark_round(min("price"), 2).alias("min_price"),
            pyspark_round(max("price"), 2).alias("max_price"),
            sum("volume").alias("total_volume"),
            count("price").alias("num_trades")
        )
        .withColumn("window_start", col("window.start"))
        .drop("window")
        .withColumn("data", date_format(col("window_start"), "yyyy-MM-dd"))
        .withColumn("ora", hour(col("window_start")))
        .withColumn("minuto", minute(col("window_start")))
    )
    # Salvataggio
    df_agg.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable(f"default.{table_name}")
    return spark.read.table(f"default.{table_name}")
print("\nCreazione Aggregazioni")
df_5min = create_time_window_aggregation(df_cleaned, "5 minutes", "dati_borsa_5min")
df_30min = create_time_window_aggregation(df_cleaned, "30 minutes", "dati_borsa_30min")
df_1hour = create_time_window_aggregation(df_cleaned, "1 hour", "dati_borsa_1hour")


Creazione Aggregazioni
Creating dati_borsa_5min...
Creating dati_borsa_30min...
Creating dati_borsa_1hour...


In [0]:
# ANALISI ANOMALIE (Z-SCORE)

print("\n Analisi Anomalie")

# Statistiche per orario (ARROTONDATE)
pattern_stats = df_5min.groupBy("symbol", "ora", "minuto").agg(
    F.round(F.avg("total_volume"), 2).alias("avg_vol"), 
    F.round(F.stddev("total_volume"), 2).alias("std_vol")
)

# Join Broadcast e Calcolo Z-Score 
df_z = df_5min.join(F.broadcast(pattern_stats), on=["symbol", "ora", "minuto"], how="left") \
    .withColumn("z_score", 
        when(col("std_vol") == 0, 0)
        .otherwise(
            F.round((col("total_volume") - col("avg_vol")) / col("std_vol"), 2) # <--- Arrotondamento Z-Score
        )
    )
print("Lo Z-Score indica quanto un dato evento è isolito rispetto alla sua norma storica.")
print("Z-Score maggiori della soglia di 2: z_score > 2")
display(df_z.filter(F.abs(col("z_score")) > 2).orderBy(F.abs(col("z_score")).desc()).limit(5))


 Analisi Anomalie
Lo Z-Score indica quanto un dato evento è isolito rispetto alla sua norma storica.
Z-Score maggiori della soglia di 2: z_score > 2


symbol,ora,minuto,avg_price,min_price,max_price,total_volume,num_trades,window_start,data,avg_vol,std_vol,z_score
AAPL,10,35,204.08,204.02,204.14,2121,5,2025-06-09T10:35:00.000Z,2025-06-09,1245.67,423.12,2.07
SGLD,17,30,329.65,329.65,329.65,310,1,2025-06-16T17:30:00.000Z,2025-06-16,184.0,61.31,2.06
AAPL,17,10,205.64,205.58,205.74,2176,5,2025-06-23T17:10:00.000Z,2025-06-23,1238.38,458.41,2.05
TSLA,11,0,346.19,346.17,346.22,2092,5,2025-06-23T11:00:00.000Z,2025-06-23,1239.29,424.3,2.01


In [0]:
# ANALISI PATTERN RIPETUTI E SEQUENZE RICORRENTI

# Raggruppo per Simbolo + Ora + Minuto indipendentemente dalla data
# Calcolo la media e la frequenza 
recurring_patterns = (
    df_5min  
    .groupBy("symbol", "ora", "minuto")
    .agg(
        pyspark_round(F.avg("total_volume"), 0).alias("avg_volume_at_time"),
        pyspark_round(F.stddev("total_volume"), 0).alias("std_volume_at_time"),
        F.count("total_volume").alias("occurrence_count"), # Quante volte appare questo slot nel mese
        pyspark_round(F.avg("num_trades"), 0).alias("avg_trades_at_time")
    )
)
# Calcolo il volume medio del mese per simbolo per avere un valore da usare come riferimento
window_symbol = Window.partitionBy("symbol")
recurring_analysis = recurring_patterns.withColumn(
    "symbol_avg_volume", 
    F.avg("avg_volume_at_time").over(window_symbol)
).withColumn(
    "volume_ratio", 
    pyspark_round(col("avg_volume_at_time") / col("symbol_avg_volume"), 2)
)
# Cerco sequenze insolite
suspicious_sequences = recurring_analysis.filter(
    (col("occurrence_count") >= 5) & 
    (col("volume_ratio") > 2.0) 
).orderBy(col("volume_ratio").desc())
print("Sequenze insolite ricorrenti identificate:")
print("Orari in cui il volume è sistematicamente > 2x la media del titolo")
display(suspicious_sequences.select(
    "symbol", "ora", "minuto", 
    "avg_volume_at_time", "volume_ratio", "occurrence_count"
))

Sequenze insolite ricorrenti identificate:
Orari in cui il volume è sistematicamente > 2x la media del titolo


symbol,ora,minuto,avg_volume_at_time,volume_ratio,occurrence_count
SGLD,9,55,2811.0,2.47,21
META,9,0,3740.0,2.47,21
SPY,9,30,9337.0,2.46,21
TSLA,9,15,3732.0,2.46,21
AAPL,9,0,3729.0,2.46,21
AAPL,9,10,3733.0,2.46,21
TSLA,9,30,3730.0,2.46,21
META,9,45,3737.0,2.46,21
META,9,30,3729.0,2.46,21
SPY,9,55,9323.0,2.46,21


In [0]:
#  CALCOLO VOLATILITÀ INTRADAY 
 
# Calcolo variazione prezzo riga per riga
window_vol = Window.partitionBy("symbol", "data").orderBy("timestamp")

df_price_change = df_cleaned.orderBy("symbol", "timestamp") \
    .withColumn("prev_price", lag("price").over(window_vol)) \
    .withColumn("price_change_pct", 
        when(col("prev_price").isNull() | (col("prev_price")==0), 0)
        .otherwise((col("price") - col("prev_price")) / col("prev_price") * 100)
    )

# Raggruppo per giorno e azione e calcolo la deviazione standard
volatility_daily = (
    df_price_change
    .filter(col("price_change_pct") != 0)
    .groupBy("symbol", "data")
    .agg(
        pyspark_round(stddev("price_change_pct"), 4).alias("volatility_stddev"), 
        pyspark_round(avg("price_change_pct"), 4).alias("avg_change"),
        count("price").alias("num_trades")
    )
)
print("La metrica volatility_stddev rappresenta la deviazione standard delle variazioni percentuali di prezzo nell'arco di una giornata.")
print("Classifica giorni con maggiore volatilità intra giornaliera")
display(
    volatility_daily
    .orderBy(col("volatility_stddev").desc()) 
    .limit(10)
    .select(
        "symbol", 
        "data", 
        # Converto il numero in stringa con % solo per averla nella tabella da visualizzare
        F.concat(F.format_number(col("volatility_stddev"), 4), F.lit("%")).alias("volatility_stddev_%"),
        F.concat(F.format_number(col("avg_change"), 4), F.lit("%")).alias("avg_change_%"),
        "num_trades"
    )
)

# Calcolo riassuntivo per Azione
volatility_summary = (
    volatility_daily
    .groupBy("symbol")
    .agg(
        pyspark_round(avg("volatility_stddev"), 4).alias("avg_daily_volatility_numeric")
    )
    .orderBy(col("avg_daily_volatility_numeric").desc())
)
print("Classifica titoli per volatilità media")
display(
    volatility_summary
    .select(
        "symbol",
        # Converto il numero in stringa con % solo per averla nella tabella da visualizzare
        F.concat(F.format_number(col("avg_daily_volatility_numeric"), 4), F.lit("%")).alias("avg_daily_volatility_%")
    )
)

La metrica volatility_stddev rappresenta la deviazione standard delle variazioni percentuali di prezzo nell'arco di una giornata.
Classifica giorni con maggiore volatilità intra giornaliera


symbol,data,volatility_stddev_%,avg_change_%,num_trades
AAPL,2025-06-23,0.0523%,0.0020%,476
AAPL,2025-06-06,0.0516%,0.0041%,488
AAPL,2025-06-04,0.0511%,0.0049%,488
AAPL,2025-06-13,0.0511%,0.0064%,491
AAPL,2025-06-19,0.0508%,0.0002%,483
AAPL,2025-06-30,0.0508%,0.0054%,491
AAPL,2025-06-02,0.0508%,0.0029%,497
AAPL,2025-06-24,0.0505%,0.0021%,496
AAPL,2025-06-09,0.0504%,0.0041%,490
AAPL,2025-06-11,0.0503%,0.0073%,495


Classifica titoli per volatilità media


symbol,avg_daily_volatility_%
AAPL,0.0502%
SGLD,0.0313%
TSLA,0.0295%
MSFT,0.0219%
SPY,0.0170%
META,0.0154%
