In [1]:
import pandas as pd
import numpy as np
import json
import os
from datetime import timedelta, datetime
import logging

In [None]:
malicious_ips = pd.read_csv("Malicious_ips.csv")
malicious_ips.head()

In [None]:
attack_data = pd.read_csv("attack_data.csv")
attack_data.head()

In [4]:
merged_df = pd.merge(malicious_ips, attack_data, on='mitigation', how='inner')


In [None]:
merged_df = pd.merge(malicious_ips, attack_data, on='mitigation', how='inner')
merged_df['date'] = pd.to_datetime(
    merged_df['date'],
    format="%Y-%m-%d %H:%M:%S",
    errors='coerce'
)
merged_df.drop(columns=['customers_y', 'Unnamed: 0_x', 'Unnamed: 0_y'], axis=1, inplace=True)
merged_df = merged_df.rename(columns={'customers_x': 'customer'})
merged_df.head()

In [None]:
events_per_day = merged_df['date'].dt.date.value_counts().sort_index()
events_per_day


In [None]:
start_time = "2025-09-29 00:00:00"
end_time   = "2025-09-29 23:59:59"

merged_df = merged_df[
    (merged_df['date'] >= start_time) &
    (merged_df['date'] <= end_time)
]
merged_df.head()

In [None]:
# [Previous Cluster class]
class Cluster:
    def __init__(self, cluster_id, rows, logger):
        self.cluster_id = cluster_id
        self.rows = rows
        self.logger = logger
        self.ip_last_observed = {}
        for row in rows:
            try:
                ip = str(row['Malicious_IP'])
                date = row['date'].strftime('%Y-%m-%d')
                self.ip_last_observed[ip] = date
            except KeyError as e:
                self.logger.error(f"Missing required field in row: {e}, row: {row}")
            except AttributeError as e:
                self.logger.error(f"Invalid date format in row: {e}, row: {row}")
        self.logger.debug(f"Cluster {self.cluster_id} initialized with {len(self.ip_last_observed)} IPs: {list(self.ip_last_observed.keys())}")

    def get_malicious_ips(self):
        return {str(row['Malicious_IP']) for row in self.rows}

    def extend(self, new_rows):
        self.rows.extend(new_rows)
        for row in new_rows:
            try:
                ip = str(row['Malicious_IP'])
                date = row['date'].strftime('%Y-%m-%d')
                if ip in self.ip_last_observed:
                    self.ip_last_observed[ip] = max(self.ip_last_observed[ip], date)
                else:
                    self.ip_last_observed[ip] = date
            except KeyError as e:
                self.logger.error(f"Missing required field in row: {e}, row: {row}")
            except AttributeError as e:
                self.logger.error(f"Invalid date format in row: {e}, row: {row}")
        self.logger.debug(f"Cluster {self.cluster_id} extended, now has {len(self.ip_last_observed)} IPs: {list(self.ip_last_observed.keys())}")

    def compute_metrics(self):
        durations = [row['duration'] for row in self.rows if pd.notna(row['duration'])]
        bps_avg = [row['bps_average'] / 1_000_000_000 for row in self.rows if pd.notna(row['bps_average'])]
        bps_pct95 = [row['bps_pct95'] / 1_000_000_000 for row in self.rows if pd.notna(row['bps_pct95'])]
        bps_max = [row['bps_max'] / 1_000_000_000 for row in self.rows if pd.notna(row['bps_max'])]
        pps_avg = [row['pps_average'] for row in self.rows if pd.notna(row['pps_average'])]
        pps_pct95 = [row['pps_pct95'] for row in self.rows if pd.notna(row['pps_pct95'])]
        pps_max = [row['pps_max'] for row in self.rows if pd.notna(row['pps_max'])]

        vectors_set = set()
        for row in self.rows:
            if pd.notna(row['vectors']):
                vectors_set.update(row['vectors'].replace('\/', '/').split(','))

        return {
            "duration_max": max(durations) if durations else None,
            "duration_avg": sum(durations) / len(durations) if durations else None,
            "gbps_max": max(bps_max) if bps_max else None,
            "gbps_avg": sum(bps_avg) / len(bps_avg) if bps_avg else None,
            "gbps_pct95": max(bps_pct95) if bps_pct95 else None,
            "pps_max": max(pps_max) if pps_max else None,
            "pps_avg": sum(pps_avg) / len(pps_avg) if pps_avg else None,
            "pps_pct95": max(pps_pct95) if pps_pct95 else None,
            "vectors": [vector.replace('\/', '/') for vector in vectors_set]
        }

