# Configuration
First, we specify the credentials and configuration to use with GCE.

In [1]:
import turbine
import google.oauth2

config = turbine.GCEConfig(
    project_id='turbine-demo',
    zone='us-central1-a',
    credentials=google.oauth2.service_account.Credentials.from_service_account_file('../../auth/turbine-demo-34dbd729c354.json'),
    service_account = "turbine-demo-worker@turbine-demo.iam.gserviceaccount.com",   
)

* `project_id` is the (already existing) GCE project in which to provision all resources.
* `zone` is the GCE zone in which to provision all resources.
* `credentials` are the credentials to use in the controller to provision resources. It is sufficient for these credentials to have "Compute Admin", "Pub/Sub Admin", and (to initialize GCS for this example) "Storage Object Admin" and "Storage Admin" roles assigned.
* `service_account` is the email of the service account to use for all VMs. It is sufficient for these credentials to have "Compute Admin", "Logs Writer", "Pub/Sub Subscriber", "Storage Object Creator", "Storage Object Viewer" roles assigned.

If one is especially motivated, one could create a custom role for each of the controller and the shim for fine-grained permissions.

# Initializing an engine
The central construct in Turbine is a `GCEEngine`, which provides an API both to create tasks and to create VMs on the cloud to execute tasks. Let's create a `GCEEngine` now:

In [2]:
example_engine = turbine.GCEEngine('example-engine', 'gcr.io/turbine-demo/github.com/kasekopf/turbine:7907e19', config)

We provide the engine with a unique identifier (`example-engine`), a docker image to run on all VMs (`gcr.io/turbine-demo/github.com/kasekopf/turbine:d091bed`, produced from [this Dockerfile](Dockerfile)), and the GCE configuration information (`config`). Note that the provided docker image must have `turbine` installed, and any `ENTRYPOINT` set on the docker image will be ignored.

After creating the engine, we prepare it to receive tasks by provisioning a task queue. This sets up a Pub/Sub topic and an associated subscription to use.

In [3]:
example_engine.prepare_queue()

Created topic projects/turbine-demo/topics/example-engine
Created subscription projects/turbine-demo/subscriptions/example-engine


# Adding tasks
In order to demonstrate that tasks can interact with Google Cloud storage, we create a simple blob file in a new bucket.

In [4]:
import google.cloud.storage
import random
import string

storage_client = google.cloud.storage.Client(project=config.project_id, credentials=config.credentials)
bucket = storage_client.create_bucket("turbine-demo-" + ''.join(random.choice(string.ascii_lowercase) for _ in range(10)))
bucket.blob("cloud_input_file.txt").upload_from_string("Some input data")

We now assign some tasks to the engine. Each task consists of:
1. A script to execute as the task.
2. A list of files to download from GCS to the script working directory before executing the task.
3. A list of files to upload to GCS from the script working directory after executing the task.

For example, we add here a task that:
1. Downloads the input file we created above from GCS to `abc/xyz/input.txt`.
2. Sleeps for 10 seconds to simulate work.
3. Prints a message to STDOUT.
4. Prints the working directory contents to STDERR.
5. Computes the word-count of the input file.
6. Uploads both the input file and the word count to a worker-specific folder in GCS.

In [5]:
for i in range(4):
    example_engine.add_task(
        "\n".join([
            "#!/bin/bash",
            "sleep 10",
            "echo \"Processing task %d\"" % (i + 1),
            "ls 1>&2",
            "wc abc/xyz/input.txt > abc/xyz/output.txt",
        ]),
        inputs=[("abc/xyz/input.txt", "gs://{bucket}/cloud_input_file.txt".format(bucket=bucket.name))],
        outputs=[("abc/xyz", "gs://{bucket}/worker_{num}".format(bucket=bucket.name, num=(i+1)))],
    )

# Executing tasks
Given these tasks, we now want to instantiate VMs on the Google Cloud to execute the tasks. We first specify the type of VMs we want to use:

In [6]:
example_engine.prepare_workers(machine_type="n1-standard-1", preemptible=True)

In this example, we want to use `n1-standard-1` VMs that can be preempted. It is also possible to requisition GPUs at this stage. Note that we have not yet started any VMs and so are not yet being charged for their computation. This command merely creates an instance template. If desired, you can manually provision VMs using this template.

We are now ready to start VMs and complete tasks! With the following command, we start 2 VMs to process tasks. Although we have specified preemptible VMs, each VM will automatically try to remake itself if preempted. Each VM will also automatically deprovision itself when the queue is empty, to save money. (If you would like the VMs to keep running when there are no tasks remaining and, say, process new tasks as they are added, add `delete_when_done=False` as an argument to `prepare_workers`.)

In the future, once I debug the REST API call, the gcloud argument will not be required and the code below will actually start the VMs. For the moment, please copy the gcloud-command produced by the code below into the terminal. This has the accidental benefit of confirming that you actually want to spend money.

In [7]:
example_engine.start_workers(2, gcloud=True)

'gcloud compute instance-groups managed create example-engine --base-instance-name example-engine --size 2 --template projects/turbine-demo/global/instanceTemplates/example-engine --zone us-central1-a --project turbine-demo'

Each of the 2 VMs will start up, begin to process tasks, and deprovision itself once there are no tasks remaining. We can confirm this by waiting 2 minutes and ensuring no VMs remain.

In [8]:
import time
print("Number of VMs initially in instance group: " + str(example_engine.workers()[0].info["targetSize"]))
time.sleep(120)
print("Number of VMs remaining in instance group: " + str(example_engine.workers()[0].info["targetSize"]))

Number of VMs initially in instance group: 2
Number of VMs remaining in instance group: 0


By searching for `text:example-engine` in the [Stackdriver Logging](https://console.cloud.google.com/logs/viewer) Global log, we can see the logs from each VM. These include the tasks processed, the STDOUT and STDERR produced by each task, and any errors seen while interacting with GCS. 

The log output from this example can be viewed [here](example_logs.txt). In this case, we see that each of the two workers initialized, processed two messages, and deprovisioned themselves.

We can further confirm that the tasks were executed properly by consulting our Google Cloud Storage Bucket.

In [9]:
print([blob.name for blob in bucket.list_blobs()])
print(bucket.blob("worker_2/output.txt").download_as_string())

['cloud_input_file.txt', 'worker_1/input.txt', 'worker_1/output.txt', 'worker_2/input.txt', 'worker_2/output.txt', 'worker_3/input.txt', 'worker_3/output.txt', 'worker_4/input.txt', 'worker_4/output.txt']
b' 0  3 15 abc/xyz/input.txt\n'


The following command cleans up the empty managed instance group left behind by the completed workers. If desired, the same `GCEEngine` object can continue to be used to add new tasks and run new workers.

In [10]:
example_engine.cleanup_workers()

Deleting instance group manager example-engine


Finally, the following command deletes all resources provisioned by the engine.

In [11]:
example_engine.cleanup()

Deleted subscription projects/turbine-demo/subscriptions/example-engine
Deleted topic projects/turbine-demo/topics/example-engine
Deleted instance template projects/turbine-demo/global/instanceTemplates/example-engine
