#### Digital Pathology `NotebookSolutionCompanion`

**Instantiate `NotebookSolutionCompanion` from `./solacc_companion_init`**
  <!-- - The `__init__.py` code used here is taken from a prior PR that works without throwing the observed `request` error e.g. [solacc/companion/\__init\__.py](https://github.com/databricks-industry-solutions/notebook-solution-companion/blob/f7e381d77675b29c2d3f9d377a528ceaf2255f23/solacc/companion/__init__.py) link wrt the PR update -->


In [0]:
%run ./solacc_companion_init

In [0]:
nsc = NotebookSolutionCompanion()


**Add Additional Models to `NotebookSolutionCompanion`**

In [0]:
## To consider being included within NotebookSolutionCompanion (nsc) in the future -- for now this is a workaround

# Methods to add to the current instance of NotebookSolutionCompanion (nsc)
def _nsc_deploy_job_clusters(self, job_clusters_json, reuse=True, wait=0):
    """
    Deploy only the job clusters defined in the job configuration.
    Returns a dictionary mapping job_cluster_keys to their corresponding cluster IDs.
    """
    job_cluster_map = {}
    
    if not job_clusters_json:
        print("No job clusters to deploy")
        return job_cluster_map
    
    print(f"🚀 Deploying {len(job_clusters_json)} job clusters...")
    
    for job_cluster_params in job_clusters_json:
        jck = job_cluster_params["job_cluster_key"]
        if "new_cluster" in job_cluster_params:
            # Convert job cluster config to interactive cluster config
            cluster_params = self.convert_job_cluster_to_cluster(job_cluster_params)
            
            # Set auto-termination to 10 minutes
            cluster_params["autotermination_minutes"] = 10
            
            # Apply cloud-specific customization to the cluster parameters
            cluster_params = self.customize_cluster_json(cluster_params)
            
            # Check if cluster with this name already exists
            cluster_name = cluster_params["cluster_name"]
            clusters = self.client.execute_get_json(f"{self.client.endpoint}/api/2.0/clusters/list")["clusters"]
            clusters_matched = list(filter(lambda cluster: cluster_name == cluster["cluster_name"], clusters))
            cluster_exists = len(clusters_matched) > 0
            
            if cluster_exists and reuse:
                # Reuse the existing cluster
                cluster_id = clusters_matched[0]["cluster_id"]
                print(f"✅ Reusing existing cluster '{cluster_name}' with ID: {cluster_id}")
            else:
                if cluster_exists and not reuse:
                    # Delete the existing cluster first
                    cluster_id = clusters_matched[0]["cluster_id"]
                    print(f"🗑️ Deleting existing cluster '{cluster_name}' with ID: {cluster_id}")
                    self.client.execute_post_json(f"{self.client.endpoint}/api/2.0/clusters/permanent-delete", {"cluster_id": cluster_id})
                    time.sleep(5)  # Wait a bit for the deletion to take effect
                
                # Create or update the cluster
                jcid = self.create_or_update_cluster_by_name(cluster_params)
                
                # Set ACL for the cluster
                self.set_acl_for_cluster(jcid)
                
                cluster_id = jcid
            
            # Store the mapping
            job_cluster_map[jck] = cluster_id
            
            # Get libraries for this job cluster
            if hasattr(self, 'job_input_json'):
                jcl = self.get_library_list_for_cluster(self.job_input_json, jck)
                if jcl:
                    self.start_cluster(cluster_id)
                    self.install_libraries(cluster_id, jcl)
    
    time.sleep(wait)
    print(f"✅ Successfully deployed {len(job_cluster_map)} job clusters")
    return job_cluster_map


def _nsc_deploy_job_with_existing_clusters(self, job_json, cluster_map, reuse=True, run_job=False):
    """
    Deploy a job using existing clusters instead of job clusters.
    """
    # Create a deep copy of the job configuration
    job_params = copy.deepcopy(job_json)
    
    # Customize the notebook paths in the job JSON
    for i, task in enumerate(job_params.get("tasks", [])):
        if "notebook_task" in task:
            notebook_name = task["notebook_task"]["notebook_path"]
            if not notebook_name.startswith(self.solacc_path):
                task["notebook_task"]["notebook_path"] = f"{self.solacc_path}/{notebook_name}"
    
    # Set the job name if not already set
    if "name" not in job_params:
        job_params["name"] = self.job_name
    
    # Add access control list
    job_params["access_control_list"] = [
        {
            "group_name": "users",
            "permission_level": "CAN_MANAGE_RUN"
        }
    ]
    
    # Remove the job_clusters section
    if "job_clusters" in job_params:
        del job_params["job_clusters"]
    
    # Replace job_cluster_key with existing_cluster_id in tasks
    for task in job_params.get("tasks", []):
        if "job_cluster_key" in task and task["job_cluster_key"] in cluster_map:
            task["existing_cluster_id"] = cluster_map[task["job_cluster_key"]]
            del task["job_cluster_key"]
    
    # Check if job with this name already exists
    job_name = job_params["name"]
    job_found = list(self.w.jobs.list(name=job_name))
    job_exists = len(job_found) > 0
    
    if job_exists and reuse:
        # Reuse the existing job by updating it
        job_id = job_found[0].job_id
        print(f"✅ Updating existing job '{job_name}' with ID: {job_id}")
        reset_job_settings = JobSettings().from_dict(job_params)
        self.w.jobs.reset(job_id, reset_job_settings)
    else:
        if job_exists and not reuse:
            # Delete the existing job first
            job_id = job_found[0].job_id
            print(f"🗑️ Deleting existing job '{job_name}' with ID: {job_id}")
            self.w.jobs.delete(job_id=job_id)
            time.sleep(5)  # Wait a bit for the deletion to take effect
        
        # Create a new job
        create_job_request = CreateJob().from_dict(job_params)
        job_id = self.w.jobs.create(request=create_job_request).job_id
        print(f"✅ Created new job '{job_name}' with ID: {job_id}")
    
    # Store the job ID for future reference
    self.job_id = job_id
    
    # Run the job if requested
    run_id = None
    if run_job:
        print(f"🚀 Running job '{job_name}' with ID: {job_id}")
        # Use the Databricks SDK directly instead of self.run_job()
        run_response = self.w.jobs.run_now(job_id=job_id)
        run_id = run_response.run_id
    
    return job_id, run_id


def _nsc_deploy_digital_pathology_job(self, job_json, suffix="", reuse=True, run_job=False):
    """
    Deploy a digital pathology job using the two-step approach:
    1. Deploy the job clusters separately
    2. Deploy the job with references to the deployed clusters
    """
    # Store the job JSON for use in other methods
    self.job_input_json = copy.deepcopy(job_json)
    
    # Step 1: Deploy the job clusters separately
    job_clusters = job_json.get("job_clusters", [])
    cluster_map = self._deploy_job_clusters(job_clusters, reuse=reuse)
    
    # Step 2: Deploy the job with references to the deployed clusters
    job_id, run_id = self._deploy_job_with_existing_clusters(job_json, cluster_map, reuse=reuse, run_job=run_job)
    
    # Get the job name for reference
    job_name = job_json.get("name", "digital-pathology-job")
    
    # Collect cluster names and IDs
    cluster_details = {}
    for key, cluster_id in cluster_map.items():
        # Get the cluster name from the job_clusters configuration
        cluster_name = key
        for jc in job_clusters:
            if jc["job_cluster_key"] == key:
                # Use the cluster name from the job_cluster_key if available
                cluster_name = jc["job_cluster_key"]
                break
        
        cluster_details[key] = {
            "cluster_id": cluster_id,
            "cluster_name": cluster_name
        }
    
    # Return comprehensive result
    result = {
        "job_id": job_id,
        "job_name": job_name,
        "clusters": cluster_map,
        "cluster_details": cluster_details
    }
    
    if run_id:
        result["run_id"] = run_id
    
    return result


# Add these methods to the existing instance of NotebookSolutionCompanion (nsc)
import types
nsc._deploy_job_clusters = types.MethodType(_nsc_deploy_job_clusters, nsc)
nsc._deploy_job_with_existing_clusters = types.MethodType(_nsc_deploy_job_with_existing_clusters, nsc)
nsc._deploy_digital_pathology_job = types.MethodType(_nsc_deploy_digital_pathology_job, nsc)

print("✅ Added new methods to existing instance of NotebookSolutionCompanion (nsc)")

In [0]:
# Define the cleanup method
def _nsc_cleanup_digital_pathology_resources(self, results, confirm=True):
    """
    Clean up resources (job and clusters) created during digital pathology job deployment
    
    Parameters:
    -----------
    results : dict
        The results dictionary returned by deploy_digital_pathology_job
    confirm : bool, optional
        Whether to ask for confirmation before deleting resources (default: True)
    
    Returns:
    --------
    dict
        Dictionary with deletion status for each resource
    """
    if not isinstance(results, dict):
        print("❌ Invalid results object. Please provide the dictionary returned by deploy_digital_pathology_job.")
        return {"status": "failed", "reason": "Invalid results object"}
    
    # Extract resource IDs
    job_id = results.get('job_id')
    clusters = results.get('clusters', {})
    
    if not job_id and not clusters:
        print("❌ No resources to clean up. The results dictionary doesn't contain job_id or clusters.")
        return {"status": "failed", "reason": "No resources found"}
    
    # Ask for confirmation if required
    if confirm:
        print(f"⚠️ You are about to delete the following resources:")
        if job_id:
            print(f"   - Job: {results.get('job_name', 'Unknown')} (ID: {job_id})")
        
        if clusters:
            print(f"   - Clusters ({len(clusters)}):")
            for key, cluster_id in clusters.items():
                print(f"     - {key} (ID: {cluster_id})")
        
        confirmation = input("\nAre you sure you want to delete these resources? (y/n): ").strip().lower()
        if confirmation != 'y':
            print("❌ Cleanup cancelled.")
            return {"status": "cancelled"}
    
    deletion_status = {"job": None, "clusters": {}}
    
    # Delete the job
    if job_id:
        try:
            print(f"🗑️ Deleting job {results.get('job_name', 'Unknown')} (ID: {job_id})...")
            self.client.execute_post_json(f"{self.client.endpoint}/api/2.0/jobs/delete", {"job_id": job_id})
            print(f"✅ Successfully deleted job with ID: {job_id}")
            deletion_status["job"] = "success"
        except Exception as e:
            print(f"❌ Failed to delete job {job_id}: {e}")
            deletion_status["job"] = f"failed: {str(e)}"
    
    # Delete the clusters
    for key, cluster_id in clusters.items():
        try:
            print(f"🗑️ Deleting cluster {key} (ID: {cluster_id})...")
            self.client.execute_post_json(f"{self.client.endpoint}/api/2.0/clusters/permanent-delete", {"cluster_id": cluster_id})
            print(f"✅ Successfully deleted cluster with ID: {cluster_id}")
            deletion_status["clusters"][cluster_id] = "success"
        except Exception as e:
            print(f"❌ Failed to delete cluster {cluster_id}: {e}")
            deletion_status["clusters"][cluster_id] = f"failed: {str(e)}"
    
    # Overall status
    if deletion_status["job"] == "success" and all(status == "success" for status in deletion_status["clusters"].values()):
        deletion_status["status"] = "success"
        print("\n✅ All resources have been successfully deleted.")
    else:
        deletion_status["status"] = "partial"
        print("\n⚠️ Some resources could not be deleted. See details above.")
    
    return deletion_status

# Add the method to the existing instance of NotebookSolutionCompanion (nsc)
nsc.cleanup_digital_pathology_resources = types.MethodType(_nsc_cleanup_digital_pathology_resources, nsc)

print("✅ Added cleanup method to NotebookSolutionCompanion (nsc) instance")

**Include function to deploy workflow job**

In [0]:
## Function to deploy resources and job
def deploy_digital_pathology_job(suffix="", reuse=True, run_job=False, workspace_url=None):
    """
    Creates and deploys the digital pathology job using the enhanced NSC methods
    """
    # Create job configuration
    job_json = create_digital_pathology_job_config(suffix)
    
    # Deploy the job using the new method
    print("🚀 Creating digital pathology job with job clusters...")
    result = nsc._deploy_digital_pathology_job(job_json, suffix, reuse, run_job)
    
    print("\n✅ Job deployment complete!")
    
    # Add URLs if workspace_url is provided
    if workspace_url:
        # Ensure URL is properly formatted
        if workspace_url.endswith("/"):
            workspace_url = workspace_url[:-1]  # Remove trailing slash if present
        
        # Add job URL
        job_id = result['job_id']
        job_name = result['job_name']
        job_url = f"{workspace_url}/#job/{job_id}"
        result['job_url'] = job_url
        
        # Add cluster URLs
        cluster_urls = {}
        for key, cluster_id in result['clusters'].items():
            cluster_url = f"{workspace_url}/#setting/clusters/{cluster_id}/configuration"
            cluster_urls[key] = cluster_url
        
        result["cluster_urls"] = cluster_urls
        
        # Add run URL if a run was started
        if "run_id" in result:
            run_id = result['run_id']
            job_run_url = f"{workspace_url}/#job/{job_id}/run/{run_id}"
            result['job_run_url'] = job_run_url
    
    # # Display a summary of the deployment with links
    # print("\n📋 Deployment Summary:")
    
    # # Display job information
    # if workspace_url and 'job_url' in result:
    #     job_link = f"<a href='{result['job_url']}' target='_blank'>{result['job_name']}</a>"
    #     print(f"   - Job: {job_link if nsc.print_html else result['job_name']} (ID: {result['job_id']})")
    #     if not nsc.print_html:
    #         print(f"   - Job URL: {result['job_url']}")
    # else:
    #     print(f"   - Job ID: {result['job_id']}")
    #     print(f"   - Job Name: {result['job_name']}")
    
    # # Display run information if available
    # if "run_id" in result and workspace_url and 'job_run_url' in result:
    #     run_link = f"<a href='{result['job_run_url']}' target='_blank'>Run #{result['run_id']}</a>"
    #     print(f"   - Run: {run_link if nsc.print_html else 'Run #' + str(result['run_id'])}")
    #     if not nsc.print_html:
    #         print(f"   - Run URL: {result['job_run_url']}")
    # elif "run_id" in result:
    #     print(f"   - Run ID: {result['run_id']}")
    
    # # Display cluster information
    # print(f"   - Clusters: {len(result['clusters'])} deployed with 10-minute auto-termination")
    
    # if workspace_url and "cluster_urls" in result:
    #     print("   - Cluster details:")
    #     for key, cluster_id in result['clusters'].items():
    #         cluster_url = result['cluster_urls'][key]
    #         cluster_link = f"<a href='{cluster_url}' target='_blank'>{key}</a>"
    #         print(f"     - {cluster_link if nsc.print_html else key} (ID: {cluster_id})")
    #         if not nsc.print_html:
    #             print(f"       {cluster_url}")
    
    # Use displayHTML for a more interactive display if supported
    if nsc.print_html:
        try:
            from IPython.display import display, HTML
            
            # Create HTML for job information
            job_html = f"""
            <div style="margin-top: 20px; padding: 10px; border: 1px solid #ccc; border-radius: 5px; background-color: #f8f8f8;">
                <h3 style="margin-top: 0;">Digital Pathology Job Deployment</h3>
                <p><strong>Job:</strong> <a href="{result['job_url']}" target="_blank">{result['job_name']}</a> (ID: {result['job_id']})</p>
            """
            
            # Add run information if available
            if "run_id" in result and 'job_run_url' in result:
                job_html += f"""<p><strong>Run:</strong> <a href="{result['job_run_url']}" target="_blank">Run #{result['run_id']}</a></p>"""
            
            # Add cluster information
            job_html += """<p><strong>Clusters:</strong></p><ul>"""
            for key, cluster_id in result['clusters'].items():
                cluster_url = result['cluster_urls'][key]
                job_html += f"""<li><a href="{cluster_url}" target="_blank">{key}</a> (ID: {cluster_id})</li>"""
            job_html += """</ul></div>"""
            
            display(HTML(job_html))
        except:
            pass
    
    return result