class BotnetClusterProcessor:
    def __init__(self, df, analysis_file: str = 'cluster_analysis.json', 
                 ip_file: str = 'cluster_malicious_ips.json', 
                 ip_history_file: str = 'cluster_ip_history.json'):
        required_columns = ['Malicious_IP', 'date', 'customer', 'industry', 'country', 'region', 
                           'duration', 'bps_average', 'bps_pct95', 'bps_max', 
                           'pps_average', 'pps_pct95', 'pps_max', 'vectors']
        missing_columns = [col for col in required_columns if col not in df.columns]
        if missing_columns:
            raise ValueError(f"Input DataFrame missing required columns: {missing_columns}")

        self.df = df.copy()
        self.df['date'] = pd.to_datetime(self.df['date'], errors='coerce')
        if self.df['date'].isna().any():
            self.logger.warning("Some 'date' values could not be converted to datetime. NaT values introduced.")
        
        self.clusters = []
        self.analysis_file = analysis_file
        self.ip_file = ip_file
        self.ip_history_file = ip_history_file
        self.logger = self._setup_logger()
        self.prev_clusters = self._load_previous_clusters_from_json()
        self.ip_cluster_history = self._load_ip_history_from_json()
        self.ip_cluster_tracker = {}
        self.first_observed_dates = {}
        self.last_observed_dates = {}
        self.active_status = {}
        self.observation_counts = {}
        self.ip_observation_counts = {}
        self._initialize_ip_observation_counts()
        self.next_cluster_id = max([c['ClusterID'] for c in self.prev_clusters], default=0) + 1

    def _setup_logger(self):
        logger = logging.getLogger(__name__)
        if not logger.handlers:
            handler = logging.StreamHandler()
            formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
            handler.setFormatter(formatter)
            logger.addHandler(handler)
            logger.setLevel(logging.INFO)
        return logger

    def _load_previous_clusters_from_json(self):
        try:
            try:
                with open(self.analysis_file, 'r') as f:
                    clusters = json.load(f)
            except FileNotFoundError:
                clusters = []
                self.logger.info(f"{self.analysis_file} not found, starting with empty clusters")

            try:
                with open(self.ip_file, 'r') as f:
                    ip_data = json.load(f)
            except FileNotFoundError:
                ip_data = []
                self.logger.info(f"{self.ip_file} not found, starting with empty IP data")

            for cluster in clusters:
                cluster_id = cluster['ClusterID']
                cluster['Malicious_IPs'] = [
                    {
                        'IP': item['Malicious_IP'],
                        'Last_Observed': item['Last_Observed'],
                        'First_Observed': item.get('First_Observed', '0000-01-01'),
                        'Observation_Count': item.get('Observation_Count', 0)
                    }
                    for item in ip_data if cluster_id in item['Clusters']
                ]
            return clusters
        except Exception as e:
            self.logger.error(f"Error loading previous clusters from JSON: {e}")
            return []

    def _load_ip_history_from_json(self):
        try:
            try:
                with open(self.ip_history_file, 'r') as f:
                    ip_history = json.load(f)
            except FileNotFoundError:
                ip_history = []
                self.logger.info(f"{self.ip_history_file} not found, starting with empty IP history")

            ip_history_dict = {}
            for item in ip_history:
                ip = item['IP']
                entry = {'ClusterID': int(item['ClusterID']), 'Date': item['Date']}
                if ip not in ip_history_dict:
                    ip_history_dict[ip] = []
                ip_history_dict[ip].append(entry)
                if ip not in self.ip_cluster_tracker:
                    self.ip_cluster_tracker[ip] = (entry['Date'], entry['ClusterID'])
                else:
                    current_date, _ = self.ip_cluster_tracker[ip]
                    if entry['Date'] > current_date:
                        self.ip_cluster_tracker[ip] = (entry['Date'], entry['ClusterID'])
            return ip_history_dict
        except Exception as e:
            self.logger.error(f"Error loading IP history from JSON: {e}")
            return {}

    def _initialize_ip_observation_counts(self):
        ip_counts = self.df['Malicious_IP'].value_counts().to_dict()
        for ip, count in ip_counts.items():
            self.ip_observation_counts[str(ip)] = count
        self.logger.debug(f"Initialized observation counts for {len(self.ip_observation_counts)} IPs")

    def is_cluster_similar(self, cluster_ips, new_ips, threshold=0.5):
        intersection_size = len(cluster_ips.intersection(new_ips))
        if not cluster_ips or not new_ips:
            return False
        similarity_ratio = intersection_size / min(len(cluster_ips), len(new_ips))
        return similarity_ratio >= threshold

    def process(self):
        self.df = self.df.sort_values(by=['customer', 'date'])
        current_cluster_rows = []
        last_row = None

        for _, row in self.df.iterrows():
            if not current_cluster_rows:
                current_cluster_rows.append(row)
                last_row = row
            else:
                if (row['customer'] == last_row['customer'] and 
                    pd.notna(row['date']) and pd.notna(last_row['date']) and
                    (row['date'] - last_row['date']).total_seconds() <= 300):
                    current_cluster_rows.append(row)
                    last_row = row
                else:
                    if current_cluster_rows:
                        self._merge_or_add_cluster(current_cluster_rows)
                    current_cluster_rows = [row]
                    last_row = row

        if current_cluster_rows:
            self._merge_or_add_cluster(current_cluster_rows)

        self._merge_similar_clusters()

    def _merge_or_add_cluster(self, rows):
        if not rows:
            self.logger.warning("No rows provided to merge or add cluster")
            return

        new_ips = {str(row['Malicious_IP']) for row in rows if 'Malicious_IP' in row}
        if not new_ips:
            self.logger.warning("No valid IPs found in rows to merge or add cluster")
            return

        first_observed = min(row['date'] for row in rows if pd.notna(row['date']))
        last_observed = max(row['date'] for row in rows if pd.notna(row['date']))
        merged = False

        for prev_cluster in self.prev_clusters:
            prev_ips = {ip_entry['IP'] for ip_entry in prev_cluster.get('Malicious_IPs', [])}
            if self.is_cluster_similar(prev_ips, new_ips):
                prev_cluster['First_Observed'] = min(
                    pd.to_datetime(prev_cluster.get('First_Observed', first_observed)),
                    first_observed
                ).strftime('%Y-%m-%d')
                prev_cluster['Last_Observed'] = max(
                    pd.to_datetime(prev_cluster.get('Last_Observed', last_observed)),
                    last_observed
                ).strftime('%Y-%m-%d')
                prev_cluster['Active'] = 'Yes' if (pd.Timestamp.now() - pd.to_datetime(prev_cluster['Last_Observed'])).days <= 14 else 'No'
                prev_cluster['Observations'] = prev_cluster.get('Observations', 0) + len(rows)

                ip_dates = {ip_entry['IP']: ip_entry['Last_Observed'] for ip_entry in prev_cluster.get('Malicious_IPs', [])}
                ip_counts = {ip_entry['IP']: ip_entry.get('Observation_Count', 0) for ip_entry in prev_cluster.get('Malicious_IPs', [])}
                for row in rows:
                    ip = str(row['Malicious_IP'])
                    date = row['date'].strftime('%Y-%m-%d')
                    if ip in ip_dates:
                        ip_dates[ip] = max(ip_dates[ip], date)
                        ip_counts[ip] += 1
                    else:
                        ip_dates[ip] = date
                        ip_counts[ip] = self.ip_observation_counts.get(ip, 1)
                prev_cluster['Malicious_IPs'] = [
                    {"IP": ip, "Last_Observed": date, "First_Observed": min(ip_dates[ip], prev_cluster.get('First_Observed', date)), "Observation_Count": ip_counts[ip]}
                    for ip, date in ip_dates.items()
                ]

                prev_cluster['customer'] = list(set(prev_cluster.get('customer', [])).union({row['customer'] for row in rows}))
                prev_cluster['customer_count'] = len(prev_cluster['customer'])
                prev_cluster['Industries'] = list(set(prev_cluster.get('Industries', [])).union({row['industry'] for row in rows}))
                prev_cluster['Industry_Count'] = len(prev_cluster['Industries'])
                prev_cluster['Countries'] = list(set(prev_cluster.get('Countries', [])).union({row['country'] for row in rows}))
                prev_cluster['Country_Count'] = len(prev_cluster['Countries'])
                prev_cluster['Regions'] = list(set(prev_cluster.get('Regions', [])).union({row['region'] for row in rows}))
                prev_cluster['Region_Count'] = len(prev_cluster['Regions'])

                computed = Cluster(0, rows, self.logger).compute_metrics()
                for key in ['duration_max', 'gbps_max', 'gbps_pct95', 'pps_max', 'pps_pct95']:
                    if computed[key] is not None and prev_cluster.get(key) is not None:
                        prev_cluster[key] = max(prev_cluster[key], computed[key])
                    elif computed[key] is not None:
                        prev_cluster[key] = computed[key]
                for key in ['duration_avg', 'gbps_avg', 'pps_avg']:
                    if computed[key] is not None:
                        prev_value = prev_cluster.get(key, 0) if prev_cluster.get(key) is not None else 0
                        prev_observations = prev_cluster.get('Observations', 1)
                        prev_cluster[key] = (
                            (prev_value * prev_observations + computed[key] * len(rows)) / 
                            (prev_observations + len(rows))
                        )
                prev_cluster['Vectors'] = list(set(prev_cluster.get('Vectors', [])).union(computed['vectors']))
                prev_cluster['Cluster_Size'] = len(ip_dates)
                merged = True
                break

        if not merged:
            new_cluster = Cluster(self.next_cluster_id, rows, self.logger)
            self.clusters.append(new_cluster)
            self.first_observed_dates[self.next_cluster_id] = first_observed
            self.last_observed_dates[self.next_cluster_id] = last_observed
            self.active_status[self.next_cluster_id] = 'Yes' if (pd.Timestamp.now() - last_observed).days <= 14 else 'No'
            self.observation_counts[self.next_cluster_id] = len(rows)
            self.next_cluster_id += 1

    def _merge_similar_clusters(self):
        merged_clusters = []
        used_cluster_ids = set()

        for i, cluster_i in enumerate(self.clusters):
            if cluster_i.cluster_id in used_cluster_ids:
                continue
            cluster_i_ips = cluster_i.get_malicious_ips()
            merged_rows = cluster_i.rows.copy()
            used_cluster_ids.add(cluster_i.cluster_id)

            for j, cluster_j in enumerate(self.clusters[i+1:], start=i+1):
                if cluster_j.cluster_id in used_cluster_ids:
                    continue
                cluster_j_ips = cluster_j.get_malicious_ips()
                if self.is_cluster_similar(cluster_i_ips, cluster_j_ips):
                    merged_rows.extend(cluster_j.rows)
                    for ip, date in cluster_j.ip_last_observed.items():
                        if ip in cluster_i.ip_last_observed:
                            cluster_i.ip_last_observed[ip] = max(cluster_i.ip_last_observed[ip], date)
                        else:
                            cluster_i.ip_last_observed[ip] = date
                    used_cluster_ids.add(cluster_j.cluster_id)
                    cluster_i_ips = cluster_i_ips.union(cluster_j_ips)

            merged_cluster = Cluster(cluster_i.cluster_id, merged_rows, self.logger)
            merged_cluster.ip_last_observed = cluster_i.ip_last_observed
            merged_clusters.append(merged_cluster)

        self.clusters = merged_clusters

    def _clean_nan(self, obj):
        """Recursively replace NaN values with None for JSON compatibility."""
        if isinstance(obj, dict):
            return {k: self._clean_nan(v) for k, v in obj.items()}
        elif isinstance(obj, list):
            return [self._clean_nan(v) for v in obj]
        elif isinstance(obj, float) and np.isnan(obj):
            return None
        return obj

    def save_results(self):
        """Save the clustering results to JSON files, handling NaN values."""
        all_clusters = []
        seen_cluster_ids = set()

        for cluster in self.prev_clusters:
            if cluster['ClusterID'] not in seen_cluster_ids:
                all_clusters.append(cluster)
                seen_cluster_ids.add(cluster['ClusterID'])

        for cluster in self.clusters:
            if cluster.cluster_id not in seen_cluster_ids:
                metrics = cluster.compute_metrics()
                all_clusters.append({
                    'ClusterID': cluster.cluster_id,
                    'duration_max': metrics['duration_max'],
                    'duration_avg': metrics['duration_avg'],
                    'gbps_max': metrics['gbps_max'],
                    'gbps_avg': metrics['gbps_avg'],
                    'gbps_pct95': metrics['gbps_pct95'],
                    'pps_max': metrics['pps_max'],
                    'pps_avg': metrics['pps_avg'],
                    'pps_pct95': metrics['pps_pct95'],
                    'customer': list({row['customer'] for row in cluster.rows}),
                    'customer_count': len({row['customer'] for row in cluster.rows}),
                    'Industries': list({row['industry'] for row in cluster.rows}),
                    'Industry_Count': len({row['industry'] for row in cluster.rows}),
                    'Countries': list({row['country'] for row in cluster.rows}),
                    'Country_Count': len({row['country'] for row in cluster.rows}),
                    'Regions': list({row['region'] for row in cluster.rows}),
                    'Region_Count': len({row['region'] for row in cluster.rows}),
                    'Observations': self.observation_counts.get(cluster.cluster_id, len(cluster.rows)),
                    'Vectors': metrics['vectors'],
                    'First_Observed': self.first_observed_dates.get(cluster.cluster_id).strftime('%Y-%m-%d'),
                    'Last_Observed': self.last_observed_dates.get(cluster.cluster_id).strftime('%Y-%m-%d'),
                    'Active': self.active_status.get(cluster.cluster_id),
                    'Malicious_IPs': [
                        {"IP": ip, "Last_Observed": date, "First_Observed": date, "Observation_Count": self.ip_observation_counts.get(ip, 1)}
                        for ip, date in cluster.ip_last_observed.items()
                    ],
                    'Cluster_Size': len(cluster.ip_last_observed)
                })
                seen_cluster_ids.add(cluster.cluster_id)

        latest_date = max(cluster['Last_Observed'] for cluster in all_clusters) if all_clusters else datetime.now().strftime('%Y-%m-%d')
        for cluster in all_clusters:
            cluster_id = cluster['ClusterID']
            for ip_entry in cluster['Malicious_IPs']:
                ip = ip_entry['IP']
                if ip not in self.ip_cluster_history:
                    self.ip_cluster_history[ip] = []
                if ip in self.ip_cluster_tracker:
                    last_date, last_cluster_id = self.ip_cluster_tracker[ip]
                    if last_date == latest_date and last_cluster_id == cluster_id:
                        continue
                    elif last_cluster_id != cluster_id:
                        self.ip_cluster_history[ip].append({"ClusterID": cluster_id, "Date": latest_date})
                        self.ip_cluster_tracker[ip] = (latest_date, cluster_id)
                else:
                    self.ip_cluster_history[ip].append({"ClusterID": cluster_id, "Date": latest_date})
                    self.ip_cluster_tracker[ip] = (latest_date, cluster_id)

        analysis_data = [{k: v for k, v in cluster.items() if k != 'Malicious_IPs'} for cluster in all_clusters]

        ip_last_cluster = {}
        ip_first_observed = {}
        ip_last_observed = {}
        for cluster in all_clusters:
            cluster_id = cluster['ClusterID']
            for ip_entry in cluster['Malicious_IPs']:
                ip = ip_entry['IP']
                current_observed = ip_entry.get('Last_Observed')
                if current_observed:
                    if ip not in ip_first_observed or current_observed < ip_first_observed[ip]:
                        ip_first_observed[ip] = current_observed
                    if ip not in ip_last_observed or current_observed > ip_last_observed[ip]:
                        ip_last_observed[ip] = current_observed
                    if ip not in ip_last_cluster or current_observed >= ip_last_cluster[ip][1]:
                        ip_last_cluster[ip] = (cluster_id, current_observed)

        ip_data = [
            {
                'Malicious_IP': ip,
                'Clusters': [cluster_id],
                'First_Observed': ip_first_observed[ip],
                'Last_Observed': ip_last_observed[ip],
                'Observation_Count': self.ip_observation_counts.get(ip, 1)
            }
            for ip, (cluster_id, _) in ip_last_cluster.items()
        ]

        ip_history_items = [
            {'IP': ip, 'Date': entry['Date'], 'ClusterID': entry['ClusterID']}
            for ip, history in self.ip_cluster_history.items()
            for entry in history
        ]

        # Clean NaN values
        analysis_data = self._clean_nan(analysis_data)
        ip_data = self._clean_nan(ip_data)
        ip_history_items = self._clean_nan(ip_history_items)

        # Save to JSON files
        try:
            with open(self.analysis_file, 'w') as f:
                json.dump(analysis_data, f, indent=4)
            self.logger.info(f"Saved {len(analysis_data)} clusters to {self.analysis_file}")
        except Exception as e:
            self.logger.error(f"Failed to save to {self.analysis_file}: {e}")

        try:
            with open(self.ip_file, 'w') as f:
                json.dump(ip_data, f, indent=4)
            self.logger.info(f"Saved {len(ip_data)} IPs to {self.ip_file}")
        except Exception as e:
            self.logger.error(f"Failed to save to {self.ip_file}: {e}")

        try:
            with open(self.ip_history_file, 'w') as f:
                json.dump(ip_history_items, f, indent=4)
            self.logger.info(f"Saved {len(ip_history_items)} IP history entries to {self.ip_history_file}")
        except Exception as e:
            self.logger.error(f"Failed to save to {self.ip_history_file}: {e}")

        df = pd.DataFrame(all_clusters)
        df['Last_Observed'] = pd.to_datetime(df['Last_Observed'], errors='coerce')
        df['First_Observed'] = pd.to_datetime(df['First_Observed'], errors='coerce')
        return df

if __name__ == "__main__":
    processor = BotnetClusterProcessor(
        merged_df,
        analysis_file='cluster_analysis.json',
        ip_file='cluster_malicious_ips.json',
        ip_history_file='cluster_ip_history.json'
    )
    processor.process()
    cluster_df = processor.save_results()