In [None]:
#pip install google-cloud-logging

In [None]:
#pip install datetime

In [None]:
import os
#os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "/content/service_key.json"
import numpy as np
import pandas as pd
import datetime
import json

In [None]:
# Load the dataset
file_path = "Data_pipeline_airflow_dags_data_scaled_data_train.csv"
data = pd.read_csv(file_path)

def log_feature_distribution(feature_data, feature_name, log_file):
    """
    Calculate and log statistics for a feature's distribution, saving to a log file.
    """
    mean = np.mean(feature_data)
    variance = np.var(feature_data)
    quantiles = np.percentile(feature_data, [25, 50, 75])

    log_entry = {
        "timestamp": str(datetime.datetime.now()),
        "feature": feature_name,
        "mean": mean,
        "variance": variance,
        "25th_percentile": quantiles[0],
        "50th_percentile": quantiles[1],
        "75th_percentile": quantiles[2]
    }

    with open(log_file, "a") as f:
        f.write(json.dumps(log_entry, indent=4) + "\n")


def detect_drift(current_stats, baseline_stats, feature_name, log_file, threshold=0.1):
    """
    Compare current statistics with baseline statistics to detect drift, saving results to a log file.
    """
    drift_detected = False
    drift_details = {}

    mean_diff = abs(current_stats['mean'] - baseline_stats['mean']) / abs(baseline_stats['mean'])
    if mean_diff > threshold:
        drift_detected = True
        drift_details['mean_diff'] = mean_diff

    variance_diff = abs(current_stats['variance'] - baseline_stats['variance']) / abs(baseline_stats['variance'])
    if variance_diff > threshold:
        drift_detected = True
        drift_details['variance_diff'] = variance_diff

    for quantile in ['25th_percentile', '50th_percentile', '75th_percentile']:
        quantile_diff = abs(current_stats[quantile] - baseline_stats[quantile]) / abs(baseline_stats[quantile])
        if quantile_diff > threshold:
            drift_detected = True
            drift_details[f'{quantile}_diff'] = quantile_diff

    result = {
        "feature": feature_name,
        "drift_detected": drift_detected,
        "drift_details": drift_details
    }

    with open(log_file, "a") as f:
        f.write(json.dumps(result, indent=4) + "\n")

    return result


def check_missing_update_with_grace(data, log_file, grace_days=2):
    """
    Check for missing updates with a grace period before flagging data drift.
    """
    latest_date = pd.to_datetime(data['date']).max()
    today = datetime.datetime.now().date()

    missing_days = (today - latest_date.date()).days

    if missing_days > grace_days:
        log_entry = {
            "timestamp": str(datetime.datetime.now()),
            "drift_type": "Missing Data (Grace Period Exceeded)",
            "details": {
                "latest_date_in_data": str(latest_date.date()),
                "expected_date": str(today),
                "missing_days": missing_days,
                "grace_period": grace_days
            }
        }
        with open(log_file, "a") as f:
            f.write(json.dumps(log_entry, indent=4) + "\n")
        print("Missing data detected and logged (grace period exceeded).")
    else:
        print(f"No missing updates detected within the grace period ({grace_days} days).")


def run_drift_detection(data, key_features, baseline_statistics, log_file):
    """
    Perform drift detection for key features in the dataset and save logs to a file.
    """
    drift_results = {}

    for feature in key_features:
        feature_data = data[feature]
        log_feature_distribution(feature_data, feature, log_file)

        baseline_stats = {
            "mean": baseline_statistics.loc['mean', feature],
            "variance": baseline_statistics.loc['std', feature] ** 2,
            "25th_percentile": baseline_statistics.loc['25%', feature],
            "50th_percentile": baseline_statistics.loc['50%', feature],
            "75th_percentile": baseline_statistics.loc['75%', feature],
        }

        current_stats = {
            "mean": feature_data.mean(),
            "variance": feature_data.var(),
            "25th_percentile": feature_data.quantile(0.25),
            "50th_percentile": feature_data.median(),
            "75th_percentile": feature_data.quantile(0.75),
        }

        drift_results[feature] = detect_drift(current_stats, baseline_stats, feature, log_file)

    return drift_results


# Define key features and log file
key_features = ['volume', 'RSI', 'MACD', 'MA20', 'SP500_VIXCLS_ratio']
baseline_statistics = data[key_features].describe(percentiles=[0.25, 0.5, 0.75])
log_file = "drift_detection_log.txt"

# Initialize log file
with open(log_file, "w") as f:
    f.write("Drift Detection Insights:\n")

# Check for missing updates
check_missing_update_with_grace(data, log_file, grace_days=2)

# Run drift detection if updates are recent
if pd.to_datetime(data['date']).max().date() == datetime.datetime.now().date():
    drift_results = run_drift_detection(data.iloc[-10:], key_features, baseline_statistics, log_file)
else:
    print("Skipping drift detection due to missing updates.")


In [None]:
from google.cloud import logging as cloud_logging

# Initialize Cloud Logging client
cloud_logging_client = cloud_logging.Client()
logger = cloud_logging_client.logger("drift-detection-logs")  # Log name

def log_to_cloud_logging(message):
    """
    Log a message to Google Cloud Logging.
    """
    logger.log_text(message)

In [None]:
log_to_cloud_logging("Drift detection setup test successful.")


In [None]:
log_to_cloud_logging(json.dumps(drift_results, indent=4))
