In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
import os
import re
import logging
import smtplib
from email.mime.text import MIMEText
from datetime import datetime

def setup_logging(log_file):
    """Sets up logging configuration."""
    logging.basicConfig(
        filename=log_file,
        level=logging.INFO,
        format="%(asctime)s - %(levelname)s - %(message)s"
    )

def send_email(subject, body, to_email, from_email, smtp_server, smtp_port, smtp_user, smtp_password):
    """Sends an email notification."""
    # This function requires valid SMTP server details to work.
    # It is included for completeness based on the original request, but can be skipped if email is not needed.
    try:
        msg = MIMEText(body)
        msg["Subject"] = subject
        msg["From"] = from_email
        msg["To"] = to_email

        # print("Attempting to send email (requires valid SMTP config)...")
        # with smtplib.SMTP(smtp_server, smtp_port) as server:
        #     server.starttls()
        #     server.login(smtp_user, smtp_password)
        #     server.sendmail(from_email, to_email, msg.as_string())
        logging.info("Email simulated successfully (SMTP disabled in sample).")
    except Exception as e:
        logging.error(f"Failed to send email: {e}")

def extract_table_and_operation(query):
    """
    Extracts the operation type and likely table name using regex.
    """
    patterns = {
        "rename_column": r"(?i)\bALTER\s+TABLE\s+(`?\"?\.?\w+`?\"?).*?\bRENAME\s+COLUMN\b",
        "update": r"(?i)\bUPDATE\s+(`?\"?\.?\w+`?\"?)\b",
        "insert": r"(?i)\bINSERT\s+INTO\s+(`?\"?\.?\w+`?\"?)\b",
        "delete": r"(?i)\bDELETE\s+FROM\s+(`?\"?\.?\w+`?\"?)\b",
    }

    for operation, pattern in patterns.items():
        match = re.search(pattern, query, re.DOTALL)
        if match:
            table_name = match.group(1).strip('`"').replace('.', '_')
            return operation, table_name

    if re.search(r"(?i)\bSELECT\b.*?\bJOIN\b", query, re.DOTALL):
        return "join_queries", "multi_table"

    return "other_queries", "unspecified"

def split_sql_file_by_table(input_file, base_output_dir, audit_file):
    """
    Splits an SQL query document into folders structured by table and operation.
    """
    try:
        os.makedirs(base_output_dir, exist_ok=True)
        with open(input_file, 'r') as file:
            sql_queries = [q.strip() for q in file.read().split(';') if q.strip()]

        audit_entries = []
        for query in sql_queries:
            operation, table_name = extract_table_and_operation(query)

            if table_name and operation:
                table_dir = os.path.join(base_output_dir, table_name)
                os.makedirs(table_dir, exist_ok=True)

                output_file_path = os.path.join(table_dir, f"{operation}.sql")

                with open(output_file_path, 'a') as op_file:
                    op_file.write(query + ";\n\n")

                log_message = f"Appended query for table '{table_name}' ({operation}) to {output_file_path}"
                logging.info(log_message)
                audit_entries.append(f"{datetime.now()} - SUCCESS - {log_message}")
            else:
                log_message = f"Could not parse operation/table for query: {query[:50]}..."
                logging.warning(log_message)
                audit_entries.append(f"{datetime.now()} - WARNING - {log_message}")

        with open(audit_file, 'w') as audit:
            audit.write("\n".join(audit_entries))
        logging.info("Audit log written successfully.")

    except Exception as e:
        logging.error(f"Error occurred: {e}")
        raise

# Example usage
if __name__ == "__main__":
    # Configuration
    input_sql_file = "queries.sql"
    output_directory = "split_queries_by_table"
    log_file = "process.log"
    audit_file = "audit.log"

    # Placeholder Email Config (SMTP functionality is commented out in send_email)
    email_config = {
        "to_email": "recipient@example.com",
        "from_email": "sender@example.com",
        "smtp_server": "smtp.example.com",
        "smtp_port": 587,
        "smtp_user": "smtp_user",
        "smtp_password": "smtp_password"
    }

    setup_logging(log_file)

    try:
        # Split SQL file
        split_sql_file_by_table(input_sql_file, output_directory, audit_file)

        # Send success email (simulation)
        subject = "SQL File Split - Success"
        body = f"The SQL file has been successfully split. Check the audit log at {audit_file}."
        send_email(subject, body, **email_config)

    except Exception as e:
        # Send failure email (simulation)
        subject = "SQL File Split - Failure"
        body = f"An error occurred during the SQL file split process: {e}"
        send_email(subject, body, **email_config)


In [None]:
import os
# Assuming 'spark' is an existing active SparkSession variable in your notebook environment

# --- CONFIGURE THE PATH ---
# This path is relative to where your notebook is running.
# The previous script created this folder in the same directory as the notebook file.
# We use 'file://' scheme for local file system access
base_sql_path = "file://./split_queries_by_table/"

print(f"Searching for SQL files in: {base_sql_path}")

sql_files_to_process = []

# os.walk iterates through the generated directory structure
# Note: os.walk needs a standard local path, so we remove the 'file://' prefix for the walk itself.
local_walk_path = base_sql_path.replace("file://", "")
for dirpath, _, filenames in os.walk(local_walk_path):
    for f in filenames:
        if f.endswith(".sql"):
            full_path = os.path.join(dirpath, f)
            sql_files_to_process.append(full_path)

print(f"Found {len(sql_files_to_process)} SQL files to execute.")

# Loop through and execute the SQL
for local_file_path in sql_files_to_process:
    # Read the content of the generated SQL file
    with open(local_file_path, 'r') as f:
        sql_query_content = f.read()

    try:
        print(f"\nExecuting SQL from: {local_file_path}")

        # spark.sql() runs the query using your active Spark session
        result_df = spark.sql(sql_query_content)

        # Print a message depending on the type of operation
        if 'SELECT' in sql_query_content.upper():
             print(f"-> Successfully loaded DataFrame. Result count: {result_df.count()}")
        else:
             print(f"-> Successfully executed DML operation (INSERT/UPDATE/DELETE/ALTER).")

    except Exception as e:
        print(f"-> FAILED to execute SQL from {local_file_path}: {e}")
