Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public class LocalApplicationRunner implements ApplicationRunner {
private final AtomicReference<Throwable> failure = new AtomicReference<>();
private final boolean isAppModeBatch;
private final Optional<CoordinationUtils> coordinationUtils;
private final MetadataStoreFactory metadataStoreFactory;
private final Optional<MetadataStoreFactory> metadataStoreFactory;
private Optional<String> runId = Optional.empty();
private Optional<RunIdGenerator> runIdGenerator = Optional.empty();

Expand All @@ -96,7 +96,7 @@ public class LocalApplicationRunner implements ApplicationRunner {
* @param config configuration for the application
*/
public LocalApplicationRunner(SamzaApplication app, Config config) {
this(app, config, new CoordinatorStreamMetadataStoreFactory());
this(app, config, getMetadataStoreFactory(new JobConfig(config)));
}

/**
Expand All @@ -110,7 +110,7 @@ public LocalApplicationRunner(SamzaApplication app, Config config, MetadataStore
this.appDesc = ApplicationDescriptorUtil.getAppDescriptor(app, config);
this.isAppModeBatch = new ApplicationConfig(config).getAppMode() == ApplicationConfig.ApplicationMode.BATCH;
this.coordinationUtils = getCoordinationUtils(config, getClass().getClassLoader());
this.metadataStoreFactory = metadataStoreFactory;
this.metadataStoreFactory = Optional.ofNullable(metadataStoreFactory);
}

/**
Expand All @@ -121,7 +121,16 @@ public LocalApplicationRunner(SamzaApplication app, Config config, MetadataStore
this.appDesc = appDesc;
this.isAppModeBatch = new ApplicationConfig(appDesc.getConfig()).getAppMode() == ApplicationConfig.ApplicationMode.BATCH;
this.coordinationUtils = coordinationUtils;
this.metadataStoreFactory = new CoordinatorStreamMetadataStoreFactory();
this.metadataStoreFactory = Optional.ofNullable(getMetadataStoreFactory(new JobConfig(appDesc.getConfig())));
}

static MetadataStoreFactory getMetadataStoreFactory(JobConfig jobConfig) {
if (jobConfig.getCoordinatorSystemNameOrNull() != null) {
return new CoordinatorStreamMetadataStoreFactory();
}
LOG.warn("{} or {} not configured. No coordinator stream metadata store will be created.",
JobConfig.JOB_COORDINATOR_SYSTEM, JobConfig.JOB_DEFAULT_SYSTEM);
return null;
}

private Optional<CoordinationUtils> getCoordinationUtils(Config config, ClassLoader classLoader) {
Expand Down Expand Up @@ -189,7 +198,9 @@ public void run(ExternalContext externalContext) {
jobConfigs.forEach(jobConfig -> {
LOG.debug("Starting job {} StreamProcessor with config {}", jobConfig.getName(), jobConfig);
MetadataStore coordinatorStreamStore = createCoordinatorStreamStore(jobConfig);
coordinatorStreamStore.init();
if (coordinatorStreamStore != null) {
coordinatorStreamStore.init();
}
StreamProcessor processor = createStreamProcessor(jobConfig, appDesc,
sp -> new LocalStreamProcessorLifecycleListener(sp, jobConfig), Optional.ofNullable(externalContext), coordinatorStreamStore);
processors.add(Pair.of(processor, coordinatorStreamStore));
Expand Down Expand Up @@ -262,9 +273,12 @@ CountDownLatch getShutdownLatch() {

@VisibleForTesting
MetadataStore createCoordinatorStreamStore(Config jobConfig) {
MetadataStore coordinatorStreamStore =
metadataStoreFactory.getMetadataStore("NoOp", jobConfig, new MetricsRegistryMap());
return coordinatorStreamStore;
if (metadataStoreFactory.isPresent()) {
MetadataStore coordinatorStreamStore =
metadataStoreFactory.get().getMetadataStore("NoOp", jobConfig, new MetricsRegistryMap());
return coordinatorStreamStore;
}
return null;
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
import org.apache.samza.execution.LocalJobPlanner;
import org.apache.samza.job.ApplicationStatus;
import org.apache.samza.metadatastore.MetadataStoreFactory;
import org.apache.samza.processor.StreamProcessor;
import org.apache.samza.task.IdentityStreamTask;
import org.apache.samza.zk.ZkMetadataStore;
Expand All @@ -60,6 +61,7 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyObject;
Expand Down Expand Up @@ -210,6 +212,43 @@ public void testRunComplete() throws Exception {
assertEquals(runner.status(), ApplicationStatus.SuccessfulFinish);
}

@Test
public void testRunCompleteWithouCoordinatorStreamStore() throws Exception {
Map<String, String> cfgs = new HashMap<>();
cfgs.put(ApplicationConfig.APP_PROCESSOR_ID_GENERATOR_CLASS, UUIDGenerator.class.getName());
config = new MapConfig(cfgs);
ProcessorLifecycleListenerFactory mockFactory = (pContext, cfg) -> mock(ProcessorLifecycleListener.class);
mockApp = (StreamApplication) appDesc -> {
appDesc.withProcessorLifecycleListenerFactory(mockFactory);
};
prepareTest();

// return the jobConfigs from the planner
doReturn(Collections.singletonList(new JobConfig(new MapConfig(config)))).when(localPlanner).prepareJobs();

StreamProcessor sp = mock(StreamProcessor.class);
ArgumentCaptor<StreamProcessor.StreamProcessorLifecycleListenerFactory> captor =
ArgumentCaptor.forClass(StreamProcessor.StreamProcessorLifecycleListenerFactory.class);

doAnswer(i ->
{
ProcessorLifecycleListener listener = captor.getValue().createInstance(sp);
listener.afterStart();
listener.afterStop();
return null;
}).when(sp).start();

ExternalContext externalContext = mock(ExternalContext.class);
doReturn(sp).when(runner)
.createStreamProcessor(anyObject(), anyObject(), captor.capture(), eq(Optional.of(externalContext)), eq(null));
doReturn(null).when(runner).createCoordinatorStreamStore(any(Config.class));

runner.run(externalContext);
runner.waitForFinish();

assertEquals(runner.status(), ApplicationStatus.SuccessfulFinish);
}

@Test
public void testRunFailure() throws Exception {
Map<String, String> cfgs = new HashMap<>();
Expand Down Expand Up @@ -437,4 +476,24 @@ private void prepareTestForRunId() throws Exception {
doReturn(coordinatorStreamStore).when(runner).createCoordinatorStreamStore(any(Config.class));
}

@Test
public void testGetMetadataStoreFactoryWithoutJobCoordinatorSystem() {
MetadataStoreFactory metadataStoreFactory =
LocalApplicationRunner.getMetadataStoreFactory(new JobConfig(new MapConfig()));
assertNull(metadataStoreFactory);
}

@Test
public void testGetMetadataStoreFactoryWithJobCoordinatorSystem() {
MetadataStoreFactory metadataStoreFactory =
LocalApplicationRunner.getMetadataStoreFactory(new JobConfig(new MapConfig(ImmutableMap.of(JobConfig.JOB_COORDINATOR_SYSTEM, "test-system"))));
assertNotNull(metadataStoreFactory);
}

@Test
public void testGetMetadataStoreFactoryWithDefaultSystem() {
MetadataStoreFactory metadataStoreFactory =
LocalApplicationRunner.getMetadataStoreFactory(new JobConfig(new MapConfig(ImmutableMap.of(JobConfig.JOB_DEFAULT_SYSTEM, "test-system"))));
assertNotNull(metadataStoreFactory);
}
}