-
Notifications
You must be signed in to change notification settings - Fork 334
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SAMZA-1438: SystemProducer, Consumer and Admin interfaces for EventHubs #308
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for pulling this major refactor. This has tremendous leverage for Samza customers!
private final String sasKey; | ||
private final String consumerName; | ||
private final EventHubConfig config; | ||
private final Map<Integer, PartitionReceiver> receivers = new TreeMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently, the EventHubEntityConnection
class has some stateful/mutating operations -
including, some synchronized methods:
+ synchronized void addPartition(int partitionId, String offset, PartitionReceiveHandler handler) {}
+ synchronized void connectAndStart() {}
+ synchronized void stop() {}
Can we consider separating these and making the class immutable:
+ private final String namespace;
+ private final String entityPath;
+ private final String sasKeyName;
+ private final String sasKey;
+ private final String consumerName;
+ private final EventHubConfig config;
Also, I'd remove these state-ful fields from this class:
+ private final Map<Integer, PartitionReceiver> receivers = new TreeMap<>();
+ private EventHubClientWrapper ehClientWrapper;
+ private final EventHubClientFactory eventHubClientFactory = new EventHubClientFactory();
Do you think we can move this map of partition -> PartitionReceiver instances into the Consumer itself since all of these are related to receiving events? Another nice thing is that you can get rid off the EventHubEntitityConnectionFactory
completely for wiring these stateful components
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Refactored in order to remove EventHubConnectionEnitity
1c1677f
|
||
@Override | ||
public SystemProducer getProducer(String systemName, Config config, MetricsRegistry registry) { | ||
return new EventHubSystemProducer(systemName, new EventHubConfig(config, systemName), registry); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Can we inject the
EventHubWrapperFactory
into theEventHubSystemProducer
,EventHubSystemConsumer
andEventHubSystemAdmin
instead of class-loading them via config? It'd be much cleaner with types that way. Use-cases at LinkedIn that require tunneling can inject their tunnel-aware versions of the EventHubClientWrapper.
if (events != null) { | ||
|
||
events.forEach(event -> { | ||
byte[] decryptedBody = event.getBytes(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove references to encryption and de-cryption everywhere? since they seem orthogonal to the record payloadss
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed the Serde configs and added the Stream to Serde mapping to EventHubSystemFactory
. (Passthrough Serde by default) e740088
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did another pass. will review test code soon! let's try wrapping this up by end of week.
* @return the body of decrypted body of the message. In case no encryption is setup for this topic, | ||
* just returns the body of the message. | ||
*/ | ||
public byte[] getDecryptedBody() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/getDecryptedBody/getBody
|
||
import com.microsoft.azure.eventhubs.EventHubClient; | ||
|
||
public interface EventHubClientWrapper { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add java docs:
Wraps the {@link EventHubClient} with lifecycle hooks for initialization and close
public static final String DEFAULT_CONFIG_STREAM_CONSUMER_START_POSITION = StartPosition.LATEST.name(); | ||
|
||
public static final String CONFIG_PRODUCER_PARTITION_METHOD = "systems.%s.eventhubs.partition.method"; | ||
public static final String DEFAULT_CONFIG_PRODUCER_PARTITION_METHOD = EventHubSystemProducer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can't we internally translate calls to send()
into one of these partition schemes? For instance, if a specific partitionId is provided, we write it to the partition. If not, we use the eventhub's hash partitioner?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the partitionId
is obtained by the partitionKey on the outgoingEnvelope. This id could be used as the partition on eventhub or the seed for the consistent hashing on eventhub.
It could be ambiguous if the user wants to use event hub hash by providing the hash via the partitionId
if the partition method is not specified.
public static final String CONFIG_SEND_KEY_IN_EVENT_PROPERTIES = "systems.%s.eventhubs.send.key"; | ||
public static final Boolean DEFAULT_CONFIG_SEND_KEY_IN_EVENT_PROPERTIES = false; | ||
|
||
public static final String CONFIG_CONNECTION_SHUTDOWN_TIMEOUT_MILLIS = "systems.%s.eventhubs.shutdown.timeout"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would be good to confirm if all these configs need to be exposed as top-level configs? If we can simply propagate eventhub configs down to the client, it'll save us from maintaining these configs
|
||
private final String systemName; | ||
|
||
public EventHubConfig(Map<String, String> config, String systemName) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason for EventHubConfig
to take in a systemName? Seems fairly system agnostic.
TimeUnit.MILLISECONDS); | ||
streamPartitions.put(streamName, ehInfo.getPartitionIds()); | ||
} catch (InterruptedException | ExecutionException e) { | ||
String msg = String.format("Error while fetching EventHubRuntimeInfo for System:%s, Stream:%s", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cant we use the same path for all exceptions? We appear to be handling exceptions the same way in either of these catch
code paths.
throw new SamzaException(msg); | ||
} | ||
} | ||
requestedMetadata.put(streamName, new SystemStreamMetadata(streamName, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where is close
called? If not, we should close the client in finally
block to avoid memory leaks?
public static final String READ_LATENCY = "readLatency"; | ||
public static final String READ_ERRORS = "readErrors"; | ||
|
||
private static Counter aggEventReadRate = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can these counters be per-instance variables? or do they have to be shared? another nice thing is that you can get rid off this aggregate
lock.
cc: @lhaiesp to confirm
private static SamzaHistogram aggReadLatency = null; | ||
private static Counter aggReadErrors = null; | ||
|
||
private Map<String, Counter> eventReadRates; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
consider making these maps final
readErrors = streamList.stream() | ||
.collect(Collectors.toMap(Function.identity(), x -> registry.newCounter(x, READ_ERRORS))); | ||
|
||
// Locking to ensure that these aggregated metrics will be created only once across multiple system consumers. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we really need an aggregate view of the metric?
cc: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This PR has somewhat different formatting (esp. continuation indent and param alignment) than Samza conventions. Would prefer making it consistent for readability.
distributionBase=GRADLE_USER_HOME | ||
distributionPath=wrapper/dists | ||
zipStoreBase=GRADLE_USER_HOME | ||
zipStorePath=wrapper/dists | ||
distributionUrl=https\://services.gradle.org/distributions/gradle-2.8-bin.zip | ||
distributionUrl=https\://services.gradle.org/distributions/gradle-2.8-all.zip |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this required?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nope, not sure why it's there
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This actually helps you get all the gradle dependencies. So you that you can browse through the gradle plugins from your IDE.
build.gradle
Outdated
} | ||
checkstyle { | ||
configFile = new File(rootDir, "checkstyle/checkstyle.xml") | ||
toolVersion = "$checkstyleVersion" | ||
} | ||
test { | ||
exclude 'org/apache/samza/system/eventhub/producer/**' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this required?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Excluding it since it is an integration test that requires access to an external EventHub instance
compile project(':samza-api') | ||
compile project(":samza-core_$scalaVersion") | ||
compile "org.slf4j:slf4j-api:$slf4jVersion" | ||
testCompile "junit:junit:$junitVersion" | ||
testCompile "org.mockito:mockito-all:$mockitoVersion" | ||
testCompile "org.powermock:powermock-api-mockito:$powerMockVersion" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Prefer not using powermock unless we really need it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately, I need powermock in order to mock the EventHubClient (private ctor) and PartitionReceiver(final class) for unit tests.
@@ -1,6 +1,6 @@ | |||
#Mon Oct 31 23:13:44 PDT 2016 | |||
#Thu Sep 07 09:55:54 PDT 2017 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can remove this, not sure why this is here.
/** | ||
* Simpler wrapper of {@link EventData} events with the decrypted payload | ||
*/ | ||
public class EventDataWrapper { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this wrapper class necessary? Is this just for holding the decrypted body? If so, maybe DecryptedEventData extends EventData
or just use an IME?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
EventData
cannot be extended due to private ctor. This is used as a workaround to pass to both the metadata from EventData
and the decrypted body as a message to IME.
ehClient.getEventHubClient()); | ||
|
||
Instant endTime = Instant.now(); | ||
long latencyMs = Duration.between(startTime, endTime).toMillis(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe just use currentTimeMillis/nanoTime and subtraction for simplicity?
if (partitioningMethod == PartitioningMethod.EVENT_HUB_HASHING) { | ||
return eventHubClient.send(eventData, convertPartitionKeyToString(partitionKey)); | ||
} else if (partitioningMethod == PartitioningMethod.PARTITION_KEY_AS_PARTITION) { | ||
if (!(partitionKey instanceof Integer)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Null check too (or is it guaranteed to be not-null?).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
null check is not necessary for instanceof
} | ||
|
||
PartitionSender sender = getPartitionSender(streamName, (int) partitionKey, eventHubClient); | ||
return sender.send(eventData); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a built in back-pressure mechanism (producer buffer size configuration etc.) somewhere? Would we run OOM if the producer can't keep up?
} | ||
} | ||
|
||
private Object getEnvelopePartitionId(OutgoingMessageEnvelope envelope) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unnecessary, can inline.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Internal implementation will override this, will make protected
} | ||
} | ||
|
||
private PartitionSender getPartitionSender(String streamName, int partition, EventHubClient eventHubClient) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
getOrCreate
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is the creation happening during send? Should it happen once during start instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The partition key is contained in the OME. This is required to determine the partition of the partitionSender created. Will create for all stream partition downstream on create.
sendErrors.get(destination).inc(); | ||
aggSendErrors.inc(); | ||
LOG.error("Send message to event hub: {} failed with exception: ", destination, throwable); | ||
sendExceptionOnCallback = throwable; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this handler isn't called on the send thread (I assume it's called on the thread that completed the future?), this exception may still not be visible to the sender threads even if the send method is synchronized:
https://www.cs.umd.edu/~pugh/java/memoryModel/jsr-133-faq.html#synchronization
"Synchronization ensures that memory writes by a thread before or during a synchronized block are made visible in a predictable manner to other threads which synchronize on the same monitor."
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch, I will to sync on the sendExceptionOnCallback
throwable instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the patch.
Just few questions to understand the patch(and minor suggestions).
import java.util.concurrent.TimeUnit; | ||
import java.util.concurrent.TimeoutException; | ||
|
||
public class EventHubSystemAdmin implements SystemAdmin { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vjagadish1989
Just curious.
Do we need this to implement ExtendedSystemAdmin here. It's usages revolves around in TaskStorageManager and StremaMetadataCache. Is it not required for EventHub scenario?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like there will be no problem in implementing the extended version, but there might be something I'm missing. Maybe we can discuss this offline.
/** | ||
* Wraps the {@link EventHubClient} with lifestyle hooks for initialization and close. | ||
*/ | ||
public interface SamzaEventHubClient { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just wondering if it's logical to rename this to EventHubManager
or EventHubClientManager
.
SamzaEventHubClient.getEventHubClient
looks odd.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree it looks a bit odd, EventHubClientManager
works!
.collect(Collectors.toMap(Function.identity(), x -> registry.newCounter(x, READ_ERRORS))); | ||
|
||
// Locking to ensure that these aggregated metrics will be created only once across multiple system consumers. | ||
synchronized (AGGREGATE) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not recommended to lock on Strings. If the samza user class uses some synchronization based upon same string literals there could be potential cases of deadlock.
It's well documented here: https://docs.oracle.com/javase/specs/jls/se7/html/jls-3.html#jls-3.10.5
Literal strings within different classes in different packages represent references to the same String object.
It would be safe to create a static object and lock here based upon that (and use Double checked locking or make the eventReadRate as volatile). Or just remove this lock in itself and make metrics per instance.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice catch, will use Object
instead
private final ConcurrentHashMap<String, SamzaEventHubClient> streamEventHubClients = new ConcurrentHashMap<>(); | ||
private final ConcurrentHashMap<SystemStreamPartition, String> streamPartitionOffsets = new ConcurrentHashMap<>(); | ||
private final Map<String, Serde<byte[]>> serdes; | ||
private boolean isStarted = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this have to be volatile?
* Creates a {@link Histogram} metric using {@link ExponentiallyDecayingReservoir} | ||
* Keeps a {@link Gauge} for each percentile | ||
*/ | ||
public class SamzaHistogram { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please correct me if i'm wrong here.
All samza metric classes(Timer, Counter etc) is a subtype of Metric. Doesn't this have to follow the same class hierarchy. I think some Metricreporters would not be abel to report this metric otherwise.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As mentioned above, I was planning to have another PR for the metrics histogram update, thanks for the comment!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@shanthoosh You are right. this PR is intentionally kept separate from that.
if (aggEventReadRate == null) { | ||
aggEventReadRate = registry.newCounter(AGGREGATE, EVENT_READ_RATE); | ||
aggEventByteReadRate = registry.newCounter(AGGREGATE, EVENT_BYTE_READ_RATE); | ||
aggReadLatency = new SamzaHistogram(registry, AGGREGATE, READ_LATENCY); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please correct me if i'm wrong here.
Instead of using new Samzahistogram
here, don't we have to do something like registry.newHistogram(...)
.
I think we need to introduce newHistogram
method in MetricsRegistry
interface for using this new metric and reporters to function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was planning to add the histogram as is for now and have a new PR later on to properly add the histogram metric.
} | ||
|
||
// Locking to ensure that these aggregated metrics will be created only once across multiple system consumers. | ||
synchronized (AGGREGATE) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If i understand right, the goal is to achieve one time aggregate metrics initialization across all instances of EventhubProducer. synchronized in method name just guarantees lock per EventhubProducer instance.
} | ||
checkstyle { | ||
configFile = new File(rootDir, "checkstyle/checkstyle.xml") | ||
toolVersion = "$checkstyleVersion" | ||
} | ||
test { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just curious. Any reason to exclude these tests from the build?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They are integration tests that require a connection to an external EventHub. Is there a Samza-specific protocol for integration test? Will add a comment here though
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@shanthoosh these tests connect to eventhub, and we'll need to expose credentials to do that. So, we've intentionally disabled them
config.getStartPosition(systemName, systemStreamPartition.getStream())); | ||
} | ||
} | ||
streamPartitionOffsets.put(systemStreamPartition, offset); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please correct me if i'm wrong here.
I think this has to use offsetComparator and pick the lowest offset.
If we've two tasks with checkpointed offsets T1(SP1: START_OF_STREAM), T2(SP1: END_OF_STREAM), register should choose the lowest one and store in streamPartitionOffsets for a partition(This register call will be called once for each TaskInstance). If we just overwrite, we could pick End (depending upon register call order).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch, I was relying on the register
being only called once per SSP
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks great. approved.
thank you for the contribution.
import com.microsoft.azure.eventhubs.EventHubClient; | ||
|
||
/** | ||
* Wraps the {@link EventHubClient} with lifestyle hooks for initialization and close. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/lifestyle/lifecycle
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
} | ||
checkstyle { | ||
configFile = new File(rootDir, "checkstyle/checkstyle.xml") | ||
toolVersion = "$checkstyleVersion" | ||
} | ||
test { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@shanthoosh these tests connect to eventhub, and we'll need to expose credentials to do that. So, we've intentionally disabled them
*/ | ||
public interface EventHubClientManager { | ||
/** | ||
* Initiate the connection to EventHub. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reword docs
Lifecycle hook to perform initializations before the {@link EventHubClient} is created.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this also creates the EventHubClient
, how about
Lifecycle hook to perform initializations for the creation of the underlying {@link EventHubClient}.
*/ | ||
void init(); | ||
|
||
/** |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
EventHubClient getEventHubClient(); | ||
|
||
/** | ||
* Timed synchronous connection close to the EventHub. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return super.poll(systemStreamPartitions, timeout); | ||
} | ||
|
||
private void renewPartitionReceiver(SystemStreamPartition ssp, Throwable throwable) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can log the exception at the caller, and not take the throwable here.
// Timeout for EventHubClient receive | ||
receiver.setReceiveTimeout(DEFAULT_EVENTHUB_RECEIVER_TIMEOUT); | ||
receiver.setReceiveHandler(streamPartitionHandlers.get(ssp)); | ||
streamPartitionReceivers.put(ssp, receiver); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should the receiver be started? Or is it done automatically>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is started by setReceiveHandler
, will add comment
public void onReceive(Iterable<EventData> events) { | ||
if (events != null) { | ||
events.forEach(event -> { | ||
byte[] decryptedBody = event.getBytes(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove all references to encryption/decryption. Should define a message interceptor instead.
public interface Interceptor {
byte[] intercept(byte[] input)
}
We will define interceptors for encryption and decryption at LinkedIn optionally. We'll have a pass-through interceptor for the open source. Another nice thing is that, Serde continues to happen inside Samza.
|
||
@Override | ||
public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> poll(Set<SystemStreamPartition> systemStreamPartitions, long timeout) throws InterruptedException { | ||
if (perminentEventHubError != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/perminent/permenant
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you mean permanent? :)
* Creates a {@link Histogram} metric using {@link ExponentiallyDecayingReservoir} | ||
* Keeps a {@link Gauge} for each percentile | ||
*/ | ||
public class SamzaHistogram { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@shanthoosh You are right. this PR is intentionally kept separate from that.
7bc10d2
to
ea1bafe
Compare
No description provided.