Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Event Hubs] Geo DR - Preview v2 #43973

Merged
merged 12 commits into from
May 12, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ public partial class EventProcessorClient : Azure.Messaging.EventHubs.Primitives
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
public override string ToString() { throw null; }
protected override System.Threading.Tasks.Task UpdateCheckpointAsync(string partitionId, Azure.Messaging.EventHubs.Processor.CheckpointPosition startingPosition, System.Threading.CancellationToken cancellationToken) { throw null; }
protected override System.Threading.Tasks.Task UpdateCheckpointAsync(string partitionId, long offset, long? sequenceNumber, System.Threading.CancellationToken cancellationToken) { throw null; }
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
protected override System.Threading.Tasks.Task UpdateCheckpointAsync(string partitionId, string offset, long? sequenceNumber, System.Threading.CancellationToken cancellationToken) { throw null; }
protected override System.Threading.Tasks.Task ValidateProcessingPreconditions(System.Threading.CancellationToken cancellationToken) { throw null; }
}
public partial class EventProcessorClientOptions
Expand Down Expand Up @@ -71,7 +72,8 @@ public partial class BlobCheckpointStore : Azure.Messaging.EventHubs.Primitives.
public override System.Threading.Tasks.Task<System.Collections.Generic.IEnumerable<Azure.Messaging.EventHubs.Primitives.EventProcessorPartitionOwnership>> ClaimOwnershipAsync(System.Collections.Generic.IEnumerable<Azure.Messaging.EventHubs.Primitives.EventProcessorPartitionOwnership> desiredOwnership, System.Threading.CancellationToken cancellationToken) { throw null; }
public override System.Threading.Tasks.Task<Azure.Messaging.EventHubs.Primitives.EventProcessorCheckpoint> GetCheckpointAsync(string fullyQualifiedNamespace, string eventHubName, string consumerGroup, string partitionId, System.Threading.CancellationToken cancellationToken) { throw null; }
public override System.Threading.Tasks.Task<System.Collections.Generic.IEnumerable<Azure.Messaging.EventHubs.Primitives.EventProcessorPartitionOwnership>> ListOwnershipAsync(string fullyQualifiedNamespace, string eventHubName, string consumerGroup, System.Threading.CancellationToken cancellationToken) { throw null; }
public override System.Threading.Tasks.Task UpdateCheckpointAsync(string fullyQualifiedNamespace, string eventHubName, string consumerGroup, string partitionId, long offset, long? sequenceNumber, System.Threading.CancellationToken cancellationToken) { throw null; }
public override System.Threading.Tasks.Task UpdateCheckpointAsync(string fullyQualifiedNamespace, string eventHubName, string consumerGroup, string partitionId, string clientIdentifier, Azure.Messaging.EventHubs.Processor.CheckpointPosition startingPosition, System.Threading.CancellationToken cancellationToken) { throw null; }
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
public override System.Threading.Tasks.Task UpdateCheckpointAsync(string fullyQualifiedNamespace, string eventHubName, string consumerGroup, string partitionId, string offset, long? sequenceNumber, System.Threading.CancellationToken cancellationToken) { throw null; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ EventData eventData = EventHubsModelFactory.EventData(
systemProperties: new Dictionary<string, object>(), //arbitrary value
partitionKey: "sample-key",
sequenceNumber: 1000,
offset: 1500,
offset: "1500",
enqueuedTime: DateTimeOffset.Parse("11:36 PM"));

// This creates a new instance of ProcessEventArgs to pass into the handler directly.
Expand Down Expand Up @@ -110,7 +110,7 @@ TimerCallback dispatchEvent = async _ =>
systemProperties: new Dictionary<string, object>(), //arbitrary value
partitionKey: "sample-key",
sequenceNumber: 1000,
offset: 1500,
offset: "1500",
enqueuedTime: DateTimeOffset.Parse("11:36 PM"));

