# Data Trust Gatekeeper: Real-Time Anomaly Detection Solution :Part 1
## PART 1 : Data Simulation and Ingestion
**Objective:**
1. Define project configurations and clean up previous runs.
2. Generate a simulated, end-to-end customer acquisition dataset (Visitor -> Application -> Account -> Transaction).
3. Inject synthetic TDQ, BDQ, and ML anomalies to test the detection pipeline.
4. Write data across a 21-day period to the created Unity Catalog Volume path.
  * **14 Days of Normal Historical Data**(Stable patterns,No anomalies, Used to train ML time-series models and baseline TDQ/BDQ thresholds.)
  
  * **7 Days of Live Data With Controlled Anomalies**
  -  Each day contains 8 micro-batches (3-hour intervals), simulating:
  -  Real-time ingestion
  -  Event stream-style arrival
  -  Mixed normal + anomalous segments

In [0]:
!pip install faker

Collecting faker
  Downloading faker-38.2.0-py3-none-any.whl.metadata (16 kB)
Downloading faker-38.2.0-py3-none-any.whl (2.0 MB)
[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/2.0 MB[0m [31m?[0m eta [36m-:--:--[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.0/2.0 MB[0m [31m22.8 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: faker
Successfully installed faker-38.2.0
[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m


In [0]:
%restart_python 

In [0]:
%sql
CREATE VOLUME IF NOT EXISTS workspace.default.datagov_project_data;

In [0]:
import pandas as pd
import numpy as np
import time
from faker import Faker
from datetime import datetime, timedelta
from pyspark.sql import SparkSession
from pyspark.sql.functions import current_timestamp, date_format

# --- 1. CONFIGURATION AND INITIALIZATION ---

# Initialize Spark Session and Faker for data generation
spark = SparkSession.builder.appName("DataTrustGatekeeper_Setup").getOrCreate()
fake = Faker()
np.random.seed(42) 

# PRODUCTION CONFIG: Define the Root Path for data storage 
CATALOG_NAME = "workspace" 
SCHEMA_NAME = "default" 
VOLUME_NAME = "datagov_project_data"

# Full path to the project's data storage location
BASE_PATH_UC = f"/Volumes/{CATALOG_NAME}/{SCHEMA_NAME}/{VOLUME_NAME}/data_trust_gatekeeper_v0/"
LANDING_PATHS = {
    "visitor": f"{BASE_PATH_UC}landing/visitor_events/",
    "app": f"{BASE_PATH_UC}landing/applications/",
    "account": f"{BASE_PATH_UC}landing/accounts/",
    "txn": f"{BASE_PATH_UC}landing/transactions/"
}

CONTROLLED_GEOS = ["USA", "UK", "IND", "CA"] 
VALID_EVENT_TYPES = ["landing", "scroll", "apply_click", "view_pricing"]
VALID_APP_STATUSES = ["submitted", "approved", "rejected"]


# --- 2. ENVIRONMENT SETUP & CLEANUP ---

print(f"1. Cleaning up previous environment at UC Volume path: {BASE_PATH_UC}")

# 2.1 Clean up the entire base path recursively
dbutils.fs.rm(BASE_PATH_UC, recurse=True)
time.sleep(1) 

# 2.2 Create the new directory structure for landing zones
for p in LANDING_PATHS.values():
    dbutils.fs.mkdirs(p)
    print(f"   -> Created Volume path: {p}")


# --- 3. HELPER FUNCTIONS FOR SIMULATION ---

def generate_batch(start_time: datetime, num_records: int, anomaly_type: str = None):
    """
    Generates a synchronized batch of data simulating the customer acquisition funnel.
    It intentionally injects specific anomalies based on the 'anomaly_type' flag.
    """
    
    visitors, applications, accounts, transactions = [], [], [], []
    
    # Base Conversion Rates
    application_prob = 0.20 
    approval_prob = 0.60    
    txn_prob = 0.10         
    
    # --- ANOMALY INJECTION LOGIC (Testing TDQ, BDQ, and ML detection) ---
    if anomaly_type == 'conversion_drop': 
        application_prob = 0.05 
    if anomaly_type == 'traffic_spike': 
        num_records = int(num_records * 2.5) 

    # FRESHNESS ANOMALY INJECTION
    time_offset_seconds = 0
    if anomaly_type == 'stale_data': 
        # Force timestamps to be 12 hours in the past relative to the batch time
        time_offset_seconds = -12 * 3600
        print(f"      -> Injecting STALE DATA (Offset: {time_offset_seconds}s)") 
        

    
    # --- Generate Visitor Events ---
    for i in range(num_records):
        vis_id = fake.uuid4()
        event_time = (start_time + timedelta(seconds=np.random.randint(1, 100)) + 
                      timedelta(seconds=time_offset_seconds))
        
        campaign = fake.word().upper() + str(np.random.randint(100, 999))
        
        # Inject Null Campaign IDs (completeness/null rate failure)
        if anomaly_type == 'null_burst' and np.random.rand() < 0.25:
            campaign = None 
        
        duration = np.random.normal(loc=30, scale=10)
        
        # Low session duration indicating bot/poor traffic quality
        if anomaly_type == 'traffic_spike' and np.random.rand() < 0.8:
            duration = np.random.uniform(low=0.1, high=1.0) 

        visitors.append({
            "visitor_id": vis_id,
            "event_type": np.random.choice(VALID_EVENT_TYPES),
            "campaign_id": campaign,
            "device_type": np.random.choice(["mobile", "desktop", "tablet"]),
            "geo": np.random.choice(CONTROLLED_GEOS), 
            "timestamp": event_time,
            "session_duration": max(0.1, duration)
        })

        # --- Generate Applications ---
        if np.random.rand() < application_prob:
            app_id = fake.uuid4()
            age = np.random.randint(22, 65)
            income = np.random.randint(40000, 150000)
            app_time = event_time + timedelta(minutes=np.random.randint(1, 15))
            
            # Inject under-age applicant (Business Rule Violation: Age < 18)
            if anomaly_type == 'age_error' and np.random.rand() < 0.15:
                age = np.random.randint(10, 17) 
            
            is_approved = np.random.rand() < approval_prob and age >= 18
            
            if is_approved:
                app_status = "approved"
            elif age < 18:
                app_status = "rejected - under age" 
            else:
                app_status = np.random.choice(["rejected", "submitted"], p=[0.9, 0.1])
            
            req_limit = int(np.floor(income / 10))

            # High requested limit relative to income
            if anomaly_type == 'limit_spike' and np.random.rand() < 0.1:
                req_limit = int(income * np.random.uniform(0.6, 0.9)) 
            
            applications.append({
                "app_id": app_id,
                "visitor_id": vis_id,
                "applicant_age": age,
                "annual_income": float(income),
                "employment_status": np.random.choice(["salaried", "self-employed", "unemployed"]),
                "req_credit_limit": req_limit,
                "timestamp": app_time,
                "application_status": app_status 
            })
            
            # --- Generate Accounts (ONLY IF approved) ---
            if app_status == 'approved':
                account_id = fake.uuid4()
                open_time = app_time + timedelta(hours=np.random.randint(2, 24))
                limit = int(np.floor(income / 10) * np.random.uniform(0.8, 1.2))
                
                accounts.append({
                    "account_id": account_id,
                    "app_id": app_id,
                    "approved_limit": limit,
                    "interest_rate": np.random.uniform(14.99, 29.99),
                    "account_open_timestamp": open_time,
                    "account_status": "active" 
                })
                
                # --- Generate Transactions (Usage) ---
                if np.random.rand() < txn_prob:
                    num_txns = np.random.randint(1, 5)
                    for k in range(num_txns):
                        txn_time = open_time + timedelta(days=np.random.randint(1, 5), minutes=np.random.randint(0, 1440))
                        amount = np.random.uniform(5.0, 500.0)
                        
                        # Inject large fraud transaction (Outlier Detection)
                        if anomaly_type == 'fraud_spike' and np.random.rand() < 0.05:
                            amount = np.random.uniform(limit * 0.7, limit * 0.9) 
                        
                        transactions.append({
                            "txn_id": fake.uuid4(),
                            "account_id": account_id,
                            "amount": round(amount, 2),
                            "merchant_cat_code": np.random.choice(["5411", "5812", "5999", "7999", "5541"]),
                            "location": fake.country(),
                            "state": fake.state_abbr(),
                            "merchant_city": fake.city(),
                            "timestamp": txn_time
                        })

    return (pd.DataFrame(visitors), 
            pd.DataFrame(applications), 
            pd.DataFrame(accounts), 
            pd.DataFrame(transactions))

def write_data_to_landing_zone(pdf: pd.DataFrame, path: str, suffix: str, timestamp_col: str):
    """
    Converts a Pandas DF to Spark DF, adds a date column, and writes as Parquet 
    to the designated landing zone, partitioned by date.
    """
    if pdf.empty:
        return
    
    pdf['date'] = pd.to_datetime(pdf[timestamp_col]).dt.strftime('%Y-%m-%d')
    
    sdf = spark.createDataFrame(pdf)

    full_path = f"{path}{suffix}"
    (sdf.write
        .mode("append")
        .partitionBy("date") 
        .parquet(full_path)) 
    print(f"   -> Wrote {len(pdf):,} records to {path}")


# --- 4. EXECUTION: HISTORICAL DATA GENERATION (14 DAYS) ---

# onfiguration for 21-Day Run
HISTORICAL_DAYS = 14
ANOMALY_INJECTION_DAYS = 7
TOTAL_DAYS = HISTORICAL_DAYS + ANOMALY_INJECTION_DAYS

print(f"\n--- 4. Generating Historical Data ({HISTORICAL_DAYS} days of normal behavior) ---")

# Start from 21 days ago, rounded to the hour for consistency
current_time = datetime.now().replace(minute=0, second=0, microsecond=0)
start_date = current_time - timedelta(days=TOTAL_DAYS)

for i in range(HISTORICAL_DAYS):
    hist_time = start_date + timedelta(days=i)
    
    batch_time = hist_time
    anomaly_type = None # No anomalies in the first 14 days
    num_records = 5000
    
    print(f"   -> Generating historical batch for: {batch_time.strftime('%Y-%m-%d')} (Normal)")

    # Generate batch
    v, app, acc, txn = generate_batch(batch_time, num_records=num_records, anomaly_type=anomaly_type)

    write_data_to_landing_zone(v, LANDING_PATHS['visitor'], "hist", "timestamp")
    write_data_to_landing_zone(app, LANDING_PATHS['app'], "hist", "timestamp")
    write_data_to_landing_zone(acc, LANDING_PATHS['account'], "hist", "account_open_timestamp")
    write_data_to_landing_zone(txn, LANDING_PATHS['txn'], "hist", "timestamp")


# --- 5. EXECUTION: STREAMING DATA GENERATION with INJECTED ANOMALIES (7 DAYS) ---

print(f"\n--- 5. Generating Live Data with Injected Anomalies ({ANOMALY_INJECTION_DAYS} days) ---")

# Define a robust daily schedule of anomalies for the last 7 days
DAILY_ANOMALY_SCHEDULE = [
    # (Anomaly Type, Base Records)
    (None, 4500),         # Normal control volume
    ('null_burst', 5000), # TDQ Nulls
    ('age_error', 5500),  # BDQ Rule Violation (Age < 18)
    (None, 4800),         # Normal control volume
    ('traffic_spike', 2000), # ML Bot Traffic (low records, high duration/low conversion)
    ('fraud_spike', 5200),# ML Outlier
    ('limit_spike', 5300),# ML Outlier (Suspiciously high limit request)
    ('stale_data', 4800), #F RESHNESS ANOMALY
    (None, 4900),         # Normal control volume
]

# Loop through the last 7 days
for i in range(ANOMALY_INJECTION_DAYS):
    current_date = start_date + timedelta(days=HISTORICAL_DAYS + i)
    print(f"\n   -> Injecting anomalies for date: {current_date.strftime('%Y-%m-%d')}")
    
    # Run 8 micro-batches spread throughout the day
    for j, (anomaly_type, base_records) in enumerate(DAILY_ANOMALY_SCHEDULE):
        # Offset the time by 3 hours for each micro-batch (8 batches * 3 hours = 24 hours)
        batch_time = current_date + timedelta(hours=j * 3)
        desc = anomaly_type if anomaly_type else "Normal"
        
        print(f"      -> Batch {j+1}: {desc} at {batch_time.strftime('%H:%M:%S')}")
        
        # Generate batch with anomaly flag
        v, app, acc, txn = generate_batch(batch_time, num_records=base_records, anomaly_type=anomaly_type)
        
        # Write files with a unique timestamp prefix
        prefix = batch_time.strftime("%Y%m%d%H%M%S")
        write_data_to_landing_zone(v, LANDING_PATHS['visitor'], f"{prefix}_live", "timestamp")
        write_data_to_landing_zone(app, LANDING_PATHS['app'], f"{prefix}_live", "timestamp")
        write_data_to_landing_zone(acc, LANDING_PATHS['account'], f"{prefix}_live", "account_open_timestamp")
        write_data_to_landing_zone(txn, LANDING_PATHS['txn'], f"{prefix}_live", "timestamp")


print("\nData Generation Complete.")
print(f"Source files are staged in: {BASE_PATH_UC}landing/")

# --- 6. VERIFICATION ---
# Verify that the files are ready for streaming ingestion
print("\n--- 6. VERIFICATION (Checking raw data counts) ---")
try:
    total_visitors = spark.read.parquet(LANDING_PATHS['visitor']).count()
    total_applications = spark.read.parquet(LANDING_PATHS['app']).count()
    print(f"Total Visitor Records Generated: {total_visitors:,}")
    print(f"Total Application Records Generated: {total_applications:,}")
except Exception as e:
    print(f"Verification Error: Could not read or list files. Error: {e}")


# Store the base path for use in the next notebook
dbutils.notebook.exit(BASE_PATH_UC)