Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SAMZA-1856: eventhub descriptors #696

Closed
wants to merge 7 commits into from

Conversation

dxichen
Copy link
Member

@dxichen dxichen commented Oct 6, 2018

@prateekm @lhaiesp
Add system descriptors and input/output descriptors for Eventhubs.
We should make an effort to deprecate the systems.%s.stream.list.

* @param namespace namespace of the Eventhub entity to consume from
* @return this input descriptor
*/
public EventHubInputDescriptor<StreamMessageType> withNamespace(String namespace) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does "Required to access the Eventhubs entity per stream" mean? Is this a required configuration to consume from an Event Hubs stream? If so, would be better to make this a param in EventHubsSystemDescriptor#getInputStream(streamId, ...).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Namespace and Entity for Event Hubs are similar to topics in Kafka. They are required since they indicate the data destination for the output and source for input. I will add those fields in both input and output descriptors.

}

/**
* Namespace associated with the input stream. Required to access the input Eventhubs entity per stream.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like the official name is "Event Hubs"? s/Eventhubs/Event Hubs/g in javadocs and configuration reference.

* @return this input descriptor
*/
public EventHubInputDescriptor<StreamMessageType> withEntityPath(String path) {
this.entityPath = Optional.of(path);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optional.ofNullable(StringUtils.stripToNull(str)) for string params everywhere.

import org.apache.samza.serializers.Serde;

/**
* A descriptor for an EventHubs input stream
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/input/output.

* @param sasKeyName the name of the SAS key required to access the Eventhub entity
* @return this output descriptor
*/
public EventHubOutputDescriptor<StreamMessageType> withSasKeyName(String sasKeyName) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't know what SAS key/token is. If this is a kind of auth token, is it possible to configure it per system instead of per stream?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This token could be configured at both a stream level and a system level. Currently we do not support the system level configuration. I have created a ticket (SAMZA-1939) for that config and will update the system descriptors when that is added.

* @return this system descriptor
*/
public EventHubSystemDescriptor withPartitioningMethod(PartitioningMethod partitioningMethod) {
this.producerEventhubsPartitioningMethod = Optional.of(partitioningMethod);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ofNullable or throw exception if null

}

/**
* Sending each message key to the eventhub in the properties of the AMQP message.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't understand the first sentence, seems like an incomplete phrase. Can you clarify, both here and in config reference?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Event Hubs uses eventData, which is a AMQP map under to hood. The kv-pair format from Samza is therefore not kept. Users may specify that they want to add the key in the map in addition to the value. I will clarify

* @param sendKeys Set to true if the message key should be send in the EventData properties, false otherwise
* @return this system descriptor
*/
public EventHubSystemDescriptor withSendKeys(Boolean sendKeys) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use boolean so that this can't be null.

* ie. if there are 6 partitions and the key is 9, the message will end up in partition 3.
* Similarly to EVENT_HUB_HASHING, if the partition key is not set the message key is used instead.</li>
* </ul>
* @param partitioningMethod
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add documentation for @param and @returns everywhere.

* Sending each message key to the eventhub in the properties of the AMQP message.
* If the Samza Eventhub consumer is used, this field is used as the message key if the partition key is not present.
*
* @param sendKeys Set to true if the message key should be send in the EventData properties, false otherwise
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nitpick: "set to true" (lower case s)

@lhaiesp
Copy link
Contributor

lhaiesp commented Oct 8, 2018

Thanks for the work. Agree that we should deprecate the stream.list config. But once we make that change, we need to make sure Brooklin connector adjust to that change as well. Also Brooklin EventHubs connector should use these descriptors.

Copy link
Contributor

@prateekm prateekm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for cleaning this up. A few more suggestions.

import org.apache.samza.serializers.Serde;


/**
* A descriptor for the EventHubs output stream
* A descriptor for the Event Hubss output stream
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: Remove extra s in Event Hubss

public EventHubInputDescriptor(String streamId, Serde serde, SystemDescriptor systemDescriptor,
InputTransformer transformer) {
super(streamId, serde, systemDescriptor, transformer);
EventHubInputDescriptor(String streamId, Serde serde, SystemDescriptor systemDescriptor, String namespace,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: move these two param after streamId and before serde. Same for docs.

@@ -48,52 +50,39 @@
* @param streamId id of the stream
* @param serde serde for messages in the stream
* @param systemDescriptor system descriptor this stream descriptor was obtained from
* @param namespace namespace for the Event Hubs entity to produce to, not null
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment as above, reorder params for logical grouping.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is resolved, not sure why it still doesn't show the diff in the preview.

public <StreamMessageType> EventHubInputDescriptor<StreamMessageType> getInputDescriptor(String streamId,
Serde<StreamMessageType> serde) {
return new EventHubInputDescriptor<>(streamId, serde, this, null);
Serde<StreamMessageType> serde, String namespace, String entityPath) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reorder params. Since this is not the regular overridden method anymore, also add javadoc for this method including its params. Same below.

@dxichen dxichen force-pushed the eventhub-system-descriptors branch from 887be94 to 4275487 Compare October 9, 2018 00:52
Copy link
Contributor

@prateekm prateekm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Final comments.

* <p>
* System properties configured using a descriptor override corresponding properties provided in configuration.
*/
public class EventHubSystemDescriptor extends SystemDescriptor<EventHubSystemDescriptor> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: Use EventHubsSystemDescriptor (use plural hubs) etc. in all these class names.

super(systemName, FACTORY_CLASS_NAME, null, null);
}

public <StreamMessageType> EventHubInputDescriptor<StreamMessageType> getInputDescriptor(String streamId, String namespace,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to add javadoc for public methods, since they don't override the base class methods anymore.

@dxichen dxichen force-pushed the eventhub-system-descriptors branch from c3e7e98 to ac244e8 Compare October 9, 2018 22:36
Copy link
Contributor

@prateekm prateekm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good, thanks!

@asfgit asfgit closed this in da88096 Oct 11, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
3 participants