Skip to content

Commit

Permalink
[BEAM-8201] Pass all other endpoints through provisioning service. (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
robertwb committed Feb 27, 2020
1 parent 8b69513 commit 921a9a8
Show file tree
Hide file tree
Showing 7 changed files with 154 additions and 294 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.beam.model.fnexecution.v1.ProvisionApi;
import org.apache.beam.model.pipeline.v1.RunnerApi.Environment;
import org.apache.beam.model.pipeline.v1.RunnerApi.StandardEnvironments;
import org.apache.beam.runners.core.construction.BeamUrns;
Expand Down Expand Up @@ -521,9 +522,13 @@ private ServerInfo createServerInfo(JobInfo jobInfo, ServerFactory serverFactory
GrpcLoggingService.forWriter(Slf4jLogWriter.getDefault()), serverFactory);
GrpcFnServer<ArtifactRetrievalService> retrievalServer =
GrpcFnServer.allocatePortAndCreateFor(artifactRetrievalService, serverFactory);
ProvisionApi.ProvisionInfo.Builder provisionInfo = jobInfo.toProvisionInfo().toBuilder();
provisionInfo.setLoggingEndpoint(loggingServer.getApiServiceDescriptor());
provisionInfo.setArtifactEndpoint(retrievalServer.getApiServiceDescriptor());
provisionInfo.setControlEndpoint(controlServer.getApiServiceDescriptor());
GrpcFnServer<StaticGrpcProvisionService> provisioningServer =
GrpcFnServer.allocatePortAndCreateFor(
StaticGrpcProvisionService.create(jobInfo.toProvisionInfo()), serverFactory);
StaticGrpcProvisionService.create(provisionInfo.build()), serverFactory);
GrpcFnServer<GrpcDataService> dataServer =
GrpcFnServer.allocatePortAndCreateFor(
GrpcDataService.create(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,10 +119,7 @@ public RemoteEnvironment createEnvironment(Environment environment) throws Excep
String containerImage = dockerPayload.getContainerImage();
// TODO: https://issues.apache.org/jira/browse/BEAM-4148 The default service address will not
// work for Docker for Mac.
String loggingEndpoint = loggingServiceServer.getApiServiceDescriptor().getUrl();
String artifactEndpoint = retrievalServiceServer.getApiServiceDescriptor().getUrl();
String provisionEndpoint = provisioningServiceServer.getApiServiceDescriptor().getUrl();
String controlEndpoint = controlServiceServer.getApiServiceDescriptor().getUrl();

ImmutableList.Builder<String> dockerOptsBuilder =
ImmutableList.<String>builder()
Expand All @@ -143,10 +140,7 @@ public RemoteEnvironment createEnvironment(Environment environment) throws Excep
ImmutableList.Builder<String> argsBuilder =
ImmutableList.<String>builder()
.add(String.format("--id=%s", workerId))
.add(String.format("--logging_endpoint=%s", loggingEndpoint))
.add(String.format("--artifact_endpoint=%s", artifactEndpoint))
.add(String.format("--provision_endpoint=%s", provisionEndpoint))
.add(String.format("--control_endpoint=%s", controlEndpoint));
.add(String.format("--provision_endpoint=%s", provisionEndpoint));
if (semiPersistDir != null) {
argsBuilder.add(String.format("--semi_persist_dir=%s", semiPersistDir));
}
Expand Down
35 changes: 24 additions & 11 deletions sdks/go/container/boot.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,29 +46,42 @@ func main() {
if *id == "" {
log.Fatal("No id provided.")
}
if *provisionEndpoint == "" {
log.Fatal("No provision endpoint provided.")
}

ctx := grpcx.WriteWorkerID(context.Background(), *id)

info, err := provision.Info(ctx, *provisionEndpoint)
if err != nil {
log.Fatalf("Failed to obtain provisioning information: %v", err)
}
log.Printf("Provision info:\n%v", info)

// TODO(BEAM-8201): Simplify once flags are no longer used.
if info.GetLoggingEndpoint().GetUrl() != "" {
*loggingEndpoint = info.GetLoggingEndpoint().GetUrl()
}
if info.GetArtifactEndpoint().GetUrl() != "" {
*artifactEndpoint = info.GetArtifactEndpoint().GetUrl()
}
if info.GetControlEndpoint().GetUrl() != "" {
*controlEndpoint = info.GetControlEndpoint().GetUrl()
}

if *loggingEndpoint == "" {
log.Fatal("No logging endpoint provided.")
}
if *artifactEndpoint == "" {
log.Fatal("No artifact endpoint provided.")
}
if *provisionEndpoint == "" {
log.Fatal("No provision endpoint provided.")
}
if *controlEndpoint == "" {
log.Fatal("No control endpoint provided.")
}

log.Printf("Initializing Go harness: %v", strings.Join(os.Args, " "))

ctx := grpcx.WriteWorkerID(context.Background(), *id)

// (1) Obtain the pipeline options

info, err := provision.Info(ctx, *provisionEndpoint)
if err != nil {
log.Fatalf("Failed to obtain provisioning information: %v", err)
}
options, err := provision.ProtoToJSON(info.GetPipelineOptions())
if err != nil {
log.Fatalf("Failed to convert pipeline options: %v", err)
Expand Down Expand Up @@ -123,7 +136,7 @@ func main() {
"--options=" + options,
}
if info.GetStatusEndpoint() != nil {
args = append(args, "--status_endpoint=" + info.GetStatusEndpoint().GetUrl())
args = append(args, "--status_endpoint="+info.GetStatusEndpoint().GetUrl())
}

log.Fatalf("User program exited: %v", execx.Execute(prog, args...))
Expand Down

0 comments on commit 921a9a8

Please sign in to comment.