Skip to content

Commit

Permalink
[BEAM-7676] uniquely identify SDK workers by the factory that created…
Browse files Browse the repository at this point in the history
… them
  • Loading branch information
ibzib committed Jul 2, 2019
1 parent 6143271 commit c6debfd
Showing 1 changed file with 5 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.beam.model.pipeline.v1.RunnerApi.Environment;
import org.apache.beam.model.pipeline.v1.RunnerApi.StandardEnvironments;
Expand Down Expand Up @@ -81,7 +82,9 @@
@ThreadSafe
public class DefaultJobBundleFactory implements JobBundleFactory {
private static final Logger LOG = LoggerFactory.getLogger(DefaultJobBundleFactory.class);
private static AtomicLong factoryIdCounter = new AtomicLong();

private final long factoryId = factoryIdCounter.incrementAndGet();
private final LoadingCache<Environment, WrappedSdkHarnessClient> environmentCache;
private final Map<String, EnvironmentFactory.Provider> environmentFactoryProviderMap;
private final ExecutorService executor;
Expand Down Expand Up @@ -112,11 +115,11 @@ public static DefaultJobBundleFactory create(

DefaultJobBundleFactory(
JobInfo jobInfo, Map<String, EnvironmentFactory.Provider> environmentFactoryMap) {
IdGenerator stageIdGenerator = IdGenerators.incrementingLongs();
IdGenerator stageIdSuffixGenerator = IdGenerators.incrementingLongs();
this.environmentFactoryProviderMap = environmentFactoryMap;
this.executor = Executors.newCachedThreadPool();
this.clientPool = MapControlClientPool.create();
this.stageIdGenerator = stageIdGenerator;
this.stageIdGenerator = () -> factoryId + "-" + stageIdSuffixGenerator.getId();
this.environmentExpirationMillis = getEnvironmentExpirationMillis(jobInfo);
this.environmentCache =
createEnvironmentCache(serverFactory -> createServerInfo(jobInfo, serverFactory));
Expand Down

0 comments on commit c6debfd

Please sign in to comment.