ProcessEventArgs eventArgs = new(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,24 @@
<Project Sdk="Microsoft.NET.Sdk">
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<Description>Azure Event Hubs is a highly scalable publish-subscribe service that can ingest millions of events per second and stream them to multiple consumers. This library extends its Event Processor with durable storage for checkpoint information using Azure Blob storage. For more information about Event Hubs, see https://azure.microsoft.com/en-us/services/event-hubs/</Description>
<Version>5.12.0-beta.1</Version>
<!--The ApiCompatVersion is managed automatically and should not generally be modified manually.-->
<ApiCompatVersion>5.11.2</ApiCompatVersion>
<PackageTags>Azure;Event Hubs;EventHubs;.NET;Event Processor;EventProcessor;$(PackageCommonTags)</PackageTags>
<TargetFrameworks>$(RequiredTargetFrameworks)</TargetFrameworks>
<AllowUnsafeBlocks>true</AllowUnsafeBlocks>
</PropertyGroup>

<ItemGroup>
<!--TEMP REMOVE BEFORE RELEASE -->
<ProjectReference Include="..\..\Azure.Messaging.EventHubs\src\Azure.Messaging.EventHubs.csproj" />
</ItemGroup>

<ItemGroup>
<Folder Include="Properties\" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="Azure.Messaging.EventHubs" />
<!--<PackageReference Include="Azure.Messaging.EventHubs" />-->
<PackageReference Include="Azure.Storage.Blobs" />
<PackageReference Include="Microsoft.Azure.Amqp" />
<PackageReference Include="Microsoft.Bcl.AsyncInterfaces" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ static BlobCheckpointStoreInternal()
/// <param name="consumerGroup">The name of the consumer group the checkpoint is associated with.</param>
/// <param name="clientIdentifier">The unique identifier of the client that authored this checkpoint.</param>
/// <param name="sequenceNumber">The sequence number associated with the checkpoint.</param>
/// <param name="replicationSegment">The replication segment associated with the checkpoint.</param>
/// <param name="offset">The offset associated with the checkpoint.</param>
/// <param name="exception">The exception that occurred.</param>
///
Expand All @@ -106,10 +105,9 @@ static BlobCheckpointStoreInternal()
string consumerGroup,
string clientIdentifier,
string sequenceNumber,
string replicationSegment,
string offset,
Exception exception) =>
Logger.UpdateCheckpointError(partitionId, fullyQualifiedNamespace, eventHubName, consumerGroup, clientIdentifier, exception.Message, sequenceNumber, replicationSegment, offset);
Logger.UpdateCheckpointError(partitionId, fullyQualifiedNamespace, eventHubName, consumerGroup, clientIdentifier, exception.Message, sequenceNumber, offset);

/// <summary>
/// Indicates that an attempt to update a checkpoint has completed.
Expand All @@ -121,7 +119,6 @@ static BlobCheckpointStoreInternal()
/// <param name="consumerGroup">The name of the consumer group the checkpoint is associated with.</param>
/// <param name="clientIdentifier">The unique identifier of the client that authored this checkpoint.</param>
/// <param name="sequenceNumber">The sequence number associated with this checkpoint.</param>
/// <param name="replicationSegment">The replication segment associated with this checkpoint.</param>
/// <param name="offset">The offset associated with this checkpoint.</param>
///
partial void UpdateCheckpointComplete(string partitionId,
Expand All @@ -130,9 +127,8 @@ static BlobCheckpointStoreInternal()
string consumerGroup,
string clientIdentifier,
string sequenceNumber,
string replicationSegment,
string offset) =>
Logger.UpdateCheckpointComplete(partitionId, fullyQualifiedNamespace, eventHubName, consumerGroup, clientIdentifier, sequenceNumber, replicationSegment, offset);
Logger.UpdateCheckpointComplete(partitionId, fullyQualifiedNamespace, eventHubName, consumerGroup, clientIdentifier, sequenceNumber, offset);

