Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Broadcast processor is dropping telemetry due to race condition #994

Closed
RamjotSingh opened this issue Nov 12, 2018 · 1 comment
Closed
Labels
Milestone

Comments

@RamjotSingh
Copy link
Contributor

RamjotSingh commented Nov 12, 2018

If you are reporting bug/issue, please provide detailed Repro instructions.

Repro Steps

  1. Run the sample test in the comment below

I saw this happening in an actual telemetry pipeline. I fired around 1000 events and the pipeline got only ~900 items. I had sampling turned off completely.

Actual Behavior

Broadcast processor drops events due to race condition.

Expected Behavior

Events received across all telemetry sinks should be equal and equal to the actual number of events sent.

Version Info

SDK Version : 2.8
.NET Version : All (tested on NetStandard and NetFx4.5)
How Application was onboarded with SDK(VisualStudio/StatusMonitor/Azure Extension) : N\A
OS : N\A
Hosting Info (IIS/Azure WebApps/ etc) : N\A

Potential Issue

In the following code, if multiple events and being sent down the pipeline, this.nextTelemetryToProcess can get cleared by some other thread before it can even try to send the event.

            public void Offer(ITelemetry telemetry)
            {
                this.nextTelemetryToProcess = this.cloneBeforeDispatch ? telemetry.DeepClone() : telemetry;
            }

            public void ProcessOffered()
            {
                if (this.nextTelemetryToProcess != null)
                {
                    this.sink.Process(this.nextTelemetryToProcess);
                    this.nextTelemetryToProcess = null;
                }
                else
                {
                    Debug.Fail("We should not be asked to process a telemetry item if none was offered");
                }
            }
@RamjotSingh
Copy link
Contributor Author

Here is the sample test to showcase the issue. If you are running this one Debug build comment out the Line 92 in Broadcast processor (the Debug.Fail is hit in race condition actually).

        /// <summary>
        /// Ensure broadcast processor does not drop telemetry items.
        /// </summary>
        [TestMethod]
        public void EnsureEventsAreNotDroppedByBroadcastProcessor()
        {
            var configuration = new TelemetryConfiguration();
            var commonChainBuilder = new TelemetryProcessorChainBuilder(configuration);
            configuration.TelemetryProcessorChainBuilder = commonChainBuilder;

            ConcurrentBag<ITelemetry> itemsReceivedBySink1 = new ConcurrentBag<ITelemetry>();
            ConcurrentBag<ITelemetry> itemsReceivedBySink2 = new ConcurrentBag<ITelemetry>();

            ITelemetryChannel firstTelemetryChannel = new StubTelemetryChannel
            {
                OnSend = telemetry =>
                {
                    itemsReceivedBySink1.Add(telemetry);
                }
            };

            ITelemetryChannel secondTelemetryChannel = new StubTelemetryChannel
            {
                OnSend = telemetry =>
                {
                    itemsReceivedBySink2.Add(telemetry);
                }
            };

            configuration.DefaultTelemetrySink.TelemetryChannel = firstTelemetryChannel;
            configuration.TelemetrySinks.Add(new TelemetrySink(configuration, secondTelemetryChannel));

            configuration.TelemetryProcessorChainBuilder.Build();

            TelemetryClient telemetryClient = new TelemetryClient(configuration);

            // Setup TelemetryContext in a way that it is filledup.
            telemetryClient.Context.Operation.Id = "OpId";
            telemetryClient.Context.Cloud.RoleName = "UnitTest";
            telemetryClient.Context.Component.Version = "TestVersion";
            telemetryClient.Context.Device.Id = "TestDeviceId";
            telemetryClient.Context.Flags = 1234;
            telemetryClient.Context.InstrumentationKey = Guid.Empty.ToString();
            telemetryClient.Context.Location.Ip = "127.0.0.1";
            telemetryClient.Context.Session.Id = "SessionId";
            telemetryClient.Context.User.Id = "userId";

            Parallel.ForEach(
                new int[100],
                new ParallelOptions
                {
                    MaxDegreeOfParallelism = 100
                },
                (value) =>
                {
                    telemetryClient.TrackAvailability(
                        "Availability",
                        DateTimeOffset.Now,
                        TimeSpan.FromMilliseconds(200),
                        "Local",
                        true,
                        "Message",
                        new Dictionary<string, string>() { { "Key", "Value" } },
                        new Dictionary<string, double>() { { "Dimension1", 0.9865 } });

                    telemetryClient.TrackDependency(
                        "HTTP",
                        "Target",
                        "Test",
                        "https://azure",
                        DateTimeOffset.Now,
                        TimeSpan.FromMilliseconds(100),
                        "200",
                        true);

                    telemetryClient.TrackEvent(
                        "Event",
                        new Dictionary<string, string>() { { "Key", "Value" } },
                        new Dictionary<string, double>() { { "Dimension1", 0.9865 } });

                    telemetryClient.TrackException(
                        new Exception("Test"),
                        new Dictionary<string, string>() { { "Key", "Value" } },
                        new Dictionary<string, double>() { { "Dimension1", 0.9865 } });

                    telemetryClient.TrackMetric("Metric", 0.1, new Dictionary<string, string>() { { "Key", "Value" } });

                    telemetryClient.TrackPageView("PageView");

                    telemetryClient.TrackRequest(
                        new RequestTelemetry("GET https://azure.com", DateTimeOffset.Now, TimeSpan.FromMilliseconds(200), "200", true)
                        {
                            HttpMethod = "GET"
                        });

                    telemetryClient.TrackTrace(
                        "Message",
                        SeverityLevel.Critical,
                        new Dictionary<string, string>() { { "Key", "Value" } });

                });

            Assert.AreEqual(itemsReceivedBySink1.Count, itemsReceivedBySink2.Count);
            Assert.AreEqual(8 * 100, itemsReceivedBySink1.Count);
            Assert.AreEqual(8 * 100, itemsReceivedBySink2.Count);
        }

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants