Skip to content

Commit

Permalink
SAMZA-2298: Fix CoordinatorStreamStore creation for LocalApplicationR…
Browse files Browse the repository at this point in the history
…unner (#1136)
  • Loading branch information
dnishimura authored and rmatharu-zz committed Aug 30, 2019
1 parent 767a5d4 commit e0b5a32
Show file tree
Hide file tree
Showing 2 changed files with 140 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,15 @@
import org.apache.samza.metrics.MetricsRegistryMap;
import org.apache.samza.metrics.MetricsReporter;
import org.apache.samza.processor.StreamProcessor;
import org.apache.samza.system.SystemAdmin;
import org.apache.samza.system.SystemAdmins;
import org.apache.samza.system.SystemStream;
import org.apache.samza.task.TaskFactory;
import org.apache.samza.task.TaskFactoryUtil;
import org.apache.samza.util.CoordinatorStreamUtil;
import org.apache.samza.util.ReflectionUtil;
import org.apache.samza.util.Util;
import org.apache.samza.zk.ZkJobCoordinatorFactory;
import org.apache.samza.zk.ZkMetadataStoreFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -96,7 +101,7 @@ public class LocalApplicationRunner implements ApplicationRunner {
* @param config configuration for the application
*/
public LocalApplicationRunner(SamzaApplication app, Config config) {
this(app, config, getMetadataStoreFactory(new JobConfig(config)));
this(app, config, getDefaultCoordinatorStreamStoreFactory(new JobConfig(config)));
}

/**
Expand All @@ -121,15 +126,22 @@ public LocalApplicationRunner(SamzaApplication app, Config config, MetadataStore
this.appDesc = appDesc;
this.isAppModeBatch = new ApplicationConfig(appDesc.getConfig()).getAppMode() == ApplicationConfig.ApplicationMode.BATCH;
this.coordinationUtils = coordinationUtils;
this.metadataStoreFactory = Optional.ofNullable(getMetadataStoreFactory(new JobConfig(appDesc.getConfig())));
this.metadataStoreFactory = Optional.ofNullable(getDefaultCoordinatorStreamStoreFactory(new JobConfig(appDesc.getConfig())));
}

static MetadataStoreFactory getMetadataStoreFactory(JobConfig jobConfig) {
if (jobConfig.getCoordinatorSystemNameOrNull() != null) {
static MetadataStoreFactory getDefaultCoordinatorStreamStoreFactory(JobConfig jobConfig) {
String coordinatorSystemName = jobConfig.getCoordinatorSystemNameOrNull();
JobCoordinatorConfig jobCoordinatorConfig = new JobCoordinatorConfig(jobConfig);
String jobCoordinatorFactoryClassName = jobCoordinatorConfig.getJobCoordinatorFactoryClassName();

// TODO: Remove restriction to only ZkJobCoordinator after next phase of metadata store abstraction.
if (StringUtils.isNotBlank(coordinatorSystemName) && ZkJobCoordinatorFactory.class.getName().equals(jobCoordinatorFactoryClassName)) {
return new CoordinatorStreamMetadataStoreFactory();
}
LOG.warn("{} or {} not configured. No coordinator stream metadata store will be created.",
JobConfig.JOB_COORDINATOR_SYSTEM, JobConfig.JOB_DEFAULT_SYSTEM);

LOG.warn("{} or {} not configured, or {} is not {}. No default coordinator stream metadata store will be created.",
JobConfig.JOB_COORDINATOR_SYSTEM, JobConfig.JOB_DEFAULT_SYSTEM,
JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, ZkJobCoordinatorFactory.class.getName());
return null;
}

Expand Down Expand Up @@ -272,15 +284,49 @@ CountDownLatch getShutdownLatch() {
}

@VisibleForTesting
MetadataStore createCoordinatorStreamStore(Config jobConfig) {
MetadataStore createCoordinatorStreamStore(Config config) {
if (metadataStoreFactory.isPresent()) {
MetadataStore coordinatorStreamStore =
metadataStoreFactory.get().getMetadataStore("NoOp", jobConfig, new MetricsRegistryMap());
return coordinatorStreamStore;
// TODO: Add missing metadata store abstraction for creating the underlying store to address SAMZA-2182
if (metadataStoreFactory.get() instanceof CoordinatorStreamMetadataStoreFactory) {
if (createUnderlyingCoordinatorStream(config)) {
MetadataStore coordinatorStreamStore =
metadataStoreFactory.get().getMetadataStore("NoOp", config, new MetricsRegistryMap());
LOG.info("Created coordinator stream store of type: {}", coordinatorStreamStore.getClass().getSimpleName());
return coordinatorStreamStore;
}
} else {
MetadataStore otherMetadataStore =
metadataStoreFactory.get().getMetadataStore("NoOp", config, new MetricsRegistryMap());
LOG.info("Created alternative coordinator stream store of type: {}", otherMetadataStore.getClass().getSimpleName());
return otherMetadataStore;
}
}

LOG.warn("No coordinator stream store created.");
return null;
}

@VisibleForTesting
boolean createUnderlyingCoordinatorStream(Config config) {
// TODO: This work around method is necessary due to SAMZA-2182 - Metadata store: disconnect between creation and usage of the underlying storage
// and will be addressed in the next phase of metadata store abstraction
if (new JobConfig(config).getCoordinatorSystemNameOrNull() == null) {
LOG.warn("{} or {} not configured. Coordinator stream not created.",
JobConfig.JOB_COORDINATOR_SYSTEM, JobConfig.JOB_DEFAULT_SYSTEM);
return false;
}
SystemStream coordinatorSystemStream = CoordinatorStreamUtil.getCoordinatorSystemStream(config);
SystemAdmins systemAdmins = new SystemAdmins(config);
systemAdmins.start();
try {
SystemAdmin coordinatorSystemAdmin = systemAdmins.getSystemAdmin(coordinatorSystemStream.getSystem());
CoordinatorStreamUtil.createCoordinatorStream(coordinatorSystemStream, coordinatorSystemAdmin);
} finally {
systemAdmins.stop();
}
return true;
}

@VisibleForTesting
StreamProcessor createStreamProcessor(Config config, ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc,
StreamProcessor.StreamProcessorLifecycleListenerFactory listenerFactory,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,25 @@
import org.apache.samza.config.Config;
import org.apache.samza.config.ConfigException;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.JobCoordinatorConfig;
import org.apache.samza.config.MapConfig;
import org.apache.samza.context.ExternalContext;
import org.apache.samza.coordinator.ClusterMembership;
import org.apache.samza.coordinator.CoordinationConstants;
import org.apache.samza.coordinator.CoordinationUtils;
import org.apache.samza.coordinator.DistributedLock;
import org.apache.samza.coordinator.metadatastore.CoordinatorStreamMetadataStoreFactory;
import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
import org.apache.samza.execution.LocalJobPlanner;
import org.apache.samza.job.ApplicationStatus;
import org.apache.samza.metadatastore.InMemoryMetadataStore;
import org.apache.samza.metadatastore.InMemoryMetadataStoreFactory;
import org.apache.samza.metadatastore.MetadataStore;
import org.apache.samza.metadatastore.MetadataStoreFactory;
import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.processor.StreamProcessor;
import org.apache.samza.standalone.PassthroughJobCoordinatorFactory;
import org.apache.samza.system.SystemAdmins;
import org.apache.samza.task.IdentityStreamTask;
import org.apache.samza.zk.ZkMetadataStore;
import org.apache.samza.zk.ZkMetadataStoreFactory;
Expand Down Expand Up @@ -476,24 +484,94 @@ private void prepareTestForRunId() throws Exception {
doReturn(coordinatorStreamStore).when(runner).createCoordinatorStreamStore(any(Config.class));
}

/**
* Default metadata store factory should be null if no job coordinator system defined and the default
* ZkJobCoordinator is used.
*/
@Test
public void testGetMetadataStoreFactoryWithoutJobCoordinatorSystem() {
public void testGetCoordinatorStreamStoreFactoryWithoutJobCoordinatorSystem() {
MetadataStoreFactory metadataStoreFactory =
LocalApplicationRunner.getMetadataStoreFactory(new JobConfig(new MapConfig()));
LocalApplicationRunner.getDefaultCoordinatorStreamStoreFactory(new JobConfig(new MapConfig()));
assertNull(metadataStoreFactory);
}

/**
* Default metadata store factory should not be null if job coordinator system defined and the default
* ZkJobCoordinator is used.
*/
@Test
public void testGetMetadataStoreFactoryWithJobCoordinatorSystem() {
public void testGetCoordinatorStreamStoreFactoryWithJobCoordinatorSystem() {
MetadataStoreFactory metadataStoreFactory =
LocalApplicationRunner.getMetadataStoreFactory(new JobConfig(new MapConfig(ImmutableMap.of(JobConfig.JOB_COORDINATOR_SYSTEM, "test-system"))));
LocalApplicationRunner.getDefaultCoordinatorStreamStoreFactory(new JobConfig(new MapConfig(ImmutableMap.of(JobConfig.JOB_COORDINATOR_SYSTEM, "test-system"))));
assertNotNull(metadataStoreFactory);
}

/**
* Default metadata store factory should not be null if default system defined and the default
* ZkJobCoordinator is used.
*/
@Test
public void testGetMetadataStoreFactoryWithDefaultSystem() {
public void testGetCoordinatorStreamStoreFactoryWithDefaultSystem() {
MetadataStoreFactory metadataStoreFactory =
LocalApplicationRunner.getMetadataStoreFactory(new JobConfig(new MapConfig(ImmutableMap.of(JobConfig.JOB_DEFAULT_SYSTEM, "test-system"))));
LocalApplicationRunner.getDefaultCoordinatorStreamStoreFactory(new JobConfig(new MapConfig(ImmutableMap.of(JobConfig.JOB_DEFAULT_SYSTEM, "test-system"))));
assertNotNull(metadataStoreFactory);
}

/**
* Default metadata store factory be null if job coordinator system or default system defined and a non ZkJobCoordinator
* job coordinator is used.
*/
@Test
public void testGetCoordinatorStreamStoreFactoryWithNonZkJobCoordinator() {
MapConfig mapConfig = new MapConfig(
ImmutableMap.of(
JobConfig.JOB_DEFAULT_SYSTEM, "test-system",
JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, PassthroughJobCoordinatorFactory.class.getName()));
MetadataStoreFactory metadataStoreFactory =
LocalApplicationRunner.getDefaultCoordinatorStreamStoreFactory(new JobConfig(mapConfig));
assertNull(metadataStoreFactory);
}

/**
* Underlying coordinator stream should be created if using CoordinatorStreamMetadataStoreFactory
* @throws Exception
*/
@Test
public void testCreateCoordinatorStreamWithCoordinatorFactory() throws Exception {
CoordinatorStreamStore coordinatorStreamStore = mock(CoordinatorStreamStore.class);
CoordinatorStreamMetadataStoreFactory coordinatorStreamMetadataStoreFactory = mock(CoordinatorStreamMetadataStoreFactory.class);
doReturn(coordinatorStreamStore).when(coordinatorStreamMetadataStoreFactory).getMetadataStore(anyString(), any(Config.class), any(
MetricsRegistry.class));
SystemAdmins systemAdmins = mock(SystemAdmins.class);
PowerMockito.whenNew(SystemAdmins.class).withAnyArguments().thenReturn(systemAdmins);
LocalApplicationRunner localApplicationRunner =
spy(new LocalApplicationRunner(mockApp, config, coordinatorStreamMetadataStoreFactory));

// create store only if successful in creating the underlying coordinator stream
doReturn(true).when(localApplicationRunner).createUnderlyingCoordinatorStream(eq(config));
assertEquals(coordinatorStreamStore, localApplicationRunner.createCoordinatorStreamStore(config));
verify(localApplicationRunner).createUnderlyingCoordinatorStream(eq(config));

// do not create store if creating the underlying coordinator stream fails
doReturn(false).when(localApplicationRunner).createUnderlyingCoordinatorStream(eq(config));
assertNull(localApplicationRunner.createCoordinatorStreamStore(config));
}

/**
* Underlying coordinator stream should not be created if not using CoordinatorStreamMetadataStoreFactory
* @throws Exception
*/
@Test
public void testCreateCoordinatorStreamWithoutCoordinatorFactory() throws Exception {
SystemAdmins systemAdmins = mock(SystemAdmins.class);
PowerMockito.whenNew(SystemAdmins.class).withAnyArguments().thenReturn(systemAdmins);
LocalApplicationRunner localApplicationRunner =
spy(new LocalApplicationRunner(mockApp, config, new InMemoryMetadataStoreFactory()));
doReturn(false).when(localApplicationRunner).createUnderlyingCoordinatorStream(eq(config));
MetadataStore coordinatorStreamStore = localApplicationRunner.createCoordinatorStreamStore(config);
assertTrue(coordinatorStreamStore instanceof InMemoryMetadataStore);

// creating underlying coordinator stream should not be called for other coordinator stream metadata store types.
verify(localApplicationRunner, never()).createUnderlyingCoordinatorStream(eq(config));
}
}

0 comments on commit e0b5a32

Please sign in to comment.