In [0]:
%python
import re
import requests
import base64

# Databricks API details
DATABRICKS_HOST = "https://adb-845055060386182.2.azuredatabricks.net"
DATABRICKS_TOKEN = "dapic77f368dd1174a60661f559bc3f41d65-2"
headers = {"Authorization": f"Bearer {DATABRICKS_TOKEN}"}

# Function to fetch and decode notebook content
def fetch_notebook_content(notebook_path):
    response = requests.get(
        f"{DATABRICKS_HOST}/api/2.0/workspace/export",
        headers=headers,
        params={"path": notebook_path, "format": "SOURCE"}  # Fetch raw SQL content
    )
    if response.status_code == 200:
        encoded_content = response.json().get("content", "")
        return base64.b64decode(encoded_content).decode("utf-8")  # Decode Base64
    else:
        print(f"❌ Error fetching notebook content ({notebook_path}): {response.text}")
        return None  # Return None on failure

# Function to validate SQL script for column comments
def validate_sql_script(sql_content):
    """
    Validates if all columns in a CREATE TABLE statement have comments.
    Returns True if valid, False otherwise.
    """
    # Match the CREATE TABLE statement inside spark.sql(""" ... """)
    spark_sql_pattern = re.compile(r'spark\.sql\("""(.*?)"""\)', re.DOTALL)

    # Extract SQL content inside spark.sql calls
    match = spark_sql_pattern.search(sql_content)
    if not match:
        print("⚠️ No CREATE TABLE statement found in this file.")
        return True  # Ignore files that don't create tables

    sql_content_inside_spark_sql = match.group(1)
    
    # Debug: print the content inside spark.sql(""" ... """)
    print("🔹 Extracted SQL content inside spark.sql: ")
    print(sql_content_inside_spark_sql)  # This will show the SQL content in the notebook

    # Improved regular expression to match CREATE TABLE statement more flexibly
    create_table_pattern = re.compile(
        r"CREATE\s+TABLE\s+IF\s+NOT\s+EXISTS\s+[\w.]+\s*\(\s*(.*?)\s*\)\s*(?:COMMENT\s*'.*')?", 
        re.IGNORECASE | re.DOTALL
    )
    
    # Regex to capture column definitions and their comments, including DECIMAL type columns
    column_pattern = re.compile(
        r"\s*(\w+)\s+\w+[\w\(\),]*\s*(?:COMMENT\s*'([^']*)')?", 
        re.IGNORECASE | re.DOTALL
    )

    # Find the CREATE TABLE statement
    create_match = create_table_pattern.search(sql_content_inside_spark_sql)
    if not create_match:
        print("⚠️ No CREATE TABLE statement found in the SQL.")
        return True  # Ignore files that don't create tables

    column_definitions = create_match.group(1)
    columns = column_pattern.findall(column_definitions)

    # Debugging: Print columns and their comments for review
    print("🔹 Extracted Columns and Their Comments:")
    for col in columns:
        print(f"Column: {col[0]} | Comment: {col[1]}")

    # Check for columns missing comments
    missing_comments = [col[0] for col in columns if not col[1].strip()]
    if missing_comments:
        print(f"❌ Missing comments for columns: {missing_comments}")
        return False  # Fail validation
    return True

# Notebook path
notebook_path = "/Workspace/Users/saipoojitha90@gmail.com/msspoc/metadataPR"

# Fetch the notebook content
sql_content = fetch_notebook_content(notebook_path)

if sql_content:
    # Validate the SQL content for missing comments
    if validate_sql_script(sql_content):
        print("✅ The notebook has comments for all columns in CREATE TABLE statements.")
    else:
        print("❌ The notebook has missing comments for some columns in CREATE TABLE statements.")
else:
    print("❌ Failed to fetch notebook content.")

In [0]:
import json
import re
import os
import sys
import base64
import requests
from pyspark.sql.utils import AnalysisException

# Step 1: Get modified files from GitHub Actions
dbutils.widgets.text("modified_files", "[]")  # Default to empty list
modified_files = json.loads(dbutils.widgets.get("modified_files"))

if not modified_files:
    print("✅ No modified files to validate.")
    sys.exit(0)  # Exit successfully if there are no changes

# Step 2: Get the Databricks username
try:
    username = spark.sql("SELECT current_user()").collect()[0][0]
except AnalysisException:
    print("❌ Error: Unable to fetch username.")
    sys.exit(1)  # Fail the job

# Step 3: Get the repo name dynamically
repo_path = dbutils.notebook.entry_point.getDbutils().notebook().getContext().notebookPath().get()
repo_name = repo_path.split("/")[3]  # Extract repo name

# Step 4: Construct full paths to modified notebooks
full_paths = [f"/Workspace/Users/{username}/{repo_name}/{os.path.splitext(file)[0]}" for file in modified_files]
print("🔹 Full Paths to Validate:", full_paths)

# Step 5: Databricks API details
DATABRICKS_HOST = "https://adb-845055060386182.2.azuredatabricks.net"
DATABRICKS_TOKEN = "dapic77f368dd1174a60661f559bc3f41d65-2"

headers = {"Authorization": f"Bearer {DATABRICKS_TOKEN}"}

# Step 6: Function to fetch and decode notebook content
def fetch_notebook_content(notebook_path):
    response = requests.get(
        f"{DATABRICKS_HOST}/api/2.0/workspace/export",
        headers=headers,
        params={"path": notebook_path, "format": "SOURCE"}  # Fetch raw SQL content
    )
    if response.status_code == 200:
        encoded_content = response.json().get("content", "")
        return base64.b64decode(encoded_content).decode("utf-8")  # Decode Base64
    else:
        print(f"❌ Error fetching notebook content ({notebook_path}): {response.text}")
        return None  # Return None on failure

# Step 7: Function to validate SQL scripts
def validate_sql_script(sql_content):
    """
    Validates if all columns in a CREATE TABLE statement have comments.
    Returns True if valid, False otherwise.
    """
    # Match the CREATE TABLE statement inside spark.sql(""" ... """)
    spark_sql_pattern = re.compile(r"spark\.sql\("""(.*?)"""\)", re.DOTALL)

    # Extract SQL content inside spark.sql calls
    match = spark_sql_pattern.search(sql_content)
    if not match:
        print("⚠️ No CREATE TABLE statement found in this file.")
        return True  # Ignore files that don't create tables

    sql_content_inside_spark_sql = match.group(1)

    # Now validate CREATE TABLE statement inside the SQL content
    create_table_pattern = re.compile(
        r"CREATE\s+TABLE\s+[\w.]+\s*\((.*?)\)\s*COMMENT\s*=", 
        re.IGNORECASE | re.DOTALL
    )
    column_pattern = re.compile(
        r"\s*(\w+)\s+\w+(\s+COMMENT\s+'([^']+)')?", 
        re.IGNORECASE
    )

    # Find the CREATE TABLE statement
    create_match = create_table_pattern.search(sql_content_inside_spark_sql)
    if not create_match:
        print("⚠️ No CREATE TABLE statement found in the SQL.")
        return True  # Ignore files that don't create tables

    column_definitions = create_match.group(1)
    columns = column_pattern.findall(column_definitions)

    # Check for columns missing comments
    missing_comments = [col[0] for col in columns if not col[2]]
    if missing_comments:
        print(f"❌ Missing comments for columns: {missing_comments}")
        return False  # Fail validation
    return True

# Step 8: Validate each modified file
validation_failed = False

for file_path in full_paths:
    sql_content = fetch_notebook_content(file_path)
    
    if sql_content is None:
        validation_failed = True  # Fail if notebook fetch fails
        continue

    if not validate_sql_script(sql_content):
        validation_failed = True  # Fail if SQL validation fails

# Step 9: Exit with failure if validation fails
if validation_failed:
    print("❌ Validation failed! Some tables have missing column comments.")
    sys.exit(1)  # Fail the Databricks job (which fails GitHub Actions)
else:
    print("✅ Validation passed! All tables have proper column comments.")
    sys.exit(0)  # Pass the job


In [0]:
# Import necessary libraries
import os
from pyspark.sql import SparkSession

# Fetch the username dynamically from the Spark context
username = dbutils.notebook.entry_point.getDbutils().notebook().getContext().userName().get()

# Fetch available catalogs dynamically
catalogs = [row[0] for row in spark.sql("SHOW CATALOGS").collect()]

# Create widgets for user input
dbutils.widgets.dropdown("catalog", catalogs[0], catalogs, "Select Catalog")
dbutils.widgets.dropdown("script_type", "Result Table Block", 
    ["Result Table Block", "Result View Block"], "Select Script Type")
dbutils.widgets.dropdown("table_type", "fact", ["fact", "dim", "hist"], "Select Table Type")
dbutils.widgets.dropdown("domain", "usg", ["usg", "evt", "ntl"], "Select Domain")
dbutils.widgets.dropdown("data_level", "CustID", ["CustID", "AgentID", "BillID"], "Select Data Level")
dbutils.widgets.dropdown("aggregation", "Day", ["Day", "Week", "Month", "Quarter", "Year"], "Select Aggregation")
dbutils.widgets.multiselect("schema_selection", "Wholesale", ["Wholesale", "Retail", "1 to 1 View"], "Select Schema Type")
dbutils.widgets.text("table_name", "MyTableName", "Enter Table Name")
dbutils.widgets.dropdown("schema_name", "EDWS", ["EDWS"], "Select Base Schema")

# Retrieve user inputs
catalog = dbutils.widgets.get("catalog")
script_type_full = dbutils.widgets.get("script_type")
table_type = dbutils.widgets.get("table_type")
domain = dbutils.widgets.get("domain")
data_level = dbutils.widgets.get("data_level")
aggregation_full = dbutils.widgets.get("aggregation")
schema_selection = dbutils.widgets.get("schema_selection")
table_name = dbutils.widgets.get("table_name")
base_schema = dbutils.widgets.get("schema_name")

# Derive abbreviations for script type and aggregation
script_type_map = {
    "Result Table Block": "R",
    "Result View Block": "V"
}
aggregation_map = {
    "Day": "D",
    "Week": "W",
    "Month": "M",
    "Quarter": "Q",
    "Year": "Y"
}
script_type = script_type_map[script_type_full]
aggregation = aggregation_map[aggregation_full]

# Generate schema names dynamically
both_schema = f"{base_schema}WR"
wholesale_view_schema = f"{base_schema}WV"
retail_view_schema = f"{base_schema}RV"
both_view_schema = f"{both_schema}V"

# Construct table name
full_table_name = f"{table_type}_{domain}_{script_type}_{data_level}_{aggregation}_{table_name}"

# Explicit columns with data types for table creation
columns = """
    cust_id INT COMMENT 'Customer ID',
    agent_id INT COMMENT 'Agent ID',
    bill_id INT COMMENT 'Bill ID',
    customer_name STRING COMMENT 'Name of the customer',
    agent_name STRING COMMENT 'Name of the agent',
    bill_amount DECIMAL(10, 2) COMMENT 'Amount of the bill',
    transaction_date TIMESTAMP COMMENT 'Date of the transaction',
    created_at TIMESTAMP COMMENT 'Record creation timestamp',
    updated_at TIMESTAMP COMMENT 'Record update timestamp'
"""

# Initialize SQL script variables
table_script = None
view_scripts = []

# Logic for Result Table Block
if script_type_full == "Result Table Block":
    # Create table in base schema(s)
    table_script = f"""
        CREATE TABLE {catalog}.{both_schema}.{full_table_name} (
            {columns}
        ) USING DELTA 
        TBLPROPERTIES ('tag' = 'mssdemo');
        """
        
    if "Wholesale" in schema_selection:
        view_scripts.append(f"""
        CREATE VIEW {catalog}.{wholesale_view_schema}.{full_table_name} AS
        SELECT * FROM {catalog}.{both_schema}.{full_table_name};
        SET TBLPROPERTIES ('tag' = 'wholesale_view');
        """)
        # Add descriptions to the columns
        view_scripts.append(f"""
        COMMENT ON VIEW {catalog}.{wholesale_view_schema}.{full_table_name} IS 'This is a description of the view';
        COMMENT ON COLUMN {catalog}.{wholesale_view_schema}.{full_table_name}.cust_id IS 'Customer ID';
        COMMENT ON COLUMN {catalog}.{wholesale_view_schema}.{full_table_name}.agent_id IS 'Agent ID';
        COMMENT ON COLUMN {catalog}.{wholesale_view_schema}.{full_table_name}.bill_id IS 'Bill ID';
        COMMENT ON COLUMN {catalog}.{wholesale_view_schema}.{full_table_name}.customer_name IS 'Name of the customer';
        COMMENT ON COLUMN {catalog}.{wholesale_view_schema}.{full_table_name}.agent_name IS 'Name of the agent';
        COMMENT ON COLUMN {catalog}.{wholesale_view_schema}.{full_table_name}.bill_amount IS 'Amount of the bill';
        COMMENT ON COLUMN {catalog}.{wholesale_view_schema}.{full_table_name}.transaction_date IS 'Date of the transaction';
        COMMENT ON COLUMN {catalog}.{wholesale_view_schema}.{full_table_name}.created_at IS 'Record creation timestamp';
        COMMENT ON COLUMN {catalog}.{wholesale_view_schema}.{full_table_name}.updated_at IS 'Record update timestamp';
        """)

    if "Retail" in schema_selection:
        view_scripts.append(f"""
        CREATE VIEW {catalog}.{retail_view_schema}.{full_table_name} AS
        SELECT * FROM {catalog}.{both_schema}.{full_table_name};
        SET TBLPROPERTIES ('tag' = 'Retail_view');
        """)
        view_scripts.append(f"""
        COMMENT ON VIEW {catalog}.{retail_view_schema}.{full_table_name} IS 'This is a description of the view';
        COMMENT ON COLUMN {catalog}.{retail_view_schema}.{full_table_name}.cust_id IS 'Customer ID';
        COMMENT ON COLUMN {catalog}.{retail_view_schema}.{full_table_name}.agent_id IS 'Agent ID';
        COMMENT ON COLUMN {catalog}.{retail_view_schema}.{full_table_name}.bill_id IS 'Bill ID';
        COMMENT ON COLUMN {catalog}.{retail_view_schema}.{full_table_name}.customer_name IS 'Name of the customer';
        COMMENT ON COLUMN {catalog}.{retail_view_schema}.{full_table_name}.agent_name IS 'Name of the agent';
        COMMENT ON COLUMN {catalog}.{retail_view_schema}.{full_table_name}.bill_amount IS 'Amount of the bill';
        COMMENT ON COLUMN {catalog}.{retail_view_schema}.{full_table_name}.transaction_date IS 'Date of the transaction';
        COMMENT ON COLUMN {catalog}.{retail_view_schema}.{full_table_name}.created_at IS 'Record creation timestamp';
        COMMENT ON COLUMN {catalog}.{retail_view_schema}.{full_table_name}.updated_at IS 'Record update timestamp';
        """)

    if "1 to 1 View" in schema_selection:
        view_scripts.append(f"""
        CREATE VIEW {catalog}.{both_view_schema}.{full_table_name} AS
        SELECT * FROM {catalog}.{both_schema}.{full_table_name};
        SET TBLPROPERTIES ('tag' = '1 to 1_view');
        """)

# Logic for Result View Block
elif script_type_full == "Result View Block":
    # Create views only
    if "Wholesale" in schema_selection:
        view_scripts.append(f"""
        CREATE VIEW {catalog}.{wholesale_view_schema}.{full_table_name} AS
        SELECT 
            cust_id, agent_id, bill_id, customer_name, agent_name, 
            bill_amount, transaction_date, created_at, updated_at
        FROM {catalog}.{both_schema}.{full_table_name};
        """)
    if "Retail" in schema_selection:
        view_scripts.append(f"""
        CREATE VIEW {catalog}.{retail_view_schema}.{full_table_name} AS
        SELECT 
            cust_id, agent_id, bill_id, customer_name, agent_name, 
            bill_amount, transaction_date, created_at, updated_at
        FROM {catalog}.{both_schema}.{full_table_name};
        """)
    if "1 to 1 View" in schema_selection:
        view_scripts.append(f"""
        CREATE VIEW {catalog}.{both_view_schema}.{full_table_name} AS
        SELECT 
            cust_id, agent_id, bill_id, customer_name, agent_name, 
            bill_amount, transaction_date, created_at, updated_at
        FROM {catalog}.{both_schema}.{full_table_name}
        UNION ALL
        SELECT 
            cust_id, agent_id, bill_id, customer_name, agent_name, 
            bill_amount, transaction_date, created_at, updated_at
        FROM {catalog}.{both_schema}.{full_table_name};
        """)
# Print the table script and view scripts
if table_script:
    print("Table Creation Script:")
    print(table_script)

if view_scripts:
    print("View Creation Scripts:")
    for script in view_scripts:
        print(script)
        
template_path = f"/dbfs/tmp/script_templates/Poojitha/scripts.txt"

# Initialize the content of the file with the table script and view scripts
script_content = ""

# If the table_script exists, add it to the file content
if table_script:
    script_content += f"Table Creation Script:\n{table_script}\n\n"

# If the view_scripts exist, add them to the file content
if view_scripts:
    script_content += "View Creation Scripts:\n"
    for script in view_scripts:
        script_content += f"{script}\n"

if script_content:
    dbutils.fs.put(template_path,script_content,overwrite=True)
    print(f"Scripts saved to: {template_path}")
else:
    print("No scripts to save.")