In [None]:
#Automated Cybersecurity Log Analyzer Agent
#Features:
#fetch logs from an API or load from CSV
# preprocess and feature-engineer logs
# unsupervised anomaly detection (IsolationForest)
# explainable anomaly scoring and suggested actions
# HTML security report generation with charts
# placeholders for alerting (email/Slack)

#NOTE: replace placeholders (API keys, endpoints, alerting hooks) before production use.

In [None]:
PROJECT_METADATA = {
    'name': 'Automated Cybersecurity Log Analyzer Agent',
    'author': 'Your Name',
    'version': '1.0',
    'description': 'Fetches logs, detects anomalies with ML, generates reports and suggests actions.'
}

In [None]:
!pip install -q scikit-learn pandas matplotlib jinja2 requests

In [None]:
#standard libs import
import os #operating system utiliies
import io #in-memory streams
import json #json encoding / decoding
import datetime #date time handling
import tempfile #temorary file utilites

In [None]:
#third party libs
import requests #http requests  for fetching logs
import pandas as pd #data manipulation
import numpy as np #numerical operations
import matplotlib.pyplot as plt #ploting
import seaborn as sns #visualization
from datetime import datetime, timedelta, timezone
from sklearn.ensemble import IsolationForest #anomaly detection model
from sklearn.preprocessing import OneHotEncoder , StandardScaler #preprocess utils
from jinja2 import Template #html reports

In [None]:
#config env
API_ENDPOINT = os.getenv('LOG_API_ENDPOINT','https://example.com/api/logs') #logs ai endpoint
API_TOKEN = os.getenv('LOG_API_TOKEN','') #bearer and key tokens for logs
LOCAL_SAMPLE_CSV = 'sample_logs.csv' #fallback csv if api not available
REPORT_OUTPUT = 'security_report.html' #reports output
RANDOM_SEED = 100

In [None]:
#fetch logs from  API
def fetch_logs_from_api(endpoint = API_ENDPOINT,token = API_TOKEN , params = None, timeout = 30):
  """fetch logs fom rest api and return pandas dataframe """
  #build headers for authetication
  headers = {'Authorization' : f'Bearer {token}'} if token else {}
  #perform get operations to fetch logs
  resp = requests.get(endpoint , headers = headers, params = paams or {}, timeout = timeout)
  #raise an exception if response indicates error
  resp.raise_for_status()
  #parse json body
  data = resp.json()
  # if the API returns a top-level 'logs' key, use it otherwise use raw list
  records = data.get('logs',data) if isinstance(data,dict) else data
  #convert records into dataframe
  df = pd.DataFrame.from_records(records)
  #return dataframe even for empty resposes
  return df

In [None]:
#load logs from local csv as fallback
def load_logs_from_csv(path = LOCAL_SAMPLE_CSV):
  """reads from local csv and returns pandas dataframe"""
  df = pd.read_csv(path)
  #return df
  return df

In [None]:
#parsing and preprocessing
def preprocess_logs(df):
  """preprocess df & return a cleaned df with engineered feats"""
  #make a copy to avoid mutalating original
  df = df.copy()
  #normalize to lowercase
  df.columns = [c.lower() for c in df.columns]
  # parse timestamp column if available
  if 'timestamp' in df.columns:
    df['timestamp'] = pd.to_datetime(df['timestamp'], errors = 'coerce')
  else:
    #create synthetic timestamp
    df['timestamp'] = pd.to_datetime('now')
  #extract common temporal features
  df['hour'] = df['timestamp'].dt.hour
  df['weekday'] = df['timestamp'].dt.weekday

  #ensure ip cols exists
  for col in ['src_ip', 'dst_ip', 'source_ip', 'destination_ip']:
    if col in df.columns:
      #create canonical src/dist ip
      if 'src_ip' not in df.columns and col=='source_ip':
        df['src_ip'] = df[col]
      if 'dst_ip' not in df.columns and col=='destination_ip':
        df['dst_ip'] = df[col]

  #fill missing values wih unknown
  text_cols = [c for c in df.columns if df[c].dtype == object]
  for c in text_cols:
    df[c] = df[c].fillna('unknown')

  #basic numeric coercion
  for c in df.columns:
    if 'bytes' in c or 'size' in c or 'status' in c or 'count' in c:
      df[c] = pd.to_numeric(df[c],errors='coerce').fillna(0)

  #return processed dataframe
  return df

In [None]:
#feature engineering
def engineer_feature(df,max_unique_cat = 50):
  """Create numeric features ready for anomaly detection and return matrix + transformer objects."""
  # select numeric columns for baseline
  numeric_cols = df.select_dtypes(include=[np.number]).columns.tolist()
  # select small-cardinality categorical columns to one-hot encode
  cat_candidates = [c for c in df.select_dtypes(include=['object']).columns if df[c].nunique() <= max_unique_cat]
  # keep a stable set of features
  features = []
  #sandard scaler
  scaler = StandardScaler()

  #one hot encoder
  ohe = OneHotEncoder(handle_unknown='ignore',sparse_output=False)

  #build numeric matrix
  x_num = df[numeric_cols].fillna(0).values if numeric_cols else np.zeros((len(df),0))
  #scale numeric features if present
  if x_num.shape[1] > 0:
    x_num = scaler.fit_transform(x_num)
  #encode categorical values
  if cat_candidates:
    x_cat = ohe.fit_transform(df[cat_candidates].astype(str))
  else:
    x_cat = np.zeros((len(df),0))
  #concatenate numeric and categorical values
  X = np.hstack([x_num,x_cat]) if x_num.size or x_cat.size else np.zeroes((len(df),0))
  #construct feature name list
  features_names = numeric_cols + list(ohe.get_feature_names_out(cat_candidates)) if cat_candidates else numeric_cols

  return X, features_names, {'scaler':scaler,'ohe':ohe,'numeric_cols':numeric_cols,'cat_cols':cat_candidates}

In [None]:
#anomaly detection model training
def train_anomaly_detector(X,contamination=0.01,random_state=RANDOM_SEED):
  """train isolation forest anomaly detector and return fitted model"""
  model = IsolationForest(contamination = contamination, random_state = random_state)
  model.fit(X)
  return model

In [None]:
#scoring nd attaching anomaly labels
def score_anomalies(model,X):
  # anomaly score: the lower, the more abnormal for sklearn's IsolationForest
  raw_scores = model.score_samples(X)
  # convert to positive anomaly score where higher means more anomalous
  anomaly_score = -raw_scores
  # predict labels: -1 for anomaly, 1 for normal
  labels = model.predict(X)

  is_anomaly = labels == -1
  return anomaly_score, is_anomaly

In [None]:
# explain anomalies (simple feature influence approximation)
def explain_top_anomalies(df,X,features_names,anomaly_score,top_k=10):
  if X.size==0:
    return pd.DataFrame()
  # compute per-feature z-scores
  feature_z = (X - np.nanmean(X, axis=0)) / (np.nanstd(X, axis=0) + 1e-9)
  #choose top k rows
  top_idx = np.argsort(-anomaly_score)[:top_k]
  records = []
  for i in top_idx:
    contrib_idx = np.argsort(-np.abs(feature_z[i]))[:5]
    contrib = [(features_names[j], float(feature_z[i, j])) for j in contrib_idx]
    records.append({'index': i, 'anomaly_score': float(anomaly_score[i]), 'top_features': contrib})
    # return a dataframe for easy consumption
  return pd.DataFrame(records)

In [None]:
# map anomaly reason to suggested actions
ACTION_TEMPLATES =[
    (lambda r: r > 10, "Investigate immediately and isolate host and rotating credentials"),
    (lambda r: r > 5, "High anomaly: review recent changes"),
    (lambda r: r > 2,"Medium anomaly: investigate further"),
    (lambda r: True,"low anomaly:monitor system")
]

def suggest_action(score):
  #return a suggested action based on template
  for cond,action in ACTION_TEMPLATES:
    if cond(score):
      return action
  return ACTION_TEMPLATES[-1][1]

In [None]:
#generate charts for reports
def generate_plots(df,anomalies_mask,output_dir=None):
  output_dir = output_dir or tempfile.mkdtemp()

  # time-series count by hour
  fig1_path = os.path.join(output_dir, 'events_by_hour.png')
  plt.figure()
  df.groupby('hour').size().plot(kind='bar')
  plt.title('Events by Hour')
  plt.xlabel('Hour of Day')
  plt.ylabel('Event Count')
  plt.tight_layout()
  plt.savefig(fig1_path)
  plt.close()

  # anomaly scatter (if numeric features exist use hour vs anomaly probability)
  fig2_path = os.path.join(output_dir, 'anomaly_scatter.png')
  plt.figure()
  plt.scatter(df.index, anomalies_mask.astype(int), alpha=0.6)
  plt.title('Anomalies Over Time (index)')
  plt.xlabel('Record Index')
  plt.ylabel('Anomaly (1=yes)')
  plt.tight_layout()
  plt.savefig(fig2_path)
  plt.close()

  return[fig1_path,fig2_path]

In [None]:
from datetime import datetime, timezone, timedelta

In [None]:
#generate HTML report using jinja2 template
REPORT_TEMPLATE = """
<html>
<head>
  <title>Security Report - {{meta.name}}</title>
  <style>
    body{font-family: Arial, sans-serif; padding:20px}
    .card{border:1px solid #ddd; padding:12px; margin:8px 0; border-radius:8px}
    table{border-collapse:collapse; width:100%}
    th,td{border:1px solid #ccc; padding:6px}
  </style>
</head>
<body>
  <h1>Security Report - {{ meta.name }}</h1>
  <p>Generated: {{ generated_at }}</p>
  <div class="card">
    <h2>Summary</h2>
    <p>Total records: {{ total_records }}</p>
    <p>Detected anomalies: {{ anomaly_count }}</p>
    <p>Top suggested action: {{ top_action }}</p>
  </div>
  <div class="card">
    <h2>Visualizations</h2>
    {% for img in images %}
      <img src="{{ img }}" style="max-width:700px; display:block; margin-bottom:10px"/>
    {% endfor %}
  </div>
  <div class="card">
    <h2>Top Anomalies</h2>
    <table>
      <tr><th>Index</th><th>Anomaly Score</th><th>Top Feature Contributions</th><th>Suggested Action</th></tr>
      {% for a in anomalies %}
      <tr>
        <td>{{ a.index }}</td>
        <td>{{ '%.3f'|format(a.anomaly_score) }}</td>
        <td>
          <ul>
          {% for f in a.top_features %}
            <li>{{ f[0] }} : {{ '%.3f'|format(f[1]) }}</li>
          {% endfor %}
          </ul>
        </td>
        <td>{{ a.suggested_action }}</td>
      </tr>
      {% endfor %}
    </table>
  </div>
</body>
</html>
"""

In [None]:
#assemble report
import datetime
def build_report(df,anomaly_score,is_anomaly,explaination_df,images,output_path=REPORT_OUTPUT):
  """Render the HTML report and write to disk."""
  total_records = len(df)
  anomaly_count = int(is_anomaly.sum())
  top_action = suggest_action(float(np.max(anomaly_score))) if total_records > 0 else 'No data'
  anomalies = []
  for _, row in explaination_df.iterrows():
    anomalies.append({'index': int(row['index']) , 'anomaly_score' : float(row['anomaly_score']) , 'top_features' : row['top_features'], 'suggested_action' : suggest_action(row['anomaly_score'])})
  #render template
  tp1 = Template(REPORT_TEMPLATE)
  html = tp1.render(meta = PROJECT_METADATA, generated_at = datetime.datetime.utcnow().isoformat() + "Z"
 , total_records=total_records , anomaly_count=anomaly_count, top_action=top_action, images=images, anomalies=anomalies)
  #write to file
  with open(output_path,'w',encoding='utf-8') as f:
    f.write(html)
  return output_path

In [None]:
#placehlder alerting function
def send_alert(subject,body,recipients=None):
  print('Alert:',subject)
  print(body)
  return True

In [None]:
from datetime import datetime , timedelta , timezone

In [None]:
#end-to-end pipeline
def run_pipeline(fetch_from_api=True,sample_csv=LOCAL_SAMPLE_CSV, contamination=0.01, top_k=10):
  #fetch logs from api
  if fetch_from_api:
    try:
      df = fetch_logs_from_api()
    except Exception as e:
      print('API fetch failed, falling back to CSV:', str(e))
      df = load_logs_from_csv(sample_csv)
  else:
    df = load_logs_from_csv(sample_csv)
  #preprocess logs
  df = preprocess_logs(df)
  #feature engg
  X, feature_names, transformers = engineer_feature(df)
  #train model
  model = train_anomaly_detector(X, contamination=contamination)
  #score anmalies
  anomaly_score , is_anomaly = score_anomalies(model,X)
  df['anomaly_score'] = anomaly_score
  df['is_anomaly'] = is_anomaly
  #explain top anomalies
  explaination_df = explain_top_anomalies(df,X,feature_names,anomaly_score,top_k=top_k)
  #generate plots
  images = generate_plots(df,is_anomaly)
  #build report
  report_path = build_report(df,anomaly_score,is_anomaly,explaination_df,images)
  #send alert
  if anomaly_score.size and np.max(anomaly_score) > 10:
    msg = f"CRITICAL Anomaly detected - score {np.max(anomaly_score):.3f}"
    send_slack_alert(msg)
    send_email_alert("critical security alert", msg)
  #return key artifacts
  return {'dataframe':df , 'model':model, 'feature_names': feature_names , 'report':report_path , 'explainations':explaination_df}

In [None]:
# example entrypoint for Colab
if __name__ == '__main__':
    # run the pipeline in default mode using CSV fallback in example
    artifacts = run_pipeline(fetch_from_api=False)
    # print out the report location for convenience
    print('Report generated at:', artifacts['report'])

Report generated at: security_report.html


  html = tp1.render(meta = PROJECT_METADATA, generated_at = datetime.datetime.utcnow().isoformat() + "Z"


In [None]:
# convenience function for interactive notebooks: main()
def main(fetch_from_api=False, sample_csv=LOCAL_SAMPLE_CSV, contamination=0.01, top_k=10):
    """Interactive entrypoint for notebooks; returns artifacts and prints report location."""
    artifacts = run_pipeline(fetch_from_api=fetch_from_api, sample_csv=sample_csv, contamination=contamination, top_k=top_k)
    print('Report generated at:', artifacts['report'])
    return artifacts

In [None]:
import os
os.environ["SLACK_WEBHOOK_URL"] = "https://hooks.slack.com/services/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"

In [None]:
#SEND SLACK ALERTS
def send_slack_alert(message):
  webhook = os.getenv("SLACK_WEBHOOK_URL")
  if not webhook:
    print("slack webhook not found")
    return False
  try:
    payload = {"text":message}
    requests.post(webhook, json=payload)
    print("slack alert send!")
    return True
  except Exception as e:
    print("failed",e)
    return False

In [None]:
#email alert function
import smtplib
from email.mime.text import MIMEText

In [None]:
os.environ["EMAIL_USER"] = "i@gmail.com"
os.environ["EMAIL_PASS"] = "imana"

In [None]:
def send_email_alert(subject,body,recipient=None):
  email_user = os.getenv("EMAIL_USER")
  email_pass = os.getenv("EMAIL_PASS")
  recipient = recipient or email_user

  msg = MIMEText(body)
  msg['Subject'] = subject
  msg['From'] = email_user
  msg['To'] = recipient

  try:
    with smtplib.SMTP_SSL("smtp.gmail.com",465) as server:
      server.login(email_user,email_pass)
      server.send_message(msg)
    print('email alert send')
    return True
  except Exception as e:
    print("failed to send", e)
    return False

In [None]:
#generate synhetic logs
import random
import pandas as pd
import numpy as np
from datetime import datetime, timedelta, timezone

def generate_synthetic_logs(num_rows=1000, anomaly_ratio=0.05):
  logs = []
  start_time = datetime.now() - timedelta(days=1)
  src_ips = [f"10.0.0.{i}" for i in range(1, 50)]
  dst_ips = [f"10.0.1.{i}" for i in range(1, 50)]

  events_normal = ["login","logout","file_upload","file_download","heatrbeat","scan"]
  events_attack = ["brute_force","port_scan","data_exfiltration","suspicious_login"]

  for i in range(num_rows):
    timestamp = start_time + timedelta(seconds = i * random.randint(1,5))
    is_anomaly = random.random() < anomaly_ratio
    if not is_anomaly:
      event = random.choice(events_normal)
      bytes_sent = max(0, int(np.random.normal(3000, 1200)))
      status = random.choice([200, 200, 200, 404, 500, 401])
    else:
      event = random.choice(events_attack)
      bytes_sent = random.randint(2000000, 10000000)  # huge transfer
      status = random.choice([403, 500, 503, 401])

    logs.append([
        timestamp.isoformat(),
        random.choice(src_ips),
        random.choice(dst_ips),
        event,
        bytes_sent,
        status,
        is_anomaly
    ])
  df = pd.DataFrame(logs, columns=["timestamp", "src_ip", "dst_ip", "event", "bytes", "status", "is_attack"])
  return df

In [None]:
#generate and save
df = generate_synthetic_logs(num_rows=2000)  # you can choose 1000, 2000, 5000, etc.
df.to_csv("synthetic_logs.csv", index=False)

df.head()

Unnamed: 0,timestamp,src_ip,dst_ip,event,bytes,status,is_attack
0,2025-11-23T15:27:56.454155,10.0.0.45,10.0.1.34,file_upload,1318,200,False
1,2025-11-23T15:28:00.454155,10.0.0.31,10.0.1.18,scan,1731,401,False
2,2025-11-23T15:28:04.454155,10.0.0.17,10.0.1.23,scan,3009,404,False
3,2025-11-23T15:28:08.454155,10.0.0.28,10.0.1.46,scan,2377,200,False
4,2025-11-23T15:28:08.454155,10.0.0.34,10.0.1.39,port_scan,4825902,500,True


In [None]:
main(fetch_from_api=False, sample_csv="synthetic_logs.csv")

AttributeError: type object 'datetime.datetime' has no attribute 'datetime'