/// <summary>
/// Indicates that an attempt to create/update a checkpoint has started.
Expand All @@ -144,7 +140,6 @@ static BlobCheckpointStoreInternal()
/// <param name="consumerGroup">The name of the consumer group the checkpoint is associated with.</param>
/// <param name="clientIdentifier">The unique identifier of the client that authored this checkpoint.</param>
/// <param name="sequenceNumber">The sequence number associated with this checkpoint.</param>
/// <param name="replicationSegment">The replication segment associated with this checkpoint.</param>
/// <param name="offset">The offset associated with this checkpoint.</param>
///
partial void UpdateCheckpointStart(string partitionId,
Expand All @@ -153,9 +148,8 @@ static BlobCheckpointStoreInternal()
string consumerGroup,
string clientIdentifier,
string sequenceNumber,
string replicationSegment,
string offset) =>
Logger.UpdateCheckpointStart(partitionId, fullyQualifiedNamespace, eventHubName, consumerGroup, clientIdentifier, sequenceNumber, replicationSegment, offset);
Logger.UpdateCheckpointStart(partitionId, fullyQualifiedNamespace, eventHubName, consumerGroup, clientIdentifier, sequenceNumber, offset);

/// <summary>
/// Indicates that an attempt to retrieve claim partition ownership has completed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,22 +252,20 @@ protected BlobEventStoreEventSource() : base(EventSourceName)
/// <param name="consumerGroup">The name of the consumer group the checkpoint is associated with.</param>
/// <param name="clientIdentifier">The unique identifier of the client that authored this checkpoint.</param>
/// <param name="sequenceNumber">The sequence number associated with this checkpoint.</param>
/// <param name="replicationSegment">The replication segment associated with this checkpoint.</param>
/// <param name="offset">The offset associated with this checkpoint.</param>
///
[Event(32, Level = EventLevel.Verbose, Message = "Starting to create/update a checkpoint for partition: `{0}` of FullyQualifiedNamespace: '{1}'; EventHubName: '{2}'; ConsumerGroup: '{3}'; ClientIdentifier: '{4}'; at SequenceNumber: '{5}' ReplicationSegment: '{6}' Offset: '{7}'.")]
[Event(32, Level = EventLevel.Verbose, Message = "Starting to create/update a checkpoint for partition: `{0}` of FullyQualifiedNamespace: '{1}'; EventHubName: '{2}'; ConsumerGroup: '{3}'; ClientIdentifier: '{4}'; at SequenceNumber: '{5}' Offset: '{6}'.")]
public virtual void UpdateCheckpointStart(string partitionId,
string fullyQualifiedNamespace,
string eventHubName,
string consumerGroup,
string clientIdentifier,
string sequenceNumber,
string replicationSegment,
string offset)
{
if (IsEnabled())
{
WriteEvent(32, partitionId ?? string.Empty, fullyQualifiedNamespace ?? string.Empty, eventHubName ?? string.Empty, consumerGroup ?? string.Empty, clientIdentifier ?? string.Empty, sequenceNumber ?? string.Empty, replicationSegment ?? string.Empty, offset ?? string.Empty);
WriteEvent(32, partitionId ?? string.Empty, fullyQualifiedNamespace ?? string.Empty, eventHubName ?? string.Empty, consumerGroup ?? string.Empty, clientIdentifier ?? string.Empty, sequenceNumber ?? string.Empty, offset ?? string.Empty);
}
}

