Skip to content

Commit

Permalink
Merge pull request #919: [proxima-direct-io-cassandra] #331 fix deser…
Browse files Browse the repository at this point in the history
…ialization of values written with serializer v2
  • Loading branch information
je-ik authored Jun 17, 2024
2 parents a7e24c4 + 5679978 commit 72d2200
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import cz.o2.proxima.direct.core.randomaccess.RandomOffset;
import cz.o2.proxima.direct.io.cassandra.CassandraDBAccessor.ClusterHolder;
import cz.o2.proxima.direct.io.cassandra.CqlFactory.KvIterable;
import cz.o2.proxima.direct.io.cassandra.Offsets.Raw;
import java.nio.ByteBuffer;
import java.util.Objects;
import java.util.Optional;
Expand Down Expand Up @@ -68,7 +69,7 @@ public <T> Optional<KeyValue<T>> get(
byte[] rowValue = val.array();

try {
return Optional.of(
return Optional.ofNullable(
accessor
.getCqlFactory()
.toKeyValue(
Expand Down Expand Up @@ -133,23 +134,27 @@ public <T> void scanWildcard(
byte[] rowValue = val.array();
// by convention
String name = wildcard.toAttributePrefix() + accessor.asString(attribute);
@Nullable
KeyValue<T> keyValue =
accessor
.getCqlFactory()
.toKeyValue(
getEntityDescriptor(),
wildcard,
key,
name,
System.currentTimeMillis(),
new Raw(name),
rowValue);

Optional<T> parsed = wildcard.getValueSerializer().deserialize(rowValue);

if (parsed.isPresent()) {
consumer.accept(
accessor
.getCqlFactory()
.toKeyValue(
getEntityDescriptor(),
wildcard,
key,
name,
System.currentTimeMillis(),
new Offsets.Raw(name),
rowValue));
if (keyValue != null) {
consumer.accept(keyValue);
} else {
log.error("Failed to parse value for key {} attribute {}.{}", key, wildcard, attribute);
log.error(
"Failed to parse value for key {} attribute {} using class {}",
key,
name,
wildcard.getValueSerializer().getClass());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ Statement scanPartition(
List<AttributeDescriptor<?>> attributes, CassandraPartition partition, Session session);

/** Convert the byte[] stored in the database into {@link KeyValue}. */
<T> KeyValue<T> toKeyValue(
<T> @Nullable KeyValue<T> toKeyValue(
EntityDescriptor entityDescriptor,
AttributeDescriptor<T> attributeDescriptor,
String key,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,15 @@
import cz.o2.proxima.core.repository.AttributeDescriptor;
import cz.o2.proxima.core.repository.EntityDescriptor;
import cz.o2.proxima.core.storage.StreamElement;
import cz.o2.proxima.core.util.ExceptionUtils;
import cz.o2.proxima.direct.core.randomaccess.KeyValue;
import cz.o2.proxima.direct.core.randomaccess.RandomOffset;
import cz.o2.proxima.io.serialization.proto.Serialization;
import cz.o2.proxima.io.serialization.proto.Serialization.Cell;
import cz.o2.proxima.io.serialization.shaded.com.google.protobuf.ByteString;
import java.io.IOException;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -60,7 +61,7 @@ public class DefaultCqlFactory extends CacheableCqlFactory {
interface Serializer extends Serializable {
byte[] asCellBytes(StreamElement element);

<T> KeyValue<T> fromCellBytes(
<T> @Nullable KeyValue<T> fromCellBytes(
EntityDescriptor entityDescriptor,
AttributeDescriptor<T> attributeDescriptor,
String key,
Expand All @@ -78,7 +79,7 @@ public byte[] asCellBytes(StreamElement element) {
}

@Override
public <T> KeyValue<T> fromCellBytes(
public <T> @Nullable KeyValue<T> fromCellBytes(
EntityDescriptor entityDescriptor,
AttributeDescriptor<T> attributeDescriptor,
String key,
Expand All @@ -104,7 +105,7 @@ public byte[] asCellBytes(StreamElement element) {
}

@Override
public <T> KeyValue<T> fromCellBytes(
public <T> @Nullable KeyValue<T> fromCellBytes(
EntityDescriptor entityDescriptor,
AttributeDescriptor<T> attributeDescriptor,
String key,
Expand All @@ -113,26 +114,36 @@ public <T> KeyValue<T> fromCellBytes(
RandomOffset offset,
byte[] serializedValue) {

Cell cell = ExceptionUtils.uncheckedFactory(() -> Cell.parseFrom(serializedValue));
if (cell.getSeqId() > 0) {
try {
Cell cell = Cell.parseFrom(serializedValue);
if (cell.getSeqId() > 0) {
return KeyValue.of(
entityDescriptor,
attributeDescriptor,
cell.getSeqId(),
key,
attribute,
offset,
cell.getValue().toByteArray(),
stamp);
}
return KeyValue.of(
entityDescriptor,
attributeDescriptor,
cell.getSeqId(),
key,
attribute,
offset,
cell.getValue().toByteArray(),
stamp);
} catch (IOException ex) {
log.warn(
"Failed to parse cell from bytes {} in key {}, entity {}, attribute {}",
Arrays.toString(serializedValue),
key,
entityDescriptor,
attribute);
}
return KeyValue.of(
entityDescriptor,
attributeDescriptor,
key,
attribute,
offset,
cell.getValue().toByteArray(),
stamp);
return null;
}
}

Expand Down Expand Up @@ -403,7 +414,7 @@ public Statement scanPartition(
}

@Override
public <T> KeyValue<T> toKeyValue(
public <T> @Nullable KeyValue<T> toKeyValue(
EntityDescriptor entityDescriptor,
AttributeDescriptor<T> attributeDescriptor,
String key,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -549,4 +549,37 @@ public void testV2SerializerIngestWildcard() {
"INSERT INTO my_table (hgw, device, my_col) VALUES (?, ?, ?) USING TIMESTAMP ?",
preparedStatement.get(0));
}

@Test
public void testV2SerializerRead() {
factory.setup(
entity,
URI.create("cassandra://whatever/my_table?primary=hgw&data=my_col&serializer=v2"),
StringConverter.getDefault());
long now = System.currentTimeMillis();
KeyValue<?> kv =
factory.toKeyValue(
entity,
attrWildcard,
"key",
attrWildcard.toAttributePrefix(true) + "1",
now,
Offsets.empty(),
Cell.newBuilder()
.setSeqId(1L)
.setValue(ByteString.copyFrom(new byte[] {1}))
.build()
.toByteArray());
assertNotNull(kv);
kv =
factory.toKeyValue(
entity,
attrWildcard,
"key",
attrWildcard.toAttributePrefix(true) + "1",
System.currentTimeMillis(),
Offsets.empty(),
new byte[] {(byte) 199, 0});
assertNull(kv);
}
}

0 comments on commit 72d2200

Please sign in to comment.