Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

( Impact on BC/DR - Data loss ) - High priority, Nodes with field value is 2D Point will throw exception and cannot be successfully sink to Neo4j #611

Open
yhhongyang opened this issue Dec 26, 2023 · 1 comment

Comments

@yhhongyang
Copy link

Description

In the source instance, create a node with a field value is 2D point, it will generate a Kafka payload like this

{
    "payload":{
        "id":"11",
        "before":null,
        "after":{
            "properties":{
                "address":{
                    "crs":"wgs-84",
                    "latitude":56.7,
                    "longitude":12.78,
                    "height":null
                }
            },
            "labels":[
                "Location"
            ]
        },
        "type":"node"
    },
    "schema":{
        "properties":{
            "address":"PointValue"
        },
        "constraints":[

        ]
    }
}

As the value of the height is null, in the sink instance, null values cannot be deserialized into correct PointValue, it will throw exception like this

2023-12-22 07:58:36.476+0000 INFO  [neo4j/2b281484] [Sink] Registering the Streams Sink procedures
org.neo4j.exceptions.InvalidArgumentException: Cannot assign NO_VALUE to field height
	at org.neo4j.values.storable.PointValue$PointBuilder.assignFloatingPoint(PointValue.java:667)
	at org.neo4j.values.storable.PointValue$PointBuilder.assign(PointValue.java:610)
	at org.neo4j.values.storable.PointValue.lambda$fromMap$0(PointValue.java:413)
	at org.neo4j.values.virtual.MapValue$MapWrappingMapValue.foreach(MapValue.java:127)
	at org.neo4j.values.storable.PointValue.fromMap(PointValue.java:413)
	at streams.utils.StreamsTransactionEventDeserializer.convertPoints(JSONUtils.kt:239)
	at streams.utils.StreamsTransactionEventDeserializer.deserialize(JSONUtils.kt:226)
	at streams.utils.StreamsTransactionNodeEventDeserializer.deserialize(JSONUtils.kt:200)
	at streams.utils.StreamsTransactionNodeEventDeserializer.deserialize(JSONUtils.kt:181)
	at com.fasterxml.jackson.databind.ObjectMapper._convert(ObjectMapper.java:4444)
	at com.fasterxml.jackson.databind.ObjectMapper.convertValue(ObjectMapper.java:4390)
	at streams.utils.JSONUtils.asStreamsTransactionEvent(JSONUtils.kt:388)
	at streams.utils.SchemaUtils.toStreamsTransactionEvent(SchemaUtils.kt:36)
	at streams.service.sink.strategy.SourceIdIngestionStrategy.mergeNodeEvents(SourceIdIngestionStrategy.kt:72)
        .......

Maybe in the source side, if the points are in 2D, the Kafka payload should not include height field.

Expected Behavior (Mandatory)

Nodes that has a field whose value is 2D point, can be successfully sinked to another Neo4j instance

Actual Behavior (Mandatory)

Currently, in the sink instance, it will throw this kind of exceptions when it handle kafka event that contains 2D point

to missing (therefore NULL) value for creator parameter start which is a non-nullable type
 at [Source: UNKNOWN; byte offset: #UNKNOWN] (through reference chain: streams.events.RelationshipPayload["start"]), key="431-0", value={"meta":{"timestamp":1703232298105,"username":"neo4j","txId":431,"txEventId":0,"txEventsCount":1,"operation":"created","source":{"hostname":"dhcp-9-245-199-179.e2y-cn.ibmmobiledemo.com"}},"payload":{", executingClass=class streams.kafka.KafkaAutoCommitEventConsumer)
	at streams.service.errors.KafkaErrorService.report(KafkaErrorService.kt:37) ~[neo4j-streams-local-4.1.3.jar:?]
	at streams.kafka.KafkaAutoCommitEventConsumer.executeAction(KafkaAutoCommitEventConsumer.kt:98) ~[neo4j-streams-local-4.1.3.jar:?]
	at streams.kafka.KafkaAutoCommitEventConsumer.readSimple(KafkaAutoCommitEventConsumer.kt:89) ~[neo4j-streams-local-4.1.3.jar:?]
	at streams.kafka.KafkaAutoCommitEventConsumer.read(KafkaAutoCommitEventConsumer.kt:119) ~[neo4j-streams-local-4.1.3.jar:?]
	at streams.kafka.KafkaEventSink$createJob$1.invokeSuspend(KafkaEventSink.kt:163) [neo4j-streams-local-4.1.3.jar:?]
	at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33) [neo4j-streams-local-4.1.3.jar:?]
	at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:108) [neo4j-streams-local-4.1.3.jar:?]
	at kotlinx.coroutines.internal.LimitedDispatcher$Worker.run(LimitedDispatcher.kt:115) [neo4j-streams-local-4.1.3.jar:?]
	at kotlinx.coroutines.scheduling.TaskImpl.run(Tasks.kt:103) [neo4j-streams-local-4.1.3.jar:?]
	at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:584) [neo4j-streams-local-4.1.3.jar:?]
	at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask(CoroutineScheduler.kt:793) [neo4j-streams-local-4.1.3.jar:?]
	at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker(CoroutineScheduler.kt:697) [neo4j-streams-local-4.1.3.jar:?]
	at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:684) [neo4j-streams-local-4.1.3.jar:?]

