# Python Anomalous System Code

In [64]:
## Install & Import required packages
!pip install boto3

import boto3
import pandas as pd
import numpy as np
import sys
import datetime
import random
import smtplib
from email.message import EmailMessage
from io import StringIO
# from datetime import datetime





In [65]:
# Intializaing the S3 Bucket Service
if sys.version_info[0] < 3: 
    from StringIO import StringIO 
else:
    from io import StringIO

s3 = boto3.client('s3')

# Generating the Clean Stock data using random walk with drift for simulating realistic stock behavior

In [75]:
# Here we can generate similar dataset for 40000 dates for 1000 stocks but due to scalability I have taken 1000 days and 100 stocks
# The original dataset for 40000 dates with 1000 stocks can be generated 

def generate_clean_stock_data(num_days=1000, num_stocks=100, start_date=None, seed=42): 
    np.random.seed(seed)
    random.seed(seed)
    
    if start_date is None:
        start_date = datetime.date(2020, 1, 1)
    dates = [start_date + datetime.timedelta(days=i) for i in range(num_days)]
    dates = [date for date in dates if date.weekday() < 5]      # Filter out weekends
    tickers = [f"ST{i:04d}" for i in range(1, num_stocks + 1)]  # Create stock tickers in format ST0001, ST0002, etc.
    initial_prices = np.random.uniform(10, 500, num_stocks)     # Generate initial prices (between $10 and $500)
    df = pd.DataFrame(index=dates, columns=tickers)
    df.iloc[0] = initial_prices
    
    # Generate daily returns using a random walk with drift
    annual_drift = np.random.uniform(0.1, 0.25, num_stocks)  # 10% to 25% annual return # 10% to +25% returns 
    annual_volatility = np.random.uniform(0.1, 0.4, num_stocks)  # 10% to 40% annual volatility # 40 % to +40% (Analysis and need to add expereinces)
    
    # Convert annual parameters to daily
    trading_days_per_year = 252
    daily_drift = annual_drift / trading_days_per_year
    daily_volatility = annual_volatility / np.sqrt(trading_days_per_year)
    
    # Generate price series & daily returns as random normal with mean=drift and std=volatility
    for i in range(1, len(dates)):
        daily_returns = np.random.normal(daily_drift,daily_volatility,num_stocks)
        df.iloc[i] = df.iloc[i-1] * (1 + daily_returns) # Apply returns to previous day's prices
    for col in df.columns:
        df[col] = df[col].apply(lambda x: float(f"{x:.2f}") if not pd.isna(x) else x) # keep the decimal to 2 points
    return df

# store the data in to clean_dataframe
clean_data = generate_clean_stock_data(num_days=1000, num_stocks=100)

# Display the first 5 and last 5 rows to verify
print("\nFirst 5 rows with first 5 stocks:")
print(clean_data.iloc[:5, :5])
print("\nLast 5 rows with last 5 stocks:")
print(clean_data.iloc[-5:, -5:])



First 5 rows with first 5 stocks:
            ST0001  ST0002  ST0003  ST0004  ST0005
2020-01-01  193.52  475.85  368.68  303.34   86.45
2020-01-02  193.77  473.77  376.29  308.03   83.42
2020-01-03  194.12  471.61  379.94  307.19   82.28
2020-01-06  193.24  474.64  385.50  307.94   84.73
2020-01-07  193.08  470.48  383.37  308.49   83.52

Last 5 rows with last 5 stocks:
            ST0096  ST0097  ST0098  ST0099  ST0100
2022-09-20  215.76  569.02  311.78   36.20   75.77
2022-09-21  215.96  566.47  310.75   36.11   74.43
2022-09-22  211.76  577.29  306.07   35.62   74.83
2022-09-23  213.64  582.33  310.10   35.40   74.60
2022-09-26  214.46  576.71  307.61   34.70   74.91


# Generating the Anomalous Stock data and Introducing anomalies to clean data 

In [67]:
import numpy as np
import pandas as pd
import random

def introduce_anomalies(clean_df, anomaly_percentage=0.05, seed=42): # 10% for anomalies 
    np.random.seed(seed)
    random.seed(seed)
    
    # Calculate number of anomalies to introduce
    anomalous_df = clean_df.copy()
    num_days = len(clean_df)
    num_stocks = len(clean_df.columns)
    num_anomalies = int(num_days * num_stocks * anomaly_percentage)
    print(f"Introducing {num_anomalies} anomalies into the dataset...")
    
    # Create anomaly types and their relative frequencies
    anomaly_types = [
        "price_spike",         # Sudden large increase in price
        "price_drop",          # Sudden large decrease in price
        "missing_value",       # Null value
        "duplicate_value",     # Same price repeated
        "zero_price",          # Price set to zero
        "negative_price",      # Negative price (impossible in reality)
        "missing_ticker"       # Missing ticker for a day
    ]
    
    anomaly_weights = [0.25, 0.25, 0.15, 0.15, 0.1, 0.05, 0.05]
    
    # Randomly select days and stocks for anomalies
    anomaly_days = np.random.choice(range(num_days), num_anomalies, replace=True)
    anomaly_stocks = np.random.choice(clean_df.columns, num_anomalies, replace=True)
    anomaly_types_selected = np.random.choice(anomaly_types, num_anomalies, replace=True, p=anomaly_weights)
    
    # Introduce anomalies
    anomaly_log = []
    
    for i in range(num_anomalies):
        day_idx = anomaly_days[i]
        stock = anomaly_stocks[i]
        anomaly_type = anomaly_types_selected[i]
        original_value = anomalous_df.iloc[day_idx][stock]
        
        # Apply the anomaly based on its type
        if anomaly_type == "price_spike":
            # Multiply price by 1.5 to 6x
            factor = np.random.uniform(1.5, 6.0)
            anomalous_df.loc[anomalous_df.index[day_idx], stock] = original_value * factor
            new_value = anomalous_df.iloc[day_idx][stock]
            
        elif anomaly_type == "price_drop":
            # Multiply price by 0.1 to 0.7
            factor = np.random.uniform(0.1, 0.7)
            anomalous_df.loc[anomalous_df.index[day_idx], stock] = original_value * factor
            new_value = anomalous_df.iloc[day_idx][stock]
            
        elif anomaly_type == "missing_value":
            anomalous_df.loc[anomalous_df.index[day_idx], stock] = np.nan
            new_value = np.nan
            
        elif anomaly_type == "duplicate_value":
            if day_idx > 0:
                anomalous_df.loc[anomalous_df.index[day_idx], stock] = anomalous_df.iloc[day_idx-1][stock]
                new_value = anomalous_df.iloc[day_idx][stock]
            else:
                new_value = original_value
                
        elif anomaly_type == "zero_price":
            anomalous_df.loc[anomalous_df.index[day_idx], stock] = 0.0
            new_value = 0.0
            
        elif anomaly_type == "negative_price":
            anomalous_df.loc[anomalous_df.index[day_idx], stock] = -original_value
            new_value = anomalous_df.iloc[day_idx][stock]
            
        elif anomaly_type == "missing_ticker":
            anomalous_df.loc[anomalous_df.index[day_idx], stock] = np.nan
            new_value = np.nan
        
        # Log the anomaly
        anomaly_log.append({ 'date': anomalous_df.index[day_idx], 'ticker': stock,'anomaly_type': anomaly_type,
            'original_value': original_value, 'new_value': new_value })
    
    # Create a dataframe with the anomaly log
    anomaly_report = pd.DataFrame(anomaly_log)
    for col in anomalous_df.columns:
        anomalous_df[col] = anomalous_df[col].apply(lambda x: float(f"{x:.2f}") if not pd.isna(x) else x)
    
    
    return anomalous_df, anomaly_report

# Introduce anomalies (5% of all data points will contain anomalies)
anomalous_data, anomaly_report = introduce_anomalies(clean_data, anomaly_percentage=0.05)

# Display summary of the anomalies introduced
print("\nAnomaly distribution by type:")
print(anomaly_report['anomaly_type'].value_counts())

# Compare sample of clean vs anomalous data
sample_date = clean_data.index[100]  
sample_stocks = clean_data.columns[:5] 

print(f"\nSample comparison for date {sample_date}:")
print("Clean data:")
print(clean_data.loc[sample_date, sample_stocks])
print("\nAnomalous data:")
print(anomalous_data.loc[sample_date, sample_stocks])

# Calculate how many values were changed
total_values = anomalous_data.size
changed_values = np.sum(clean_data.values != anomalous_data.values)
null_values = anomalous_data.isna().sum().sum()

print(f"\nTotal values in dataset: {total_values}")
print(f"Changed values: {changed_values} ({changed_values/total_values:.2%} of total)")
print(f"Null values introduced: {null_values}")

# Display examples of different types of anomalies
print("\nExamples of each anomaly type:")
for anomaly_type in anomaly_report['anomaly_type'].unique():
    example = anomaly_report[anomaly_report['anomaly_type'] == anomaly_type].iloc[0]
    print(f"\n{anomaly_type.upper()}:")
    print(f"Date: {example['date']}, Ticker: {example['ticker']}")
    print(f"Original value: {example['original_value']}, New value: {example['new_value']}")

