Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Self-Relationship exception #319

Open
4 of 8 tasks
In-Wake opened this issue Apr 3, 2024 · 1 comment · May be fixed by #351
Open
4 of 8 tasks

Self-Relationship exception #319

In-Wake opened this issue Apr 3, 2024 · 1 comment · May be fixed by #351
Labels
Milestone

Comments

@In-Wake
Copy link

In-Wake commented Apr 3, 2024

Description

Hi
I need to connect objects from the same stream to each other.
Foreign-Key is not prioritized #52
"But I have a workaround with SelectKey(..) combine ˋJoin(..)`."
After writing a test I got an exception
System.ArgumentException: 'An item with the same key has already been added. Key: test-test-driver-app-KSTREAM-TOTABLE-0000000003-repartition'

I use kafka-streams-dotnet from commit 981e9d4

the exception throws from

How to reproduce

Test codes

public class SelfId
{
    public int Id { get; set; }
}

public class SelfRelation
{
    public int Id { get; set; }
    public string Name { get; set; }
    public int? Relation { get; set; }
}

public class Container
{
    public SelfRelation Node { get; set; }
    public SelfRelation Dependency { get; set; }
}

        [Fact]
        public void SelfTes()
        {
            var builder = new StreamBuilder();

            var stream =
                builder.Stream("self", new JsonSerDes<SelfId>(), new JsonSerDes<SelfRelation>());

            var filtrate = stream.Filter((k, v) => v.Relation.HasValue);
            var withRelationKeyStream = filtrate.SelectKey((k, v) => new SelfId { Id = v.Relation!.Value });
            var withRelationKeyTable = withRelationKeyStream.ToTable(
                Materialized<SelfId, SelfRelation, IKeyValueStore<Bytes, byte[]>>
                    .Create<JsonSerDes<SelfId>, JsonSerDes<SelfRelation>>());

            var table = stream.ToTable(
                Materialized<SelfId, SelfRelation, IKeyValueStore<Bytes, byte[]> >
                    .Create<JsonSerDes<SelfId>, JsonSerDes<SelfRelation>>());

            //this line add exception
            var join = withRelationKeyTable.Join(table, (left, right) => new Container { Node = left, Dependency = right });

           var topology = builder.Build();


           var config = new StreamConfig();
           config.ApplicationId = "test-test-driver-app";

           var driver = new TopologyTestDriver(topology, config);

           // create the test input topic
           var inputTopic =
               driver.CreateInputTopic(
                   "self", new JsonSerDes<SelfId>(), new JsonSerDes<SelfRelation>());


           inputTopic.PipeInput(new SelfId { Id = 1 }, new SelfRelation { Id = 1, Name = "self", Relation = 2 });
           inputTopic.PipeInput(new SelfId { Id = 2 }, new SelfRelation { Id = 2, Name = "rel" });

     }

Checklist

Please provide the following information:

  • A complete (i.e. we can run it), minimal program demonstrating the problem. No need to supply a project file.
  • A code snippet with your topology builder (ex: builder.Stream<string, string>("topic").to("an-another-topic");)
  • Streamiz.Kafka.Net nuget version.
  • Apache Kafka version.
  • Client configuration.
  • Operating system.
  • Provide logs (with in debug mode (log4net and StreamConfig.Debug) as necessary in configuration).
  • Critical issue.
@LGouellec
Copy link
Owner

Hi @In-Wake,

Did you try to run the same topology with a real kafka cluster ? It seems there is a bug in the TopologTestDriver?
Btw, sorry for this late reply.

@LGouellec LGouellec added this to the 1.7.0 milestone Jul 17, 2024
@LGouellec LGouellec linked a pull request Jul 18, 2024 that will close this issue
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants