# Data Quality Rules Execution

## Overview
This notebook executes IBM Cloud Pak for Data (CPD) data quality rules in parallel for improved performance. It provides:

- **Parallel execution** of multiple DQ rules using ThreadPoolExecutor
- **Configurable batch sizes** to control cluster load
- **Output saved to project** with execution results  

## Settings

**CPD Host**: The IBM Cloud Pak for Data cluster endpoint where your data quality rules are deployed.

**Batch Size**: Maximum number of rules to execute concurrently. Adjust based on your cluster capacity.
**Delay in Seconds**: The number of seconds to wait between batches.


> ⚠️ **Note**: Higher batch sizes may overwhelm the cluster and cause timeouts. Start with 5 and adjust based on performance.

In [None]:
CPD_HOST = "cpd-host.company.com"
BATCH_SIZE = 5
DELAY_IN_SECONDS = 1

## Required Imports

In [2]:
from ibm_watson_studio_lib import access_project_or_space
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime
import time
import requests
import pandas as pd

import urllib3
urllib3.disable_warnings()

### Initialize Watson Studio library for project access

In [3]:
wslib = access_project_or_space()

### Listing assets by a specific asset type
You can use `wslib.assets.list_assets` without a filter to retrieve all assets of a given asset type. Use `wslib.assets.list_asset_types` to get a list of all available asset types. Or you can use the generic asset type asset to retrieve all assets. In this case we need data quality rules.

In [4]:
dq_rules = wslib.assets.list_assets("data_rule")
# wslib.show(dq_rules)
# for dq_rule in dq_rules:
#    print(dq_rule['asset_id'] + ' - ' + dq_rule['name'])


## Core Execution Function

Executes a single data quality rule via CPD API and returns structured results.

In [5]:
def execute_rule(token, project_id, rule_id):
    try:
        url = f"https://{CPD_HOST}/data_quality/v3/projects/{project_id}/rules/{rule_id}/execute"
        headers = {
            'Content-Type': 'application/json',
            'Authorization': f'Bearer {token}'
        }
        
        response = requests.post(url, headers=headers, verify=False)
        
        if response.status_code == 200:
            execution_data = response.json()
            return {
                'success': True,
                'rule_id': rule_id,
                'rule_name': execution_data.get('name', 'Unknown Rule'),
                'status': execution_data.get('status', {}).get('state', 'unknown'),
                'job_id': execution_data.get('job', {}).get('id'),
                'job_run_id': execution_data.get('job_run', {}).get('id')
            }
        else:
            return {
                'success': False,
                'rule_id': rule_id,
                'error': f"HTTP {response.status_code}",
                'response': response.text
            }
            
    except Exception as e:
        return {
            'success': False,
            'rule_id': rule_id,
            'error': str(e)
        }

## Parallel Execution Function

Executes multiple data quality rules concurrently using ThreadPoolExecutor.

In [6]:
def execute_rules_parallel(token, project_id, dq_rules, max_workers=5):
    """Execute rules in parallel and return structured results"""
    results = []
    
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        # Submit all jobs
        future_to_rule = {
            executor.submit(execute_rule, token, project_id, rule['asset_id']): rule
            for rule in dq_rules
        }
        
        # Process as they complete
        for future in as_completed(future_to_rule):
            rule = future_to_rule[future]
            
            try:
                result = future.result()
                results.append((rule, result))
            except Exception as exc:
                results.append((rule, {'success': False, 'error': str(exc)}))
    
    return results

## Parallel Execution Function with Delay

In [7]:
def execute_rules_parallel_with_delay(token, project_id, dq_rules, batch_size=5, delay_between_batches=1):
    """Execute rules in parallel with delay in between"""
    all_results = []
    
    # If no delay requested
    if delay_between_batches == 0:
        return execute_rules_parallel(token, project_id, dq_rules, max_workers=batch_size)
    
    # Otherwise, process in batches with delays
    for i in range(0, len(dq_rules), batch_size):
        batch = dq_rules[i:i + batch_size]
        batch_results = execute_rules_parallel(token, project_id, batch, max_workers=batch_size)
        all_results.extend(batch_results)
        
        if i + batch_size < len(dq_rules):
            time.sleep(delay_between_batches)
    
    return all_results

## Results Processing

Converts execution results into a pandas DataFrame for analysis and display.

In [8]:
def results_to_dataframe(results):
    data = []
    for rule, result in results:
        if result.get('success', False):
            data.append({
                'Rule Name': rule['name'],
                'Rule ID': rule['asset_id'],
                'Status': result.get('status', 'unknown'),
                'Job ID': result.get('job_id', 'N/A'),
                'Job Run ID': result.get('job_run_id', 'N/A'),
                'Success': True,
                'Error': None
            })
        else:
            data.append({
                'Rule Name': rule['name'],
                'Rule ID': rule['asset_id'], 
                'Status': 'Failed',
                'Job ID': 'N/A',
                'Job Run ID': 'N/A',
                'Success': False,
                'Error': result.get('error', 'Unknown error')
            })
    
    df = pd.DataFrame(data)
    return df

## Save Results

Saves API calls result as a data asset withtin the project as CSV.

In [9]:
def save_results(pandas_df, prefix="dq_run"):
    """Save DataFrame with timestamp in filename"""

    timestamp = datetime.now().strftime("%Y-%m-%d-%H-%M-%S")
    filename = f"{prefix}_{timestamp}.csv"
    
    # Save the file
    wslib.save_data(filename, pandas_df.to_csv(index=False).encode())
    print(f"Results saved to: {filename}")
    
    return filename

## Execute Rules and Save Results

Main execution block: get current project ID and auth token, run rules in parallel, process results, and save the result.

In [10]:
project_id = wslib.here.get_ID()
token = wslib.auth.get_current_token()

results = execute_rules_parallel_with_delay(token, project_id, dq_rules, batch_size=BATCH_SIZE, delay_between_batches=DELAY_IN_SECONDS)
df = results_to_dataframe(results)
save_results(df)

Results saved to: dq_run_2025-06-26-04-02-20.csv


'dq_run_2025-06-26-04-02-20.csv'

## Display Results (optional)

In [None]:
display(df)