Expand All @@ -281,22 +279,20 @@ protected BlobEventStoreEventSource() : base(EventSourceName)
/// <param name="consumerGroup">The name of the consumer group the checkpoint is associated with.</param>
/// <param name="clientIdentifier">The unique identifier of the client that authored this checkpoint.</param>
/// <param name="sequenceNumber">The sequence number associated with this checkpoint.</param>
/// <param name="replicationSegment">The replication segment associated with this checkpoint.</param>
/// <param name="offset">The offset associated with this checkpoint.</param>
///
[Event(33, Level = EventLevel.Verbose, Message = "Completed the attempt to create/update a checkpoint for partition: `{0}` of FullyQualifiedNamespace: '{1}'; EventHubName: '{2}'; ConsumerGroup: '{3}'; ClientIdentifier: '{4}'; at SequenceNumber: '{5}' ReplicationSegment: '{6}' Offset: '{7}'.")]
[Event(33, Level = EventLevel.Verbose, Message = "Completed the attempt to create/update a checkpoint for partition: `{0}` of FullyQualifiedNamespace: '{1}'; EventHubName: '{2}'; ConsumerGroup: '{3}'; ClientIdentifier: '{4}'; at SequenceNumber: '{5}' Offset: '{6}'.")]
public virtual void UpdateCheckpointComplete(string partitionId,
string fullyQualifiedNamespace,
string eventHubName,
string consumerGroup,
string clientIdentifier,
string sequenceNumber,
string replicationSegment,
string offset)
{
if (IsEnabled())
{
WriteEvent(33, partitionId ?? string.Empty, fullyQualifiedNamespace ?? string.Empty, eventHubName ?? string.Empty, consumerGroup ?? string.Empty, clientIdentifier ?? string.Empty, sequenceNumber ?? string.Empty, replicationSegment ?? string.Empty, offset ?? string.Empty);
WriteEvent(33, partitionId ?? string.Empty, fullyQualifiedNamespace ?? string.Empty, eventHubName ?? string.Empty, consumerGroup ?? string.Empty, clientIdentifier ?? string.Empty, sequenceNumber ?? string.Empty, offset ?? string.Empty);
}
}

Expand All @@ -310,24 +306,22 @@ protected BlobEventStoreEventSource() : base(EventSourceName)
/// <param name="consumerGroup">The name of the consumer group the checkpoint is associated with.</param>
/// <param name="clientIdentifier">The unique identifier of the processor that authored this checkpoint.</param>
/// <param name="sequenceNumber">The sequence number associated with this checkpoint.</param>
/// <param name="replicationSegment">The replication segment associated with this checkpoint.</param>
/// <param name="offset">The offset associated with this checkpoint.</param>
/// <param name="errorMessage">The message for the exception that occurred.</param>
///
[Event(34, Level = EventLevel.Error, Message = "An exception occurred when creating/updating a checkpoint for partition: `{0}` of FullyQualifiedNamespace: '{1}'; EventHubName: '{2}'; ConsumerGroup: '{3}'; ClientIdentifier: '{5}'; at SequenceNumber: '{6}' ReplicationSegment '{7}' Offset '{8}'. ErrorMessage: '{4}'.")]
[Event(34, Level = EventLevel.Error, Message = "An exception occurred when creating/updating a checkpoint for partition: `{0}` of FullyQualifiedNamespace: '{1}'; EventHubName: '{2}'; ConsumerGroup: '{3}'; ErrorMessage: '{4}'; ClientIdentifier: '{5}'; at SequenceNumber: '{6}' Offset '{7}'.")]
public virtual void UpdateCheckpointError(string partitionId,
string fullyQualifiedNamespace,
string eventHubName,
string consumerGroup,
string clientIdentifier,
string errorMessage,
string sequenceNumber,
string replicationSegment,
string offset)
{
if (IsEnabled())
{
WriteEvent(34, partitionId ?? string.Empty, fullyQualifiedNamespace ?? string.Empty, eventHubName ?? string.Empty, consumerGroup ?? string.Empty, errorMessage ?? string.Empty, clientIdentifier ?? string.Empty, sequenceNumber ?? string.Empty, replicationSegment ?? string.Empty, offset ?? string.Empty);
WriteEvent(34, partitionId ?? string.Empty, fullyQualifiedNamespace ?? string.Empty, eventHubName ?? string.Empty, consumerGroup ?? string.Empty, errorMessage ?? string.Empty, clientIdentifier ?? string.Empty, sequenceNumber ?? string.Empty, offset ?? string.Empty);
}
}

Expand Down
Loading
Loading