In [1]:
#!pip install tensorflow==2.13.0  tensorflow_datasets --default-timeout=10000
#!pip install "numpy<1.25,>=1.24"
#!pip install kagglehub

In [2]:
import kagglehub
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
import os
from pyspark.sql.functions import when, col
from pyspark.ml.feature import StandardScaler
from pyspark.sql import functions as F
from pyspark.sql.types import DoubleType


# Download latest version
path = kagglehub.dataset_download("hassan06/nslkdd")

print("Path to dataset files:", path)

Path to dataset files: /home/jovyan/.cache/kagglehub/datasets/hassan06/nslkdd/versions/1


In [3]:

spark = SparkSession.builder.appName("ids").getOrCreate()
# Step 2: Load the data
#data = spark.read.csv(path, header=True, inferSchema=True)


In [4]:

train_file = os.path.join(path, "KDDTrain+.txt")
test_file = os.path.join(path, "KDDTest+.txt")




# 3️⃣ Leggi file — ATTENZIONE: non c’è header, separatore=',' (oppure '\t' se errore)
cols = ['duration','protocol_type','service','flag','src_bytes','dst_bytes','land','wrong_fragment','urgent','hot'
,'num_failed_logins','logged_in','num_compromised','root_shell','su_attempted','num_root','num_file_creations'
,'num_shells','num_access_files','num_outbound_cmds','is_host_login','is_guest_login','count','srv_count','serror_rate'
,'srv_serror_rate','rerror_rate','srv_rerror_rate','same_srv_rate','diff_srv_rate','srv_diff_host_rate','dst_host_count','dst_host_srv_count'
,'dst_host_same_srv_rate','dst_host_diff_srv_rate','dst_host_same_src_port_rate','dst_host_srv_diff_host_rate','dst_host_serror_rate'
,'dst_host_srv_serror_rate','dst_host_rerror_rate','dst_host_srv_rerror_rate','outcome','level']

train_df = spark.read.csv(train_file, header=False, sep=',')
train_df.printSchema()

test_df  = spark.read.csv(test_file, header=False, sep=',')

#print(len(cols))

for old, new in zip(train_df.columns, cols):
    print(old,new)
    train_df=train_df.withColumnRenamed(old, new)

for old, new in zip(test_df.columns, cols):
    test_df = test_df.withColumnRenamed(old, new)



#print(f"Train: {train_df.count()}  Test: {test_df.count()}")
#print(train_df.columns)
#print(test_df.columns)

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: string (nullable = true)
 |-- _c6: string (nullable = true)
 |-- _c7: string (nullable = true)
 |-- _c8: string (nullable = true)
 |-- _c9: string (nullable = true)
 |-- _c10: string (nullable = true)
 |-- _c11: string (nullable = true)
 |-- _c12: string (nullable = true)
 |-- _c13: string (nullable = true)
 |-- _c14: string (nullable = true)
 |-- _c15: string (nullable = true)
 |-- _c16: string (nullable = true)
 |-- _c17: string (nullable = true)
 |-- _c18: string (nullable = true)
 |-- _c19: string (nullable = true)
 |-- _c20: string (nullable = true)
 |-- _c21: string (nullable = true)
 |-- _c22: string (nullable = true)
 |-- _c23: string (nullable = true)
 |-- _c24: string (nullable = true)
 |-- _c25: string (nullable = true)
 |-- _c26: string (nullable = true)
 |-- _c27: string (nullable = tru

In [5]:
def cast_numeric_columns(df, exclude_cols=None):
    """
    Converte automaticamente le colonne numeriche da string a DoubleType.
    Esclude le colonne categoriche indicate in exclude_cols.
    """
    if exclude_cols is None:
        exclude_cols = []

    # Crea una copia per sicurezza
    df_casted = df

    for col_name in df.columns:
        if col_name not in exclude_cols:
            df_casted = df_casted.withColumn(col_name, F.col(col_name).cast(DoubleType()))

    return df_casted



categorical_cols = ["protocol_type", "service", "flag", "outcome"]
train_df=cast_numeric_columns(train_df,categorical_cols)
test_df=cast_numeric_columns(test_df,categorical_cols)

In [6]:
train_df.dtypes

[('duration', 'double'),
 ('protocol_type', 'string'),
 ('service', 'string'),
 ('flag', 'string'),
 ('src_bytes', 'double'),
 ('dst_bytes', 'double'),
 ('land', 'double'),
 ('wrong_fragment', 'double'),
 ('urgent', 'double'),
 ('hot', 'double'),
 ('num_failed_logins', 'double'),
 ('logged_in', 'double'),
 ('num_compromised', 'double'),
 ('root_shell', 'double'),
 ('su_attempted', 'double'),
 ('num_root', 'double'),
 ('num_file_creations', 'double'),
 ('num_shells', 'double'),
 ('num_access_files', 'double'),
 ('num_outbound_cmds', 'double'),
 ('is_host_login', 'double'),
 ('is_guest_login', 'double'),
 ('count', 'double'),
 ('srv_count', 'double'),
 ('serror_rate', 'double'),
 ('srv_serror_rate', 'double'),
 ('rerror_rate', 'double'),
 ('srv_rerror_rate', 'double'),
 ('same_srv_rate', 'double'),
 ('diff_srv_rate', 'double'),
 ('srv_diff_host_rate', 'double'),
 ('dst_host_count', 'double'),
 ('dst_host_srv_count', 'double'),
 ('dst_host_same_srv_rate', 'double'),
 ('dst_host_diff_srv_r

In [7]:

train_df = train_df.withColumn(
    "outcome",
    when(col("outcome") == "normal", 0).otherwise(1)
)

test_df = test_df.withColumn(
    "outcome",
    when(col("outcome") == "normal", 0).otherwise(1)
)


In [8]:
train_df.select("outcome").distinct().show()


+-------+
|outcome|
+-------+
|      1|
|      0|
+-------+



In [None]:
y_train=train_df["outcome"]


In [9]:
train_df=train_df.drop("protocol_type", "service", "flag", "outcome")
test_df=test_df.drop("protocol_type", "service", "flag", "outcome")

In [10]:
train_df.columns

['duration',
 'src_bytes',
 'dst_bytes',
 'land',
 'wrong_fragment',
 'urgent',
 'hot',
 'num_failed_logins',
 'logged_in',
 'num_compromised',
 'root_shell',
 'su_attempted',
 'num_root',
 'num_file_creations',
 'num_shells',
 'num_access_files',
 'num_outbound_cmds',
 'is_host_login',
 'is_guest_login',
 'count',
 'srv_count',
 'serror_rate',
 'srv_serror_rate',
 'rerror_rate',
 'srv_rerror_rate',
 'same_srv_rate',
 'diff_srv_rate',
 'srv_diff_host_rate',
 'dst_host_count',
 'dst_host_srv_count',
 'dst_host_same_srv_rate',
 'dst_host_diff_srv_rate',
 'dst_host_same_src_port_rate',
 'dst_host_srv_diff_host_rate',
 'dst_host_serror_rate',
 'dst_host_srv_serror_rate',
 'dst_host_rerror_rate',
 'dst_host_srv_rerror_rate',
 'level']

In [11]:

assembler = VectorAssembler(
    inputCols = train_df.columns,     # lista delle colonne numeriche
    outputCol = "features_vec"
)
train_df_vect = assembler.transform(train_df)
test_df_vect= assembler.transform(test_df)
train_df_vect.dtypes

[('duration', 'double'),
 ('src_bytes', 'double'),
 ('dst_bytes', 'double'),
 ('land', 'double'),
 ('wrong_fragment', 'double'),
 ('urgent', 'double'),
 ('hot', 'double'),
 ('num_failed_logins', 'double'),
 ('logged_in', 'double'),
 ('num_compromised', 'double'),
 ('root_shell', 'double'),
 ('su_attempted', 'double'),
 ('num_root', 'double'),
 ('num_file_creations', 'double'),
 ('num_shells', 'double'),
 ('num_access_files', 'double'),
 ('num_outbound_cmds', 'double'),
 ('is_host_login', 'double'),
 ('is_guest_login', 'double'),
 ('count', 'double'),
 ('srv_count', 'double'),
 ('serror_rate', 'double'),
 ('srv_serror_rate', 'double'),
 ('rerror_rate', 'double'),
 ('srv_rerror_rate', 'double'),
 ('same_srv_rate', 'double'),
 ('diff_srv_rate', 'double'),
 ('srv_diff_host_rate', 'double'),
 ('dst_host_count', 'double'),
 ('dst_host_srv_count', 'double'),
 ('dst_host_same_srv_rate', 'double'),
 ('dst_host_diff_srv_rate', 'double'),
 ('dst_host_same_src_port_rate', 'double'),
 ('dst_host_sr

In [12]:

scaler = StandardScaler(
    inputCol  = "features_vec",
    outputCol = "features_scaled",
    withMean  = True,   # opzionale: sottrae la media
    withStd   = True    # opzionale: divide per deviazione standard
)
scaler_model = scaler.fit(train_df_vect)
train_df_scaled = scaler_model.transform(train_df_vect)

scaler_model = scaler.fit(test_df_vect)
test_df_scaled = scaler_model.transform(test_df_vect)


In [13]:
test_df_scaled.head(1)

[Row(duration=0.0, src_bytes=0.0, dst_bytes=0.0, land=0.0, wrong_fragment=0.0, urgent=0.0, hot=0.0, num_failed_logins=0.0, logged_in=0.0, num_compromised=0.0, root_shell=0.0, su_attempted=0.0, num_root=0.0, num_file_creations=0.0, num_shells=0.0, num_access_files=0.0, num_outbound_cmds=0.0, is_host_login=0.0, is_guest_login=0.0, count=229.0, srv_count=10.0, serror_rate=0.0, srv_serror_rate=0.0, rerror_rate=1.0, srv_rerror_rate=1.0, same_srv_rate=0.04, diff_srv_rate=0.06, srv_diff_host_rate=0.0, dst_host_count=255.0, dst_host_srv_count=10.0, dst_host_same_srv_rate=0.04, dst_host_diff_srv_rate=0.06, dst_host_same_src_port_rate=0.0, dst_host_srv_diff_host_rate=0.0, dst_host_serror_rate=0.0, dst_host_srv_serror_rate=0.0, dst_host_rerror_rate=1.0, dst_host_srv_rerror_rate=1.0, level=21.0, features_vec=SparseVector(39, {19: 229.0, 20: 10.0, 23: 1.0, 24: 1.0, 25: 0.04, 26: 0.06, 28: 255.0, 29: 10.0, 30: 0.04, 31: 0.06, 36: 1.0, 37: 1.0, 38: 21.0}), features_scaled=DenseVector([-0.1555, -0.022

NameError: name 'y_train' is not defined