Introducing 3570 anomalies into the dataset...

Anomaly distribution by type:
anomaly_type
price_drop         904
price_spike        876
duplicate_value    559
missing_value      514
zero_price         391
missing_ticker     166
negative_price     160
Name: count, dtype: int64

Sample comparison for date 2020-05-20:
Clean data:
ST0001    237.51
ST0002    503.06
ST0003    435.62
ST0004    371.12
ST0005    103.53
Name: 2020-05-20, dtype: float64

Anomalous data:
ST0001    241.21
ST0002    503.06
ST0003    435.62
ST0004    371.12
ST0005    103.53
Name: 2020-05-20, dtype: float64

Total values in dataset: 71400
Changed values: 3492 (4.89% of total)
Null values introduced: 673

Examples of each anomaly type:

DUPLICATE_VALUE:
Date: 2020-05-22, Ticker: ST0024
Original value: 207.76, New value: 200.82

MISSING_VALUE:
Date: 2021-09-01, Ticker: ST0079
Original value: 189.11, New value: nan

NEGATIVE_PRICE:
Date: 2021-01-13, Ticker: ST0022
Original value: 81.09, New value: -81.09

PRICE_SPIKE:
D

# Anomaly Detection System

In [68]:
# Keeping threshold up and down and moving average of 7 days for measurement metrics
def detect_anomalies_against_history(historical_df, new_df, threshold_up=0.2, threshold_down=-0.2, window=7):
    
    historical_df.index = pd.to_datetime(historical_df.index)
    new_df.index = pd.to_datetime(new_df.index)

    if historical_df.shape[0] < window:
        raise ValueError("Not enough historical data to compute moving average.")
    
    current_date = new_df.index[0]
    today_prices = new_df.iloc[0]
    moving_avg = historical_df.iloc[-window:].mean()
    expected_tickers = list(historical_df.columns)
    incoming_tickers = list(new_df.columns)

    anomalies = {}

    # Check for the anomalies in the data
    for i, col in enumerate(incoming_tickers):
        if pd.isna(col) or str(col).strip() == "":
            anomalies[f"column_{i}"] = "Unnamed/Blank Ticker Column"

    for i, expected_ticker in enumerate(expected_tickers):
        if expected_ticker not in incoming_tickers:
            anomalies[f"column_{i}"] = f"Missing Ticker at Position {i}"

    duplicate_tickers = new_df.columns[new_df.columns.duplicated()].tolist()
    for dup in duplicate_tickers:
        anomalies[dup] = "Duplicate Ticker Name"

    if current_date in historical_df.index:
        anomalies['__DUPLICATE_DATE__'] = f"Duplicate Date Entry ({current_date.date()})"

    for ticker in incoming_tickers:
        if ticker not in expected_tickers or pd.isna(ticker) or str(ticker).strip() == "":
            continue  # skip unexpected or unnamed tickers

        today_price = today_prices[ticker]
        avg_price = moving_avg.get(ticker, np.nan)

        if pd.isna(today_price):
            anomalies[ticker] = "Missing/Null Price (value=None)"
            continue

        if today_price == 0:
            anomalies[ticker] = "Zero Price (value=0.00)"
            continue

        if today_price < 0:
            anomalies[ticker] = f"Negative Price (value={today_price:.2f})"
            continue

        if pd.isna(avg_price) or avg_price == 0:
            anomalies[ticker] = "Invalid Moving Average"
            continue

        pct_change = (today_price - avg_price) / avg_price
        if pct_change > threshold_up:
            anomalies[ticker] = f"Price Increase > {threshold_up*100:.0f}% ({pct_change*100:.2f}%)"
        elif pct_change < threshold_down:
            anomalies[ticker] = f"Price Decrease < {abs(threshold_down*100):.0f}% ({pct_change*100:.2f}%)"

    return anomalies


In [69]:
# Keep the last record as new record for new_df
historical_df = anomalous_data.iloc[:-1]
new_df = anomalous_data.iloc[[-1]]
new_df = new_df.drop(columns=["ST0006"])  # simulate missing ticker ST0006
new_df.columns = list(new_df.columns)
new_df[""] = 123.45                       # simulate a blank column for new_df

# Run anomaly detection
anomalies = detect_anomalies_against_history(historical_df, new_df)
print(f"\nDetected {len(anomalies)} anomalies on {new_df.index[0].date()}:\n")
for ticker, issue in anomalies.items():
    print(f"{ticker}: {issue}")



Detected 14 anomalies on 2022-09-26:

column_99: Unnamed/Blank Ticker Column
column_5: Missing Ticker at Position 5
ST0002: Price Decrease < 20% (-23.23%)
ST0005: Price Decrease < 20% (-28.53%)
ST0009: Price Increase > 20% (120.54%)
ST0018: Zero Price (value=0.00)
ST0021: Price Decrease < 20% (-39.75%)
ST0055: Missing/Null Price (value=None)
ST0059: Price Increase > 20% (44.75%)
ST0070: Price Increase > 20% (40.64%)
ST0076: Price Increase > 20% (20.95%)
ST0080: Price Decrease < 20% (-36.44%)
ST0081: Price Increase > 20% (207.37%)
ST0096: Price Increase > 20% (41.78%)


In [71]:
# Storing the results in the S3 Bucket
df = pd.DataFrame([{'Ticker/Key': k, 'Issue': v} for k, v in anomalies.items()])
csv_buffer = StringIO()
df.to_csv(csv_buffer, index=False)
s3_key = f"anomalies/anomaly_report_1.csv"

# 5. Upload to S3
s3 = boto3.client('s3')
s3.put_object(Bucket='atchi-reddy-s3-bucket', Key=s3_key, Body=csv_buffer.getvalue() )

{'ResponseMetadata': {'RequestId': 'DSTXVRAH3552W9N3',
  'HostId': 'Y2eBTp9ny5cAPotadvFLBm4iD35542FJ1mjAKj2N42jXi5gpV0Ok1l2Yo2wQvStv68QVSG8zIfA=',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amz-id-2': 'Y2eBTp9ny5cAPotadvFLBm4iD35542FJ1mjAKj2N42jXi5gpV0Ok1l2Yo2wQvStv68QVSG8zIfA=',
   'x-amz-request-id': 'DSTXVRAH3552W9N3',
   'date': 'Tue, 15 Apr 2025 08:20:52 GMT',
   'x-amz-server-side-encryption': 'AES256',
   'etag': '"e7f9046769e552f0f670315d801f0194"',
   'x-amz-checksum-crc32': '/2aA1A==',
   'x-amz-checksum-type': 'FULL_OBJECT',
   'content-length': '0',
   'server': 'AmazonS3'},
  'RetryAttempts': 0},
 'ETag': '"e7f9046769e552f0f670315d801f0194"',
 'ChecksumCRC32': '/2aA1A==',
 'ChecksumType': 'FULL_OBJECT',
 'ServerSideEncryption': 'AES256'}

In [72]:
response = s3.list_objects_v2(Bucket='atchi-reddy-s3-bucket', Prefix='anomalies/')

In [74]:
## Checking the file which stored in S3 bucket 
csv_obj = s3.get_object(Bucket="atchi-reddy-s3-bucket", Key="anomalies/anomaly_report_1.csv")
body = csv_obj['Body']
csv_string = body.read().decode('utf-8')
df = pd.read_csv(StringIO(csv_string))
df.head()

Unnamed: 0,Ticker/Key,Issue
0,column_99,Unnamed/Blank Ticker Column
1,column_5,Missing Ticker at Position 5
2,ST0002,Price Decrease < 20% (-23.23%)
3,ST0005,Price Decrease < 20% (-28.53%)
4,ST0009,Price Increase > 20% (120.54%)


# Mail Delivery System for Alerts

In [73]:
# Haven't attached the sensitive details 

def send_email_alert(sender_email, app_password, recipient_email, subject, body):
    msg = EmailMessage()
    msg['Subject'] = subject
    msg['From'] = sender_email
    msg['To'] = recipient_email
    msg.set_content(body)
    
    try:
        with smtplib.SMTP_SSL('smtp.gmail.com', 465) as smtp:
            smtp.login(sender_email, app_password)
            smtp.send_message(msg)
            print("Email alert sent successfully.")
    except Exception as e:
        print(f"Failed to send email: {e}")

def format_anomalies_for_email(anomalies, date):
    body_lines = [f"📅 Detected {len(anomalies)} anomalies on {date.date()}:\n"]
    for key, issue in anomalies.items():
        body_lines.append(f"{key}: {issue}")
    return "\n".join(body_lines)

email_body = format_anomalies_for_email(anomalies, new_df.index[0])

send_email_alert(
    sender_email="your_email@gmail.com",
    app_password="your_app_password_here",
    recipient_email="recipient_email@example.com",
    subject=" Stock Anomaly Alert - " + str(new_df.index[0].date()),
    body=email_body)


Failed to send email: (535, b'5.7.8 Username and Password not accepted. For more information, go to\n5.7.8  https://support.google.com/mail/?p=BadCredentials 6a1803df08f44-6f0dea10825sm98041446d6.116 - gsmtp')
