<a href="https://colab.research.google.com/github/KOWSALYAAARU/test_app/blob/main/Anomaly_Detection_in_Serial_Communication_ipynb.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.ensemble import IsolationForest
import plotly.express as px


In [2]:
# Step 1: Simulate serial communication data
np.random.seed(42)

In [3]:
protocols = ['I2C', 'SPI', 'UART']
num_records = 1000

In [11]:
def simulate_data(protocol, size):
    return pd.DataFrame({
        'timestamp': pd.date_range(start='2025-01-01', periods=size, freq='S'),
        'protocol': protocol,
        'baud_rate': np.random.choice([9600, 4800, 57600, 115200], size),
        'packet_size': np.random.normal(loc=50, scale=10, size=size).astype(int),
        'latency_ms': np.abs(np.random.normal(loc=10, scale=2, size=size)),
        'error_rate': np.random.beta(2, 10, size)
    })

In [12]:
# Create the full dataset
df = pd.concat([simulate_data(proto, num_records) for proto in protocols], ignore_index=True)



  'timestamp': pd.date_range(start='2025-01-01', periods=size, freq='S'),


In [13]:
# Step 2: Inject anomalies
num_anomalies = 30
anomaly_indices = np.random.choice(df.index, num_anomalies, replace=False)
df.loc[anomaly_indices, 'packet_size'] *= 5
df.loc[anomaly_indices, 'latency_ms'] *= 10
df.loc[anomaly_indices, 'error_rate'] = 1.0
df['is_anomaly'] = 0
df.loc[anomaly_indices, 'is_anomaly'] = 1

In [14]:
df.head()

Unnamed: 0,timestamp,protocol,baud_rate,packet_size,latency_ms,error_rate,is_anomaly
0,2025-01-01 00:00:00,I2C,57600,53,12.603483,0.163993,0
1,2025-01-01 00:00:01,I2C,115200,68,13.123022,0.032747,0
2,2025-01-01 00:00:02,I2C,9600,59,10.064008,0.211409,0
3,2025-01-01 00:00:03,I2C,57600,44,8.493164,0.164698,0
4,2025-01-01 00:00:04,I2C,57600,41,10.919944,0.106626,0


In [16]:
# Step 3: Preprocessing
df_encoded = pd.get_dummies(df[['protocol']], drop_first=True)
X = pd.concat([df[['baud_rate', 'packet_size', 'latency_ms', 'error_rate']], df_encoded], axis=1)


In [17]:
# Step 4: Train Isolation Forest
model = IsolationForest(n_estimators=100, contamination=0.03, random_state=42)
df['anomaly_score'] = model.fit_predict(X)
df['detected_anomaly'] = (df['anomaly_score'] == -1).astype(int)

In [18]:
# Step 5: Save data to CSV
df.to_csv("serial_comm_data.csv", index=False)
print("CSV saved as 'serial_comm_data.csv'")


CSV saved as 'serial_comm_data.csv'


In [19]:
# Step 6: Plot detected anomalies
fig = px.scatter(
    df, x='timestamp', y='latency_ms',
    color='detected_anomaly',
    symbol='protocol',
    title='Detected Anomalies in Serial Communication',
    labels={'latency_ms': 'Latency (ms)'},
    color_discrete_map={0: 'blue', 1: 'red'}
)
fig.show()

In [20]:
# Optional: Confusion Matrix for validation
from sklearn.metrics import confusion_matrix, classification_report

print("Confusion Matrix:")
print(confusion_matrix(df['is_anomaly'], df['detected_anomaly']))
print("\nClassification Report:")
print(classification_report(df['is_anomaly'], df['detected_anomaly']))

Confusion Matrix:
[[2910   60]
 [   0   30]]

Classification Report:
              precision    recall  f1-score   support

           0       1.00      0.98      0.99      2970
           1       0.33      1.00      0.50        30

    accuracy                           0.98      3000
   macro avg       0.67      0.99      0.74      3000
weighted avg       0.99      0.98      0.98      3000

