## Using AI for Anomalies Detection in Data Quality
**Description**: Implement an AI-based approach to detect anomalies in data quality.

**Steps**:
1. Use an Anomaly Detection Algorithm:
    - Use sklearn's Isolation Forest for anomaly detection.

**Example data:**

data = np.array([[25, 50000], [30, 60000], [35, 75000], [40, None], [45, 100000]])

2. Integrate with Great Expectations:
    - Generate alerts if anomalies are detected:

In [2]:
import pandas as pd
import great_expectations as ge
from great_expectations.core import ExpectationSuite
from great_expectations.execution_engine import PandasExecutionEngine
import time
import smtplib
from email.mime.text import MIMEText
import datetime
import threading  # Import the threading module
from sklearn.ensemble import IsolationForest
import numpy as np

# 1. Load the Sample Data (as a function)
def generate_sample_data():
    """Generates a sample DataFrame with 'age' and 'income'."""
    data = np.array([[25, 50000], [30, 60000], [35, 75000], [40, None], [45, 100000]])
    df = pd.DataFrame(data, columns=['age', 'income'])
    return df

# 2. Create a Great Expectations Context
context = ge.get_context()

# 3. Define a Custom Expectation (if needed)
class ExpectIncomeGreaterThan(ge.expectation.ColumnMapExpectation):
    """
    Expect income values to be greater than a specified threshold.
    """
    map_metric_provider_class = PandasExecutionEngine.map_series
    success_keys = ("threshold",)
    default_kwarg_values = {"threshold": 0}
    args_keys = ("column", "threshold")

    @PandasExecutionEngine.map_series(schema_type=None)
    def _map_series(cls, series: pd.Series, threshold: int) -> pd.Series:
        """
        Determines if income is greater than the threshold.

        Args:
            series: The Pandas Series representing the income column.
            threshold: The threshold value.

        Returns:
            A Pandas Series of boolean values (True if income > threshold, False otherwise).
        """
        return series > threshold

    def get_validation_dependencies(
        cls,
        column: str,
        threshold: int = None,
        **kwargs,
    ) -> list[ge.core.ExpectationConfiguration]:
        """
        Returns a list of dependencies for this Expectation
        """
        return [ge.core.ExpectationConfiguration("expect_column_values_to_exist", {"column": column})]

    def describe(
        cls,
        column: str,
        threshold: int = None,
        **kwargs,
    ) -> str:
        """
        Returns a human-readable string representation of this expectation.
        """
        return f"Values in column '{column}' should be greater than {threshold:.2f}."

# 4. Create an Expectation Suite
expectation_suite_name = "income_validation_suite"
suite = context.get_expectation_suite(expectation_suite_name)
if suite is None:
    suite = context.create_expectation_suite(expectation_suite_name)

# 5. Add Expectations to the Suite
suite.add_expectation(
    ge.core.ExpectationConfiguration(
        expectation_type="expect_column_values_to_not_be_null",
        kwargs={"column": "income"},
    )
)
suite.add_expectation(
    ge.core.ExpectationConfiguration(
        expectation_type="expect_column_values_to_be_between",
        kwargs={
            "column": "age",
            "min_value": 18,
            "max_value": 65,
        },
    )
)
suite.add_expectation(
    ge.core.ExpectationConfiguration(
        expectation_type="expect_income_greater_than",
        kwargs={
            "column": "income",
            "threshold": 40000,
        },
    )
)
context.save_expectation_suite(expectation_suite=suite, expectation_suite_name=expectation_suite_name)

def send_data_quality_alert(validation_results, anomaly_data=None):
    """
    Sends an email alert with details about the data quality issues,
    including any anomalies detected by the Isolation Forest model.

    Args:
        validation_results (dict): The validation results from Great Expectations.
        anomaly_data (pd.DataFrame, optional): DataFrame containing the detected anomalies. Defaults to None.
    """
    # Email configuration
    sender_email = "your_email@example.com"  # Replace with your email address
    receiver_email = "recipient_email@example.com"  # Replace with the recipient's email address
    smtp_server = "smtp.example.com"  # Replace with your SMTP server address
    smtp_port = 587  # Replace with your SMTP server port (e.g., 587 for TLS)
    smtp_username = "your_email@example.com"  # Replace with your email username
    smtp_password = "your_email_password"  # Replace with your email password or an app password

    # Create the email message
    subject = "Data Quality and Anomaly Alert"
    now = datetime.datetime.now()
    date_time_str = now.strftime("%d/%m/%Y, %H:%M:%S")
    body = f"Data quality check failed on {date_time_str} UTC.\n\n"
    body += "Details:\n"
    for result in validation_results["results"]:
        if not result["success"]:
            body += f"- Expectation: {result['expectation_config']['expectation_type']}\n"
            body += f"  Column: {result['expectation_config']['kwargs'].get('column', 'N/A')}\n"
            body += f"  Status: Failed\n"
            if result.get("result", {}).get("unexpected_count") is not None:
                body += f"  Unexpected Count: {result['result']['unexpected_count']}\n"
            body += f"  Details: {result['result']}\n"
            body += "\n"
    if anomaly_data is not None and not anomaly_data.empty:
        body += "\nAnomalies Detected by Isolation Forest:\n"
        body += anomaly_data.to_string()  # Convert DataFrame to string for email
    body += "\n"
    body += "Please investigate the data quality and anomalies immediately."

    msg = MIMEText(body)
    msg["Subject"] = subject
    msg["From"] = sender_email
    msg["To"] = receiver_email

    # Send the email
    try:
        server = smtplib.SMTP(smtp_server, smtp_port)
        server.starttls()  # Use TLS encryption
        server.login(smtp_username, smtp_password)
        server.sendmail(sender_email, [receiver_email], msg.as_string())
        server.quit()
        print("Data quality alert email sent successfully.")
    except Exception as e:
        print(f"Error sending email: {e}")
        print("Please check your email configuration and network connection.")

def detect_anomalies(df):
    """
    Detects anomalies in the given DataFrame using Isolation Forest.

    Args:
        df (pd.DataFrame): The input DataFrame.

    Returns:
        pd.DataFrame: A DataFrame containing the detected anomalies, or an empty DataFrame if no anomalies are found.
    """
    # Handle missing values using imputation (replace NaN with the mean of the column)
    df_imputed = df.copy()  # Create a copy to avoid modifying the original DataFrame
    for col in df_imputed.columns:
        if df_imputed[col].isnull().any():
            mean_val = df_imputed[col].mean()
            df_imputed[col] = df_imputed[col].fillna(mean_val)

    # Initialize and fit the Isolation Forest model
    model = IsolationForest(contamination='auto', random_state=42)  # Set random_state for reproducibility
    model.fit(df_imputed)

    # Predict anomalies (returns 1 for inliers, -1 for outliers)
    anomaly_labels = model.predict(df_imputed)

    # Get the anomaly data points
    anomaly_data = df[anomaly_labels == -1]
    return anomaly_data

def monitor_data_quality():
    """
    Monitors data quality in real-time (simulated) by repeatedly generating sample data,
    validating it with Great Expectations, detecting anomalies, and sending alerts.
    """
    while True:
        # 1. Generate new data
        new_data_df = generate_sample_data()

        # 2. Detect anomalies using Isolation Forest
        anomaly_data = detect_anomalies(new_data_df)

        # 3. Create a BatchRequest for the new data
        batch_request = ge.datasource.Datasource.create_batch_request(
            datasource_name="pandas",
            data_asset_name="realtime_data",
            pandas_df=new_data_df,
        )

        # 4. Create a Validator
        validator = context.get_validator(
            batch_request=batch_request,
            expectation_suite_name=expectation_suite_name,
        )

        # 5. Validate the data
        results = validator.validate()

        # 6. Print and check the validation results
        print(f"Data quality check at {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}:")
        print(results)

        if not results["success"] or not anomaly_data.empty: #send if GE validation fails or anomalies detected
            print("Data validation or anomaly detection failed!")
            send_data_quality_alert(results, anomaly_data)
        else:
            print("Data validation successful. No anomalies detected.")

        time.sleep(5)

if __name__ == "__main__":
    # Start the monitoring in a separate thread
    monitoring_thread = threading.Thread(target=monitor_data_quality)
    monitoring_thread.daemon = True
    monitoring_thread.start()

    # Keep the main thread alive (optional, for demonstration)
    while True:
        time.sleep(60)
        print("Main thread is still running...")


AttributeError: module 'great_expectations' has no attribute 'expectation'