In [None]:
%pip install azureml-pipeline

In [5]:
# Parameters of the cpu
vm_size = "Standard_DS3_v2"
vm_priority = "Dedicated"
number_of_max_instances = 4

# Parameters of the broker
brokerAddress = "\'20.8.72.226\'"
machineTypeKafka = "DS2_v2"

# Parameter of machine
nb_instances_per_cpu = 4 # Has to be equal to the number of max instances
nb_compute = 8 # Number of virtual machines for production and consumption

# Calculated parameters for number of machines needed
cpu_all = 2 * nb_compute + 1 # 1 for the manager

In [1]:
from azure.ai.ml.entities import AmlCompute, PipelineJobSettings
import datetime
from azure.identity import DefaultAzureCredential
import os
from azure.ai.ml import MLClient
from azureml.core import Workspace
import time
# the dsl decorator tells the sdk that we are defining an Azure ML pipeline
from azure.ai.ml import dsl

ws = Workspace.from_config()
credential = DefaultAzureCredential()
nb_compute_prod_cons = nb_compute

# Execute the script
%run setenv.py

# Get a handle to the workspace
ml_client = MLClient(
    credential=credential,
    subscription_id= os.environ['subscription_id'],
    resource_group_name= os.environ['resource_group'],
    workspace_name= os.environ['workspace_name']
)

# Function to create a cluster-------------------------------------------------

def create_cluster(ml_client, vm_size, vm_priority, number_of_max_instances):
    # Create a unique name for the cluster
    cpu_compute_target = "cpu-cluster-kafka-" + datetime.datetime.now().strftime("%Y%m%d%H%M%S")

    cpu_cluster = AmlCompute(
            name=cpu_compute_target,
            # Azure ML Compute is the on-demand VM service
            type="amlcompute",
            # VM Family
            size=vm_size,
            # Minimum running nodes when there is no job running
            min_instances=0,
            # Nodes in cluster
            max_instances=number_of_max_instances,
            # How many seconds will the node running after the job termination
            idle_time_before_scale_down=180,
            # Dedicated or LowPriority. The latter is cheaper but there is a chance of job termination
            tier=vm_priority,
        )
        # Now, we pass the object to MLClient's create_or_update method
    ml_client.compute.begin_create_or_update(cpu_cluster)
    return ml_client.compute.get(cpu_compute_target)

# End of the function to create a cluster--------------------------------------

# Create the clusters---------------------------------------------------------

cpu_list_available = []

for _ in range(cpu_all):
    cpu = create_cluster(ml_client, vm_size, vm_priority, number_of_max_instances)
    cpu_list_available.append(cpu)
    time.sleep(1)

time.sleep(5)

# End of the creation of the clusters-----------------------------------------

# Define the pipelines--------------------------------------------------------

@dsl.pipeline(
    name="Manager"+machineTypeKafka,
    description="Manager pipeline",
    default_compute_target=cpu_list_available.pop().name,
)
def pipeline_manager():
    manager = ml_client.components.get("manager")

    manager = manager(
            brokerAddress=brokerAddress,
            nb_consumers_producers=(nb_compute_prod_cons)*nb_instances_per_cpu,
    )
    return {}

@dsl.pipeline(
    name="Consumer"+machineTypeKafka,
    description="Consumer pipeline",
    default_compute_target=cpu_list_available[0].name,
)
def pipeline_consumer():
    consumer = ml_client.components.get("consumer")

    for i in range(nb_instances_per_cpu):
        consumer = consumer(
            num_consumer = str(i),
            num_machine = str(nb_compute_prod_cons),
            brokerAddress=brokerAddress,
        )

    return {}

@dsl.pipeline(
    name="Producer"+machineTypeKafka,
    description="Producer pipeline",
    default_compute_target=cpu_list_available[0].name,
)
def pipeline_producer():
    producer = ml_client.components.get("producer")

    for i in range(nb_instances_per_cpu):
        producer = producer(
            num_producer = str(i),
            num_machine = str(nb_compute_prod_cons),
            brokerAddress=brokerAddress,
            machineKafka=machineTypeKafka,
        )

    return {}

# End of the definition of the pipelines--------------------------------------

# Create the pipelines--------------------------------------------------------

pipeline_manager = pipeline_manager()

while nb_compute_prod_cons > 0:
    nb_compute_prod_cons -= 1
    # Create the consumer pipeline
    pipeline_cons = pipeline_consumer()

    # Set the compute target and pop it from the list
    pipeline_cons.settings = PipelineJobSettings(
        default_compute=cpu_list_available.pop().name
    )

    # Submit the pipeline
    ml_client.jobs.create_or_update(
        pipeline_cons,
        # Project's name
        experiment_name="consumer"+machineTypeKafka+str(nb_compute_prod_cons),
    )

    # Create the producer pipeline
    pipeline_prod = pipeline_producer()

    # Set the compute target and pop it from the list
    pipeline_prod.settings = PipelineJobSettings(
        default_compute=cpu_list_available.pop().name
    )

    # Submit the pipeline
    ml_client.jobs.create_or_update(
        pipeline_prod,
        # Project's name
        experiment_name="producer"+machineTypeKafka+str(nb_compute_prod_cons),
    )

    # Decrease the number of consumers and producers
    

# End of the creation of the pipelines----------------------------------------

# Create the manager pipeline-------------------------------------------------


# Submit the pipeline
ml_client.jobs.create_or_update(
    pipeline_manager,
    # Project's name
    experiment_name="manager"+machineTypeKafka,
)

The history saving thread hit an unexpected error (OperationalError('no such table: history')).History will not be written to the database.


NameError: name 'nb_compute' is not defined