The root cause of this error is

2023-12-22 07:58:36.476+0000 INFO  [neo4j/2b281484] [Sink] Registering the Streams Sink procedures
org.neo4j.exceptions.InvalidArgumentException: Cannot assign NO_VALUE to field height
	at org.neo4j.values.storable.PointValue$PointBuilder.assignFloatingPoint(PointValue.java:667)
	at org.neo4j.values.storable.PointValue$PointBuilder.assign(PointValue.java:610)
	at org.neo4j.values.storable.PointValue.lambda$fromMap$0(PointValue.java:413)
	at org.neo4j.values.virtual.MapValue$MapWrappingMapValue.foreach(MapValue.java:127)
	at org.neo4j.values.storable.PointValue.fromMap(PointValue.java:413)
	at streams.utils.StreamsTransactionEventDeserializer.convertPoints(JSONUtils.kt:239)
	at streams.utils.StreamsTransactionEventDeserializer.deserialize(JSONUtils.kt:226)
	at streams.utils.StreamsTransactionNodeEventDeserializer.deserialize(JSONUtils.kt:200)
	at streams.utils.StreamsTransactionNodeEventDeserializer.deserialize(JSONUtils.kt:181)
	at com.fasterxml.jackson.databind.ObjectMapper._convert(ObjectMapper.java:4444)
	at com.fasterxml.jackson.databind.ObjectMapper.convertValue(ObjectMapper.java:4390)
	at streams.utils.JSONUtils.asStreamsTransactionEvent(JSONUtils.kt:388)
	at streams.utils.SchemaUtils.toStreamsTransactionEvent(SchemaUtils.kt:36)
	at streams.service.sink.strategy.SourceIdIngestionStrategy.mergeNodeEvents(SourceIdIngestionStrategy.kt:72)
	at streams.service.StreamsSinkService.writeWithStrategy(StreamsSinkService.kt:32)
	at streams.service.StreamsSinkService.writeForTopic(StreamsSinkService.kt:40)
	at streams.kafka.KafkaEventSink$createJob$1$timeMillis$1.invoke(KafkaEventSink.kt:167)
	at streams.kafka.KafkaEventSink$createJob$1$timeMillis$1.invoke(KafkaEventSink.kt:163)
	at streams.kafka.KafkaAutoCommitEventConsumer.executeAction(KafkaAutoCommitEventConsumer.kt:95)
	at streams.kafka.KafkaAutoCommitEventConsumer.readSimple(KafkaAutoCommitEventConsumer.kt:89)
	at streams.kafka.KafkaAutoCommitEventConsumer.read(KafkaAutoCommitEventConsumer.kt:119)
	at streams.kafka.KafkaEventSink$createJob$1.invokeSuspend(KafkaEventSink.kt:163)
	at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
	at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:108)
	at kotlinx.coroutines.internal.LimitedDispatcher$Worker.run(LimitedDispatcher.kt:115)
	at kotlinx.coroutines.scheduling.TaskImpl.run(Tasks.kt:103)
	at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:584)
	at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask(CoroutineScheduler.kt:793)
	at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker(CoroutineScheduler.kt:697)
	at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:684)

Currently used versions

Versions

  • OS:
  • Neo4j: 4.4.28
  • Neo4j-Streams: 4.1.3
@yhhongyang
Copy link
Author

Possible solution:
Maybe add more data structures for PointValue

data class StreamsPointCartesian(override val crs: String, val x: Double, val y: Double): StreamsPoint()
data class StreamsPointCartesian3D(override val crs: String, val x: Double, val y: Double, val z: Double): StreamsPoint()
data class StreamsPointWgs(override val crs: String, val latitude: Double, val longitude: Double): StreamsPoint()
data class StreamsPointWgs3D(override val crs: String, val latitude: Double, val longitude: Double, val height: Double): StreamsPoint()

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant