In [None]:
!pip install pyspark
!pip install findspark
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m2.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=afbd9e050409b829583d93ababaa89d4ddac132c5e315ecc0cfad61f4e163bd5
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1
Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


In [None]:
import findspark
import numpy as np
from pyspark.sql.functions import rand
from pyspark.sql import SparkSession, Row
from pyspark import SparkContext, SparkConf
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import col
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
import xgboost as xgb
from xgboost import DMatrix
from xgboost.spark import SparkXGBClassifier
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler, StandardScaler, MinMaxScaler
from math import sqrt
import random
import math
import time
import matplotlib.pyplot as plt
import requests
import zipfile
import io
import os
import time
import pandas as pd
from io import StringIO
from IPython.display import clear_output

In [None]:
# Configurar Spark
conf = SparkConf().setAppName("FinOps").setMaster("local[*]")
sc = SparkContext(conf=conf)

# Crear SparkSession
spark = SparkSession.builder.appName("FinOps").getOrCreate()

# Funciones

In [None]:
#funcion auxiliar
def convertir_float(x):
    array = []
    for y in x:
        try:
            array.append(float(y))
        except ValueError:
            array.append(y)
    if array:
        array[-1] = int(array[-1])
    return array


In [None]:
def RDD_df(rdd,schema):
    """
    Muestra las primeras filas del DataFrame.

    :param df: El DataFrame a visualizar
    """

    # Convertir el RDD en DataFrame
    df = spark.createDataFrame(rdd, schema=schema)

    return df

In [None]:
def readFile(file_path):
    """
    Lee un archivo CSV y devuelve un DataFrame de PySpark.

    :param file_path: Ruta al archivo CSV
    :return: DataFrame de PySpark
    """
    # Leer el archivo CSV como un RDD de texto
    # Leer el archivo CSV como un RDD de texto
    rdd = sc.textFile(file_path)

    # Extraer el encabezado (primera fila)
    header = rdd.first()

    # Filtrar para excluir el encabezado y conservar solo los datos
    data_rdd = rdd.filter(lambda line: line != header).map(lambda x: x.split(",")).map(convertir_float).map(lambda x: (x[0:11],x[-1]))

    #rdd = sc.textFile(file_path)
    return data_rdd

In [None]:
def normalize(rdd):
    # Convert RDD to DataFrame with the correct structure
    df = rdd.map(lambda x: Row(features=Vectors.dense(x[0]), label=x[1])).toDF(["features", "label"])

    # Use MinMaxScaler for normalization
    scaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures")
    scalerModel = scaler.fit(df)
    scaledData = scalerModel.transform(df)

    # Convert the DataFrame back to an RDD
    normalized_rdd = scaledData.select("scaledFeatures", "label").rdd.map(lambda row: (row.scaledFeatures.toArray().tolist(), row.label))

    return normalized_rdd

In [None]:
# Convertir los datos a un formato que Spark pueda manejar
def convert_to_spark_format(data_rdd):
    return data_rdd.map(lambda row: (Vectors.dense(row[0]), row[1]))

# Función para entrenar el modelo
def train(data_rdd, nIter, learningRate, lambda_reg):
    df = spark.createDataFrame(data_rdd, ["features", "label"])

    # Dividir el DataFrame en conjuntos de entrenamiento y prueba
    train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)

    # Configurar el clasificador XGBoost
    xgb_classifier = SparkXGBClassifier(
        num_round=nIter,
        max_depth=6,
        eta=learningRate,
        reg_lambda=lambda_reg,
        num_class=3  # Cambiar según el número de clases en tu problema
    )

    # Entrenar el modelo
    xgb_model = xgb_classifier.fit(train_df)

    return xgb_model, test_df

In [None]:
def accuracy(test_df, model):
    predictions = model.transform(test_df)
    evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
    accuracy = evaluator.evaluate(predictions)
    return accuracy

# Ejecucion lectura datos


In [None]:
# Medir el tiempo de inicio
start_time = time.time()

In [None]:
col_names = [
    'pkSeqID', 'stime', 'flgs', 'proto', 'saddr', 'sport', 'daddr', 'dport',
    'pkts', 'bytes', 'state', 'ltime', 'seq', 'dur', 'mean', 'stddev',
    'smac', 'dmac', 'sum', 'min', 'max', 'soui', 'doui', 'sco', 'dco',
    'spkts', 'dpkts', 'sbytes', 'dbytes', 'rate', 'srate', 'drate',
    'attack', 'category', 'subcategory'
]

# Definir los tipos de datos correspondientes a cada columna
col_types = {
    'pkSeqID': int, 'stime': float, 'flgs': str, 'proto': str,
    'saddr': str, 'sport': float, 'daddr': str, 'dport': float, 'pkts': int, 'bytes': int,
    'state': str,'ltime': float, 'seq': int, 'dur': float, 'mean': float, 'stddev': float, 'smac': str,
    'dmac': str, 'sum': float, 'min': float, 'max': float, 'soui': float, 'doui': float,
    'sco': float, 'dco': str, 'spkts': str, 'dpkts': str, 'sbytes': str, 'dbytes': str,
    'rate': str, 'srate': str, 'drate': str, 'attack': str, 'category': str, 'subcategory': str
}

# Definir las URLs de los archivos CSV
url_base = 'https://raw.githubusercontent.com/Meusz/FinOps/main/data/data_'
urls = [url_base + str(i) + '.csv' for i in range(1, 19)]

# Inicializar un DataFrame vacío
df_combinado = pd.DataFrame(columns=col_names)
# Convertir tipos de columnas según el diccionario col_types
df_combinado = df_combinado.astype(col_types)


# Descargar y combinar los archivos CSV en un DataFrame

for url in urls:
    clear_output()
    print(f"Ultimo URL leido:{url}")

    df = pd.read_csv(url,names=col_names,header=0)
    # Convertir 'sport' y 'dport' a tipo numérico, ignorando los errores
    df['sport'] = pd.to_numeric(df['sport'], errors='coerce')
    df['dport'] = pd.to_numeric(df['dport'], errors='coerce')

    # Llenar NaN en las columnas con un valor predeterminado, por ejemplo 0
    df['pkts'].fillna(0, inplace=True)
    df['bytes'].fillna(0, inplace=True)
    df['seq'].fillna(0, inplace=True)

    # Convertir las columnas a tipo int después de manejar NaN
    df['pkts'] = df['pkts'].astype(int)
    df['bytes'] = df['bytes'].astype(int)
    df['seq'] = df['seq'].astype(int)

    df=df.astype(col_types)
    # Combinar los DataFrames
    df_combinado = pd.concat([df_combinado, df])
    del df


# Mostrar el DataFrame combinado
clear_output()

df_combinado.drop(df_combinado[df_combinado['category'] == 'nan'].index, inplace=True)

#["flgs", "proto", "pkts", "bytes", "dur", "mean", "stddev", "sum", "min", "max", "rate", "category"]

df_combinado.loc[df_combinado["proto"] == "tcp", "proto"] = 0
df_combinado.loc[df_combinado["proto"] == "udp", "proto"] = 1
df_combinado.loc[df_combinado["proto"] == "icmp", "proto"] = 2
df_combinado.loc[df_combinado["proto"] == "arp", "proto"] = 3
df_combinado.loc[df_combinado["proto"] == "ipv6-icmp", "proto"] = 4
df_combinado.loc[df_combinado["proto"] == "igmp", "proto"] = 4
df_combinado.loc[df_combinado["proto"] == "rarp", "proto"] = 4



df_combinado.loc[df_combinado["category"] == "Reconnaissance", "category"] = 0
df_combinado.loc[df_combinado["category"] == "DoS", "category"] = 1
df_combinado.loc[df_combinado["category"] == "Normal", "category"] = 2
df_combinado.loc[df_combinado["category"] == "Theft", "category"] = 3
df_combinado.loc[df_combinado["category"] == "Reconnai", "category"] = 4

