Skip to content

Language Extensibility

Marie Hoeger edited this page Oct 11, 2018 · 14 revisions

Language Extensibility

In order to bring Language Extensibility to functions, we've split the functions runtime into the host (which manages function events), and language worker processes (which runs user functions for a given language). These two pieces communicate using gRPC as a messaging layer.

Language Worker Implementations

Worker Configuration Conventions

We are using Microsoft.Extensions.Configuration in order to handle the worker configuration. Here's some sample configuration for node.

{
  languageWorkers: {
    node: {
      workerDirectoy:  <path to parent folder that has the worker> 
      arguments: <debugging port> // --inspect=5858
      ... other worker specific settings
    }
  }
}

This allows the above json configuration, as well as configuration via:

  • Command line, /languageWorkers:node:arguments= --inspect=5858
  • Environment variable, languageWorkers:node:workerDirectory = "d:/git/worker"

gRPC Contract

For gRPC, we currently have one call which creates a duplex stream of StreamingMessages, which are the protobuf messages defined below. All messages between host and worker flow across this bi-directional stream. The host runs the gRPC server, and the language worker processes act as clients.

service FunctionRpc {
 rpc EventStream (stream StreamingMessage) returns (stream StreamingMessage) {}
}

Protobuf Contract

Every message that crosses the channel is a StreamingMessage, which contains a request id and a content union that contains one sub-message.

This contract lives here: https://github.com/Azure/azure-functions-language-worker-protobuf/blob/dev/src/proto/FunctionRpc.proto

message StreamingMessage {
  string request_id = 1;
  oneof content {
    // Worker signals to host that it has been started
    StartStream start_stream = 20; 

    // Host sends capabilities/init data to worker
    WorkerInitRequest worker_init_request = 17;

    // Worker responds after initializing with its capabilities & status
    WorkerInitResponse worker_init_response = 16;

    // Host sends required metadata to worker to load function
    FunctionLoadRequest function_load_request = 8;

    // Worker responds after loading with the load result
    FunctionLoadResponse function_load_response = 9;
    
    // Host sends invocation information (function id, binding data, parameters) to worker
    InvocationRequest invocation_request = 4;

    // Worker sends response to host
    InvocationResponse invocation_response = 5;

    // Structured log from the worker based off of the ILogger interface
    RpcLog rpc_log = 2;
  }
}

Implementing a Language Worker

At it's core, the language worker needs to provide event handlers for streaming messages. Here's an example in node:

eventStream.on('data', (msg) => {
      let event = <string>msg.content;       // find the type of streaming message
      let eventHandler = (<any>this)[event]; // find the handler on the class for the streaming message type
      if (eventHandler) {
        eventHandler.apply(this, [msg.requestId, msg[event]]);  // invoke the handler with the id and message
      } else {
        console.error(`Worker ${workerId} had no handler for message '${event}'`)
      }
    });
...
  // example of a streaming message handler for workerInitRequest
  public workerInitRequest(requestId: string, msg: rpc.WorkerInitRequest) {
    // nothing to be done, so write the workerInitResponse back to the stream
    this._eventStream.write({
      requestId: requestId,
      workerInitResponse: {
        result: {
          status: Status.Success
        }
      }
    });
  }

More message handler implementations here: https://github.com/Azure/azure-functions-nodejs-worker/blob/dev/src/WorkerChannel.ts

Basic Host/Worker Lifecycle

  1. Start Host
  2. Start gRPC server
  3. For each function, Host creates a Worker (via IWorkerProvider) if necessary. One Worker process per language.
  4. StartStream - Worker connects to Host gRPC server
  5. WorkerInitRequest, WorkerInitResponse - Host and Worker exchange version info & capabilities
  6. FunctionLoadRequest, FunctionLoadResponse - Host sends Worker function metadata, Worker loads function
  7. InvocationRequest, InvocationResponse - Host sends function id, binding data, parameters, Worker response with function result
Clone this wiki locally
You can’t perform that action at this time.