In [10]:
from datetime import datetime
import yaml
import os
import argparse
import os
import time
import googleapiclient.discovery
from six.moves import input
from datetime import datetime


## Set up configs

In [None]:
with open('./.config.yaml') as file:
    config=yaml.load(file, Loader=yaml.FullLoader)


PROJECT_ID = config.get("PROJECT_ID")
ZONE = config.get("ZONE")
MACHINE_TYPE = config.get("MACHINE_TYPE")
VM_QUOTAS = config.get("VM_QUOTAS")
NETWORK = config.get("NETWORK")
SERVICE_ACCOUNT = config.get("SERVICE_ACCOUNT")
BUCKET_URI = f"gs://{PROJECT_ID}/temp"


print(f"Current project is {PROJECT_ID}")

In [None]:

# [START list_instances]
def list_instances(compute, project, zone):
    result = compute.instances().list(project=project, zone=zone).execute()
    return result['items'] if 'items' in result else None
# [END list_instances]


# [START create_instance]
def create_instance(compute, project, zone, name, bucket, startup_script, machine_type, network, service_account):
    # Get the latest Debian Jessie image.
    image_response = compute.images().getFromFamily(
        project='debian-cloud', family='debian-11').execute()
    source_disk_image = image_response['selfLink']

    # Configure the machine
    machine_type = f"zones/{zone}/machineTypes/{machine_type}"

    config = {
        'name': name,
        'machineType': machine_type,

        # Specify the boot disk and the image to use as a source.
        'disks': [
            {
                'boot': True,
                'autoDelete': True,
                'initializeParams': {
                    'sourceImage': source_disk_image,
                }
            }
        ],

        # Specify a network interface with NAT to access the public
        # internet.
        'networkInterfaces': [{
            'network': f'global/networks/{network}',
            'accessConfigs': [
                {'type': 'ONE_TO_ONE_NAT', 'name': 'External NAT'}
            ]
        }],

        # Allow the instance to access cloud storage and logging.
        'serviceAccounts': [{
            'email': f'{service_account}',
            'scopes': [
                'https://www.googleapis.com/auth/devstorage.read_write',
                'https://www.googleapis.com/auth/logging.write',
                'https://www.googleapis.com/auth/bigquery',
                'https://www.googleapis.com/auth/bigquery.insertdata',
                'https://www.googleapis.com/auth/compute'
            ]
        }],

        # Metadata is readable from the instance and allows you to
        # pass configuration from deployment scripts to instances.
        'metadata': {
            'items': [{
                # Startup script is automatically executed by the
                # instance upon startup.
                'key': 'startup-script',
                'value': startup_script
            }]
        }
    }

    return compute.instances().insert(
        project=project,
        zone=zone,
        body=config).execute()
# [END create_instance]


# [START delete_instance]
def delete_instance(compute, project, zone, name):
    return compute.instances().delete(
        project=project,
        zone=zone,
        instance=name).execute()
# [END delete_instance]


# [START wait_for_operation]
def wait_for_operation(compute, project, zone, operation):
    print('Waiting for operation to finish...')
    while True:
        result = compute.zoneOperations().get(
            project=project,
            zone=zone,
            operation=operation).execute()

        if result['status'] == 'DONE':
            print("done.")
            if 'error' in result:
                raise Exception(result['error'])
            return result

        time.sleep(1)
# [END wait_for_operation]


# [START run]
def main(project, bucket, zone, instance_name, startup_script, machine_type, network, service_account, wait=True):
    compute = googleapiclient.discovery.build('compute', 'v1')

    print('Creating instance.')

    operation = create_instance(compute, project, zone, instance_name, bucket, startup_script, machine_type, network, service_account)
    wait_for_operation(compute, project, zone, operation['name'])

    instances = list_instances(compute, project, zone)

    print('Instances in project %s and zone %s:' % (project, zone))
    for instance in instances:
        print(' - ' + instance['name'])

    print("""
Instance created.
It will take a minute or two for the instance to complete work.
""".format(bucket))

def _get_number_instances(compute, project, zone) -> int:
    compute = googleapiclient.discovery.build('compute', 'v1')
    instances = list_instances(compute, project, zone)
    return len(instances)


In [None]:
startup_script = open('startup-script.sh', 'r').read()

## Run parallel workers

'20221007-134944'

In [None]:
worker_num = 1
for var in range(19):
    
    instance_name = f"worker-{worker_num}-{datetime.now().strftime("%Y%m%d-%H%M%S")}"
    print(f"> starting {instance_name}")


    # Replace startup script parameters
    worker_startup_script = startup_script.format(
        var=var,
        project=PROJECT_ID,
        dataset="test",
        instance_name=instance_name,
        zone=ZONE)


    # Start compute engine VM
    #print(f"startup script  = {worker_startup_script}")
    main(
        project=PROJECT_ID, 
        bucket=BUCKET_URI, 
        zone=ZONE, 
        instance_name=instance_name, 
        startup_script=worker_startup_script, 
        machine_type = MACHINE_TYPE,
        network = NETWORK, service_account= SERVICE_ACCOUNT)



    # Limit the number of instances
    compute = googleapiclient.discovery.build('compute', 'v1')
    num_instances = _get_number_instances(compute, PROJECT_ID, ZONE)
    
    # Ensure the instances are stoped before starting new ones
    while num_instances > VM_QUOTAS-4:
        time.sleep(60)
        compute = googleapiclient.discovery.build('compute', 'v1')
        num_instances = _get_number_instances(compute, PROJECT_ID, ZONE)
    # Increment worker num
    worker_num += 1
