<a href="https://colab.research.google.com/github/Keerthana0004/CloudScan/blob/main/training_advanced.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [3]:
!unzip /content/drive/MyDrive/aws_5000.zip -d /content/dataset/

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
  inflating: /content/dataset/aws_5000/5/309541697/outputs_1.tf  
  inflating: /content/dataset/aws_5000/5/309541697/variables_1.tf  
  inflating: /content/dataset/aws_5000/5/309541697/main.tf  
  inflating: /content/dataset/aws_5000/5/309541697/variables.tf  
  inflating: /content/dataset/aws_5000/5/309541697/outputs.tf  
   creating: /content/dataset/aws_5000/5/327879466/
  inflating: /content/dataset/aws_5000/5/327879466/inputs.tf  
  inflating: /content/dataset/aws_5000/5/327879466/main.tf  
  inflating: /content/dataset/aws_5000/5/327879466/outputs.tf  
   creating: /content/dataset/aws_5000/5/321051059/
  inflating: /content/dataset/aws_5000/5/321051059/infra.tf  
  inflating: /content/dataset/aws_5000/5/321051059/provider.tf  
  inflating: /content/dataset/aws_5000/5/321051059/variables.tf  
   creating: /content/dataset/aws_5000/5/288208169/
  inflating: /content/dataset/aws_5000/5/288208169/output.tf  
  inflatin

In [4]:
!cp /content/drive/MyDrive/checkov_results_5000.json .

In [5]:
!pip install pandas python-hcl2 tqdm

Collecting python-hcl2
  Downloading python_hcl2-7.3.1-py3-none-any.whl.metadata (5.2 kB)
Downloading python_hcl2-7.3.1-py3-none-any.whl (22 kB)
Installing collected packages: python-hcl2
Successfully installed python-hcl2-7.3.1


In [6]:
%%writefile feature_extractor.py

import hcl2
import json
import pandas as pd
from pathlib import Path
from tqdm import tqdm

print("Starting feature extraction (Advanced Features Version)...")

# --- Configuration ---
DATASET_DIR = '/content/dataset/aws_5000/'
JSON_INPUT_FILE = 'checkov_results_5000.json'
CSV_OUTPUT_FILE = '/content/drive/MyDrive/dataset_advanced.csv' # Saving to a new file

def extract_specific_features(resource_type, config):
    """Extracts detailed attributes for specific resource types."""
    features = {}

    if resource_type == 'aws_s3_bucket':
        # Get the value of the 'acl' attribute if it exists
        features['s3_acl'] = config.get('acl', [None])[0]
        # Check if encryption is enabled
        features['s3_encryption_enabled'] = 1 if 'server_side_encryption_configuration' in config else 0
        # Check if versioning is enabled
        versioning = config.get('versioning', [{}])[0]
        features['s3_versioning_enabled'] = 1 if versioning.get('enabled') == [True] else 0

    elif resource_type == 'aws_security_group_rule':
        features['sg_is_ingress'] = 1 if config.get('type', ['ingress'])[0] == 'ingress' else 0
        # Check if it's open to the world
        cidr_blocks = config.get('cidr_blocks', [[]])[0]
        features['sg_is_open_to_world'] = 1 if '0.0.0.0/0' in cidr_blocks else 0

    elif resource_type == 'aws_db_instance':
        features['db_storage_encrypted'] = 1 if config.get('storage_encrypted') == [True] else 0
        features['db_publicly_accessible'] = 1 if config.get('publicly_accessible') == [True] else 0

    return features

# --- Load and index the Checkov results ---
with open(JSON_INPUT_FILE, 'r') as f:
    results_data = json.load(f)

misconfigured_resources = {}
check_results = results_data.get('results', {}).get('failed_checks', [])

for check in check_results:
    file_path = check['file_path'].lstrip('./')
    unique_key = (file_path, check['resource'])
    misconfigured_resources[unique_key] = check['check_id']

print(f"Loaded {len(misconfigured_resources)} misconfigured resources.")

# --- Find all Terraform files and extract features ---
terraform_files = list(Path(DATASET_DIR).rglob('*.tf'))
all_resources_data = []
print(f"Found {len(terraform_files)} files to parse...")

for tf_file_path in tqdm(terraform_files, desc="Parsing files"):
    try:
        with open(tf_file_path, 'r', encoding='utf-8') as f:
            relative_path_full = tf_file_path.relative_to(DATASET_DIR)
            current_file_path = str(Path(*relative_path_full.parts[1:]))

            tf_data = hcl2.load(f)
            if tf_data.get('resource'):
                for resource_block in tf_data['resource']:
                    for resource_type, details in resource_block.items():
                        for resource_name, config in details.items():
                            full_resource_id = f"{resource_type}.{resource_name}"

                            # Base features
                            base_features = {
                                'file_path': current_file_path,
                                'resource_id': full_resource_id,
                                'resource_type': resource_type
                            }

                            # Get specific features for this resource type
                            specific_features = extract_specific_features(resource_type, config)

                            # Get the label
                            lookup_key = (current_file_path, full_resource_id)
                            label = {'is_misconfigured': 1 if lookup_key in misconfigured_resources else 0}

                            # Combine all features and the label
                            all_features = {**base_features, **specific_features, **label}
                            all_resources_data.append(all_features)
    except Exception as e:
        pass

# --- Create and clean the final DataFrame ---
df = pd.DataFrame(all_resources_data)
# One-hot encode the categorical 'resource_type' and 's3_acl'
df = pd.get_dummies(df, columns=['resource_type', 's3_acl'], dummy_na=True)
# Fill any remaining missing values with 0
df = df.fillna(0)

# Drop path and id as they are not features for the model
df_for_training = df.drop(columns=['file_path', 'resource_id'])

df_for_training.to_csv(CSV_OUTPUT_FILE, index=False)
print(f"✅ Done! Your advanced dataset is ready at {CSV_OUTPUT_FILE}.")

Writing feature_extractor.py


In [7]:
!python3 feature_extractor.py

Starting feature extraction (Advanced Features Version)...
Loaded 59255 misconfigured resources.
Found 95720 files to parse...
Parsing files: 100% 95720/95720 [24:37<00:00, 64.79it/s]
✅ Done! Your advanced dataset is ready at /content/drive/MyDrive/dataset_advanced.csv.


In [None]:
%%writefile train_model.py

import pandas as pd
import joblib
import json
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import classification_report, confusion_matrix

print("Starting model training on advanced features...")

# --- 1. Load the Pre-processed Dataset ---
# We now load the new, advanced dataset
df = pd.read_csv('/content/drive/MyDrive/dataset_advanced.csv')
print(f"Advanced dataset loaded with {len(df)} resources and {len(df.columns)} columns.")

# --- 2. Separate Features and Labels ---

# The 'is_misconfigured' column is our target label
labels = df['is_misconfigured']

# All other columns in the CSV are our features
features = df.drop(columns=['is_misconfigured'])

# Save the final list of feature columns for the application
feature_columns_path = '/content/drive/MyDrive/model_columns_advanced.json'
feature_columns = list(features.columns)
with open(feature_columns_path, 'w') as f:
    json.dump(feature_columns, f)
print(f"Saved {len(feature_columns)} model feature columns to {feature_columns_path}")

# --- 3. Split Data ---
X_train, X_test, y_train, y_test = train_test_split(
    features, labels, test_size=0.2, random_state=42, stratify=labels
)
print(f"Training on {len(X_train)} samples, testing on {len(X_test)} samples.")

# --- 4. Train Model ---
model = RandomForestClassifier(n_estimators=100, random_state=42, n_jobs=-1)
print("Training RandomForestClassifier...")
model.fit(X_train, y_train)

model_path = '/content/drive/MyDrive/cloudscan_model_advanced.pkl'
joblib.dump(model, model_path)
print(f"✅ Trained advanced model saved to {model_path}")

# --- 5. Evaluate Model ---
print("\n--- Evaluating Model Performance ---")
predictions = model.predict(X_test)
print("\n--- Classification Report ---")
print(classification_report(y_test, predictions, target_names=['Secure (0)', 'Misconfigured (1)']))

print("\n--- Confusion Matrix ---")
cm = confusion_matrix(y_test, predictions)
print(f"True Negatives: {cm[0][0]}")
print(f"False Positives: {cm[0][1]}")
print(f"False Negatives: {cm[1][0]}")
print(f"True Positives: {cm[1][1]}")

In [None]:
!python train_model.py