In [None]:
import warnings
warnings.filterwarnings('ignore')

In [None]:
import sys
import os

# Move the current working directory to SignalWatch root directory
project_root = os.path.abspath(os.path.join(os.getcwd(), '../..'))

# Add the project root to the sys.path
if project_root not in sys.path:
    sys.path.append(project_root)

In [None]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

In [None]:
from config.spark_setup import get_spark_session
from config.backend_config import table_paths, example_paths
from config.notification_config import notification_paths, example_notification_paths

from backend.tables.departments import merge_department_data, get_department_by_id
from backend.tables.features import merge_feature_data, get_feature_by_id, get_production_feature
from backend.tables.control_runs import merge_control_run_data
from backend.tables.notification_details import merge_notification_details, get_production_notification_by_control_id

from services.controls.control_chart import ControlChart
from services.notification.email.config import EmailConfig
from services.notification.email.service import EmailService

In [None]:
# create a SparkSession
# spark = SparkSession.builder.appName('ControlAnalysisJobExample').getOrCreate()
app_name = 'ControlAnalysisJobExample'
spark = get_spark_session(app_name)

In [None]:
###############################################################################
## EXAMPLE SET UP 
# Inplace convertion can be accomplished with: 
# %sql CONVERT TO DELTA parquet.'/scripts/data/parquet/temperature_data_high.parquet'
###############################################################################
# Mocked data
# Read the Parquet file into a Spark DataFrame
mock_df = spark.read.parquet('./scripts/data/parquet/temperature_data_high.parquet')

mock_df = mock_df.orderBy('Index', ascending=False).limit(180)
# modify limit number for more or less data

# Write the DataFrame to a Delta table
example_delta_path = example_paths["high_precission"]
mock_df.write.format("delta").mode("overwrite").save(example_delta_path)

In [None]:
###############################################################################
## BUSINESS ENTITY

    # department_id STRING,
    # department_name STRING,
    # lead_name STRING,
    # lead_email STRING,
    # point_of_contact_name STRING,
    # point_of_contact_email STRING,
    # created_timestamp TIMESTAMP,
    # modified_timestamp TIMESTAMP
###############################################################################
# Create a Department Record:
# Department
department_id = "OpsSigDevTeam"
department_details_insert = [
    department_id,
    "SigOps Development Team",
    "Eric Barber",
    "eric.barber@opssig.com",
    "Eric Barber",
    "eric.barber@opssig.com"
]
merge_department_data(department_details_insert)

# Query the Department Details:
department_id = "OpsSigDevTeam"
department_details = get_department_by_id(department_id) 
department_details = department_details.collect()

if not department_details:
    raise ValueError(f"Department ID {department_id} not found.")
    
if department_details:
    department_id = department_details[0]['department_id']
    department_name = department_details[0]['department_name']
    department_lead = department_details[0]['lead_name']
    department_lead_email = department_details[0]['lead_email']
    department_point_of_contact_name = department_details[0]['point_of_contact_name']
    department_point_of_contact_email = department_details[0]['point_of_contact_email']

In [None]:
###############################################################################
## REGISTER FEATURE QUERY

    # department_id STRING,
    # feature_id STRING,
    # feature_name STRING,
    # feature_version STRING,
    # feature_query_id STRING,
    # triage_team ARRAY<STRING>,
    # created_timestamp TIMESTAMP,
    # modified_timestamp TIMESTAMP
###############################################################################
# Create a Feature:
# Feature
feature_id = 'HighPrecissionTempSystem'

feature_logic = f"""
    SELECT 
        Index,
        Temp_F as Feature
    FROM delta.`{example_delta_path}` 
    ORDER BY Index DESC 
    LIMIT 25
"""

example_feature_details = [
    department_id,
    feature_id,
    "DailyHighPrecissionTempSystem",
    "1",
    "feature_query_id-12345",
    feature_logic,
    ['eric.barber@opssig.com'],
    True,
    F.current_timestamp(),
    F.current_timestamp()
]
merge_feature_data(example_feature_details)

# Query feature Details
feature_id = 'HighPrecissionTempSystem'
feature_details = get_feature_by_id(feature_id)
feature_details = feature_details.collect()

if not feature_details:
    raise ValueError(f"Department ID {feature_id} not found.")

if feature_details:
    feature_id = feature_details[0]['feature_id']
    feature_name = feature_details[0]['feature_name']
    feature_version = feature_details[0]['feature_version']
    feature_logic = feature_details[0]['feature_logic'].strip().replace('\n','')
    triage_team = feature_details[0]['feature_version']

feature_details

In [None]:
###############################################################################
## CONTROL DEFINITION
    
    # department_id STRING,
    # feature_id STRING,
    # control_id STRING,
    # control_group STRING,
    # control_version STRING,
    # control_notebook_id STRING,
    # control_notebook_url STRING,
    # created_timestamp TIMESTAMP,
    # modified_timestamp TIMESTAMP
###############################################################################
control_id = 'HighPrecissionTempSystem-xmr'
contorl_group = 'HighPrecissionTemp'
control_version = '1'
control_notebood_id = 'Null'
control_notebood_url = 'Null'

# complete control analysis
feature_data_result = spark.sql(feature_logic)
chart = ControlChart(feature_data_result, 'FEATURE', 'Index')
control_result_df, removed_point_df = chart.run_control_chart(chart_type='xmr')

###############################################################################
## CONTROL RUN DEFINITION

    # department_id STRING,
    # feature_id STRING,
    # feature_version STRING,
    # feature_data ARRAY,
    # control_id STRING,
    # control_version STRING,
    # control_signal_count STRING,
    # control_signal_data ARRAY,
    # signal_detected BOOLEAN,
###############################################################################
# log the control run
department_id = department_id
feature_id = feature_id
feature_version = feature_version
feature_data = feature_data_result.collect()
control_id = control_id
control_version = control_version
control_signal_count = removed_point_df.count()
control_signal_data = removed_point_df.collect()
signal_detected = True if removed_point_df.count() > 0 else False

run_data = [
    department_id,
    feature_id,
    feature_version,
    feature_data, 
    control_id,
    control_version,
    control_signal_count, 
    control_signal_data,
    signal_detected
]

merge_control_run_data(run_data)

# run_data_labels = [
#     'department_id',
#     'feature_id',
#     'feature_version',
#     'feature_data',
#     'control_id',
#     'control_version',
#     'control_signal_count',
#     'control_signal_data',
#     'signal_detected'
# ]

# for ky in range(len(run_data)):
#     print(f'{run_data_labels[ky]}: {run_data[ky]}')

In [None]:
control_runs = spark.sql(f"select * from delta.`{table_paths['control_runs']}`")
control_runs.show()
# spark.sql(f"DELETE FROM delta.`{table_paths['control_runs']}` WHERE department_id = 'OpsSigDevTeam'")

In [None]:
###############################################################################
## NOTIFICATION DEFINITION

    # notification_id STRING,
    # notification_version STRING,
    # notification_type STRING,
    # notification_priority STRING,
    # in_production BOOLEAN,

    # notification_template_path STRING,
    # notification_template_dir STRING,
    # notification_template_file STRING,

    # control_id STRING,
    # control_version STRING,

    # created_timestamp TIMESTAMP,
    # modified_timestamp TIMESTAMP

###############################################################################
notification_id = "HighPrecissionTempSystem"
notification_version = "V0.0.1"
notification_type = 'email'
notification_priority = '5'
in_production = True

# Build template for notification
control_id = 'HighPrecissionTempSystem-xmr'
# TODO: Update to use qualified PATHs objects
email_storage = example_notification_paths["template_store"]

notification_template_dir = f'{email_storage}/{department_id}/{control_id}/'
os.makedirs(f'{notification_template_dir}', exist_ok=True)

notification_template_file = f'{notification_type}{notification_version}.html'
notification_template_path = f'{notification_template_dir}{notification_template_file}'

# Define the template (or you could load from a file if it's saved on disk)
template_html = '''
<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>{{ department_id }} Control Signal Report</title>
    <style>
        body { font-family: Arial, sans-serif; }
        h1, h2 { text-align: center; }
        .container { width: 80%; margin: auto; padding: 20px; }
        table { width: 100%; border-collapse: collapse; margin-bottom: 20px; }
        table, th, td { border: 1px solid black; }
        th, td { padding: 10px; text-align: left; }
        th { background-color: #f2f2f2; }
        .highlight { background-color: #f9e79f; }
    </style>
</head>
<body>
    <div class="container">
        <h1>Control Signal Report</h1>
        <h2>Department: {{ department_id }}</h2>
        <h2>Feature: {{ feature_id }} (v{{ feature_version }})</h2>


        <h3>Control Signal Data</h3>
        <table>
            <thead>
                <tr>
                    <th>Index</th>
                    <th>Feature</th>
                    <th>Moving Range</th>
                    <th>Upper Control Limit (UCL)</th>
                    <th>Lower Control Limit (LCL)</th>
                    <th>Center Line (CL)</th>
                    <th>Out of Control?</th>
                </tr>
            </thead>
            <tbody>
                {% for row in control_signal_data %}
                <tr {% if row.out_of_control %}class="highlight"{% endif %}>
                    <td>{{ row.Index }}</td>
                    <td>{{ row.FEATURE }}</td>
                    <td>{{ row.moving_range }}</td>
                    <td>{{ row.UCL }}</td>
                    <td>{{ row.LCL }}</td>
                    <td>{{ row.CL }}</td>
                    <td>{{ "Yes" if row.out_of_control else "No" }}</td>
                </tr>
                {% endfor %}
            </tbody>
        </table>

        <h3>Control Signal Summary</h3>
        <p><strong>Control ID:</strong> {{ control_id }}</p>
        <p><strong>Version:</strong> {{ control_version }}</p>
        <p><strong>Signal Detected:</strong> {{ "True" if signal_detected else "False" }}</p>

        <h3>Feature Data</h3>
        <table>
            <thead>
                <tr>
                    <th>Index</th>
                    <th>Feature Value</th>
                </tr>
            </thead>
            <tbody>
                {% for row in feature_data %}
                <tr>
                    <td>{{ row.Index }}</td>
                    <td>{{ row.FEATURE }}</td>
                </tr>
                {% endfor %}
            </tbody>
        </table>

    </div>
</body>
</html>
'''
# Write the template to a file
with open(notification_template_path, 'w') as template_file:
    template_file.write(template_html)

print(f"Template written to '{notification_template_path}'")

# Update notification details
notification_details = [
    notification_id,
    notification_version,
    notification_type,
    notification_priority,
    control_id,
    control_version,
    notification_template_path,
    notification_template_dir,
    notification_template_file,
    in_production,
]
merge_notification_details(notification_details)

In [None]:
notification_details = spark.sql(f"select * from delta.`{table_paths['notification_details']}`")
print(
    notification_details.schema,
    notification_details.first(),
    sep='\n'
)

In [None]:
# dev:
# get notification: email_subject, notification_template_path, notification_template_file,
control_id = 'HighPrecissionTempSystem-xmr'
notification_details_sdf = get_production_notification_by_control_id(control_id)
notification_details_sdf = notification_details_sdf.collect()
if notification_details_sdf:
    notification_template_dir = notification_details_sdf[0]['notification_template_dir']
    notification_template_file = notification_details_sdf[0]['notification_template_file']
    # notification_details_subject = notification_details_sdf[0]['notification_details']['subject']
else:
    notification_template_path = None  # Handle case when no data is returned
    notification_template_file = None  # Handle case when no data is returned

# get feature: email list
feature_id = "HighPrecissionTempSystem"
feature_details = get_production_feature(feature_id)
feature_details = feature_details.collect()
if feature_details:
    triage_team = feature_details[0]['triage_team']
else:
    triage_team = None  # Handle case when no data is returned
    
# get control runs since last notidication run
# Read the Delta table
control_runs_df = spark.read.format("delta").load(f"{table_paths['control_runs']}")

# For example, filter by `control_run_id`
control_run_row = control_runs_df.orderBy(control_runs_df.control_run_timestamp.desc()).first()
context = {
    "department_id": control_run_row.department_id,
    "feature_id": control_run_row.feature_id,
    "feature_version": int(control_run_row.feature_version),  # Convert to integer
    "feature_data": [
        {"Index": int(feature.Index), "FEATURE": feature.Feature}
        for feature in control_run_row.feature_data  # Take the first 5 elements
    ],
    "control_id": control_run_row.control_id,
    "control_version": int(control_run_row.control_version),  # Convert to integer
    "control_signal_data": [
        {
            "Index": int(control.Index),
            "FEATURE": control.Feature,
            "moving_range": control.moving_range,
            "UCL": control.UCL,
            "LCL": control.LCL,
            "CL": control.CL,
            "out_of_control": control.out_of_control
        }
        for control in control_run_row.control_signal_data
    ],
    "signal_detected": control_run_row.signal_detected
}

