In [None]:
# Install semantic-link-labs package
!pip install semantic-link-labs
!pip install tabulate

In [None]:
# Import Libraries
import sempy_labs as labs
import requests
import pandas as pd
import sempy.fabric as fabric
from tabulate import tabulate
from pyspark.sql import functions as F
from pyspark.sql.types import StringType
from sempy_labs.tom import connect_semantic_model
import pandas as pd



Setup Parameters

In [None]:
# Parameters
subscription_key =  mssparkutils.credentials.getSecret('your_key_vault_address','lint-subscription-key')

## Define Functions

In [None]:
import requests
import json
from urllib.parse import urlencode

# API Configuration
API_BASE_URL = "https://api.pqlint.com/v1"

class PQLintApiError(Exception):
    pass

def invoke_pqlint_api_request(uri, method='GET', headers=None, body=None, content_type='application/json'):
    """
    Helper function to make HTTP requests with proper error handling.
    """
    headers = headers or {}
    if content_type:
        headers['Content-Type'] = content_type

    try:
        if method.upper() == 'GET':
            response = requests.get(uri, headers=headers)
        elif method.upper() == 'POST':
            if isinstance(body, (dict, list)):
                response = requests.post(uri, headers=headers, data=json.dumps(body))
            else:
                response = requests.post(uri, headers=headers, data=body)
        else:
            raise ValueError(f"Unsupported HTTP method: {method}")

        response.raise_for_status()
        return response.json()
    except requests.exceptions.HTTPError as http_err:
        try:
            error_body = response.json()
            error_message = f"HTTP {response.status_code} {response.reason} - {error_body.get('body', str(error_body))}"
        except Exception:
            error_message = f"HTTP {response.status_code} {response.reason} - {http_err}"
        raise PQLintApiError(error_message)
    except Exception as err:
        raise PQLintApiError(f"PQLint API Error: {str(err)}")

def get_linting_rules():
    """
    Retrieves all linting rules from the PQLint API.
    """
    uri = f"{API_BASE_URL}/lint/rules"
    return invoke_pqlint_api_request(uri, method='GET')

def invoke_code_linting(code, subscription_key, rules=None, severity=None, format='pq'):
    """
    Lints Power Query M code or TMDL code using the PQLint API.
    """
    if not code.strip():
        raise ValueError("Code parameter cannot be null or empty")
    if not subscription_key.strip():
        raise ValueError("SubscriptionKey parameter cannot be null or empty")

    request_body = {
        "code": code
    }

    if rules:
        request_body["rules"] = rules

    options = {}
    if severity:
        options["severity"] = severity
    if format:
        options["format"] = format

    if options:
        request_body["options"] = options

    encoded_key = urlencode({"subscription-key": subscription_key})
    uri = f"{API_BASE_URL}/pq/lint?{encoded_key}"

    return invoke_pqlint_api_request(uri, method='POST', body=request_body)


def lint_result_to_df(lint_result, dataset_name, table_name, partition_name):
    """
    Converts lint_result (JSON/dict/list) to a DataFrame and adds DatasetName, TableName, and PartitionName columns.

    Args:
        lint_result (dict or list): The lint result JSON or list of rule violations.
        dataset_name (str): The dataset name to add.
        table_name (str): The table name to add.
        partition_name (str): The partition name to add.

    Returns:
        pd.DataFrame: Expanded DataFrame with relevant columns including DatasetName, TableName, PartitionName.
    """
    if isinstance(lint_result, list):
        df = pd.json_normalize(
            lint_result,
            sep='_',
            meta=['ID', 'Name', 'Category', 'Description', 'Severity'],
            errors='ignore'
        )
    elif isinstance(lint_result, dict):
        df = pd.json_normalize([lint_result], sep='_')
    else:
        raise ValueError("lint_result must be a dict or list")

    # Optional: Flatten References list to semicolon-separated string
    if 'References' in df.columns:
        df['References'] = df['References'].apply(lambda refs: '; '.join(
            f"{r.get('Description', '')} ({r.get('Link', '')})" for r in refs
        ) if isinstance(refs, list) else None)

    # Rename or fill missing columns if necessary
    if 'ErrorInformation_errorLocation_positionStart_lineNumber' not in df.columns:
        df['ErrorInformation_errorLocation_positionStart_lineNumber'] = None

    # Add Dataset/Table/Partition metadata
    df['DatasetName'] = dataset_name
    df['TableName'] = table_name
    df['PartitionName'] = partition_name

    return df


## Retrieve all semantic models in the workspace

In [None]:
# Get all workspaces
df_workspaces = fabric.list_workspaces()

def get_workspace_datasets(workspace_row):
    """Get datasets for a single workspace with error handling"""
    ws_id = workspace_row['Id']
    ws_name = workspace_row['Name']
    
    try:
        df_datasets = fabric.list_datasets(ws_id)
        df_datasets['workspace_id'] = ws_id
        df_datasets['workspace_name'] = ws_name
        return df_datasets
    except Exception as e:
        print(f"Error getting datasets for workspace {ws_name} ({ws_id}): {e}")
        return None

# Get datasets for all workspaces
datasets_list = []
for _, ws_row in df_workspaces.iterrows():
    datasets_df = get_workspace_datasets(ws_row)
    if datasets_df is not None:
        datasets_list.append(datasets_df)

# Combine results
if datasets_list:
    all_datasets_df = pd.concat(datasets_list, ignore_index=True)
    display(all_datasets_df)
    print(f"Total datasets found: {len(all_datasets_df)}")
else:
    print("No datasets found across all workspaces")


## Retrieve all semantic models/datasets and run linter on Partitions

In [None]:

results = []
for d in all_datasets_df.to_dict(orient="records"):
    print(f"[INFO] Linting {d['Dataset Name']} in {d['workspace_name']}")

    try:
        # Try connecting to semantic model
        with connect_semantic_model(dataset=d['Dataset ID'], workspace=d['workspace_id'], readonly=True) as tom:
            for t in tom.model.Tables:
                for c in t.Partitions:
                    # Skip partitions like 'LocalDateTable_...'
                    if c.Name.startswith("LocalDateTable_"):
                        continue

                    print(f"Linting '{t.Name}'[{c.Name}] in dataset '{d['Dataset Name']}'")

                    try:
                        lint_result = invoke_code_linting(
                            code=c.Source.Expression,
                            subscription_key=subscription_key,
                            rules=['no-ai-functions']
                        )

                        df_temp = lint_result_to_df(
                            lint_result,
                            dataset_name=d['Dataset Name'],
                            table_name=t.Name,
                            partition_name=c.Name
                        )

                        display(df_temp)  # Output results
                        results.append(df_temp)

                    except Exception as lint_err:
                        print(f"[ERROR] Linting failed for {d['Dataset Name']} -> {t.Name} -> {c.Name}: {lint_err}")
                        continue

    except Exception as conn_err:
        print(f"[ERROR] Failed to connect to {d['Dataset Name']} in {d['workspace_name']}: {conn_err}")
        continue




# Combine all results into a single DataFrame
df_all = pd.concat(results, ignore_index=True)

# After creating df_all as shown previously

# Example aggregation: count of categories per TableName and Partition
# Assuming your DataFrame has columns: 'TableName', 'Partition', 'Category'

agg_df = (
    df_all
    .groupby(['DatasetName', 'TableName', 'PartitionName', 'Category'])
    .size()
    .reset_index(name='Count')
)

print(tabulate(agg_df, headers='keys', tablefmt='github', showindex=False))