Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CosmosDB] Handling Concurrency when owned entities are updated externally #28166

Closed
JamesBroadberry opened this issue Jun 6, 2022 · 2 comments
Labels
closed-no-further-action The issue is closed and no further action is planned. customer-reported

Comments

@JamesBroadberry
Copy link

JamesBroadberry commented Jun 6, 2022

Issue we're having

Since Cosmos updates are entire documents right now, we've an object (typically under 5kb) to represent a device. Multiple services interact with the database using EF at the same time. What we're seeing is that when we process messages from a device, the updates to the database sometimes overwrite data on the device which has been set via our API.

e.g.

Message Processing: Get device entity
API: Get device entity
API: Update properties on device entity
API: Save changes
Message Processing: Update properties on device entity
Message Processing: Save changes (overwriting those done by the API)

We've turned to using ETag concurrency to handle this but due to the nature of how the updates are done, we'd like to re-apply ONLY the changes we've made since the record has been updated in Cosmos but are having a lot of issues with owned/embedded entities causing issues.

Is there anything that we can do to better handle this scenario?

For example can we can we GetDatabaseValues on the owned entities and do similar processing on those inside of SaveChangesAndHandleConcurrency in the code example? We've tried to take inspiration from the code example here.

Fundamentally, the underlying problem is that Cosmos EF does not allow a model configuration whereby owned entities simply remain as objects on their owning type which are tracked as a single object instead of being decomposed into separate tracked objects.

Code example

using System;
using System.Threading.Tasks;
using FluentAssertions;
using Microsoft.Azure.Cosmos;
using Microsoft.EntityFrameworkCore;
using Xunit;

namespace vs.experiments.cosmosconcurrency;

public class CosmosStoreDbContext : DbContext
{
    public const string DatabaseName = "ConcurrencyProblems";
    public const string ContainerName = "CosmosStoreDbContext";

    protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)
        => optionsBuilder.UseCosmos(
            "AccountEndpoint=https://localhost:8081/;AccountKey=C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw==",
            DatabaseName);

    protected override void OnModelCreating(ModelBuilder modelBuilder)
    {
        modelBuilder.HasDefaultContainer(ContainerName);
        modelBuilder.Entity<Book>().OwnsOne(x => x.Author);
        modelBuilder.Entity<Book>().UseETagConcurrency();
    }
}

public record Author
{
    public string Name { get; set; } = string.Empty;
    public DateTime DateOfBirth { get; set; } = DateTime.MinValue;
}

public record Book
{
    public Guid Id { get; set; }
    public string Title { get; set; } = string.Empty;
    public DateTime PublishedDate { get; set; } = DateTime.MinValue;
    public Author Author { get; set; } = new();
}

public class TestCases
{
    private readonly CosmosStoreDbContext _dbContext;
    private readonly Guid _seededBookId = Guid.NewGuid();

    public TestCases()
    {
        _dbContext = new CosmosStoreDbContext();
        _dbContext.Database.EnsureCreated();
        _dbContext.RemoveRange(_dbContext.Set<Book>());
        _dbContext.SaveChanges();

        _dbContext.Add(new Book()
        {
            Id = _seededBookId,
            Title = "The Hobbit", Author = new Author()
            {
                Name = "J. R. R. Tolkien",
                DateOfBirth = new DateTime(1892, 1, 3)
            },
            PublishedDate = new DateTime(1937, 9, 21)
        });
        _dbContext.SaveChanges();
    }

    ~TestCases()
    {
        _dbContext.Dispose();
    }

    // This test passes because of the handling of the concurrency exception and re-applying local changes
    // and re-loading of database values. This is the behaviour we're looking for when working with owned
    // properties also.
    [Fact]
    public async Task UpdatingABookBasePropertyAfterExternalModificationOfBaseProperty_KeepsExternalChanges()
    {
        // Arrange
        var cosmosClient = _dbContext.Database.GetCosmosClient();
        var container =
            cosmosClient.GetContainer(CosmosStoreDbContext.DatabaseName, CosmosStoreDbContext.ContainerName);
        var externallyUpdatedTitle = "Not The Hobbit";
        var efUpdatedPublishedDate = DateTime.Today;

        // Act
        //   Get Book entity locally
        var book = await _dbContext.Set<Book>().FirstAsync();

        //   Update Book externally
        var rawBook = await container.ReadItemAsync<dynamic>($"Book|{book.Id}", PartitionKey.None);
        rawBook.Resource.Title = externallyUpdatedTitle;
        await container.UpsertItemAsync(rawBook.Resource, PartitionKey.None);

        //   Update book using EF
        book.PublishedDate = efUpdatedPublishedDate;
        await SaveChangesAndHandleConcurrency();

        // Assert
        _dbContext.ChangeTracker.Clear();
        var fetchedBook = await _dbContext.Set<Book>().FirstAsync();
        fetchedBook.PublishedDate.Should().Be(efUpdatedPublishedDate);
        fetchedBook.Title.Should().Be(externallyUpdatedTitle);
    }
    
    // This test FAILS because the "Author" on "Book" is an owned property and not picked up in the entity's values
    // So the local "Author" (un-reloaded from database) overwrites the externally modified changes.
    [Fact]
    public async Task UpdatingABookBasePropertyAfterExternalModificationOfOwnedProperty_KeepsExternalChanges()
    {
        // Arrange
        var cosmosClient = _dbContext.Database.GetCosmosClient();
        var container =
            cosmosClient.GetContainer(CosmosStoreDbContext.DatabaseName, CosmosStoreDbContext.ContainerName);
        var efUpdatedPublishedDate = DateTime.Today;
        var externallyUpdatedAuthorName = "Tolkien";

        // Act
        //   Get Book entity locally
        var book = await _dbContext.Set<Book>().FirstAsync();

        //   Update Book externally
        var rawBook = await container.ReadItemAsync<dynamic>($"Book|{book.Id}", PartitionKey.None);
        rawBook.Resource.Author.Name = externallyUpdatedAuthorName;
        await container.UpsertItemAsync(rawBook.Resource, PartitionKey.None);

        //   Update book using EF
        book.PublishedDate = efUpdatedPublishedDate;
        await SaveChangesAndHandleConcurrency();

        // Assert
        _dbContext.ChangeTracker.Clear();
        var fetchedBook = await _dbContext.Set<Book>().FirstAsync();
        fetchedBook.PublishedDate.Should().Be(efUpdatedPublishedDate);
        fetchedBook.Author.Name.Should().Be(externallyUpdatedAuthorName); // FAILS HERE. Expected "Tolkien" but got "J. R. R. Tolkien"
    }
    
    // This test FAILS with a CosmosException due to the internal client returning HTTP 412. This is likely because
    // we've modified the owned "Author" property on the "Book" and the ETag in EF has not been updated. Meanwhile,
    // the ETag has been changed with external modification.
    // May be related to: https://github.com/dotnet/efcore/issues/26625
    [Fact]
    public async Task UpdatingABookOwnedPropertyAfterExternalModificationOfBaseProperty_KeepsExternalChanges()
    {
        // Arrange
        var cosmosClient = _dbContext.Database.GetCosmosClient();
        var container =
            cosmosClient.GetContainer(CosmosStoreDbContext.DatabaseName, CosmosStoreDbContext.ContainerName);
        var externallyUpdateTitle = "Not The Hobbit";
        var efUpdatedAuthorName = "Tolkien";

        // Act
        //   Get Book entity locally
        var book = await _dbContext.Set<Book>().FirstAsync();

        //   Update Book externally
        var rawBook = await container.ReadItemAsync<dynamic>($"Book|{book.Id}", PartitionKey.None);
        rawBook.Resource.Title = externallyUpdateTitle;
        await container.UpsertItemAsync(rawBook.Resource, PartitionKey.None);

        //   Update book using EF
        book.Author.Name = efUpdatedAuthorName;
        // FAILS HERE. Microsoft.Azure.Cosmos.CosmosException: PreconditionFailed (412)
        //             Would expect this to throw an EF DbUpdateConcurrencyException
        await SaveChangesAndHandleConcurrency();

        // Assert
        _dbContext.ChangeTracker.Clear();
        var fetchedBook = await _dbContext.Set<Book>().FirstAsync();
        fetchedBook.Author.Name.Should().Be(efUpdatedAuthorName);
        fetchedBook.Title.Should().Be(externallyUpdateTitle);
    }

