diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/admin/EventHubSystemAdmin.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/admin/EventHubSystemAdmin.java index 44353d2c8a..2d229292e4 100644 --- a/samza-azure/src/main/java/org/apache/samza/system/eventhub/admin/EventHubSystemAdmin.java +++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/admin/EventHubSystemAdmin.java @@ -19,14 +19,28 @@ package org.apache.samza.system.eventhub.admin; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.microsoft.azure.eventhubs.EventData; import com.microsoft.azure.eventhubs.EventHubClient; +import com.microsoft.azure.eventhubs.EventHubException; import com.microsoft.azure.eventhubs.EventHubRuntimeInformation; +import com.microsoft.azure.eventhubs.EventPosition; +import com.microsoft.azure.eventhubs.PartitionReceiver; import com.microsoft.azure.eventhubs.PartitionRuntimeInformation; +import java.time.Instant; import java.util.ArrayList; import java.util.Arrays; import org.apache.samza.Partition; import org.apache.samza.SamzaException; +import org.apache.samza.startpoint.Startpoint; +import org.apache.samza.startpoint.StartpointOldest; +import org.apache.samza.startpoint.StartpointSpecific; +import org.apache.samza.startpoint.StartpointTimestamp; +import org.apache.samza.startpoint.StartpointUpcoming; +import org.apache.samza.startpoint.StartpointVisitor; import org.apache.samza.system.SystemAdmin; import org.apache.samza.system.SystemStreamMetadata; import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata; @@ -57,12 +71,27 @@ public class EventHubSystemAdmin implements SystemAdmin { private final EventHubConfig eventHubConfig; private final Map eventHubClients = new HashMap<>(); private final Map streamPartitions = new HashMap<>(); + private final EventHubSamzaOffsetResolver eventHubSamzaOffsetResolver; public EventHubSystemAdmin(String systemName, EventHubConfig eventHubConfig, EventHubClientManagerFactory eventHubClientManagerFactory) { this.systemName = systemName; this.eventHubConfig = eventHubConfig; this.eventHubClientManagerFactory = eventHubClientManagerFactory; + this.eventHubSamzaOffsetResolver = new EventHubSamzaOffsetResolver(this, eventHubConfig); + } + + @Override + public void stop() { + for (Map.Entry entry : eventHubClients.entrySet()) { + EventHubClientManager eventHubClientManager = entry.getValue(); + try { + eventHubClientManager.close(DEFAULT_SHUTDOWN_TIMEOUT_MILLIS); + } catch (Exception e) { + LOG.warn(String.format("Exception occurred when closing EventHubClient of stream: %s.", entry.getKey()), e); + } + } + eventHubClients.clear(); } @Override @@ -121,20 +150,20 @@ public Map getSystemStreamMetadata(Set str SystemStreamMetadata systemStreamMetadata = new SystemStreamMetadata(streamName, sspMetadataMap); requestedMetadata.put(streamName, systemStreamMetadata); } + return requestedMetadata; } catch (Exception e) { String msg = String.format("Error while fetching EventHubRuntimeInfo for System:%s", systemName); LOG.error(msg, e); throw new SamzaException(msg, e); - } finally { - // Closing clients - eventHubClients.forEach((streamName, client) -> client.close(DEFAULT_SHUTDOWN_TIMEOUT_MILLIS)); - eventHubClients.clear(); } + } - return requestedMetadata; + @Override + public String resolveStartpointToOffset(SystemStreamPartition systemStreamPartition, Startpoint startpoint) { + return startpoint.apply(systemStreamPartition, eventHubSamzaOffsetResolver); } - private EventHubClientManager getOrCreateStreamEventHubClient(String streamName) { + EventHubClientManager getOrCreateStreamEventHubClient(String streamName) { if (!eventHubClients.containsKey(streamName)) { LOG.info(String.format("Creating EventHubClient for Stream=%s", streamName)); @@ -205,4 +234,75 @@ public static Integer compareOffsets(String offset1, String offset2) { public Integer offsetComparator(String offset1, String offset2) { return compareOffsets(offset1, offset2); } + + /** + * Offers a eventhub specific implementation of {@link StartpointVisitor} that resolves + * different types of {@link Startpoint} to samza offset. + */ + @VisibleForTesting + static class EventHubSamzaOffsetResolver implements StartpointVisitor { + + private final EventHubSystemAdmin eventHubSystemAdmin; + private final EventHubConfig eventHubConfig; + + EventHubSamzaOffsetResolver(EventHubSystemAdmin eventHubSystemAdmin, EventHubConfig eventHubConfig) { + this.eventHubSystemAdmin = eventHubSystemAdmin; + this.eventHubConfig = eventHubConfig; + } + + @Override + public String visit(SystemStreamPartition systemStreamPartition, StartpointSpecific startpointSpecific) { + return startpointSpecific.getSpecificOffset(); + } + + @Override + public String visit(SystemStreamPartition systemStreamPartition, StartpointTimestamp startpointTimestamp) { + String streamName = systemStreamPartition.getStream(); + EventHubClientManager eventHubClientManager = eventHubSystemAdmin.getOrCreateStreamEventHubClient(streamName); + EventHubClient eventHubClient = eventHubClientManager.getEventHubClient(); + + PartitionReceiver partitionReceiver = null; + try { + // 1. Initialize the arguments required for creating the partition receiver. + String partitionId = String.valueOf(systemStreamPartition.getPartition().getPartitionId()); + Instant epochInMillisInstant = Instant.ofEpochMilli(startpointTimestamp.getTimestampOffset()); + EventPosition eventPosition = EventPosition.fromEnqueuedTime(epochInMillisInstant); + String consumerGroup = eventHubConfig.getStreamConsumerGroup(systemStreamPartition.getSystem(), streamName); + + // 2. Create a partition receiver with event position defined by the timestamp. + partitionReceiver = eventHubClient.createReceiverSync(consumerGroup, partitionId, eventPosition); + + // 3. Read a single message from the partition receiver. + Iterable eventHubMessagesIterator = partitionReceiver.receiveSync(1); + ArrayList eventHubMessageList = Lists.newArrayList(eventHubMessagesIterator); + + // 4. Validate that a single message was fetched from the broker. + Preconditions.checkState(eventHubMessageList.size() == 1, "Failed to read messages from EventHub system."); + + // 5. Return the offset present in the metadata of the first message. + return eventHubMessageList.get(0).getSystemProperties().getOffset(); + } catch (EventHubException e) { + LOG.error(String.format("Exception occurred when fetching offset for timestamp: %d from the stream: %s", startpointTimestamp.getTimestampOffset(), streamName), e); + throw new SamzaException(e); + } finally { + if (partitionReceiver != null) { + try { + partitionReceiver.closeSync(); + } catch (EventHubException e) { + LOG.error(String.format("Exception occurred when closing partition-receiver of the stream: %s", streamName), e); + } + } + } + } + + @Override + public String visit(SystemStreamPartition systemStreamPartition, StartpointOldest startpointOldest) { + return EventHubSystemConsumer.START_OF_STREAM; + } + + @Override + public String visit(SystemStreamPartition systemStreamPartition, StartpointUpcoming startpointUpcoming) { + return EventHubSystemConsumer.END_OF_STREAM; + } + } } diff --git a/samza-azure/src/test/java/org/apache/samza/system/eventhub/admin/TestEventHubSystemAdmin.java b/samza-azure/src/test/java/org/apache/samza/system/eventhub/admin/TestEventHubSystemAdmin.java index e45d3f4447..befbf3ac22 100644 --- a/samza-azure/src/test/java/org/apache/samza/system/eventhub/admin/TestEventHubSystemAdmin.java +++ b/samza-azure/src/test/java/org/apache/samza/system/eventhub/admin/TestEventHubSystemAdmin.java @@ -19,11 +19,24 @@ package org.apache.samza.system.eventhub.admin; +import com.microsoft.azure.eventhubs.EventData; +import com.microsoft.azure.eventhubs.EventHubClient; +import com.microsoft.azure.eventhubs.EventHubException; +import com.microsoft.azure.eventhubs.PartitionReceiver; +import java.util.Arrays; import org.apache.samza.Partition; +import org.apache.samza.startpoint.StartpointOldest; +import org.apache.samza.startpoint.StartpointSpecific; +import org.apache.samza.startpoint.StartpointTimestamp; +import org.apache.samza.startpoint.StartpointUpcoming; import org.apache.samza.system.SystemAdmin; import org.apache.samza.system.SystemStreamMetadata; +import org.apache.samza.system.SystemStreamPartition; +import org.apache.samza.system.eventhub.EventHubClientManager; +import org.apache.samza.system.eventhub.EventHubConfig; import org.apache.samza.system.eventhub.EventHubSystemFactory; import org.apache.samza.system.eventhub.MockEventHubConfigFactory; +import org.apache.samza.system.eventhub.admin.EventHubSystemAdmin.EventHubSamzaOffsetResolver; import org.apache.samza.system.eventhub.consumer.EventHubSystemConsumer; import org.apache.samza.system.eventhub.producer.EventHubSystemProducer; import org.junit.Assert; @@ -33,6 +46,7 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; +import org.mockito.Mockito; import static org.apache.samza.system.eventhub.MockEventHubConfigFactory.*; @@ -80,4 +94,65 @@ public void testGetStreamMetadata() { } } + @Test + public void testStartpointResolverShouldResolveTheStartpointOldestToCorrectOffset() { + EventHubSystemAdmin mockEventHubSystemAdmin = Mockito.mock(EventHubSystemAdmin.class); + EventHubConfig eventHubConfig = Mockito.mock(EventHubConfig.class); + SystemStreamPartition systemStreamPartition = new SystemStreamPartition("test-system", "test-stream", new Partition(0)); + + EventHubSamzaOffsetResolver resolver = new EventHubSamzaOffsetResolver(mockEventHubSystemAdmin, eventHubConfig); + + Assert.assertEquals(EventHubSystemConsumer.START_OF_STREAM, resolver.visit(systemStreamPartition, new StartpointOldest())); + } + + @Test + public void testStartpointResolverShouldResolveTheStartpointUpcomingToCorrectOffset() { + EventHubSystemAdmin mockEventHubSystemAdmin = Mockito.mock(EventHubSystemAdmin.class); + EventHubConfig eventHubConfig = Mockito.mock(EventHubConfig.class); + SystemStreamPartition systemStreamPartition = new SystemStreamPartition("test-system", "test-stream", new Partition(0)); + + EventHubSamzaOffsetResolver resolver = new EventHubSamzaOffsetResolver(mockEventHubSystemAdmin, eventHubConfig); + + Assert.assertEquals(EventHubSystemConsumer.END_OF_STREAM, resolver.visit(systemStreamPartition, new StartpointUpcoming())); + } + + @Test + public void testStartpointResolverShouldResolveTheStartpointSpecificToCorrectOffset() { + EventHubSystemAdmin mockEventHubSystemAdmin = Mockito.mock(EventHubSystemAdmin.class); + EventHubConfig eventHubConfig = Mockito.mock(EventHubConfig.class); + SystemStreamPartition systemStreamPartition = new SystemStreamPartition("test-system", "test-stream", new Partition(0)); + + EventHubSamzaOffsetResolver resolver = new EventHubSamzaOffsetResolver(mockEventHubSystemAdmin, eventHubConfig); + + Assert.assertEquals("100", resolver.visit(systemStreamPartition, new StartpointSpecific("100"))); + } + + @Test + public void testStartpointResolverShouldResolveTheStartpointTimestampToCorrectOffset() throws EventHubException { + // Initialize variables required for testing. + EventHubSystemAdmin mockEventHubSystemAdmin = Mockito.mock(EventHubSystemAdmin.class); + EventHubConfig eventHubConfig = Mockito.mock(EventHubConfig.class); + SystemStreamPartition systemStreamPartition = new SystemStreamPartition("test-system", "test-stream", new Partition(0)); + String mockedOffsetToReturn = "100"; + + // Setup the mock variables. + EventHubClientManager mockEventHubClientManager = Mockito.mock(EventHubClientManager.class); + EventHubClient mockEventHubClient = Mockito.mock(EventHubClient.class); + PartitionReceiver mockPartitionReceiver = Mockito.mock(PartitionReceiver.class); + EventData mockEventData = Mockito.mock(EventData.class); + EventData.SystemProperties mockSystemProperties = Mockito.mock(EventData.SystemProperties.class); + + // Configure the mock variables to return the appropriate values. + Mockito.when(mockEventHubSystemAdmin.getOrCreateStreamEventHubClient("test-stream")).thenReturn(mockEventHubClientManager); + Mockito.when(mockEventHubClientManager.getEventHubClient()).thenReturn(mockEventHubClient); + Mockito.when(mockEventHubClient.createReceiverSync(Mockito.anyString(), Mockito.anyString(), Mockito.any())).thenReturn(mockPartitionReceiver); + Mockito.when(mockPartitionReceiver.receiveSync(1)).thenReturn(Arrays.asList(mockEventData)); + Mockito.when(mockEventData.getSystemProperties()).thenReturn(mockSystemProperties); + Mockito.when(mockSystemProperties.getOffset()).thenReturn(mockedOffsetToReturn); + + // Test the Offset resolver. + EventHubSamzaOffsetResolver resolver = new EventHubSamzaOffsetResolver(mockEventHubSystemAdmin, eventHubConfig); + String resolvedOffset = resolver.visit(systemStreamPartition, new StartpointTimestamp(100L)); + Assert.assertEquals(mockedOffsetToReturn, resolvedOffset); + } }