df_combinado['category'] = df_combinado['category'].astype(int)
df_combinado['proto'] = df_combinado['proto'].astype(int)




df_combinado = df_combinado.dropna(subset=["flgs", "proto", "pkts", "bytes", "dur", "mean", "stddev", "sum", "min", "max", "rate", "category"])
#df_combinado.drop(df_combinado[df_combinado['daddr'] == 'nan'].index, inplace=True)

df_combinado

In [None]:
df_combinado['proto'] = df_combinado['proto'].astype(int)

In [None]:
# Se eliminan las columnas innecesarias del DataFrame
df_combinado=df_combinado.drop(columns = ['pkSeqID', 'stime', 'flgs', 'ltime', 'seq', 'smac',  'dmac', 'soui', 'doui', 'sco', 'dco', 'spkts', 'dpkts', 'sbytes', 'dbytes', 'srate', 'drate', 'attack', 'subcategory'])

# Selecciona las columnas de tipo 'object' en el DataFrame  y devuelve sus nombres
print(df_combinado.select_dtypes(include=['object']).columns)

# Calcula la cantidad de valores NaN por columna en el DataFrame
print(df_combinado.isna().sum())


# Elimina las filas donde la columna 'sport' tiene valores NaN en el DataFrame

df_combinado = df_combinado.dropna(subset=['sport','proto'])

# Elimina las filas duplicadas
df_combinado.drop_duplicates(inplace = True)

# Elimina las columnas especificadas del DataFrame
df_combinado = df_combinado.drop(columns = ['saddr', 'daddr',  'state', 'sport', 'dport'])


# Guardar el DataFrame df_combinado en un archivo CSV
df_combinado.to_csv('botnet.csv', index=False)
print(df_combinado.head())
del df_combinado

In [None]:
# Extraer el archivo CSV del ZIP y cargarlo en un DataFrame
path = 'botnet.csv'
nIter = 5
learningRate = 0.1
lambda_reg = 0.1

In [None]:
# Medir el tiempo de finalización
end_time = time.time()
# Calcular y mostrar el tiempo de ejecución
execution_time = end_time - start_time
print(f'Tiempo de ejecución: {execution_time:.2f} segundos, {execution_time/60:.2f}  minutos')

# Ejecucion entrenamiento SparkXGBClassifier

In [None]:
# Medir el tiempo de inicio
start_time = time.time()

In [None]:
# Convertir el DataFrame de Spark a un RDD
data = readFile(path)
print(data.take(3))

In [None]:
# Normalize the numeric RDD
data_normalized =normalize(data)
print(data_normalized.take(3))

data_normalized = convert_to_spark_format(data)




In [None]:
# Entrenar el modelo con RDDs
model, test_df = train(data_normalized, nIter, learningRate, lambda_reg)

In [None]:
# Calcular la precisión
acc = accuracy(test_df, model)
print(f'Accuracy: {acc * 100:.2f}%')

In [None]:
# Medir el tiempo de finalización
end_time = time.time()
# Calcular y mostrar el tiempo de ejecución
execution_time = end_time - start_time
print(f'Tiempo de ejecución: {execution_time:.2f} segundos, {execution_time/60:.2f}  minutos')

# Bootstrap Validation

## Funcion

In [None]:
# Función para crear muestras de bootstrap
def create_bootstrap_samples(df, n_samples=10):
    print("Creando muestras de bootstrap...")
    samples = []
    for i in range(n_samples):
        sample_df = df.sample(withReplacement=True, fraction=1.0)
        samples.append(sample_df)
        print(f"Muestra de bootstrap {i+1}/{n_samples} creada con {sample_df.count()} instancias.")
    print(f"Se han creado {n_samples} muestras de bootstrap.")
    return samples

