New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SAMZA-1786: Introduce metadata store abstraction. #583
Conversation
@prateekm @xinyuiscool @sborya |
* | ||
* @param containerContext represents the Samza container context. | ||
*/ | ||
void init(SamzaContainerContext containerContext); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be used outside of SamzaContainer, and as such may not have access to the container context. Should provide access to Config and MetricsRegistry instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 a CLI tool using this interface to read metadata wouldn't have this context.
<String, byte[]> or <String, Object> might be a simpler/better alternative for a starter impl.
It'd be nice if the metadatastore offered atomic updates across keys, e.g., for diagnostics a container-id and error-occurrence in that container should be recorded atomically.
Does the implementation of this plan to internally use tableAPI implementation(s)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense. Removed Context and switched to having config
and metricsRegistry
as parameters.
I'm not sure if it's a good idea to fix string as key type. This will prevent us in the future from persisting non-string keys in the metadatastore(since majority of store API's offer byte[], byte[] contract, this will incur a unnecessary limitation) and reading/writing it across samza-job, custom tools etc.
It'd be nice if the metadatastore offered atomic updates across keys.
Not sure if it's a good idea to add this to interface contract and mandate this for every store implementation. For majority of scenarios in samza, a store implementation that persists the (key,value) through a synchronous remote call should be sufficient.
Atomic updates through write-batch(or similar batch accumulation) is not supported by every store. Unless we have prevalent and concrete use-cases across different layers in samza, i think it's a good idea not to introduce it.
cc @rayman7718, in case you have any feedback on the MetadataStore interface from the perspective of a diagnostics data store. |
@shanthoosh : Would be preferable to structure the metadata store API to have string keys and byte-array values. Strings offer you nice properties around equality and prefix-checks. |
this.taskAssignmentManager = new TaskAssignmentManager(coordinatorStreamManager); | ||
public LocalityManager(Config config, MetricsRegistry metricsRegistry, Serde<String> keySerde, Serde<String> valueSerde) { | ||
this.config = config; | ||
MetadataStoreFactory metadataStoreFactory = Util.getObj(new JobConfig(config).getMetadataStoreFactory(), MetadataStoreFactory.class); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is serde required? More specific - does it need to be a part of the store?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Discussed offline.
Serde
is not part of the store, it's used for serialize/deserialize messages before writing/reading from the store in LocalityManager
/TaskAssignmentManager
(util classes) and not passed to the store.
@@ -105,7 +138,11 @@ public void writeTaskContainerMapping(String taskName, String containerId) { | |||
*/ | |||
public void deleteTaskContainerMappings(Iterable<String> taskNames) { | |||
for (String taskName : taskNames) { | |||
writeTaskContainerMapping(taskName, null); | |||
metadataStore.remove(keySerde.toBytes(taskName)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need to remove from taskNameToContainerId?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch. Fixed.
} | ||
|
||
if (containerId == null) { | ||
coordinatorStreamManager.send(new Delete(SOURCE, taskName, SetTaskContainerMapping.TYPE)); | ||
metadataStore.remove(keySerde.toBytes(taskName)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what happens if remove() fails (throws an exception)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We propagate it back to the caller(which was the existing behavior as well prior to this change).
I think it's better to retain the behavior and not alter it as a part of this change.
* | ||
* @param containerContext represents the Samza container context. | ||
*/ | ||
void init(SamzaContainerContext containerContext); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the use for this context?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Replaced with config
and metricsRegistry
as arguments.
@@ -60,29 +97,23 @@ public LocalityManager(CoordinatorStreamManager coordinatorStreamManager) { | |||
* @return the map of containerId: (hostname, jmxAddress, jmxTunnelAddress) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now LocalityManager shouldn't relate to the actual store implementation, coordinator stream or not (see comments above).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@shanthoosh +1 to removing references to coordinator stream in these classes.
*/ | ||
public class CoordinatorStreamKeySerde implements Serde<String> { | ||
|
||
private final Serde<List<?>> keySerde; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should use JsonSerde
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. That's right. I instantiate the field to JsonSerde
in the constructor.
Moved the assignment to declaration itself.
samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamValueSerde.java
Show resolved
Hide resolved
if (valueBytes != null) { | ||
String locationId = valueSerde.fromBytes(valueBytes); | ||
allMappings.put(keySerde.fromBytes(keyBytes), ImmutableMap.of(SetContainerHostMapping.HOST_KEY, locationId, | ||
SetContainerHostMapping.JMX_TUNNELING_URL_KEY, "", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we no longer setting the JMX_TUNNELING_URL_KEY
and JMX_URL_KEY
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Historically JmxUrl
and JmxTunnelingUrl
were stored in coordinator stream. The action plan for this patch is to use the same API to store container locality for standalone in zookeeper. Since we do not want to store jmxUrl in zookeeper, I've removed it from here. Anyways, we log the Jmx
properties in the SamzaContainer
log file(if we want, we can use that to retrieve that information). Yarn retains container log files throughout the lifecycle of the SamzaContainer
process. IMHO coordinator stream should have used ideally to store the logical container to task assignments, not the attributes of the physical process.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@bharathkk Besides, if necessary, we can store the JMX url separately in the metadata store / diagnostics stream in SamzaContainer.
SetContainerHostMapping.JMX_URL_KEY, "")); | ||
} | ||
}); | ||
containerToHostMapping = allMappings; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is this no longer an unmodifiableMap
? I don't see any places where you write a value to this map.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It doesn't have to be a unmodifiableMap
. We want to return a unmodifiable view back to the caller, that does not necessitate the internal structures to be unmodifiable as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is, in general, a good practice to enforce immutability where ever possible especially, in concurrent programming. containerToHostMapping
is only set in readContainerLocality()
and not modified anywhere else.
The other reason I feel strongly about this change is, your PR doesn't introduce any changes to the map as well. so why modify its properties? It makes it hard to backtrack and rule out changes during debugging.
jmxTunnelingAddress)); | ||
|
||
metadataStore.put(keySerde.toBytes(containerId), valueSerde.toBytes(hostName)); | ||
|
||
Map<String, String> mappings = new HashMap<>(); | ||
mappings.put(SetContainerHostMapping.HOST_KEY, hostName); | ||
mappings.put(SetContainerHostMapping.JMX_URL_KEY, jmxAddress); | ||
mappings.put(SetContainerHostMapping.JMX_TUNNELING_URL_KEY, jmxTunnelingAddress); | ||
containerToHostMapping.put(containerId, mappings); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see the purpose of this as well. It looks like writeContainerToHostMapping
seems to be invoked only during the start of the container and this update is not even read subsequently. I am surprised as to how this is working prior to your change since the old implementation sets containerToHostMapping
to unmodifiableMap
inside readContainerLocality
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SamzaContainers
stores the container locality to CoordinatorStream
(kafka topic) through localityManager. writeContainerHostMapping
as a part of its startup sequence.
ContainerAllocator
thread in JobCoordinator
uses localityManager.readContainerLocality()
to get the container to preferred host mapping for requesting physical resources from the ClusterResourceManager
(within linkedin it is yarn
).
If we refrain from storing this mapping in SamzaContainer
(since it's not read within it), then we will break host affinity for stateful samza jobs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I should have been more explicit. I do understand the need for it to be in coordinator stream. I was referring to the part on why we persist the information in a local map within LocalityManager
.
Map<String, String> mappings = new HashMap<>();
mappings.put(SetContainerHostMapping.HOST_KEY, hostName);
...
containerToHostMapping.put(containerId, mappings);
After writing the locality information to the coordinator stream as part of the startup, the locality information is not accessed for the rest of the lifecycle of the running container. Subsequent reads happen only for container movements, restarts or deploy at which point, you create new LocalityManager
and bootstrap from coordinator stream.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, @shanthoosh can you check how this map field is being used? Does it rely on a particular order of operations to be correct (e.g., readLocality before writeLocality)? If so, can we just issue a separate readLocality request within writeLocality to reconstruct the map instead of trying to optimize this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@prateekm Removed the map field and made it as a map with scope local to readContainerLocality.
writeContainerLocality
currently invokes readContainerLocality()
at the beginning to get the entire map.
try { | ||
OutgoingMessageEnvelope envelope = new OutgoingMessageEnvelope(coordinatorSystemStream, 0, key, value); | ||
if (value != null) { | ||
bootstrappedMessages.put(key, value); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need this if all the read path get
and all
methods bootstrap for messages prior to reading?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To maintain full set of messages from coordinator stream. Bootstrap doesn't read everything from the beginning(only the delta from the last read).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree about the delta part. Are there possibilities of get
or all
being invoked concurrently during a put
? If no, then whatever new message that gets written (appended) to the coordinator stream should be read by subsequent get
or all
as part of bootstrapMessagesFromStream()
, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1. Also, if this is for concurrent get/put, TreeMap is not thread safe.
Might want to call store.flush before the bootstrapMessagesFromStream to make sure producer flushes previous writes.
samza-api/src/main/java/org/apache/samza/metadatastore/MetadataStore.java
Show resolved
Hide resolved
samza-core/src/main/java/org/apache/samza/container/LocalityManager.java
Show resolved
Hide resolved
private static final Logger LOG = LoggerFactory.getLogger(LocalityManager.class); | ||
|
||
private final CoordinatorStreamManager coordinatorStreamManager; | ||
private final Config config; | ||
private Map<String, Map<String, String>> containerToHostMapping = new HashMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// Map from containerID -> Map of per-container properties, e.g., {@link SetContainerHostMapping.HOST_KEY}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good overall.
Some minor refactors possible.
|
||
/** | ||
* Store abstraction responsible for managing the metadata of a Samza job and is agnostic of the | ||
* deployment model (yarn/standalone) of the Samza job. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"and is agnostic ..." is unnecessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed.
|
||
/** | ||
* Initializes the metadata store, if applicable, setting up the underlying resources | ||
* and connections to the store endpoints. Upon successful completion of this method, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can delete second sentence: "Upon successful completion ..."
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed.
* and connections to the store endpoints. Upon successful completion of this method, | ||
* metadata store is considered available to accept the client operations. | ||
* | ||
* @param containerContext represents the Samza container context. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/represents//
byte[] get(byte[] key); | ||
|
||
/** | ||
* Updates the mapping of the specified key-value pair; Associates the specified {@code key} with the specified {@code value}. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Both sentences are saying the same thing, can delete the first.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed.
samza-api/src/main/java/org/apache/samza/metadatastore/MetadataStore.java
Show resolved
Hide resolved
|
||
/** | ||
* Read all messages from the earliest offset, all the way to the latest. | ||
* Currently, this method only pays attention to config messages. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this "only config messages" still correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed, removed the comment.
import org.apache.samza.serializers.Serde; | ||
|
||
/** | ||
* Serializer for values written into the coordinator stream(kafka topic). CoordinatorStreamMessage combines both key |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/kafka topic//
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
/** | ||
* Serializer for values written into the coordinator stream(kafka topic). CoordinatorStreamMessage combines both key | ||
* and value serializer for coordinator stream messages. Since value is relevant to this serializer, coordinator stream | ||
* key is nuked for different message types. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"nuked for"? Also can you clarify what is this trying to say? Not sure I understand.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nuked --> set to null/empty.
Reworded it.
@@ -186,4 +189,6 @@ class JobConfig(config: Config) extends ScalaMapConfig(config) with Logging { | |||
def getNonLoggedStorePath = getOption(JobConfig.JOB_NON_LOGGED_STORE_BASE_DIR) | |||
|
|||
def getLoggedStorePath = getOption(JobConfig.JOB_LOGGED_STORE_BASE_DIR) | |||
|
|||
def getMetadataStoreFactory = getOption(JobConfig.METADATA_STORE_FACTORY).getOrElse(classOf[CoordinatorStreamStoreFactory].getCanonicalName) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should not be defaulted here. Let's do it at the call site where CoordinatorStreamStoreFactory is used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently this is called from multiple places and defaulting from caller creates unnecessary duplication.
Even in some of the other getClass methods in this config class (getSSPGrouper, getTaskNameGrouper), the defaulting is done here. I'd prefer to keep this here for consistency.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
-
CoordinatorStreamStore is not necessarily the long term default implementation, and it's not even the only current implementation. Wouldn't you need to call this with ZkStore for all standalone call sites anyway?
-
Use getName() for class names that are meant to be classloaded.
@Override | ||
public byte[] toBytes(String value) { | ||
if (type.equalsIgnoreCase(SetContainerHostMapping.TYPE)) { | ||
SetContainerHostMapping hostMapping = new SetContainerHostMapping(SOURCE, "", value, config.get("jmx.tunneling.url"), config.get("jmx.url")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the jmx url stuff still relevant? Maybe remove or update class javadoc to clarify that this will not be set anymore?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not relevant anymore. Removed it and updated java doc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's still setting them. Intentional?
If you remove this, you can remove the config from the serde too.
a492b46
to
1f32e24
Compare
samza-api/src/main/java/org/apache/samza/metadatastore/MetadataStore.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly LGTM, minor feedback.
* Closes the metadata store, if applicable, relinquishing all the underlying resources | ||
* and connections. | ||
*/ | ||
void close(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor: Move flush above close.
private final SystemProducer systemProducer; | ||
private final SystemConsumer systemConsumer; | ||
private final SystemAdmin systemAdmin; | ||
private final Object bootstrapLock = new Object(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move next to bootstrappedMessages to clarify what this is guarding.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
systemProducer.send(SOURCE, envelope); | ||
flush(); | ||
} catch (Exception e) { | ||
throw new SamzaException(e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't need to wrap exceptions in SamzaException unless we have special handling for them at the call site. Here and everywhere else.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamStore.java
Show resolved
Hide resolved
@Override | ||
public byte[] toBytes(String value) { | ||
if (type.equalsIgnoreCase(SetContainerHostMapping.TYPE)) { | ||
SetContainerHostMapping hostMapping = new SetContainerHostMapping(SOURCE, "", value, config.get("jmx.tunneling.url"), config.get("jmx.url")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's still setting them. Intentional?
If you remove this, you can remove the config from the serde too.
@@ -186,4 +189,6 @@ class JobConfig(config: Config) extends ScalaMapConfig(config) with Logging { | |||
def getNonLoggedStorePath = getOption(JobConfig.JOB_NON_LOGGED_STORE_BASE_DIR) | |||
|
|||
def getLoggedStorePath = getOption(JobConfig.JOB_LOGGED_STORE_BASE_DIR) | |||
|
|||
def getMetadataStoreFactory = getOption(JobConfig.METADATA_STORE_FACTORY).getOrElse(classOf[CoordinatorStreamStoreFactory].getCanonicalName) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
-
CoordinatorStreamStore is not necessarily the long term default implementation, and it's not even the only current implementation. Wouldn't you need to call this with ZkStore for all standalone call sites anyway?
-
Use getName() for class names that are meant to be classloaded.
* Builds the {@link CoordinatorStreamStore} based upon the provided {@link Config} | ||
* and {@link MetricsRegistry}. | ||
*/ | ||
public class CoordinatorStreamStoreFactory implements MetadataStoreFactory { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would prefer CoordinatorStreamMetadataStoreFactory etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
public LocalityManager(CoordinatorStreamManager coordinatorStreamManager) { | ||
this.coordinatorStreamManager = coordinatorStreamManager; | ||
this.taskAssignmentManager = new TaskAssignmentManager(coordinatorStreamManager); | ||
public LocalityManager(Config config, MetricsRegistry metricsRegistry, Serde<String> keySerde, Serde<String> valueSerde) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this constructor used anywhere (other than tests)? If not, maybe make protected/package private.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
As a part of SEP-11, this patch adds MetadataStore interface to store task and container locality in both yarn and standalone deployment models. Please refer to SEP-11 for more details.
Few important points to note:
Testing: