In [None]:
import snowflake.connector
import configparser
# Loading credentials from config.ini
config = configparser.ConfigParser()
config.read("config.ini")
SNOWFLAKE_CONFIG = {
   "user": config["snowflake"]["user"],
   "password": config["snowflake"]["password"],
   "account": config["snowflake"]["account"],
   "warehouse": config["snowflake"]["warehouse"],
   "database": config["snowflake"]["database"],
   "schema": config["snowflake"]["schema"],
}
def fetch_latest_email_alert():
   conn = snowflake.connector.connect(**SNOWFLAKE_CONFIG)
   cursor = conn.cursor()
   query = """
       SELECT subject, INCIDENT_ID, impact_level, affected_service, timestamp, description
       FROM cleaned_alerts
       ORDER BY timestamp DESC
       LIMIT 1;
   """
   cursor.execute(query)
   latest_alert = cursor.fetchone()
   column_names = [desc[0] for desc in cursor.description]
   cursor.close()
   conn.close()
   if latest_alert:
       return dict(zip(column_names, latest_alert))
   return None
# Fetching the latest alert
latest_alert = fetch_latest_email_alert()
print("Latest Email Alert:", latest_alert)



In [21]:
import requests
import json
API_URL = "https://api-inference.huggingface.co/models/mistralai/Mistral-7B-Instruct-v0.2"
API_KEY = config["huggingface"]["api_key"]
HEADERS = {"Authorization": f"Bearer {API_KEY}", "Content-Type": "application/json"}
def create_ai_solutions_table():
   #Creating the AI_SOLUTIONS table if it doesn't exist.
   conn = snowflake.connector.connect(**SNOWFLAKE_CONFIG)
   cursor = conn.cursor()
   query = """
   CREATE TABLE IF NOT EXISTS AI_SOLUTIONS (
       INCIDENT_ID STRING PRIMARY KEY,
       SUBJECT STRING,
       IMPACT_LEVEL STRING,
       AFFECTED_SERVICE STRING,
       TIMESTAMP TIMESTAMP,
       DESCRIPTION STRING,
       AI_GENERATED_SOLUTION STRING
   );
   """
   cursor.execute(query)
   conn.commit()
   cursor.close()
   conn.close()
   print("AI_SOLUTIONS table checked/created successfully.")


In [None]:
def fetch_latest_email_alert():
   #Fetching the most recent email alert from Snowflake.
   conn = snowflake.connector.connect(**SNOWFLAKE_CONFIG)
   cursor = conn.cursor()
   query = """
       SELECT subject, INCIDENT_ID, impact_level, affected_service, timestamp, description
       FROM cleaned_alerts
       ORDER BY timestamp DESC
       LIMIT 1;
   """
   cursor.execute(query)
   latest_alert = cursor.fetchone()
   column_names = [desc[0].lower() for desc in cursor.description]  # Convert to lowercase
   cursor.close()
   conn.close()
   return dict(zip(column_names, latest_alert)) if latest_alert else None
def generate_solution(alert_data):
   """Send the alert data to Hugging Face API and get a generated solution."""
   if not alert_data:
       return "No new alerts found."
   formatted_alert = f"""
   Incident ID: {alert_data['incident_id']}
   Subject: {alert_data['subject']}
   Impact Level: {alert_data['impact_level']}
   Affected Service: {alert_data['affected_service']}
   Timestamp: {alert_data['timestamp']}
   Description: {alert_data['description']}
   """
   data = {"inputs": f"Analyze this email alert and provide a solution along with steps to resolve:\n\n{formatted_alert}"}
   try:
       response = requests.post(API_URL, headers=HEADERS, json=data)
       response.raise_for_status()  # Raising error if request fails
       result = response.json()
       # Ensuring response is a list and extracting solution safely
       if isinstance(result, list) and result:
           generated_solution = result[0].get("generated_text", "Solution not found.")
       else:
           generated_solution = "Solution not found."
       print("\nPrompt:")
       print(data["inputs"])
       print("\nSolution:")
       print(generated_solution)
       return generated_solution
   except requests.exceptions.RequestException as e:
       print(f"Request failed: {e}")
       return "API request failed."
   except json.JSONDecodeError:
       print("Failed to decode JSON response from API.")
       return "Invalid API response."
   except Exception as e:
       print(f"An unexpected error occurred: {e}")
       return "Unexpected error."
def store_ai_solution(alert_data, ai_solution):
   """Stores the AI-generated solution in the AI_SOLUTIONS table."""
   conn = snowflake.connector.connect(**SNOWFLAKE_CONFIG)
   cursor = conn.cursor()
   query = """
   INSERT INTO AI_SOLUTIONS (INCIDENT_ID, SUBJECT, IMPACT_LEVEL, AFFECTED_SERVICE, TIMESTAMP, DESCRIPTION, AI_GENERATED_SOLUTION)
   VALUES (%s, %s, %s, %s, %s, %s, %s)
   """
   values = (
       alert_data['incident_id'], alert_data['subject'], alert_data['impact_level'],
       alert_data['affected_service'], alert_data['timestamp'], alert_data['description'], ai_solution
   )
   cursor.execute(query, values)
   conn.commit()
   cursor.close()
   conn.close()
   print("Solution stored successfully.")
# Execution
create_ai_solutions_table()  # Ensuring table exists before inserting data
latest_alert = fetch_latest_email_alert()
if latest_alert:
   solution = generate_solution(latest_alert)
   if solution:
       store_ai_solution(latest_alert, solution)

AI_SOLUTIONS table checked/created successfully.

Prompt:
Analyze this email alert and provide a solution along with steps to resolve:


   Incident ID: INC109
   Subject: GCP IAM Policy Change Detected
   Impact Level: Medium
   Affected Service: Google Cloud IAM
   Timestamp: 2025-04-03 16:20:00
   

Solution:
Analyze this email alert and provide a solution along with steps to resolve:


   Incident ID: INC109
   Subject: GCP IAM Policy Change Detected
   Impact Level: Medium
   Affected Service: Google Cloud IAM
   Timestamp: 2025-04-03 16:20:00
   
Solution:

1. Identify the unknown user:
   a. In the Google Cloud Console, navigate to the IAM & Admin > IAM page for the project 'project-xyz'.
   b. Look for the newly added user under the 'ALL MEMBERS' tab. Document the email address of the unknown user.
2. Check the user's role and access:
   a. Click on the unknown user's name in the IAM & Admin > IAM page.
   b. Review the 'Access Transitivity' tab for any possible escalation path