Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ public class SamzaExecutionContext implements ApplicationContainerContext {
private GrpcFnServer<GrpcDataService> fnDataServer;
private GrpcFnServer<GrpcStateService> fnStateServer;
private ControlClientPool controlClientPool;
private ExecutorService dataExecutor;
private IdGenerator idGenerator = IdGenerators.incrementingLongs();

public SamzaExecutionContext(SamzaPipelineOptions options) {
Expand Down Expand Up @@ -92,26 +93,31 @@ public void start() {
if (SamzaRunnerOverrideConfigs.isPortableMode(options)) {
try {
controlClientPool = MapControlClientPool.create();
final ExecutorService dataExecutor = Executors.newCachedThreadPool();
dataExecutor = Executors.newCachedThreadPool();

fnControlServer =
GrpcFnServer.allocatePortAndCreateFor(
FnApiControlClientPoolService.offeringClientsToPool(
controlClientPool.getSink(), () -> SAMZA_WORKER_ID),
ServerFactory.createWithPortSupplier(
() -> SamzaRunnerOverrideConfigs.getFnControlPort(options)));
LOG.info("Started control server on port {}", fnControlServer.getServer().getPort());

fnDataServer =
GrpcFnServer.allocatePortAndCreateFor(
GrpcDataService.create(dataExecutor, OutboundObserverFactory.serverDirect()),
ServerFactory.createDefault());
LOG.info("Started data server on port {}", fnDataServer.getServer().getPort());

fnStateServer =
GrpcFnServer.allocatePortAndCreateFor(
GrpcStateService.create(), ServerFactory.createDefault());
LOG.info("Started state server on port {}", fnStateServer.getServer().getPort());

final long waitTimeoutMs =
SamzaRunnerOverrideConfigs.getControlClientWaitTimeoutMs(options);
LOG.info("Control client wait timeout config: " + waitTimeoutMs);

final InstructionRequestHandler instructionHandler =
controlClientPool.getSource().take(SAMZA_WORKER_ID, Duration.ofMillis(waitTimeoutMs));
final EnvironmentFactory environmentFactory =
Expand All @@ -120,6 +126,7 @@ public void start() {
jobBundleFactory =
SingleEnvironmentInstanceJobBundleFactory.create(
environmentFactory, fnDataServer, fnStateServer, idGenerator);
LOG.info("Started job bundle factory");
} catch (Exception e) {
throw new RuntimeException(
"Running samza in Beam portable mode but failed to create job bundle factory", e);
Expand All @@ -131,19 +138,29 @@ public void start() {

@Override
public void stop() {
closeFnServer(fnControlServer);
closeAutoClosable(fnControlServer, "controlServer");
fnControlServer = null;
closeFnServer(fnDataServer);
closeAutoClosable(fnDataServer, "dataServer");
fnDataServer = null;
closeFnServer(fnStateServer);
closeAutoClosable(fnStateServer, "stateServer");
fnStateServer = null;
if (dataExecutor != null) {
dataExecutor.shutdown();
dataExecutor = null;
}
controlClientPool = null;
closeAutoClosable(jobBundleFactory, "jobBundle");
jobBundleFactory = null;
}

private void closeFnServer(GrpcFnServer<?> fnServer) {
try (AutoCloseable closer = fnServer) {
// do nothing
private static void closeAutoClosable(AutoCloseable closeable, String name) {
try (AutoCloseable closer = closeable) {
LOG.info("Closed {}", name);
} catch (Exception e) {
LOG.error("Failed to close fn api servers. Ignore since this is shutdown process...", e);
LOG.error(
"Failed to close {}. Ignore since this is shutdown process...",
closeable.getClass().getSimpleName(),
e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,7 @@ public SamzaPipelineResult run(Pipeline pipeline) {

pipeline.replaceAll(SamzaTransformOverrides.getDefaultOverrides());

if (LOG.isDebugEnabled()) {
LOG.debug("Post-processed Beam pipeline:\n{}", PipelineDotRenderer.toDotString(pipeline));
}
LOG.info("Beam pipeline DOT graph:\n{}", PipelineDotRenderer.toDotString(pipeline));

final Map<PValue, String> idMap = PViewToIdMapper.buildIdMap(pipeline);

Expand Down Expand Up @@ -141,6 +139,7 @@ private Map<String, MetricsReporterFactory> getMetricsReporters() {
final MetricsReporter reporter = options.getMetricsReporters().get(i);

reporters.put(name, (MetricsReporterFactory) (nm, processorId, config) -> reporter);
LOG.info(name + ": " + reporter.getClass().getName());
}
return reporters;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,13 +344,17 @@ void deletePersisted(KeyedTimerData<K> keyedTimerData) {
private void loadEventTimeTimers() {
if (!eventTimerTimerState.isEmpty().read()) {
final Iterator<KeyedTimerData<K>> iter = eventTimerTimerState.readIterator().read();
for (int i = 0; i < timerBufferSize && iter.hasNext(); i++) {
int i = 0;
for (; i < timerBufferSize && iter.hasNext(); i++) {
eventTimeTimers.add(iter.next());
}

LOG.info("Loaded {} event time timers in memory", i);

// manually close the iterator here
final SamzaStoreStateInternals.KeyValueIteratorState iteratorState =
(SamzaStoreStateInternals.KeyValueIteratorState) eventTimerTimerState;

iteratorState.closeIterators();
}
}
Expand All @@ -359,11 +363,15 @@ private void loadProcessingTimeTimers() {
if (!processingTimerTimerState.isEmpty().read()) {
final Iterator<KeyedTimerData<K>> iter = processingTimerTimerState.readIterator().read();
// since the iterator will reach to the end, it will be closed automatically
int count = 0;
while (iter.hasNext()) {
final KeyedTimerData<K> keyedTimerData = iter.next();
timerRegistry.schedule(
keyedTimerData, keyedTimerData.getTimerData().getTimestamp().getMillis());
++count;
}

LOG.info("Loaded {} processing time timers in memory", count);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ public Config build() {
// apply user configs
config.putAll(createUserConfig(options));

config.put(ApplicationConfig.APP_NAME, options.getJobName());
config.put(ApplicationConfig.APP_ID, options.getJobInstance());
config.put(JobConfig.JOB_NAME(), options.getJobName());
config.put(JobConfig.JOB_ID(), options.getJobInstance());

Expand Down