<a href="https://colab.research.google.com/github/harshavardhanb77/Ddos_Threat_Analysis/blob/main/sflow_flood_attack.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import json
import numpy as np
from sklearn.tree import DecisionTreeClassifier

# Function to load sFlow data from a file
def load_sflow_data(file_path):
    with open(file_path, 'r') as file:
        sflow_data = json.load(file)
    return sflow_data

# Function to extract features from sFlow data
def extract_features(sflow_data):
    packet_counts = len(sflow_data['samples'])
    total_packet_size = sum(int(element['sampledPacketSize']) for sample in sflow_data['samples'] for element in sample['elements'])
    average_packet_size = total_packet_size / packet_counts if packet_counts else 0
    dst_ips = [element['dstIP'] for sample in sflow_data['samples'] for element in sample['elements']]
    unique_dst_ips = len(set(dst_ips))

    features = np.array([packet_counts, average_packet_size, unique_dst_ips]).reshape(1, -1)
    return features

# Dummy function to train a model
def train_dummy_model():
    X_train = np.array([[100, 150, 1], [10, 64, 10]])
    y_train = np.array([1, 0])
    model = DecisionTreeClassifier()
    model.fit(X_train, y_train)
    return model

# Function to predict flood attack
def predict_flood_attack(sflow_data):
    model = train_dummy_model()
    features = extract_features(sflow_data)
    prediction = model.predict(features)
    return "Attack" if prediction[0] == 1 else "Normal"

# Path to sFlow data file
file_path = '/content/sflow-data.json'  # Change this to the actual file path

# Load sFlow data from file
sflow_data = load_sflow_data(file_path)

# Predict if it's a flood attack
result = predict_flood_attack(sflow_data)
print(result)


Normal


In [2]:
import json
import numpy as np
from sklearn.tree import DecisionTreeClassifier

def load_sflow_data_from_file(file_path):
    with open(file_path, 'r') as file:
        sflow_data = json.load(file)
    return sflow_data

def extract_udp_features(sflow_data):
    udp_packets = [element for sample in sflow_data['samples'] for element in sample['elements'] if element['IPProtocol'] == '17']
    packet_counts = len(udp_packets)
    if packet_counts == 0:
        return np.array([0, 0, 0]).reshape(1, -1)  # Return zeros if no UDP packets

    total_packet_size = sum(int(packet['sampledPacketSize']) for packet in udp_packets)
    average_packet_size = total_packet_size / packet_counts
    dst_ips = [packet['dstIP'] for packet in udp_packets]
    most_common_dst_ip = max(set(dst_ips), key=dst_ips.count)  # Assuming the most targeted IP is the one with the highest packet count
    packets_to_common_dst_ip = dst_ips.count(most_common_dst_ip)

    # Features: Number of UDP packets, average UDP packet size, packets to the most common destination IP
    features = np.array([packet_counts, average_packet_size, packets_to_common_dst_ip]).reshape(1, -1)
    return features

def train_dummy_model():
    # Dummy training data for demonstration
    X_train = np.array([[500, 1400, 500], [10, 64, 2], [300, 1200, 300]])  # Add more realistic training data
    y_train = np.array([1, 0, 1])  # Labels: 1 for attack, 0 for normal
    model = DecisionTreeClassifier()
    model.fit(X_train, y_train)
    return model

def predict_udp_flood_attack(file_path):
    sflow_data = load_sflow_data_from_file(file_path)
    model = train_dummy_model()  # In real use, load a pre-trained model
    features = extract_udp_features(sflow_data)
    prediction = model.predict(features)
    return "UDP Flood Attack" if prediction[0] == 1 else "Normal"

file_path = '/content/sflow-data.json'  # Ensure this path is correct
result = predict_udp_flood_attack(file_path)
print(result)


Normal
