In [3]:
import requests
import json
import os
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import IsolationForest, RandomForestClassifier
from sklearn.metrics import classification_report, confusion_matrix
import joblib
import logging

class APISecurityModelTrainer:
    def __init__(self, config_path='config.json', log_path='model_training.log'):
        """
        Initialize the model trainer with configuration and logging
        """
        # Setup logging
        logging.basicConfig(
            filename=log_path,
            level=logging.INFO,
            format='%(asctime)s - %(levelname)s: %(message)s'
        )
        self.logger = logging.getLogger(__name__)

        # Load configuration
        try:
            with open(config_path) as f:
                self.config = json.load(f)
        except FileNotFoundError:
            self.config = {
                "sensitive_keywords": ["key", "token", "password", "ssn", "credit", "card"],
                "training_urls": [
                    "https://jsonplaceholder.typicode.com/posts",
                    "https://api.github.com/users",
                    "https://api.publicapis.org/entries"
                ]
            }
            self.logger.warning("Config file not found. Using default configuration.")

    def extract_features(self, response):
        """
        Extract comprehensive security-relevant features from API response
        """
        features = {
            'status_code': response.status_code,
            'content_length': len(response.content),
            'num_sensitive_headers': 0,
            'num_sensitive_keywords': 0,
            'redirects': len(response.history),
            'header_count': len(response.headers),
            'is_json': False,
            'is_secure': False
        }

        # Check for HTTPS
        features['is_secure'] = response.url.startswith('https://')

        # Check headers for sensitive information
        sensitive_keywords = self.config.get("sensitive_keywords", [])
        for key, value in response.headers.items():
            if any(sensitive in key.lower() for sensitive in sensitive_keywords):
                features['num_sensitive_headers'] += 1

        # Check if response is JSON
        try:
            json_data = response.json()
            features['is_json'] = True
            features['num_sensitive_keywords'] = sum(
                sum(1 for sensitive in sensitive_keywords if sensitive in str(value).lower())
                for value in json_data.values()
            )
        except (ValueError, TypeError):
            pass

        return features

    def collect_training_data(self, urls=None):
        """
        Collect training data from multiple API endpoints
        """
        if urls is None:
            urls = self.config.get("training_urls", [])

        training_data = []
        for url in urls:
            try:
                # Simulate different scenarios
                variants = [
                    {'url': url, 'headers': {}},
                    {'url': url, 'headers': {'Authorization': 'Bearer test_token'}},
                    # Add more header variations if needed
                ]

                for variant in variants:
                    response = requests.get(
                        variant['url'],
                        headers=variant['headers'],
                        timeout=5
                    )
                    features = self.extract_features(response)

                    # Label based on features (simplified example)
                    features['is_secure_endpoint'] = (
                        features['is_secure'] and
                        features['status_code'] < 400 and
                        features['num_sensitive_keywords'] == 0
                    )

                    training_data.append(features)

            except requests.exceptions.RequestException as e:
                self.logger.error(f"Error collecting data from {url}: {e}")

        return pd.DataFrame(training_data)

    def prepare_data(self, df):
        """
        Prepare data for machine learning models
        """
        # Separate features and labels
        X = df.drop('is_secure_endpoint', axis=1)
        y = df['is_secure_endpoint']

        # Split data
        X_train, X_test, y_train, y_test = train_test_split(
            X, y, test_size=0.2, random_state=42
        )

        # Scale features
        scaler = StandardScaler()
        X_train_scaled = scaler.fit_transform(X_train)
        X_test_scaled = scaler.transform(X_test)

        return X_train_scaled, X_test_scaled, y_train, y_test, scaler

    def train_isolation_forest(self, X_train, X_test):
        """
        Train Isolation Forest for anomaly detection
        """
        iso_forest = IsolationForest(
            contamination=0.1,  # Expect 10% of samples to be anomalous
            random_state=42
        )
        iso_forest.fit(X_train)

        # Predict and evaluate
        train_predictions = iso_forest.predict(X_train)
        test_predictions = iso_forest.predict(X_test)

        # Save model
        joblib.dump(iso_forest, 'isolation_forest_model.joblib')
        self.logger.info("Isolation Forest model trained and saved.")

        return iso_forest

    def train_classification_model(self, X_train_scaled, X_test_scaled, y_train, y_test):
        """
        Train a Random Forest Classifier for security endpoint classification
        """
        rf_classifier = RandomForestClassifier(
            n_estimators=100,
            random_state=42,
            class_weight='balanced'
        )
        rf_classifier.fit(X_train_scaled, y_train)

        # Evaluate the model
        y_pred = rf_classifier.predict(X_test_scaled)

        # Logging evaluation metrics
        self.logger.info("Classification Report:")
        self.logger.info(classification_report(y_test, y_pred))

        # Confusion Matrix
        conf_matrix = confusion_matrix(y_test, y_pred)
        self.logger.info("Confusion Matrix:")
        self.logger.info(conf_matrix)

        # Save model
        joblib.dump(rf_classifier, 'security_classifier_model.joblib')
        self.logger.info("Random Forest Classifier trained and saved.")

        return rf_classifier

    def generate_synthetic_data(self, base_df, num_samples=1000):
        """
        Generate synthetic training data to augment real-world data
        """
        synthetic_data = []
        for _ in range(num_samples):
            # Randomly perturb existing data points
            sample = base_df.sample(1).iloc[0].copy()

            # Introduce controlled variations
            sample['content_length'] += np.random.randint(-100, 100)
            sample['num_sensitive_headers'] = max(0, sample['num_sensitive_headers'] + np.random.randint(-1, 2))
            sample['num_sensitive_keywords'] = max(0, sample['num_sensitive_keywords'] + np.random.randint(-1, 2))

            # Randomize some flags
            sample['is_secure'] = np.random.choice([True, False], p=[0.8, 0.2])
            sample['is_secure_endpoint'] = sample['is_secure'] and sample['num_sensitive_keywords'] == 0

            synthetic_data.append(sample)

        return pd.DataFrame(synthetic_data)

    def train_comprehensive_model(self, additional_urls=None):
        """
        Comprehensive model training pipeline
        """
        # Collect training data
        base_df = self.collect_training_data()

        # Add additional URLs if provided
        if additional_urls:
            additional_df = self.collect_training_data(additional_urls)
            base_df = pd.concat([base_df, additional_df], ignore_index=True)

        # Generate synthetic data
        synthetic_df = self.generate_synthetic_data(base_df)
        combined_df = pd.concat([base_df, synthetic_df], ignore_index=True)

        # Prepare data
        X_train_scaled, X_test_scaled, y_train, y_test, scaler = self.prepare_data(combined_df)

        # Save scaler
        joblib.dump(scaler, 'feature_scaler.joblib')

        # Train models
        self.train_isolation_forest(X_train_scaled, X_test_scaled)
        self.train_classification_model(X_train_scaled, X_test_scaled, y_train, y_test)

def main():
    # Initialize and train the model
    model_trainer = APISecurityModelTrainer()

    # Optional: Provide additional training URLs
    additional_urls = [
        "https://reqres.in/api/users",
        "https://httpbin.org/get"
    ]

    model_trainer.train_comprehensive_model(additional_urls)

if __name__ == "__main__":
    main()

AttributeError: 'list' object has no attribute 'values'