    // Inspired by: https://docs.microsoft.com/en-us/ef/core/saving/concurrency#resolving-concurrency-conflicts
    private async Task SaveChangesAndHandleConcurrency()
    {
        var saved = false;
        var attempts = 0;
        while (!saved)
        {
            attempts++;

            try
            {
                // Attempt to save changes to the database
                await _dbContext.SaveChangesAsync();
                saved = true;
            }
            catch (DbUpdateConcurrencyException ex)
            {
                if (attempts > 5)
                {
                    throw;
                }

                foreach (var entry in ex.Entries)
                {
                    if (entry.Entity is Book)
                    {
                        var proposedValues = entry.CurrentValues;
                        var originalValues = entry.OriginalValues;
                        var databaseValues = await entry.GetDatabaseValuesAsync();


                        // This is potentially a start of an attempt to update owned properties but GetDatabaseValuesAsync throws
                        // foreach (var reference in entry.References)
                        // {
                        //     if (reference.TargetEntry != null)
                        //     {
                        //         var referenceDatabaseValues = await reference.TargetEntry.GetDatabaseValuesAsync();
                        //     }
                        // }


                        foreach (var property in proposedValues.Properties)
                        {
                            var proposedValue = proposedValues[property];
                            var originalValue = originalValues[property];
                            var databaseValue = databaseValues[property];

                            // Owned entities aren't in this list
                            if (proposedValue != originalValue)
                            {
                                // We made this change, keep it
                            }
                            else
                            {
                                // Reset property to that in DB because we've not changed it locally
                                proposedValues[property] = databaseValue;
                            }
                        }

                        // Refresh original values to bypass next concurrency check
                        entry.OriginalValues.SetValues(databaseValues);
                    }
                    else
                    {
                        throw new NotSupportedException(
                            "Don't know how to handle concurrency conflicts for "
                            + entry.Metadata.Name);
                    }
                }
            }
        }
    }
}

Include provider and version information

EF Core version: Tested using 6.0.5 and 7.0.0-preview.4.22229.2
Database provider: Microsoft.EntityFrameworkCore.Cosmos
Target framework: .NET 6.0
Operating system: Windows 11
IDE: JetBrains Rider 2021.3.2

@AndriySvyryd
Copy link
Member

AndriySvyryd commented Jun 8, 2022

#13559 will probably help you, but meanwhile you can recurse through the owned types like this:

private async static Task MergeDatabaseValuesAsync(EntityEntry entry, EntityEntry databaseEntry)
{
    if (entry == null
        || databaseEntry == null)
    {
        return;
    }

    // Refresh original values
    entry.OriginalValues.SetValues(databaseEntry.OriginalValues);

    foreach (var propertyEntry in entry.Properties)
    {
        if (!propertyEntry.IsModified)
        {
            // Reset property to the value currently in DB because we've not changed it locally
            propertyEntry.CurrentValue = propertyEntry.OriginalValue;
        }
    }

    foreach (var reference in entry.References)
    {            
        await MergeDatabaseValuesAsync(reference.TargetEntry, databaseEntry.Reference(reference.Metadata).TargetEntry);
    }

    foreach (var collection in entry.Collections)
    {
        var databaseCollection = databaseEntry.Collection(collection.Metadata);

        // Assuming the order and length have not changed
        var iterator1 = collection.CurrentValue.GetEnumerator();
        var iterator2 = databaseCollection.CurrentValue.GetEnumerator();
        while (iterator1.MoveNext() && iterator2.MoveNext())
        {
            await MergeDatabaseValuesAsync(
                collection.FindEntry(iterator1.Current), databaseCollection.FindEntry(iterator2.Current));
        }
    }
}

To use it you'd need to create another context instance:

using (var context2 = contextFactory.CreateDbContext())
{
    var databaseEntity = await context2.FindAsync<Book>(entry.Property<string>("id").CurrentValue);
    await MergeDatabaseValuesAsync(entry, context2.Entry(databaseEntity));
}

@AndriySvyryd AndriySvyryd added the closed-no-further-action The issue is closed and no further action is planned. label Jun 8, 2022
@ajcvickers ajcvickers closed this as not planned Won't fix, can't repro, duplicate, stale Oct 4, 2022
@TWEESTY
Copy link

TWEESTY commented Feb 12, 2024

#13559 will probably help you, but meanwhile you can recurse through the owned types like this:

private async static Task MergeDatabaseValuesAsync(EntityEntry entry, EntityEntry databaseEntry)
{
    if (entry == null
        || databaseEntry == null)
    {
        return;
    }

    // Refresh original values
    entry.OriginalValues.SetValues(databaseEntry.OriginalValues);

    foreach (var propertyEntry in entry.Properties)
    {
        if (!propertyEntry.IsModified)
        {
            // Reset property to the value currently in DB because we've not changed it locally
            propertyEntry.CurrentValue = propertyEntry.OriginalValue;
        }
    }

    foreach (var reference in entry.References)
    {            
        await MergeDatabaseValuesAsync(reference.TargetEntry, databaseEntry.Reference(reference.Metadata).TargetEntry);
    }

    foreach (var collection in entry.Collections)
    {
        var databaseCollection = databaseEntry.Collection(collection.Metadata);

        // Assuming the order and length have not changed
        var iterator1 = collection.CurrentValue.GetEnumerator();
        var iterator2 = databaseCollection.CurrentValue.GetEnumerator();
        while (iterator1.MoveNext() && iterator2.MoveNext())
        {
            await MergeDatabaseValuesAsync(
                collection.FindEntry(iterator1.Current), databaseCollection.FindEntry(iterator2.Current));
        }
    }
}

To use it you'd need to create another context instance:

using (var context2 = contextFactory.CreateDbContext())
{
    var databaseEntity = await context2.FindAsync<Book>(entry.Property<string>("id").CurrentValue);
    await MergeDatabaseValuesAsync(entry, context2.Entry(databaseEntity));
}

You have to reverse the proposition in order for it to work (because if you do "entry.OriginalValues.SetValues(databaseEntry.OriginalValues)", properties modified inside the database should be marked as modified, so the locally values will be applied).

Proposition :

private async static Task MergeDatabaseValuesAsync(EntityEntry entry, EntityEntry databaseEntry)
{
    if (entry == null
        || databaseEntry == null)
    {
        return;
    }

    foreach (var propertyEntry in entry.Properties)
    {
        if (propertyEntry.IsModified)
        {
            // Set the new property value for the database entry, because it has been changed locally
            databaseEntry.Properties.Single(x => x.Metadata.Name == propertyEntry.Metadata.Name).CurrentValue = propertyEntry.CurrentValue;
        }
    }

    foreach (var reference in entry.References)
    {
        await MergeDatabaseValuesAsync(reference.TargetEntry, databaseEntry.Reference(reference.Metadata).TargetEntry);
    }

    foreach (var collection in entry.Collections)
    {
        var databaseCollection = databaseEntry.Collection(collection.Metadata);

        // Assuming the order and length have not changed
        var iterator1 = collection.CurrentValue.GetEnumerator();
        var iterator2 = databaseCollection.CurrentValue.GetEnumerator();
        while (iterator1.MoveNext() && iterator2.MoveNext())
        {
            await MergeDatabaseValuesAsync(
                collection.FindEntry(iterator1.Current), databaseCollection.FindEntry(iterator2.Current));
        }
    }
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
closed-no-further-action The issue is closed and no further action is planned. customer-reported
Projects
None yet
Development

No branches or pull requests

4 participants