In [152]:
%pip install docker
import yaml
import docker
import re
import os
import subprocess
from concurrent.futures import ThreadPoolExecutor
from time import sleep

Note: you may need to restart the kernel to use updated packages.


In [153]:
def parse_yaml_script_file(file_path):
    with open(file_path) as f:
        try:
            text = f.read()
            parsed_data = yaml.safe_load(text)
            return parsed_data
        except FileNotFoundError:
            print('File not found')
            return None
        except yaml.YAMLError as exc:
            print(exc)
            return None

In [154]:
def get_variables_and_parameters(parsed_data):
    initialVariables = {}
    for key in parsed_data.get('variables', []):
        initialVariables[key.get('name')] = key.get('value')
    variables = initialVariables.copy()
    parameters = {}
    for key in parsed_data.get('parameters', []):
        parameters[key.get('name')] = key.get('value')
        
    return initialVariables, variables, parameters

In [155]:

def process_steps(steps, initialVariables, variables, parameters, job):
    initialVariables_job, variables_job, parameters_job = get_variables_and_parameters(job)

    initialVariables.update(initialVariables_job)
    variables.update(variables_job)
    parameters.update(parameters_job)

    for step in steps:
        if 'script' in step:
            script_content = step['script']
            expanded_script = expand_variables(script_content, variables, initialVariables)
            expanded_script = expand_parameters(expanded_script, parameters)
            execute_script(expanded_script, None, "script")
        elif 'bash' in step:
            bash_command = step['bash']
            expanded_bash_command = expand_variables(bash_command, variables, initialVariables)
            expanded_bash_command = expand_parameters(expanded_bash_command, parameters)
            print(f"Executing bash command: {expanded_bash_command}\n")
            run_bash(expanded_bash_command, variables)
        elif 'task' in step:
            steps = job.get('steps')
            script_content = step['task']
            for step in steps:
                inputs = step.get('inputs')
                filepath = inputs.get('filepath')
                if filepath:
                    filepath = expand_parameters(filepath, parameters=parameters)
                command = inputs.get('command')
                command = expand_parameters(command, parameters=parameters)
                command = expand_variables(command, variables=variables, initialVariables=initialVariables)
                execute_script(command, filepath, "task")
        else:
            print("Unsupported step type")
    return job

def expand_variables(content, variables, initialVariables):
    for key in variables:
        content = re.sub(fr'\${{\s*{{\s*variables.{key}\s*}}\s*}}', str(initialVariables[key]), content)
        content = re.sub(fr"\$\(\s*{key}\s*\)", str(variables[key]), content)
    return content

def expand_parameters(content, parameters):
    for key in parameters:
        content = re.sub(fr'\${{\s*{{\s*parameters.{key}\s*}}\s*}}', str(parameters[key]), content)
    return content

def run_bash(bash_command, variables):
    match = re.search(r"\[task\.setvariable variable=(\w+)\](\w+)", bash_command)
    if match:
        print(f"---------------------Updating variable {match.group(1)} to {match.group(2)}----------------------")
        variable_name = match.group(1)
        new_value = match.group(2)
        for key in variables:
            if key == variable_name:
                variables[key] = new_value

image_name = "my_image"
dockerfile_path = "./C"

def execute_script(script, filePath, step_type):
    client = docker.from_env()

    try:
        print(f"Building Docker image '{image_name}'...")
        client.images.build(path=dockerfile_path, tag=image_name, rm=True)
        script_new = script.replace("\n", ";")
        
        if (step_type is "task"):
            container = client.containers.run(
                image=image_name,
                command=['/bin/bash', '-c', script_new],
                detach=True,
                volumes={os.path.join(os.path.abspath(os.getcwd()), filePath): {"bind": "/usr/src/app", "mode": "rw"}}
            )
        else:
            container = client.containers.run(image_name, command=['/bin/bash', '-c', script_new], detach=True, name="container")
        container.wait()

        output = container.logs().decode('utf-8')
        print(f"Output of the command:\n{output}")

        container.stop()
        container.remove()

    except docker.errors.BuildError as e:
        print(f"Error building Docker image: {e}")
    except docker.errors.APIError as e:
        print(f"Error interacting with Docker: {e}")

def execute_script_without_docker(script):
    for line in script.split('\n'):
        result = subprocess.run(line, shell=True, capture_output=True, text=True)
        

def run_docker_command(command):
    result = subprocess.run(command, shell=True, capture_output=True, text=True)
    return result.stdout, result.stderr, result.returncode


  if (step_type is "task"):


In [156]:
import sys


yaml_file_path = 'test1.yml'
parsed_data = parse_yaml_script_file(yaml_file_path)

def is_job_complete(completed_jobs, job_name):
    for job in completed_jobs:
        if job.get('displayName') == job_name:
            return True
    return False

if(parsed_data):
    initialVariables, variables, parameters = get_variables_and_parameters(parsed_data)
    print(parameters)
    stages = parsed_data.get('stages', [])
    for stage in stages:
        jobs = stage.get('jobs', [])

        completed_jobs = []
        started_jobs = jobs.copy()
        with ThreadPoolExecutor() as executor:
            futures = []
            while len(completed_jobs) != len(jobs):
                ids_jobs = []
                ids_futures = []
                for idx, job in enumerate(started_jobs):
                    job_name = job.get("displayName", [])
                    job_condition = not job.get('dependsOn') or is_job_complete(completed_jobs, job.get('dependsOn'))
                    if not job.get('dependsOn') or is_job_complete(completed_jobs, job.get('dependsOn')):
                        print("Executing job: " + job.get("displayName", []))
                        steps = job.get('steps', [])
                        futures.append(executor.submit(process_steps, steps, initialVariables, variables, parameters, job))
                        ids_jobs.append(idx)
                
                for id in ids_jobs:
                    started_jobs.pop(id)
                
                for idx, future in enumerate(futures):
                    job = future.result()
                    print(f"Completed job {job.get('displayName')}")
                    completed_jobs.append(job)
                    ids_futures.append(idx)
                
                for id in ids_futures:
                    futures.pop(id)

{'image': 'windows-latest'}
Executing job: build
Building Docker image 'my_image'...
Output of the command:
initialValue
initialValue

Executing bash command: echo "##vso[task.setvariable variable=one]secondValue"

---------------------Updating variable one to secondValue----------------------
Building Docker image 'my_image'...
Output of the command:
initialValue
secondValue

Completed job build
Executing job: Test Variables
Building Docker image 'my_image'...
Output of the command:
Testing Parameters:
Image parameter value: windows-latest

Building Docker image 'my_image'...
Output of the command:
Testing Variables:
Variable one value: initialValue
Variable two value: initialValue
Variable one using secondValue
Variable two using initialValue

Completed job Test Variables
