Skip to content

Commit

Permalink
SAMZA-2192: Add StartpointVisitor implementation for EventHub. (#1030)
Browse files Browse the repository at this point in the history
* Adding initial implementation for EventHubSystemAdmin.

* Code clean up.
  • Loading branch information
shanthoosh committed May 16, 2019
1 parent c4afc2e commit 2a589fc
Show file tree
Hide file tree
Showing 2 changed files with 181 additions and 6 deletions.
Expand Up @@ -19,14 +19,28 @@

package org.apache.samza.system.eventhub.admin;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.microsoft.azure.eventhubs.EventData;
import com.microsoft.azure.eventhubs.EventHubClient;
import com.microsoft.azure.eventhubs.EventHubException;
import com.microsoft.azure.eventhubs.EventHubRuntimeInformation;
import com.microsoft.azure.eventhubs.EventPosition;
import com.microsoft.azure.eventhubs.PartitionReceiver;
import com.microsoft.azure.eventhubs.PartitionRuntimeInformation;

import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import org.apache.samza.Partition;
import org.apache.samza.SamzaException;
import org.apache.samza.startpoint.Startpoint;
import org.apache.samza.startpoint.StartpointOldest;
import org.apache.samza.startpoint.StartpointSpecific;
import org.apache.samza.startpoint.StartpointTimestamp;
import org.apache.samza.startpoint.StartpointUpcoming;
import org.apache.samza.startpoint.StartpointVisitor;
import org.apache.samza.system.SystemAdmin;
import org.apache.samza.system.SystemStreamMetadata;
import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata;
Expand Down Expand Up @@ -57,12 +71,27 @@ public class EventHubSystemAdmin implements SystemAdmin {
private final EventHubConfig eventHubConfig;
private final Map<String, EventHubClientManager> eventHubClients = new HashMap<>();
private final Map<String, String[]> streamPartitions = new HashMap<>();
private final EventHubSamzaOffsetResolver eventHubSamzaOffsetResolver;

public EventHubSystemAdmin(String systemName, EventHubConfig eventHubConfig,
EventHubClientManagerFactory eventHubClientManagerFactory) {
this.systemName = systemName;
this.eventHubConfig = eventHubConfig;
this.eventHubClientManagerFactory = eventHubClientManagerFactory;
this.eventHubSamzaOffsetResolver = new EventHubSamzaOffsetResolver(this, eventHubConfig);
}

@Override
public void stop() {
for (Map.Entry<String, EventHubClientManager> entry : eventHubClients.entrySet()) {
EventHubClientManager eventHubClientManager = entry.getValue();
try {
eventHubClientManager.close(DEFAULT_SHUTDOWN_TIMEOUT_MILLIS);
} catch (Exception e) {
LOG.warn(String.format("Exception occurred when closing EventHubClient of stream: %s.", entry.getKey()), e);
}
}
eventHubClients.clear();
}

@Override
Expand Down Expand Up @@ -121,20 +150,20 @@ public Map<String, SystemStreamMetadata> getSystemStreamMetadata(Set<String> str
SystemStreamMetadata systemStreamMetadata = new SystemStreamMetadata(streamName, sspMetadataMap);
requestedMetadata.put(streamName, systemStreamMetadata);
}
return requestedMetadata;
} catch (Exception e) {
String msg = String.format("Error while fetching EventHubRuntimeInfo for System:%s", systemName);
LOG.error(msg, e);
throw new SamzaException(msg, e);
} finally {
// Closing clients
eventHubClients.forEach((streamName, client) -> client.close(DEFAULT_SHUTDOWN_TIMEOUT_MILLIS));
eventHubClients.clear();
}
}

return requestedMetadata;
@Override
public String resolveStartpointToOffset(SystemStreamPartition systemStreamPartition, Startpoint startpoint) {
return startpoint.apply(systemStreamPartition, eventHubSamzaOffsetResolver);
}

private EventHubClientManager getOrCreateStreamEventHubClient(String streamName) {
EventHubClientManager getOrCreateStreamEventHubClient(String streamName) {
if (!eventHubClients.containsKey(streamName)) {
LOG.info(String.format("Creating EventHubClient for Stream=%s", streamName));

Expand Down Expand Up @@ -205,4 +234,75 @@ public static Integer compareOffsets(String offset1, String offset2) {
public Integer offsetComparator(String offset1, String offset2) {
return compareOffsets(offset1, offset2);
}

/**
* Offers a eventhub specific implementation of {@link StartpointVisitor} that resolves
* different types of {@link Startpoint} to samza offset.
*/
@VisibleForTesting
static class EventHubSamzaOffsetResolver implements StartpointVisitor<SystemStreamPartition, String> {

private final EventHubSystemAdmin eventHubSystemAdmin;
private final EventHubConfig eventHubConfig;

EventHubSamzaOffsetResolver(EventHubSystemAdmin eventHubSystemAdmin, EventHubConfig eventHubConfig) {
this.eventHubSystemAdmin = eventHubSystemAdmin;
this.eventHubConfig = eventHubConfig;
}

@Override
public String visit(SystemStreamPartition systemStreamPartition, StartpointSpecific startpointSpecific) {
return startpointSpecific.getSpecificOffset();
}

@Override
public String visit(SystemStreamPartition systemStreamPartition, StartpointTimestamp startpointTimestamp) {
String streamName = systemStreamPartition.getStream();
EventHubClientManager eventHubClientManager = eventHubSystemAdmin.getOrCreateStreamEventHubClient(streamName);
EventHubClient eventHubClient = eventHubClientManager.getEventHubClient();

PartitionReceiver partitionReceiver = null;
try {
// 1. Initialize the arguments required for creating the partition receiver.
String partitionId = String.valueOf(systemStreamPartition.getPartition().getPartitionId());
Instant epochInMillisInstant = Instant.ofEpochMilli(startpointTimestamp.getTimestampOffset());
EventPosition eventPosition = EventPosition.fromEnqueuedTime(epochInMillisInstant);
String consumerGroup = eventHubConfig.getStreamConsumerGroup(systemStreamPartition.getSystem(), streamName);

// 2. Create a partition receiver with event position defined by the timestamp.
partitionReceiver = eventHubClient.createReceiverSync(consumerGroup, partitionId, eventPosition);

// 3. Read a single message from the partition receiver.
Iterable<EventData> eventHubMessagesIterator = partitionReceiver.receiveSync(1);
ArrayList<EventData> eventHubMessageList = Lists.newArrayList(eventHubMessagesIterator);

// 4. Validate that a single message was fetched from the broker.
Preconditions.checkState(eventHubMessageList.size() == 1, "Failed to read messages from EventHub system.");

// 5. Return the offset present in the metadata of the first message.
return eventHubMessageList.get(0).getSystemProperties().getOffset();
} catch (EventHubException e) {
LOG.error(String.format("Exception occurred when fetching offset for timestamp: %d from the stream: %s", startpointTimestamp.getTimestampOffset(), streamName), e);
throw new SamzaException(e);
} finally {
if (partitionReceiver != null) {
try {
partitionReceiver.closeSync();
} catch (EventHubException e) {
LOG.error(String.format("Exception occurred when closing partition-receiver of the stream: %s", streamName), e);
}
}
}
}

@Override
public String visit(SystemStreamPartition systemStreamPartition, StartpointOldest startpointOldest) {
return EventHubSystemConsumer.START_OF_STREAM;
}

@Override
public String visit(SystemStreamPartition systemStreamPartition, StartpointUpcoming startpointUpcoming) {
return EventHubSystemConsumer.END_OF_STREAM;
}
}
}
Expand Up @@ -19,11 +19,24 @@

package org.apache.samza.system.eventhub.admin;

import com.microsoft.azure.eventhubs.EventData;
import com.microsoft.azure.eventhubs.EventHubClient;
import com.microsoft.azure.eventhubs.EventHubException;
import com.microsoft.azure.eventhubs.PartitionReceiver;
import java.util.Arrays;
import org.apache.samza.Partition;
import org.apache.samza.startpoint.StartpointOldest;
import org.apache.samza.startpoint.StartpointSpecific;
import org.apache.samza.startpoint.StartpointTimestamp;
import org.apache.samza.startpoint.StartpointUpcoming;
import org.apache.samza.system.SystemAdmin;
import org.apache.samza.system.SystemStreamMetadata;
import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.system.eventhub.EventHubClientManager;
import org.apache.samza.system.eventhub.EventHubConfig;
import org.apache.samza.system.eventhub.EventHubSystemFactory;
import org.apache.samza.system.eventhub.MockEventHubConfigFactory;
import org.apache.samza.system.eventhub.admin.EventHubSystemAdmin.EventHubSamzaOffsetResolver;
import org.apache.samza.system.eventhub.consumer.EventHubSystemConsumer;
import org.apache.samza.system.eventhub.producer.EventHubSystemProducer;
import org.junit.Assert;
Expand All @@ -33,6 +46,7 @@
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.mockito.Mockito;

import static org.apache.samza.system.eventhub.MockEventHubConfigFactory.*;

Expand Down Expand Up @@ -80,4 +94,65 @@ public void testGetStreamMetadata() {
}
}

@Test
public void testStartpointResolverShouldResolveTheStartpointOldestToCorrectOffset() {
EventHubSystemAdmin mockEventHubSystemAdmin = Mockito.mock(EventHubSystemAdmin.class);
EventHubConfig eventHubConfig = Mockito.mock(EventHubConfig.class);
SystemStreamPartition systemStreamPartition = new SystemStreamPartition("test-system", "test-stream", new Partition(0));

EventHubSamzaOffsetResolver resolver = new EventHubSamzaOffsetResolver(mockEventHubSystemAdmin, eventHubConfig);

Assert.assertEquals(EventHubSystemConsumer.START_OF_STREAM, resolver.visit(systemStreamPartition, new StartpointOldest()));
}

@Test
public void testStartpointResolverShouldResolveTheStartpointUpcomingToCorrectOffset() {
EventHubSystemAdmin mockEventHubSystemAdmin = Mockito.mock(EventHubSystemAdmin.class);
EventHubConfig eventHubConfig = Mockito.mock(EventHubConfig.class);
SystemStreamPartition systemStreamPartition = new SystemStreamPartition("test-system", "test-stream", new Partition(0));

EventHubSamzaOffsetResolver resolver = new EventHubSamzaOffsetResolver(mockEventHubSystemAdmin, eventHubConfig);

Assert.assertEquals(EventHubSystemConsumer.END_OF_STREAM, resolver.visit(systemStreamPartition, new StartpointUpcoming()));
}

@Test
public void testStartpointResolverShouldResolveTheStartpointSpecificToCorrectOffset() {
EventHubSystemAdmin mockEventHubSystemAdmin = Mockito.mock(EventHubSystemAdmin.class);
EventHubConfig eventHubConfig = Mockito.mock(EventHubConfig.class);
SystemStreamPartition systemStreamPartition = new SystemStreamPartition("test-system", "test-stream", new Partition(0));

EventHubSamzaOffsetResolver resolver = new EventHubSamzaOffsetResolver(mockEventHubSystemAdmin, eventHubConfig);

Assert.assertEquals("100", resolver.visit(systemStreamPartition, new StartpointSpecific("100")));
}

@Test
public void testStartpointResolverShouldResolveTheStartpointTimestampToCorrectOffset() throws EventHubException {
// Initialize variables required for testing.
EventHubSystemAdmin mockEventHubSystemAdmin = Mockito.mock(EventHubSystemAdmin.class);
EventHubConfig eventHubConfig = Mockito.mock(EventHubConfig.class);
SystemStreamPartition systemStreamPartition = new SystemStreamPartition("test-system", "test-stream", new Partition(0));
String mockedOffsetToReturn = "100";

// Setup the mock variables.
EventHubClientManager mockEventHubClientManager = Mockito.mock(EventHubClientManager.class);
EventHubClient mockEventHubClient = Mockito.mock(EventHubClient.class);
PartitionReceiver mockPartitionReceiver = Mockito.mock(PartitionReceiver.class);
EventData mockEventData = Mockito.mock(EventData.class);
EventData.SystemProperties mockSystemProperties = Mockito.mock(EventData.SystemProperties.class);

// Configure the mock variables to return the appropriate values.
Mockito.when(mockEventHubSystemAdmin.getOrCreateStreamEventHubClient("test-stream")).thenReturn(mockEventHubClientManager);
Mockito.when(mockEventHubClientManager.getEventHubClient()).thenReturn(mockEventHubClient);
Mockito.when(mockEventHubClient.createReceiverSync(Mockito.anyString(), Mockito.anyString(), Mockito.any())).thenReturn(mockPartitionReceiver);
Mockito.when(mockPartitionReceiver.receiveSync(1)).thenReturn(Arrays.asList(mockEventData));
Mockito.when(mockEventData.getSystemProperties()).thenReturn(mockSystemProperties);
Mockito.when(mockSystemProperties.getOffset()).thenReturn(mockedOffsetToReturn);

// Test the Offset resolver.
EventHubSamzaOffsetResolver resolver = new EventHubSamzaOffsetResolver(mockEventHubSystemAdmin, eventHubConfig);
String resolvedOffset = resolver.visit(systemStreamPartition, new StartpointTimestamp(100L));
Assert.assertEquals(mockedOffsetToReturn, resolvedOffset);
}
}

0 comments on commit 2a589fc

Please sign in to comment.