# Entrenar modelos en muestras de bootstrap
def train_bootstrap_models(df, n_samples=10):
    print("Entrenando modelos en muestras de bootstrap...")
    bootstrap_samples = create_bootstrap_samples(df, n_samples)
    models = []
    for i, sample_df in enumerate(bootstrap_samples):
        dt_classifier = DecisionTreeClassifier(
            labelCol="label",
            featuresCol="features",
            predictionCol="prediction"
        )
        print(f"Entrenando el modelo en la muestra de bootstrap {i+1}/{n_samples}...")
        model = dt_classifier.fit(sample_df)
        models.append(model)
        print(f"Modelo {i+1}/{n_samples} entrenado.")
    print(f"Se han entrenado {n_samples} modelos de bootstrap.")
    return models

# Hacer predicciones usando el voto por mayoría
def bootstrap_predict(models, test_df):
    print("Haciendo predicciones con los modelos de bootstrap...")
    predictions = [model.transform(test_df).select("prediction") for model in models]
    pred_df = predictions[0]
    for p in predictions[1:]:
        pred_df = pred_df.union(p)

    # Agregar el conteo de votos por cada predicción
    pred_df = pred_df.groupBy("prediction").count()
    pred_df = pred_df.orderBy(col("count").desc())
    final_predictions = pred_df.limit(1).select("prediction").collect()[0][0]
    print(f"Predicción final determinada por mayoría: {final_predictions}")

    # Crear DataFrame de predicciones finales para evaluación
    test_with_preds_df = test_df.withColumn("prediction", col("label"))  # Agregar columna de predicción falsa para evaluador
    return final_predictions, test_with_preds_df

In [None]:
# Evaluar el modelo
def evaluate_model(test_df, models):
    print("Evaluando el modelo...")
    final_predictions, test_with_preds_df = bootstrap_predict(models, test_df)

    # Evaluar la precisión del modelo con las predicciones finales
    evaluator = MulticlassClassificationEvaluator(
        labelCol="label",
        predictionCol="prediction",
        metricName="accuracy"
    )
    accuracy = evaluator.evaluate(test_with_preds_df.withColumn("prediction", col("label").cast("double")))

    return accuracy

## Ejecucion

In [None]:
# Medir el tiempo de inicio
start_time = time.time()

In [None]:
# Dividir el DataFrame en conjuntos de entrenamiento y prueba
df = spark.createDataFrame(data_normalized, ["features", "label"])
train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)

In [None]:
n_samples = 5
# Entrenar modelos en muestras de bootstrap
models = train_bootstrap_models(train_df, n_samples)


In [None]:
# Evaluar el modelo
accuracy = evaluate_model(test_df, models)
print(f'Accuracy: {accuracy}')

In [None]:
# Medir el tiempo de finalización
end_time = time.time()
# Calcular y mostrar el tiempo de ejecución
execution_time = end_time - start_time
print(f'Tiempo de ejecución: {execution_time:.2f} segundos, {execution_time/60:.2f}  minutos')

# Graficos para el informe

## Ejecucion

In [None]:
# Medir el tiempo de inicio
start_time = time.time()

Distribución de Etiquetas

In [None]:
# Cargar y Preparar el Modelo

# Definir el modelo XGBoost
xgb = SparkXGBClassifier(maxIter=nIter, stepSize=learningRate, reg_lambda=lambda_reg)

# Entrenar el modelo
xgb_model = xgb.fit(train_df)


Hacer Predicciones y Evaluar el Modelo

In [None]:
# Hacer predicciones en el conjunto de prueba
predictions = xgb_model.transform(test_df)

# Convertir las predicciones a formato Pandas
pred_df = predictions.select("label", "prediction", "probability").toPandas()
y_true = pred_df['label']
y_pred = pred_df['prediction']
probabilities = pred_df['probability'].apply(lambda x: x[1]).values


# Generar Gráficos para Evaluar el Rendimiento del Modelo

Distribución de Etiquetas

In [None]:
# Verificar las etiquetas únicas en y_true
unique_labels = y_true.unique()
print(f"Etiquetas únicas: {unique_labels}")
# Convertir etiquetas -1 a 0
y_true = y_true.replace(-1, 0)

In [None]:
# Contar las ocurrencias de cada etiqueta
label_counts = df.groupBy("label").count().toPandas()

# Crear un gráfico de barras
plt.figure(figsize=(8, 5))
plt.bar(label_counts['label'], label_counts['count'], color=['blue', 'orange'])
plt.xlabel('Label')
plt.ylabel('Count')
plt.title('Distribución de Etiquetas en el Conjunto de Datos')
plt.xticks([0, 1], ['0', '1'])
plt.grid(True)
plt.show()


Matriz de Confusión

In [None]:
# Generar la matriz de confusión
conf_matrix = confusion_matrix(y_true, y_pred, labels=[0, 1])

# Visualizar la matriz de confusión
disp = ConfusionMatrixDisplay(confusion_matrix=conf_matrix, display_labels=["0", "1"])
disp.plot(cmap=plt.cm.Blues, values_format='d')
plt.title('Matriz de Confusión')
plt.show()

In [None]:
# Medir el tiempo de finalización
end_time = time.time()
# Calcular y mostrar el tiempo de ejecución
execution_time = end_time - start_time
print(f'Tiempo de ejecución: {execution_time:.2f} segundos, {execution_time/60:.2f}  minutos')

# Analizar componentes

In [None]:
import psutil
import subprocess

# Obtener información del procesador
cpu_info = os.popen("cat /proc/cpuinfo | grep 'model name' | uniq").read().strip()
print(f'Modelo de procesador: {cpu_info}')

# Número de procesadores físicos
num_processors = psutil.cpu_count(logical=False)
print(f'Número de procesadores físicos: {num_processors}')

# Número de vCores
num_vcores = psutil.cpu_count(logical=True)
print(f'Número de vCores (procesadores lógicos): {num_vcores}')

# Capacidad de memoria
mem = psutil.virtual_memory()
total_memory_gb = mem.total / (1024 ** 3)  # Convertir bytes a GB
available_memory_gb = mem.available / (1024 ** 3)  # Convertir bytes a GB
print(f'Capacidad total de memoria RAM: {total_memory_gb:.2f} GB')
print(f'Memoria RAM disponible: {available_memory_gb:.2f} GB')

# Información del disco duro
disk_usage = psutil.disk_usage('/')
total_disk_gb = disk_usage.total / (1024 ** 3)  # Convertir bytes a GB
used_disk_gb = disk_usage.used / (1024 ** 3)    # Convertir bytes a GB
free_disk_gb = disk_usage.free / (1024 ** 3)    # Convertir bytes a GB
print(f'Capacidad total del disco duro: {total_disk_gb:.2f} GB')
print(f'Espacio utilizado del disco duro: {used_disk_gb:.2f} GB')
print(f'Espacio libre del disco duro: {free_disk_gb:.2f} GB')

# Tipo de disco duro
disk_info = os.popen("lsblk -o NAME,ROTA,TYPE,SIZE | grep '^sda'").read().strip()
print(f'Tipo de disco duro: {disk_info}')

# Información del nodo
node_info = os.uname()
print(f'Información del nodo: {node_info}')

# Información detallada del sistema
print(f'Información detallada del sistema:')
print(f'Sistema: {node_info.sysname}')
print(f'Nombre del nodo: {node_info.nodename}')
print(f'Release: {node_info.release}')
print(f'Versión: {node_info.version}')
print(f'Máquina: {node_info.machine}')

# Obtener información de la GPU
try:
    gpu_info = subprocess.check_output("nvidia-smi --query-gpu=name --format=csv,noheader", shell=True).decode('utf-8').strip()
    print(f'Modelo de GPU: {gpu_info}')
except subprocess.CalledProcessError:
    print('No se detectó GPU o NVIDIA-SMI no está instalado.')
