Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SEP-19 : Refactoring sideInputs from SamzaContainer to ContainerStorageManager #912

Merged
merged 42 commits into from Mar 1, 2019

Conversation

rmatharu-zz
Copy link
Contributor

@rmatharu-zz rmatharu-zz commented Feb 8, 2019

This PR makes the following changes:

  • Moves the instantiation, wiring, of TaskSideInputStorageManagers from SamzaContainer to ContainerStorageManager.

  • Creates a separate SystemConsumers and set of sysConsumers in ContainerStorageManager which are responsible to read side inputs, but maintains existing bootstrap semantics for sideInputs.

  • Creates a periodic flush thread in ContainerStorageManager to flush all sideInputs, and made the TaskSideInputManager serialize access to process and flush.
    Note that, currently both the sideInput reads (from SystemConsumers) and flushes are separate single thread.

Tested with existing unit and integration tests, sample side-input and non-sideInput apps on local VPC-dev cluster

@rmatharu-zz rmatharu-zz changed the title Test sideinputrefactor Refactoring sideInputs from SamzaContainer to ContainerStorageManager Feb 8, 2019
@rmatharu-zz rmatharu-zz changed the title Refactoring sideInputs from SamzaContainer to ContainerStorageManager SEP-19 : Refactoring sideInputs from SamzaContainer to ContainerStorageManager Feb 8, 2019
Copy link
Contributor

@mynameborat mynameborat left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First pass. Will look into CSM again

@@ -101,6 +118,21 @@
private final JobContext jobContext;
private final ContainerContext containerContext;

/* Sideinput related parameters */
private final Map<String, List<SystemStream>> sideInputSystemStreams; // Map of side input system-streams indexed by store name
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens when there are duplicate system stream associated with a store? Can this be Map<String, Set<SystemStram>> instead? By that, you explicitly disallow duplicates and be consistent with the semantics of sideInputSSps which has a task -> store -> set of ssps

private SystemConsumers sideInputSystemConsumers;
private final Map<SystemStreamPartition, SystemStreamMetadata.SystemStreamPartitionMetadata> initialSideInputSSPMetadata = new ConcurrentHashMap<>();
// Recorded sspMetadata of the sideInputSSPs recorded at start, used to determine when sideInputs are caughtup and container init can proceed
private CountDownLatch sideInputsCaughtUp; // Used by the sideInput-read thread to signal to the main thread
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make it volatile

}

/**
* Creates SystemConsumer objects for store restoration, creating one consumer per system.
*/
private static Map<String, SystemConsumer> createStoreConsumers(Map<String, SystemStream> changelogSystemStreams,
private static Map<String, SystemConsumer> createConsumers(Map<String, List<SystemStream>> systemStreams,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this have to take a Map<String, List<SystemStream>> systemStreams? It seems sufficient to just take in Set<SystemStreams>.

You can also get rid of the up conversion in the calling site for creating store consumers.


/** Maps containing relevant per-task objects */
private final Map<TaskName, Map<String, StorageEngine>> taskStores;
private final Map<TaskName, TaskRestoreManager> taskRestoreManagers;
private final Map<TaskName, TaskInstanceMetrics> taskInstanceMetrics;
private final Map<TaskName, TaskInstanceCollector> taskInstanceCollectors;

private final Map<String, SystemConsumer> systemConsumers; // Mapping from storeSystemNames to SystemConsumers
private final Map<String, SystemConsumer> storeConsumers; // Mapping from store name to SystemConsumers
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like this is a map from store system name to consumers from the creation path. Can you update the comment?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No its indexed by store name.

private final Map<TaskName, Map<String, Set<SystemStreamPartition>>> sideInputSSPs;
private final Map<TaskName, Map<String, SideInputsProcessor>> sideInputStoresToProcessor;
private final Map<SystemStreamPartition, TaskSideInputStorageManager> sideInputStorageManagers; // Map of sideInput storageManagers indexed by ssp, for simpler lookup for process()
private final Map<String, SystemConsumer> sideInputConsumers; // Mapping from storeSystemNames to SystemConsumers
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we roll this into storeConsumers above? Store restoration and side inputs restoration happens sequentially, so not sure what is the purpose behind separating out the consumers.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The overlap of systems and system-consumers was a bit tricky to reason about so i let them the consumers be isolated.

@@ -101,6 +118,21 @@
private final JobContext jobContext;
private final ContainerContext containerContext;

/* Sideinput related parameters */
private final Map<String, List<SystemStream>> sideInputSystemStreams; // Map of side input system-streams indexed by store name
private final Map<TaskName, Map<String, Set<SystemStreamPartition>>> sideInputSSPs;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cosmetics: I noticed you wrap side inputs related parameters within a task context or some map.

Can we rename them to make it clear. e.g. sideInputSSPs, makes me think its a set but its a map from task to a map of store to set of ssps.

});
});