print(context)

# Configuration for email service (non-SSL)
recipient = triage_team

notification_details_subject = f"Notification: {control_id} on {feature_id}"
subject = notification_details_subject
subject = "Greeting from test"
template_dir = notification_template_dir
template_name = notification_template_file

demo = True
if demo:
    config = EmailConfig(
        smtp_server="smtp.example.com",
        smtp_port=587,  # Correct port for non-SSL with STARTTLS
        username="sender@example.com",
        password="password",
        use_ssl=False  # Ensure this is False for STARTTLS
    )
    
    email_service = EmailService(config, template_dir=template_dir)
    
    # Render the template with data
    output = email_service.renderer.render(template_name, context)
    
    # Step 4: Write the rendered output to an HTML file inside the 'reports' directory
    report_dir = './reports'
    os.makedirs(report_dir, exist_ok=True)
    
    report_path = os.path.join(report_dir, 'control_signal_report.html')
    with open(report_path, 'w') as f:
        f.write(output)
    
    print(f"HTML report generated and saved as '{report_path}'")
    
    send_email = False
    if send_email:
        # Send the email
        email_service.send(
            recipient=recipient,
            subject=subject,
            template_name=template_name,
            context=context
        )

In [None]:
from jinja2 import Environment, FileSystemLoader

In [None]:
# # Configuration for STARTTLS (non-SSL)
# config = EmailConfig(
#     smtp_server="smtp.example.com",
#     smtp_port=587,  # Correct port for non-SSL with STARTTLS
#     username="sender@example.com",
#     password="password",
#     use_ssl=False  # Ensure this is False for STARTTLS
# )

# email_service = EmailService(config, template_dir="services/notification/email/templates")

# recipient = "recipient@example.com"
# subject = "Test Email"
# context = {"title": "Test Title", "message": "Test message content."}

# # Send the email
# email_service.send(
#     recipient=recipient,
#     subject=subject,
#     template_name="notification_template.html",
#     context=context
# )

In [None]:
noti_runs = spark.sql(f"select * from delta.`{table_paths['notification_runs']}`")
noti_runs.show()

In [None]:
### EMAIL SERVICE EXAMPLE: COMPLETE
# Context returned from Control Run Signal Events
# Define the data to pass to the template
data = {
    "department_id": "OpsSigDevTeam",
    "feature_id": "HighPrecissionTempSystem",
    "feature_version": 1,
    "feature_data": [
        {"Index": 180, "FEATURE": 72.145},
        {"Index": 179, "FEATURE": 72.023},
        {"Index": 178, "FEATURE": 72.121},
        {"Index": 177, "FEATURE": 71.977},
        {"Index": 176, "FEATURE": 71.000},
    ],
    "control_id": "HighPrecissionTempSystem-xmr",
    "control_version": 1,
    "control_signal_data": [
        {
            "Index": 157,
            "FEATURE": 71.166,
            "moving_range": 0.847,
            "UCL": 72.550,
            "LCL": 71.350,
            "CL": 71.950,
            "out_of_control": True
        }
    ],
    "signal_detected": True
}

# Configuration for STARTTLS (non-SSL)
config = EmailConfig(
    smtp_server="smtp.example.com",
    smtp_port=587,  # Correct port for non-SSL with STARTTLS
    username="sender@example.com",
    password="password",
    use_ssl=False  # Ensure this is False for STARTTLS
)

email_service = EmailService(config, template_dir=notification_template_dir)

recipient = "recipient@example.com"
subject = "Test Email"
template_name=notification_template_file
context = {"title": "Test Title", "message": "Test message content."}
# Send the email
email_service.send(
    recipient=recipient,
    subject=subject,
    template_name=notification_template_file,
    context=context
)

In [None]:
### RENDER EMAIL EXAMPLE: COMPLETE
# Step 3: Load the template from the file and render the data
file_loader = FileSystemLoader(example_notification_paths["template_store"])
env = Environment(loader=file_loader)

# Load the template file
template = env.get_template(f'{notification_template_dir}{notification_template_file}')

# Get data from control runs with singal

# Define the data to pass to the template
data = {
    "department_id": "OpsSigDevTeam",
    "feature_id": "HighPrecissionTempSystem",
    "feature_version": 1,
    "feature_data": [
        {"Index": 180, "FEATURE": 72.145},
        {"Index": 179, "FEATURE": 72.023},
        {"Index": 178, "FEATURE": 72.121},
        {"Index": 177, "FEATURE": 71.977},
        {"Index": 176, "FEATURE": 71.000},
    ],
    "control_id": "HighPrecissionTempSystem-xmr",
    "control_version": 1,
    "control_signal_data": [
        {
            "Index": 157,
            "FEATURE": 71.166,
            "moving_range": 0.847,
            "UCL": 72.550,
            "LCL": 71.350,
            "CL": 71.950,
            "out_of_control": True
        }
    ],
    "signal_detected": True
}

# Render the template with data
output = template.render(data)

# Step 4: Write the rendered output to an HTML file inside the 'reports' directory
report_dir = './reports'
os.makedirs(report_dir, exist_ok=True)

report_path = os.path.join(report_dir, 'control_signal_report.html')
with open(report_path, 'w') as f:
    f.write(output)

print(f"HTML report generated and saved as '{report_path}'")

In [None]:
###############################################################################
## NOTIFICATION RUN DEFINITION
    # department_id STRING,
    # feature_id STRING,
    # feature_version STRING,
    # feature_data ARRAY,
    # control_id STRING,
    # control_version STRING,
###############################################################################
# Control Triage Team
triage_team = triage_team # Feature Retister
control_id = 'HighPrecissionTempSystem-xmr'

# Get notification details
notification_details = get_notification_by_contorl_id(control_id)

# Get all signal events since last notification run
# High water mark
last_notification_run_timestamp = get_last_notification_run_timestamp(control_id)

# signal data
signal_data = get_signal_since_run(control_id, last_notification_run_timestamp)

# create notifications for each control run with unmitigated singal event
notification_file_path = create_notification(signal_data)

# send notification

In [None]:
from datetime import datetime

# Function to check for new notifications and send them
def check_and_send_notifications():
    # Step 1: Check for control runs by control ID that detected a signal
    control_runs_df = spark.sql(f"""
        SELECT control_id, control_signal_data
        FROM delta.`{table_paths['control_runs']}`
        WHERE signal_detected = True
    """)

    # Step 2: Check if a notification was already sent for that signal
    notification_runs_df = spark.sql(f"""
        SELECT control_id, notification_id
        FROM delta.`{table_paths['notification_runs']}`
    """)
    
    # Step 3: Filter control runs that have not been notified yet
    new_notifications_df = control_runs_df.join(
        notification_runs_df,
        control_runs_df.control_id == notification_runs_df.control_id,
        how="leftanti"  # Left Anti Join to exclude already notified control runs
    )

    # Step 4: Send the notifications for new signals
    for row in new_notifications_df.collect():
        control_id = row['control_id']
        control_signal_data = row['control_signal_data']

        # Fetch the corresponding notification details
        notification_details_df = spark.sql(f"""
            SELECT notification_id, notification_template_path, notification_type, notification_priority
            FROM delta.`{table_paths['notification_details']}`
            WHERE control_id = '{control_id}'
        """)

        # For each notification detail, send the notification
        for notification in notification_details_df.collect():
            notification_id = notification['notification_id']
            notification_template_path = notification['notification_template_path']
            notification_type = notification['notification_type']
            notification_priority = notification['notification_priority']

            # Simulate sending a notification (e.g., via email, SMS, or other service)
            message_status, response_code = send_notification(notification_template_path, notification_type, control_signal_data)

            # Log the notification run into the notification_runs table
            insert_notification_run(
                notification_id,
                control_id,
                notification_type,
                notification_priority,
                message_status,
                response_code
            )
    
    print("Notification process completed successfully.")

In [None]:
check_and_send_notifications()