In [1]:
import time as t
from ctm_python_client.core.workflow import Workflow, WorkflowDefaults
from ctm_python_client.core.comm import Environment, EnvironmentMode
from aapi import *

workflow1 = Workflow(Environment.create_workbench('workbench'), WorkflowDefaults(run_as='workbench'))

workflow1.clear_all()
run_as_dummy = True

databricksLoad = JobDatabricks(
    'LoadForecasts',
    connection_profile='DATABRICKS',
    databricks_job_id='991955986358417',
    parameters='"params":{}',
    idempotency_token = 'tokeni_%%ORDERID',
    run_as_dummy= run_as_dummy
)

terraform_scale_databricks = JobTerraform(
    'TerraformScale',
    connection_profile='TERRAFORM',
    action='Run Workspace',
    workspace_name='dealership-demo',
    run_name= "demorun%%ORDERID",
    variables = [{"min_workers":"4"},{"max_workers":"10"}],
    run_as_dummy= run_as_dummy
)

ExtractInventoryHistory = JobDatabricks(
    'ExtractInventoryHistory',
    connection_profile='DATABRICKS',
    databricks_job_id='658383437534753',
    parameters='"params":{}',
    idempotency_token = 'token_%%ORDERID',
    run_as_dummy= run_as_dummy
)

web_service_job = JobWebServices(
    object_name='WeatherService',
    run_as_dummy=run_as_dummy,
    connection_profile='WeatherAPI',
    service='ForecastService',
    operation='GetWeather',
    request_type=JobWebServices.RequestType.FreeText,
    soap_request=['<SOAP-ENV>Request</SOAP-ENV>']
)

query_job = JobDatabaseSQLScript(
    object_name='QueryWeather',
    run_as_dummy=run_as_dummy,
    sql_script='UPDATE weather SET forecast="sunny" WHERE city="Seattle"',
    connection_profile='db_profile'
)

sagemaker_job = JobAwsSageMaker(
    f'InventoryForecastModel',
    connection_profile= 'SAGEMAKER',
    pipeline_name= 'InferencePipeline',
    idempotency_token= 'Token_ControlM_for_SageMaker%%ORDERID',
    add_parameters= 'checked',
    parameters= '{"Name":"input_file", "Value": "file"}',
    run_as_dummy= run_as_dummy,
    retry_pipeline_execution='unchecked',
    rerun_limit= Job.RerunLimit(times='5')
)

tableau_refresh = JobTableau(
    'TableauRefresh',
    connection_profile='TABLEAU',
    action="Refresh Datasource",
    datasource_name="inventory_forecast",
    run_as_dummy=run_as_dummy
)

folder = Folder('Folder_Sanity_Integration_Factory',
                controlm_server='workbench',
                job_list=[databricksLoad, terraform_scale_databricks, ExtractInventoryHistory, databricksLoad, web_service_job, query_job, sagemaker_job, tableau_refresh])


print(folder.dumps_aapi(indent=2))
workflow1.add(folder)

workflow1.build()
workflow1.deploy()
run = workflow1.run()
jobs = Workflow.get_jobs_list_of_folders(Environment.create_workbench('workbench'),server="workbench", folder="Folder_Sanity_Integration_Factory")

jobs1 = workflow1.get_jobs_draft_with_workflow_instance(server="workbench", folder="Folder_Sanity_Integration_Factory")
print(jobs)


[DEBUG] Initializing shared Converter instance.
[DEBUG] Discovering all @attrs classes under aapi...
[DEBUG] Registering structure hooks for discovered @attrs classes...
[DEBUG] Registering structure hooks for the following classes: ['JobVMwarePowerOff', 'ConnectionProfileAwsBatch', 'ActionSetToOK', 'JobFileWatcher', 'JobAWS', 'RunningJobs', 'ConnectionProfileIBMDataStageLinux', 'JobVMwareConfigurationDeployTemplate', 'JobCommunicationSuite', 'JobAzure', 'EndpointDestSftp', 'FolderClientData', 'JobOS400FullVirtualTerminal', 'JobSAPR3COPY', 'JobHadoopHDFSCommands', 'JobAWSBatch', 'ConnectionProfileMicrosoftPowerBISP', 'ConnectionProfileSnowflakeIdP', 'JobOS400Full', 'ConnectionProfileFileTransferLocal', 'ConnectionProfileAzureSynapse', 'ConnectionProfileAirbyte', 'ConnectionProfileFileTransferAzure', 'SiteStandard', 'ConnectionProfileInformaticaCS', 'JobAwsGlue', 'JobVMwarePower', 'JobAirbyte', 'JobAzureFunction', 'Tag', 'JobGCPDataflow', 'TagFolder', 'ConnectionProfileAwsCloudFormation

In [2]:
import time as t
from ctm_python_client.core.workflow import Workflow, WorkflowDefaults
from ctm_python_client.core.comm import Environment, EnvironmentMode
from aapi import *

workflow1 = Workflow(Environment.create_workbench('workbench'), WorkflowDefaults(run_as='workbench'))

workflow1.clear_all()
run_as_dummy = True

databricksLoad = JobDatabricks(
    'LoadForecasts',
    connection_profile='DATABRICKS',
    databricks_job_id='991955986358417',
    parameters='"params":{}',
    idempotency_token = 'tokeni_%%ORDERID',
    run_as_dummy= run_as_dummy
)

terraform_scale_databricks = JobTerraform(
    'TerraformScale',
    connection_profile='TERRAFORM',
    action='Run Workspace',
    workspace_name='dealership-demo',
    run_name= "demorun%%ORDERID",
    variables = [{"min_workers":"4"},{"max_workers":"10"}],
    run_as_dummy= run_as_dummy
)

ExtractInventoryHistory = JobDatabricks(
    'ExtractInventoryHistory',
    connection_profile='DATABRICKS',
    databricks_job_id='658383437534753',
    parameters='"params":{}',
    idempotency_token = 'token_%%ORDERID',
    run_as_dummy= run_as_dummy
)

web_service_job = JobWebServices(
    object_name='WeatherService',
    run_as_dummy=run_as_dummy,
    connection_profile='WeatherAPI',
    service='ForecastService',
    operation='GetWeather',
    request_type=JobWebServices.RequestType.FreeText,
    soap_request=['<SOAP-ENV>Request</SOAP-ENV>']
)


folder1 = Folder('Folder_Sanity_1',
                controlm_server='workbench',
                job_list=[databricksLoad, terraform_scale_databricks])
folder2 = Folder('Folder_Sanity_2',
                controlm_server='workbench',
                job_list=[ExtractInventoryHistory, web_service_job])

workflow1.add(folder1)
workflow1.add(folder2)

workflow1.build()
workflow1.deploy()
run = workflow1.run()
jobs = Workflow.get_jobs_list_of_folders(Environment.create_workbench('workbench'),server="workbench", folder="Folder_Sanity_*")

jobs1 = workflow1.get_jobs_draft_with_workflow_instance(server="workbench", folder="Folder_Sanity_*")
print(jobs)



[DEBUG] Deserializing class: Folder
[DEBUG] Raw data: {'Type': 'Folder', 'ControlmServer': 'workbench', 'RunAs': 'workbench', 'Jobs': [{'Type': 'Job:Databricks', 'ConnectionProfile': 'DATABRICKS', 'Databricks Job ID': '991955986358417', 'Parameters': '"params":{}', 'Idempotency Token': 'tokeni_%%ORDERID', 'RunAsDummy': True, 'RunAs': 'workbench', 'Name': 'Jobs1', 'object_name': 'Jobs1'}, {'Type': 'Job:Terraform', 'ConnectionProfile': 'TERRAFORM', 'Action': 'Run Workspace', 'Workspace Name': 'dealership-demo', 'Run Name': 'demorun%%ORDERID', 'RunAsDummy': True, 'RunAs': 'workbench', 'Variables': [{'min_workers': '4'}, {'max_workers': '10'}, {'UCM-WORKSPACEBODY3': ''}, {'UCM-WORKSPACEBODY1': ''}], 'Name': 'Jobs2', 'object_name': 'Jobs2'}], 'object_name': 'Folder_Sanity_1'}
[DEBUG] Extracted Type hint: Job:Databricks
[DEBUG] Resolved Type 'Job:Databricks' to class JobDatabricks

[DEBUG] Deserializing class: JobDatabricks
[DEBUG] Raw data: {'Type': 'Job:Databricks', 'ConnectionProfile': '

In [2]:
import time as t
from ctm_python_client.core.workflow import Workflow, WorkflowDefaults
from ctm_python_client.core.comm import Environment, EnvironmentMode
from aapi import *

workflow1 = Workflow(Environment.create_workbench('workbench'), WorkflowDefaults(run_as='workbench'))

workflow1.clear_all()
run_as_dummy = True

databricksLoad = JobDatabricks(
    'LoadForecasts',
    connection_profile='DATABRICKS',
    databricks_job_id='991955986358417',
    parameters='"params":{}',
    idempotency_token = 'tokeni_%%ORDERID',
    run_as_dummy= run_as_dummy
)

terraform_scale_databricks = JobTerraform(
    'TerraformScale',
    connection_profile='TERRAFORM',
    action='Run Workspace',
    workspace_name='dealership-demo',
    run_name= "demorun%%ORDERID",
    variables = [{"min_workers":"4"},{"max_workers":"10"}],
    run_as_dummy= run_as_dummy
)

ExtractInventoryHistory = JobDatabricks(
    'ExtractInventoryHistory',
    connection_profile='DATABRICKS',
    databricks_job_id='658383437534753',
    parameters='"params":{}',
    idempotency_token = 'token_%%ORDERID',
    run_as_dummy= run_as_dummy
)

web_service_job = JobWebServices(
    object_name='WeatherService',
    run_as_dummy=run_as_dummy,
    connection_profile='WeatherAPI',
    service='ForecastService',
    operation='GetWeather',
    request_type=JobWebServices.RequestType.FreeText,
    soap_request=['<SOAP-ENV>Request</SOAP-ENV>']
)


folder1 = Folder('Folder_Sanity_1',
                controlm_server='workbench',
                job_list=[databricksLoad, terraform_scale_databricks])
folder2 = Folder('Folder_Sanity_2',
                controlm_server='workbench',
                job_list=[ExtractInventoryHistory, web_service_job])

workflow1.add(folder1)
workflow1.add(folder2)

workflow1.build()
workflow1.deploy()

workflow2 = Workflow.get_jobs(Environment.create_workbench('workbench'),server="workbench", folder="Folder_Sanity_*")


# print(workflow1._definitions['Folder_Sanity_2'].job_list[1].dumps_aapi(indent=4))
# print(workflow2._definitions['Folder_Sanity_2'].job_list[1].dumps_aapi(indent=4))
print(workflow2)


ApiException: (500)
Reason: 
HTTP response headers: HTTPHeaderDict({'X-Content-Type-Options': 'nosniff', 'Pragma': 'no-cache', 'X-Frame-Options': 'SAMEORIGIN', 'Referrer-Policy': 'no-referrer', 'Strict-Transport-Security': 'max-age=31536000; includeSubDomains; preload', 'Cache-Control': 'no-cache, no-store, private, must-revalidate, max-age=0, no-transform', 'Content-Security-Policy': "object-src 'none'; base-uri 'self'; sandbox allow-scripts allow-popups allow-forms allow-top-navigation allow-presentation allow-same-origin allow-downloads allow-popups-to-escape-sandbox; report-uri /csp-violation-report-endpoint/; form-action 'self'; frame-ancestors 'self';", 'hstsMaxAgeSeconds': '31536000', 'X-XSS-Protection': '1; mode=block', 'Expect-CT': 'max-age=120, enforce', 'vary': 'origin,accept-encoding', 'Set-Cookie': 'JSESSIONID=F941664439616F7D8B1939EA25B5F82A; Path=/automation-api; Secure; HttpOnly', 'Server': 'Web Server', 'Content-Type': 'application/json', 'Transfer-Encoding': 'chunked', 'Date': 'Tue, 13 May 2025 06:47:19 GMT', 'Connection': 'close'})
HTTP response body: {
  "errors" : [ {
    "message" : "Failed to login: Incorrect username or password"
  } ]
}


In [None]:
import time as t
from ctm_python_client.core.workflow import Workflow, WorkflowDefaults
from ctm_python_client.core.comm import Environment
from aapi import *

workflow = Workflow(Environment.create_workbench('workbench'), WorkflowDefaults(run_as='workbench'))

workflow.clear_all()
run_as_dummy = True

databricksLoad = JobDatabricks(
    'LoadForecasts',
    connection_profile='DATABRICKS',
    databricks_job_id='991955986358417',
    parameters='"params":{}',
    idempotency_token='tokeni_%%ORDERID',
    run_as_dummy=run_as_dummy
)

terraform_scale_databricks = JobTerraform(
    'TerraformScale',
    connection_profile='TERRAFORM',
    action='Run Workspace',
    workspace_name='dealership-demo',
    run_name="demorun%%ORDERID",
    variables=[{"min_workers": "4"}, {"max_workers": "10"}],
    run_as_dummy=run_as_dummy
)

ExtractInventoryHistory = JobDatabricks(
    'ExtractInventoryHistory',
    connection_profile='DATABRICKS',
    databricks_job_id='658383437534753',
    parameters='"params":{}',
    idempotency_token='token_%%ORDERID',
    run_as_dummy=run_as_dummy
)

web_service_job = JobWebServices(
    object_name='WeatherService',
    run_as_dummy=run_as_dummy,
    connection_profile='WeatherAPI',
    service='ForecastService',
    operation='GetWeather',
    request_type=JobWebServices.RequestType.FreeText,
    soap_request=['<SOAP-ENV>Request</SOAP-ENV>']
)

query_job = JobDatabaseSQLScript(
    object_name='QueryWeather',
    run_as_dummy=run_as_dummy,
    sql_script='UPDATE weather SET forecast="sunny" WHERE city="Seattle"',
    connection_profile='db_profile'
)

sagemaker_job = JobAwsSageMaker(
    f'InventoryForecastModel',
    connection_profile='SAGEMAKER',
    pipeline_name='InferencePipeline',
    idempotency_token='Token_ControlM_for_SageMaker%%ORDERID',
    add_parameters='checked',
    parameters='{"Name":"input_file", "Value": "file"}',
    run_as_dummy=run_as_dummy,
    retry_pipeline_execution='unchecked',
    rerun_limit=Job.RerunLimit(times='5')
)

tableau_refresh = JobTableau(
    'TableauRefresh',
    connection_profile='TABLEAU',
    action="Refresh Datasource",
    datasource_name="inventory_forecast",
    run_as_dummy=run_as_dummy
)

folder1 = Folder('Folder_Sanity_1',
                 controlm_server='workbench',
                 job_list=[databricksLoad, terraform_scale_databricks, databricksLoad])

folder2 = Folder('Folder_Sanity_2',
                 controlm_server='workbench',
                 job_list=[web_service_job, query_job, sagemaker_job, tableau_refresh])

workflow.add(folder1)
workflow.add(folder2)

workflow.build()
workflow.deploy()

workflow2 = Workflow.get_jobs(Environment.create_workbench('workbench'), server="workbench", folder="Folder_Sanity_*")

print(workflow2)

# Folder_Sanity_1
print("=== Folder_Sanity_1 - Job 0 (databricksLoad) ===")
print(workflow._definitions['Folder_Sanity_1'].job_list[0].dumps_aapi(indent=4))
print(workflow2._definitions['Folder_Sanity_1'].job_list[0].dumps_aapi(indent=4))

print("=== Folder_Sanity_1 - Job 1 (terraform_scale_databricks) ===")
print(workflow._definitions['Folder_Sanity_1'].job_list[1].dumps_aapi(indent=4))
print(workflow2._definitions['Folder_Sanity_1'].job_list[1].dumps_aapi(indent=4))

print("=== Folder_Sanity_1 - Job 2 (databricksLoad again) ===")
print(workflow._definitions['Folder_Sanity_1'].job_list[2].dumps_aapi(indent=4))
print(workflow2._definitions['Folder_Sanity_1'].job_list[2].dumps_aapi(indent=4))

# Folder_Sanity_2
print("=== Folder_Sanity_2 - Job 0 (web_service_job) ===")
print(workflow._definitions['Folder_Sanity_2'].job_list[0].dumps_aapi(indent=4))
print(workflow2._definitions['Folder_Sanity_2'].job_list[0].dumps_aapi(indent=4))

print("=== Folder_Sanity_2 - Job 1 (query_job) ===")
print(workflow._definitions['Folder_Sanity_2'].job_list[1].dumps_aapi(indent=4))
print(workflow2._definitions['Folder_Sanity_2'].job_list[1].dumps_aapi(indent=4))

print("=== Folder_Sanity_2 - Job 2 (sagemaker_job) ===")
print(workflow._definitions['Folder_Sanity_2'].job_list[2].dumps_aapi(indent=4))
print(workflow2._definitions['Folder_Sanity_2'].job_list[2].dumps_aapi(indent=4))

print("=== Folder_Sanity_2 - Job 3 (tableau_refresh) ===")
print(workflow._definitions['Folder_Sanity_2'].job_list[3].dumps_aapi(indent=4))
print(workflow2._definitions['Folder_Sanity_2'].job_list[3].dumps_aapi(indent=4))


<ctm_python_client.core.workflow.Workflow object at 0x0000028D4E4A3D10>
=== Folder_Sanity_1 - Job 0 (databricksLoad) ===
{
    "Type": "Job:Databricks",
    "RunAs": "workbench",
    "RunAsDummy": true,
    "ConnectionProfile": "DATABRICKS",
    "Databricks Job ID": "991955986358417",
    "Parameters": "\"params\":{}",
    "Idempotency Token": "tokeni_%%ORDERID"
}
{
    "Type": "Job:Databricks",
    "RunAs": "workbench",
    "RunAsDummy": true,
    "ConnectionProfile": "DATABRICKS",
    "Databricks Job ID": "991955986358417",
    "Parameters": "\"params\":{}",
    "Idempotency Token": "tokeni_%%ORDERID"
}
=== Folder_Sanity_1 - Job 1 (terraform_scale_databricks) ===
{
    "Type": "Job:Terraform",
    "RunAs": "workbench",
    "Variables": [
        {
            "min_workers": "4"
        },
        {
            "max_workers": "10"
        }
    ],
    "RunAsDummy": true,
    "ConnectionProfile": "TERRAFORM",
    "Action": "Run Workspace",
    "Workspace Name": "dealership-demo",
    "

In [None]:
import json
import sys
from ctm_python_client.core.workflow import Workflow, WorkflowDefaults
from ctm_python_client.core.comm import Environment
from aapi import *

def dump_aapi_clean(job):
    """Return a standardized JSON string of the AAPI dump for reliable comparison."""
    return json.dumps(json.loads(job.dumps_aapi()), indent=4, sort_keys=True)

def compare_jobs(job1, job2, folder_name, job_index):
    """Compare two jobs' AAPI output and print differences, if any."""
    aapi1 = dump_aapi_clean(job1)
    aapi2 = dump_aapi_clean(job2)
    job_type = type(job1).__name__

    if aapi1 != aapi2:
        print(f"❌ MISMATCH in {folder_name} - Job {job_index} ({job_type})")
        print("---- Expected ----")
        print(aapi1)
        print("---- Actual ----")
        print(aapi2)
        return False
    else:
        print(f"✅ Match: {folder_name} - Job {job_index} ({job_type})")
        return True

def compare_folders(workflow1, workflow2, folder_name):
    """Compare all jobs in two folders with the same name."""
    jobs1 = workflow1._definitions[folder_name].job_list
    jobs2 = workflow2._definitions[folder_name].job_list

    if len(jobs1) != len(jobs2):
        print(f"❌ Job count mismatch in {folder_name}: {len(jobs1)} vs {len(jobs2)}")
        return False

    all_matched = True
    for i, (job1, job2) in enumerate(zip(jobs1, jobs2)):
        if not compare_jobs(job1, job2, folder_name, i):
            all_matched = False
    return all_matched

def run_tests():
    env = Environment.create_workbench('workbench')
    workflow_actual = Workflow.get_jobs(env, server="workbench", folder="Folder_Sanity_*")

    workflow = Workflow(env, WorkflowDefaults(run_as='workbench'))
    workflow.clear_all()
    run_as_dummy = True

    # Define jobs
    databricksLoad = JobDatabricks(
        'LoadForecasts',
        connection_profile='DATABRICKS',
        databricks_job_id='991955986358417',
        parameters='"params":{}',
        idempotency_token='tokeni_%%ORDERID',
        run_as_dummy=run_as_dummy
    )
    
    
    jobEvent = JobCommand('JobEvent', command='ls -lt')
    
    add_events = AddEvents(events=[
        EventOutAdd(event="e2", date=Event.Date.NoDate),
        EventOutAdd(event="e3", date=Event.Date.NoDate)
    ])
    jobEvent.events_to_add = add_events
    
    delete_events = DeleteEvents(events=[
        Event(event="e5", date=Event.Date.NextOrderDate),
        Event(event="e4", date=Event.Date.NextOrderDate)
    ])
    jobEvent.events_to_delete = delete_events
    
    wait_events = WaitForEvents(events=[
        Event(event="e1"),
        Event(event="e2"),
        Event(event="e3", date=Event.Date.AnyDate)])
    
    jobEvent.event_list = wait_events

    sagemaker_job = JobAwsSageMaker(
        'InventoryForecastModel',
        connection_profile='SAGEMAKER',
        pipeline_name='InferencePipeline',
        idempotency_token='Token_ControlM_for_SageMaker%%ORDERID',
        add_parameters='checked',
        parameters='{"Name":"input_file", "Value": "file"}',
        run_as_dummy=run_as_dummy,
        retry_pipeline_execution='unchecked',
        rerun_limit=Job.RerunLimit(times='5')
    )

    tableau_refresh = JobTableau(
        'TableauRefresh',
        connection_profile='TABLEAU',
        action="Refresh Datasource",
        datasource_name="inventory_forecast",
        run_as_dummy=run_as_dummy
    )

    # Define folders
    folder1 = Folder('Folder_Sanity_1',
                     controlm_server='workbench',
                     job_list=[databricksLoad, jobEvent])

    folder2 = Folder('Folder_Sanity_2',
                     controlm_server='workbench',
                     job_list=[sagemaker_job, tableau_refresh])

    workflow.add(folder1)
    workflow.add(folder2)

    # Deploy workflow
    workflow.build()
    workflow.deploy()

    workflow_actual = Workflow.get_jobs(env, server="workbench", folder="Folder_Sanity_*")

    # Compare folders
    folders_to_test = ['Folder_Sanity_1', 'Folder_Sanity_2']
    all_passed = True
    for folder_name in folders_to_test:
        if not compare_folders(workflow, workflow_actual, folder_name):
            all_passed = False

    if not all_passed:
        print("❌ One or more job comparisons failed.")
        sys.exit(1)

    print("✅ All job comparisons passed.")

if __name__ == "__main__":
    run_tests()


✅ Match: Folder_Sanity_1 - Job 0 (JobDatabricks)
✅ Match: Folder_Sanity_1 - Job 1 (JobCommand)
✅ Match: Folder_Sanity_2 - Job 0 (JobAwsSageMaker)
✅ Match: Folder_Sanity_2 - Job 1 (JobTableau)
✅ All job comparisons passed.
