In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
import numpy as np
import s3fs
import io

from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
import matplotlib.pyplot as plt
import seaborn as sns
import subprocess
import boto3
from botocore.config import Config 
from typing import Tuple,List


In [2]:
MINIO_BUCKET_NAME="satellite-data"
MINIO_ADDRESS="http://192.168.128.236:9000"
MINIO_ID="minioadmin"
MINIO_KEY="minioadmin"

## Setup Spark context 

In [3]:
try:
    spark.stop()
except:
    pass

from pyspark import SparkContext
SparkContext._active_spark_context = None
print(" SparkContext pulito!")

 SparkContext pulito!


In [4]:
spark = SparkSession.builder \
    .appName("Crop-Classifier") \
    .config("spark.executor.memory", "12g") \
    .config("spark.driver.memory", "12g") \
    .config("spark.sql.shuffle.partitions", "200") \
    .config("spark.hadoop.fs.s3a.endpoint", MINIO_ADDRESS) \
    .config("spark.hadoop.fs.s3a.access.key", MINIO_ID) \
    .config("spark.hadoop.fs.s3a.secret.key", MINIO_KEY) \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .master("local[*]") \
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")
print(f"Spark LOCAL pronto! CPUs: {spark.sparkContext.defaultParallelism}")


Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/12/26 11:24:50 WARN Utils: Your hostname, PC7, resolves to a loopback address: 127.0.1.1; using 192.168.128.236 instead (on interface eno1)
25/12/26 11:24:50 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/12/26 11:24:50 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Spark LOCAL pronto! CPUs: 8


## Verifica dataset MinIO (boto3 + mc)

In [5]:
# üîß BYPASS S3 Spark ‚Üí Usa mc + Python nativo (100% funziona)
print(" Lista patch via mc (conferma esistenza)")
result = subprocess.run([
    "docker", "exec", "minio", "mc", "ls", 
    "myminio/satellite-data/patches/", "|", "wc", "-l"
], shell=True, capture_output=True, text=True)

print(result.stdout)
print(" mc conferma patch esistono!")

# üîß S3 client Python nativo (bypassa Spark S3A)
s3 = boto3.client(
    's3',
    endpoint_url=MINIO_ADDRESS,
    aws_access_key_id=MINIO_ID,
    aws_secret_access_key=MINIO_KEY
)

response = s3.list_objects_v2(Bucket=MINIO_BUCKET_NAME, Prefix='patches/')
files = [obj['Key'] for obj in response.get('Contents', []) if obj['Key'].endswith('.npz')]
print(f" {len(files):,} patch .npz confermate via boto3!")

# Prime 10
print(" Prime 10:")
for f in files[:10]:
    size = next(obj['Size'] for obj in response['Contents'] if obj['Key'] == f)
    print(f"  {size/1024/1024:.1f}MB {f}")




 Lista patch via mc (conferma esistenza)

 mc conferma patch esistono!
 862 patch .npz confermate via boto3!
 Prime 10:
  2.3MB patches/task_0_worker_2.npz
  2.4MB patches/task_1000_worker_2.npz
  2.4MB patches/task_1001_worker_3.npz
  2.6MB patches/task_1004_worker_9.npz
  2.4MB patches/task_1011_worker_3.npz
  2.4MB patches/task_1012_worker_9.npz
  2.5MB patches/task_1016_worker_3.npz
  2.4MB patches/task_1017_worker_2.npz
  0.9MB patches/task_1018_worker_3.npz
  1.7MB patches/task_101_worker_9.npz


## Estrai pixel + Feature Engineering Sentinel-2

In [6]:
# ORDINE FINALE dopo stackstac.stack() (ALFABETICO!)
BANDS_ORDER = {
    0: 'blue',      # B02 (490nm)
    1: 'green',     # B03 (560nm)  
    2: 'nir',       # B08 (842nm) ‚Üê NIR principale
    3: 'nir08',     # B8A (865nm) ‚Üê NIR stretto
    4: 'red',       # B04 (665nm)
    5: 'rededge1',  # B05 (705nm)
    6: 'rededge2',  # B06 (740nm)
    7: 'rededge3',  # B07 (783nm)
    8: 'swir16',    # B11 (1610nm) ‚Üê SWIR1
    9: 'swir22'     # B12 (2190nm) ‚Üê SWIR2
}


In [7]:

def extract_pixels(s3_path: str) -> List[Tuple[List[float], float]]:
    """
    Estrae pixel features Sentinel-2 + labels da file .npz su MinIO S3.
    
    Args:
        s3_path: Path S3 patch .npz

    Output features (6 totali per pixel):
        | Index | Feature     | Banda   | Descrizione                  |
        |-------|-------------|---------|------------------------------|
        | 0     | Red (B04)   | bands[3]| Riflettanza Rossa (665nm)   |
        | 1     | NIR (B08)   | bands[7]| Infrarosso Vicino (842nm)   |
        | 2     | SWIR1(B11)  | bands[10]| Infrarosso SWIR (1610nm)   |
        | 3     | NDVI        | -       | (NIR-Red)/(NIR+Red)         |
        | 4     | NDWI        | -       | (Green-SWIR)/(Green+SWIR)   |
        | 5     | NDMI        | -       | (NIR-SWIR)/(NIR+SWIR)       |
    
    Returns:
        [(features_6d, label), ...] dove features = [Red, NIR, SWIR1, NDVI, NDWI, NDMI]
        - features_6d: [Red, NIR, SWIR1, NDVI, NDWI, NDMI]
        - label: 0.0 (non-crop) o 1.0 (crop)
    """
    try:
        fs = s3fs.S3FileSystem(
            key=MINIO_ID, secret=MINIO_KEY,
            client_kwargs={'endpoint_url': MINIO_ADDRESS}
        )
        with fs.open(s3_path.replace('s3a://', ''), 'rb') as f:
            data = np.load(io.BytesIO(f.read()))
        
        bands = data['bands']  # (10, H, W)
        mask = data['mask'].astype(float)
        
        # üå± INDICI CORRETTI stackstac alfabetico
        b3_green = bands[1].flatten().astype(np.float32)   # 'green' ‚Üí index 1
        b4_red = bands[4].flatten().astype(np.float32)     # 'red'   ‚Üí index 4
        b8_nir = bands[2].flatten().astype(np.float32)     # 'nir'   ‚Üí index 2
        b11_swir = bands[8].flatten().astype(np.float32)   # 'swir16'‚Üí index 8
        
        # üåø FIX: SENZA out= parameter (crea nuovo array float)
        ndvi = np.where(
            (b8_nir + b4_red) != 0,
            (b8_nir - b4_red) / (b8_nir + b4_red + 1e-8),
            0.0
        )
        
        ndwi = np.where(
            (b3_green + b11_swir) != 0,
            (b3_green - b11_swir) / (b3_green + b11_swir + 1e-8),
            0.0
        )
        
        ndmi = np.where(
            (b8_nir + b11_swir) != 0,
            (b8_nir - b11_swir) / (b8_nir + b11_swir + 1e-8),
            0.0
        )
        
        # 6 features finali
        features = np.column_stack([
            b4_red, b8_nir, b11_swir,  # Raw bands
            ndvi, ndwi, ndmi           # Spectral indices
        ])
        
        labels = mask.flatten()
        
        # Filtra pixel validi (0 o 1)
        valid = (labels == 0) | (labels == 1)
        return [(features[valid][i].tolist(), float(labels[valid][i]))
                for i in range(len(labels[valid]))]
        
    except Exception as e:
        print(f"‚ùå Error {s3_path}: {e}")
        import traceback
        traceback.print_exc()
        return []

# UDF Spark
schema = ArrayType(StructType([
    StructField("features", ArrayType(FloatType()), False),
    StructField("label", FloatType(), False)
]))
extract_udf = udf(extract_pixels, schema)

print("‚úÖ UDF fixed (np.where instead of np.divide out=)")


‚úÖ UDF fixed (np.where instead of np.divide out=)




In [8]:
# %% TEST: Conferma ordine bande
first_file = files[0]
obj = s3.get_object(Bucket='satellite-data', Key=first_file)
data = np.load(io.BytesIO(obj['Body'].read()))

print(f"üìä bands.shape: {data['bands'].shape}")  # (10, 256, 256)

# Stampa statistiche per identificare bande
for i in range(10):
    band = data['bands'][i]
    print(f"  Band {i}: mean={band.mean():.1f} std={band.std():.1f} "
          f"range=[{band.min()}, {band.max()}]")

# Verifica NDVI
red = data['bands'][4]
nir = data['bands'][2]
ndvi = (nir - red) / (nir + red + 1e-8)
print(f"\nüåø NDVI range: [{ndvi.min():.3f}, {ndvi.max():.3f}]")
print(f"   (atteso: -0.2 ‚Üí +0.8 per vegetazione)")


üìä bands.shape: (10, 581, 443)
  Band 0: mean=213.9 std=123.3 range=[1, 3284]
  Band 1: mean=396.3 std=165.3 range=[33, 3480]
  Band 2: mean=364.9 std=199.1 range=[21, 3836]
  Band 3: mean=2379.8 std=571.7 range=[470, 5688]
  Band 4: mean=786.8 std=238.9 range=[149, 4039]
  Band 5: mean=1885.6 std=426.3 range=[553, 4255]
  Band 6: mean=2266.7 std=503.8 range=[753, 5003]
  Band 7: mean=2571.5 std=546.3 range=[991, 5495]
  Band 8: mean=1765.4 std=548.4 range=[523, 5081]
  Band 9: mean=914.0 std=395.2 range=[220, 3784]

üåø NDVI range: [0.000, 342.560]
   (atteso: -0.2 ‚Üí +0.8 per vegetazione)


## üìä CELL 5: Estrai pixel ‚Üí DataFrame 6 colonne


### STEP 1: Lista patch S3 (100 patch per test veloce)

In [None]:
# %% [markdown]
# # ‚ö° VELOCIZZATO: Cache + 1 solo count finale

# %%
import time
start = time.time()

# STEP 1-2: Paths (invariato)
s3 = boto3.client('s3', endpoint_url=MINIO_ADDRESS, 
                  aws_access_key_id=MINIO_ID, aws_secret_access_key=MINIO_KEY)

response = s3.list_objects_v2(Bucket='satellite-data', Prefix='patches/')
files = [obj['Key'] for obj in response['Contents'] if obj['Key'].endswith('.npz')][:100]
paths = ['s3a://satellite-data/' + f for f in files]

print(f"üéØ {len(paths)} patch selezionate")

patches_df = spark.createDataFrame(
    [(path,) for path in paths],
    StructType([StructField("path", StringType())])
).repartition(20)  # ‚Üê Parallelizza su 20 partizioni

# STEP 3: UDF + CACHE (esegui subito)
print("üîÑ Esecuzione UDF (pu√≤ richiedere 5-10 min)...")
pixels_raw_df = patches_df.withColumn("pixels", extract_udf(col("path"))) \
    .cache()  # ‚Üê CACHE risultato UDF!

# Force esecuzione UDF ADESSO (1 volta sola)
n_patches = pixels_raw_df.count()
print(f"‚úÖ {n_patches} patch processate in {time.time()-start:.1f}s")

# STEP 4: Esplodi (veloce, usa cache)
pixels_df = pixels_raw_df.select(explode("pixels").alias("pixel"))

# STEP 5: 6 colonne + CACHE
features_df = pixels_df.select(
    col("pixel.features")[0].alias("red_b4"),
    col("pixel.features")[1].alias("nir_b8"),
    col("pixel.features")[2].alias("swir_b11"),
    col("pixel.features")[3].alias("ndvi"),
    col("pixel.features")[4].alias("ndwi"),
    col("pixel.features")[5].alias("ndmi"),
    col("pixel.label").alias("label")
).cache()  # ‚Üê CACHE features finali

# 1 SOLO COUNT (esegue explode + select)
total_pixels = features_df.count()
print(f"‚úÖ {total_pixels:,} pixel in {time.time()-start:.1f}s totali")

# Statistiche (usano cache)
features_df.groupBy("label").count().orderBy("label").show()
features_df.describe("ndvi", "label").show()
features_df.show(5)

print(f"\n‚è±Ô∏è  Tempo totale: {time.time()-start:.1f}s ({(time.time()-start)/60:.1f} min)")


üéØ 100 patch selezionate
üîÑ Esecuzione UDF (pu√≤ richiedere 5-10 min)...


ERROR:root:KeyboardInterrupt while sending command.                (0 + 8) / 20]
Traceback (most recent call last):
  File "/home/amministratore/Scrivania/Big Data Acquisition/progetto mimmo/.venv/lib/python3.12/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/amministratore/Scrivania/Big Data Acquisition/progetto mimmo/.venv/lib/python3.12/site-packages/py4j/clientserver.py", line 535, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.12/socket.py", line 707, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt
25/12/26 11:39:48 WARN BlockManager: Putting block rdd_13_1 failed due to exception org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/tmp/ipykernel_1388271/4216968101.py", line

KeyboardInterrupt: 

Traceback (most recent call last):
  File "/home/amministratore/Scrivania/Big Data Acquisition/progetto mimmo/.venv/lib/python3.12/site-packages/pyspark/python/lib/pyspark.zip/pyspark/daemon.py", line 233, in manager
    code = worker(sock, authenticated)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/amministratore/Scrivania/Big Data Acquisition/progetto mimmo/.venv/lib/python3.12/site-packages/pyspark/python/lib/pyspark.zip/pyspark/daemon.py", line 82, in worker
    worker_main(infile, outfile)
  File "/home/amministratore/Scrivania/Big Data Acquisition/progetto mimmo/.venv/lib/python3.12/site-packages/pyspark/python/lib/pyspark.zip/pyspark/util.py", line 974, in wrapper
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/home/amministratore/Scrivania/Big Data Acquisition/progetto mimmo/.venv/lib/python3.12/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 3405, in main
    if read_int(infile) == SpecialLengths.END_OF_STREAM:
       ^^

25/12/26 11:39:52 WARN PythonUDFWithNamedArgumentsRunner: Incomplete task 8.0 in stage 2 (TID 16) interrupted: Attempting to kill Python Worker
25/12/26 11:39:52 WARN BlockManager: Putting block rdd_13_8 failed due to exception org.apache.spark.TaskKilledException.
25/12/26 11:39:52 WARN BlockManager: Block rdd_13_8 could not be removed as it was not found on disk or in memory
25/12/26 11:39:52 WARN TaskSetManager: Lost task 8.0 in stage 2.0 (TID 16) (192.168.128.236 executor driver): TaskKilled (Stage cancelled: Job aborted due to stage failure: Task 6 in stage 2.0 failed 1 times, most recent failure: Lost task 6.0 in stage 2.0 (TID 14) (192.168.128.236 executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/tmp/ipykernel_1388271/4216968101.py", line 72, in extract_pixels
KeyboardInterrupt

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:645)
	at org.apache.spark.sql.execution.p

## ML Pipeline + Training

In [None]:
# Vector Assembler (8 features ‚Üí vector)
assembler = VectorAssembler(
    inputCols=["features"], 
    outputCol="features_vec",
    handleInvalid="skip" # <-- salta le righe che contengono null
)

scaler = StandardScaler(
    inputCol="features",
    outputCol="scaled_features", # La nuova colonna vettoriale
    withMean=True,
    withStd=True
)


# Random Forest (bilanciato per CROP raro)
rf = RandomForestClassifier(
    featuresCol="features_vec",
    labelCol="label",
    numTrees=100,
    maxDepth=8,
    impurity="gini",
    featureSubsetStrategy="sqrt"
)

# SPLIT 80/20
train_df, test_df = pixels_df.randomSplit([0.8, 0.2], seed=42)

print(f"üìä Train: {train_df.count():,} | Test: {test_df.count():,} pixel")

# TRAIN
train_assembled = assembler.transform(train_df)
model = rf.fit(train_assembled)

print("‚úÖ Modello addestrato!")


üìä Train: 0 | Test: 0 pixel


IllegalArgumentException: Data type array<float> of column features is not supported.

In [None]:
# PREDIZIONI
test_assembled = assembler.transform(test_df)
predictions = model.transform(test_assembled)

# Metriche
roc_evaluator = BinaryClassificationEvaluator(
    labelCol="label", rawPredictionCol="rawPrediction", metricName="areaUnderROC"
)
auc = roc_evaluator.evaluate(predictions)
print(f"üéØ AUC-ROC: {auc:.4f}")

f1_evaluator = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="f1"
)
f1 = f1_evaluator.evaluate(predictions)
print(f"üìä F1-Score: {f1:.4f}")

# Confusion Matrix
cm_df = predictions.groupBy("label", "prediction").count().toPandas()
print("üî¢ Confusion Matrix:")
print(cm_df)

# Feature Importance
importances = model.featureImportances
print("\nüåø Feature Importance:")
features = ["Red", "NIR", "SWIR1", "NDVI", "NDWI", "NDMI"]
for i, (feat, imp) in enumerate(zip(features, importances)):
    print(f"  {feat}: {imp:.3f}")


In [None]:
# Salva modello su MinIO
model.write().overwrite().save("s3a://satellite-data/models/crop_classifier_rf_v1")
print("‚úÖ Modello salvato su MinIO!")

# Test su 1 patch nuova
test_patch = patches_df.filter(col("path").contains("task_0")).first()
test_pixels = spark.createDataFrame(
    extract_pixels(test_patch.path), 
    schema
).select("features", "label")

test_assembled = assembler.transform(test_pixels)
test_pred = model.transform(test_assembled)

print("üß™ Test nuova patch:")
test_pred.groupBy("label", "prediction").count().show()


In [None]:
spark.stop()
print("‚úÖ Spark terminato!")