// Create a map of changeLogSSP to storeName across all tasks, assuming no stores have the same changelogSSP
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: "assuming no two stores"

});

// changelogSystemStreams correspond only to active tasks (since those of standby-tasks moved to side inputs above)
this.changelogSystemStreams = changelogSSPToStore.entrySet().stream().collect(Collectors.toMap(x -> x.getValue(), x -> x.getKey().getSystemStream(), (x, y) -> x));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

iiuc, you are grouping the SSPs by store and projecting the SSPs to system stream.
you can also use collectors.groupingBy to simplify.

@@ -112,15 +144,49 @@

public ContainerStorageManager(ContainerModel containerModel, StreamMetadataCache streamMetadataCache,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cosmetics: Can we break down constructor functions? Logically group some of the initialization and separate them out to make it more readable.

storageEngine.stop();
});

Map<String, StorageEngine> persistentStores = this.taskStores.entrySet().stream().filter(e -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this stop side input stores? It is possible for side input stores to be persisted to disk.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No this wont, note that this is inside TaskRestoreManager, which only handles changelogged stores.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed you use taskStores from the outer scope from CSM.
Can we instead use the helper method you have from outer class to get non-side-input stores to be safe?

@rmatharu-zz
Copy link
Contributor Author

Addressed all comments.
Thumbsup => the comment has been applied.

Copy link
Contributor

@mynameborat mynameborat left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another pass on the changes

@@ -207,9 +208,11 @@ public static boolean storeExists(File storeDir) {
* @param storeBaseDir the base directory to use
* @param storeName the store name to use
* @param taskName the task name which is referencing the store
* @param taskMode
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add documentation

* @return the partition directory for the store
*/
public static File getStorePartitionDir(File storeBaseDir, String storeName, TaskName taskName) {
public static File getStorePartitionDir(File storeBaseDir, String storeName, TaskName taskName, TaskMode taskMode) {
// TODO: use task-Mode to decide the storePartitionDir -- standby's dir should be the same as active
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we doing this in a separate PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, i'll do a subsequent once when PR-903 is in

val sideInputSystemStreams = sideInputStoresToSystemStreams.values.flatMap(sideInputs => sideInputs.toStream).toSet

info("Got side input store system streams: %s" format sideInputSystemStreams)

val inputSystemStreams = inputSystemStreamPartitions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally, input SSPs should no longer have side inputs since its not using run loop anymore.
You can either clean up the side inputs wiring in high level in this PR or as a follow up PR. Also, we need to update the configuration docs to reflect this new behavior where users no longer need to set task.inputs for low level application.

@@ -358,14 +369,6 @@ object SamzaContainer extends Logging {

info("Got intermediate streams: %s" format intermediateStreams)

val sideInputStoresToSystemStreams = config.getStoreNames
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if this is a git diff thing. I notice some usages of sideInputStoresToSystemStreams below which are not removed as part of this PR. e.g. ln 546, 567
Can you confirm?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i just moved it up

@@ -656,13 +593,11 @@ object SamzaContainer extends Logging {
storageManager = storageManager,
tableManager = tableManager,
reporters = reporters,
systemStreamPartitions = taskSSPs,
systemStreamPartitions = taskSSPs -- taskSideInputSSPs,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we piggy backing on ssps within task model to pass the side inputs too? Should we have something similar to changelog partition in task model that represents the side inputs separately so that we don't have to handle for side inputs in places where taskSSPs is used?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// Make the main thread wait until all sideInputs have been caughtup or thrown an exception
this.sideInputsCaughtUp.await();

if (sideInputException.isPresent()) { // Throw exception if there was an exception in catching-up sideInputs
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add the jira ticket if exists to capture the scenario where we need to communicate to the container in case of the exception after bootstrapping?

sideInputsFlushFuture = sideInputsFlushExecutor.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
getSideInputStorageManagers().forEach(sideInputStorageManager -> sideInputStorageManager.flush());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what happens when flush throws an exception? Can we also capture that into sideInputException?

try {
sideInputsFlushExecutor.awaitTermination(SIDE_INPUT_FLUSH_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
throw new SamzaException("Exception while shutting down side inputs", e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We also need to bubble up the sideInputException if set here.

storageEngine.stop();
});

Map<String, StorageEngine> persistentStores = this.taskStores.entrySet().stream().filter(e -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed you use taskStores from the outer scope from CSM.
Can we instead use the helper method you have from outer class to get non-side-input stores to be safe?


// stop all sideinput consumers and stores
if (sideInputSystemConsumers != null) {
this.sideInputSystemConsumers.stop();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this go higher before shutting down the read? Wondering if the consumers have buffer which also needs to emptied before stopping the read.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Along the same lines as SamzaContainer, which first shuts down runloop, then stops consumers

@rmatharu-zz
Copy link
Contributor Author

Addressed all comments.

Copy link
Contributor

@mynameborat mynameborat left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good overall. Considered approved once you have addressed the comments.

@@ -239,7 +250,7 @@ private void initializeStoreDirectories() {
* Its contents are a JSON encoded mapping from each side input SSP to its last processed offset, and a checksum.
*/
@VisibleForTesting
void writeOffsetFiles() {
void writeOffsetFiles() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: revert extra spacing

private volatile CountDownLatch sideInputsCaughtUp; // Used by the sideInput-read thread to signal to the main thread
private volatile boolean shutDownSideInputRead = false;
private final ScheduledExecutorService sideInputsFlushExecutor = Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat(SIDEINPUTS_FLUSH_THREAD_NAME).build());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you set the thread to be daemon?

}

// stop all sideInputStores -- this will perform one last flush on the KV stores, and write the offset file
this.sideInputStorageManagers.values().stream().collect(Collectors.toSet()).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: reuse getSideInputStorageManagers()

this.sideInputSystemConsumers.start();

// create a thread for sideInput reads
Thread readSideInputs = new Thread(() -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we set this to be a daemon thread? The java default is non-daemon thread.


// creating task restore managers
this.taskRestoreManagers = createTaskRestoreManagers(systemAdmins, clock);

// creating side input store processors, one per store per task
taskSideInputProcessors = createSideInputProcessors(new StorageConfig(config), this.containerModel, this.sideInputSystemStreams, this.taskInstanceMetrics);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like this can be a local variable or done inside the createSideInputStorageManagers()

this.shutDownSideInputRead = true;

// stop all sideinput consumers and stores
if (sideInputSystemConsumers != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we simplify these nulls checks and instead have one state that denotes if side inputs are present or absent? I noticed in some places you check if the sideInputConsumers are non-empty and some of them checks for null and some use sideInputSystemStreams is non-empty.

Lets just have one place and one logic to determine the presence of side inputs and if its present then the invariant of some of the variables being non-null should hold since you create empty maps or collection for the most part.

this.sideInputConsumers = createConsumers(this.sideInputSystemStreams, systemFactories, config, this.samzaContainerMetrics.registry());

// create SystemConsumers for consuming from taskSideInputSSPs, if sideInputs are being used
if (!this.sideInputConsumers.isEmpty()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please refer to comment below about the checks for presence of side inputs

.get(storeName)
.getStoreProperties()
.isPersistedToDisk()) {
// if this store has been already then re-create and overwrite it only if it is a
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: missing "created in the taskStores"

// have to use the right serde because the sideInput stores are created
Serde keySerde = serdes.get(new StorageConfig(config).getStorageKeySerde(storeName).get());
Serde msgSerde = serdes.get(new StorageConfig(config).getStorageMsgSerde(storeName).get());
sideInputStoresToProcessors.get(taskName).put(storeName, new SideInputsProcessor() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: extract into constant since its okay to reuse the same identity processor for all stores.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Each store is created with its respective configured serdes, so this processor needs to use that serde.


containerModel.getTasks().forEach((taskName, taskModel) -> {

if (!sideInputSystemStreams.isEmpty()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you move it outside the forEach to short circuit earlier?

@rmatharu-zz
Copy link
Contributor Author

Addressed all comments.

@atoomula atoomula merged commit 320359f into apache:master Mar 1, 2019
Zhangyx39 pushed a commit to Zhangyx39/samza that referenced this pull request Apr 3, 2019
…geManager (apache#912)

* Rocksdb bug fix

* Moving TaskSideInputStorageManagers in CSM

* Adding basic side input refactor. Consumption logic works from inside CSM.

* Cleaning up sideInput references from TaskInstances and SamzaContainer

* minor

* Adding periodic flush logic to CSM for sideInputs

* Removing sideInputs from taskInputs

* fixing test

* test fix

* test fix

* Code simplifications

* checkstyle

* Using sideInput storage manager for standby tasks

* Minor store dir integ

* Adding comment

* Adding merge function in case of duplicate keys

* fixing tests

* Addressing cosmetic review comments

* Added bound-flush and exception handling for sideInputs

* Adding documentation

* TestTaskStorageManager : updating test

* Adding taskMode to decide StorePartitionDir

* Adding taskMode to decide StorePartitionDir

minor

* minor

* minor refactor of CSM

* minor

* Addressing review comments

* minor change in CSM
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
4 participants