Skip to content

Commit

Permalink
[proxima-core] #96 fix delete wildcard replication
Browse files Browse the repository at this point in the history
  • Loading branch information
je-ik committed Jul 17, 2018
1 parent 16132a2 commit f6833d9
Show file tree
Hide file tree
Showing 10 changed files with 357 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -152,9 +152,11 @@ private AttributeFamilyProxyDescriptor(
getRandomAccess(lookup, targetFamilyRead),
getPartitionedView(lookup, targetFamilyRead),
getPartitionedCachedView(entity, lookup, targetFamilyRead, targetFamilyWrite),
targetFamilyRead.getType() == StorageType.PRIMARY
? targetFamilyRead.getAccess()
: AccessType.or(targetFamilyRead.getAccess(), AccessType.from("read-only")),
targetFamilyWrite.getType() ==
StorageType.PRIMARY && targetFamilyRead.getType() == StorageType.PRIMARY
? targetFamilyRead.getAccess()
: AccessType.or(
targetFamilyRead.getAccess(), AccessType.from("read-only")),
targetFamilyRead.getFilter(),
null);

Expand Down Expand Up @@ -326,7 +328,10 @@ public void observe(
BatchLogObserver observer) {

reader.observe(
partitions, attributes, wrapTransformed(lookup, observer));
partitions, attributes.stream()
.map(a -> lookup.lookupProxy(a.getName()))
.collect(Collectors.toList()),
wrapTransformed(lookup, observer));
}

};
Expand Down Expand Up @@ -357,14 +362,17 @@ public RandomOffset fetchOffset(
return reader.fetchOffset(type, key);
}

@SuppressWarnings("unchecked")
@Override
public <T> Optional<KeyValue<T>> get(
String key, String attribute, AttributeDescriptor<T> desc) {

AttributeProxyDescriptorImpl<?> targetAttribute = lookup.lookupProxy(
AttributeProxyDescriptorImpl<T> targetAttribute;
targetAttribute = (AttributeProxyDescriptorImpl<T>) lookup.lookupProxy(
desc.getName());
ProxyTransform transform = targetAttribute.getReadTransform();
return reader.get(key, transform.fromProxy(attribute), desc)
return reader.get(
key, transform.fromProxy(attribute), targetAttribute.getReadTarget())
.map(kv -> transformToProxy(kv, targetAttribute));
}

Expand Down
91 changes: 54 additions & 37 deletions core/src/main/java/cz/o2/proxima/repository/ConfigRepository.java
Original file line number Diff line number Diff line change
Expand Up @@ -1208,7 +1208,11 @@ private void createReplicationTransformations(
.setFilter(replicated.getFilter())
.setTransformation(
renameTransform(
sourceMapping::get,
source -> Objects.requireNonNull(
sourceMapping.get(source),
() -> String.format(
"Source mapping doesn't contain %s: %s",
source, sourceMapping)),
(input, desc) -> {
String raw = strippingReplPrefix(input);
// each incoming attribute is proxy
Expand Down Expand Up @@ -1236,7 +1240,11 @@ private void createReplicationTransformations(
.setFilter(replicated.getFilter())
.setTransformation(
renameTransform(
sourceMapping::get,
source -> Objects.requireNonNull(
sourceMapping.get(source),
() -> String.format(
"Source mapping doesn't contain %s: %s",
source, sourceMapping)),
(a, desc) -> renameReplicated(replPrefix, a)))
.build());
}
Expand Down Expand Up @@ -1273,11 +1281,13 @@ private void createReplicationTransformations(
.setEntity(entity)
.setFilter(target.getFilter())
.setTransformation(
renameTransform(
sourceMapping::get,
renameTransform(source -> Objects.requireNonNull(
sourceMapping.get(source),
() -> String.format(
"Source mapping doesn't contain %s: %s",
source, sourceMapping)),
(input, desc) -> {
String raw = strippingReplPrefix(input);
// each incoming attribute is proxy
AttributeProxyDescriptorImpl<?> proxyDesc;
proxyDesc = ((AttributeDescriptorBase<?>) sourceToOrig
.get(desc)).toProxy();
Expand Down Expand Up @@ -1344,7 +1354,8 @@ private static String renameReplicated(
int dollar = input.indexOf('$');
if (dollar < 0) {
if (strict) {
throw new IllegalArgumentException("Invalid input, missing $ separator");
throw new IllegalArgumentException(
"Invalid input, missing $ separator in `" + input + "'");
} else {
return prefix + input;
}
Expand Down Expand Up @@ -1373,37 +1384,43 @@ public void setup(Repository repo) {
}

@Override
public int apply(
StreamElement input,
Collector<StreamElement> collector) {

AttributeDescriptor<?> desc = descTransform.apply(
input.getAttributeDescriptor());

collector.collect(input.isDelete()
? input.isDeleteWildcard()
? StreamElement.deleteWildcard(
input.getEntityDescriptor(),
desc,
input.getUuid(),
input.getKey(),
input.getStamp())
: StreamElement.delete(
input.getEntityDescriptor(),
desc,
input.getUuid(),
input.getKey(),
nameTransform.apply(input.getAttribute(), input.getAttributeDescriptor()),
input.getStamp())
: StreamElement.update(
input.getEntityDescriptor(),
desc,
input.getUuid(),
input.getKey(),
nameTransform.apply(input.getAttribute(), input.getAttributeDescriptor()),
input.getStamp(),
input.getValue()));
return 1;
public int apply(StreamElement input, Collector<StreamElement> collector) {
try {
AttributeDescriptor<?> desc = descTransform.apply(
input.getAttributeDescriptor());

collector.collect(input.isDelete()
? input.isDeleteWildcard()
? StreamElement.deleteWildcard(
input.getEntityDescriptor(),
desc,
input.getUuid(),
input.getKey(),
nameTransform.apply(
input.getAttribute(), input.getAttributeDescriptor()),
input.getStamp())
: StreamElement.delete(
input.getEntityDescriptor(),
desc,
input.getUuid(),
input.getKey(),
nameTransform.apply(
input.getAttribute(), input.getAttributeDescriptor()),
input.getStamp())
: StreamElement.update(
input.getEntityDescriptor(),
desc,
input.getUuid(),
input.getKey(),
nameTransform.apply(
input.getAttribute(), input.getAttributeDescriptor()),
input.getStamp(),
input.getValue()));
return 1;
} catch (Exception ex) {
log.warn("Failed to apply rename transform on {}", input, ex);
return 0;
}
}
};
}
Expand Down
30 changes: 26 additions & 4 deletions core/src/main/java/cz/o2/proxima/storage/StreamElement.java
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ public static StreamElement delete(
* @param attributeDesc descriptor of attribute
* @param uuid UUID of the event
* @param key key of entity
* @param attribute string representation of the attribute
* @param stamp timestamp of the event
* @return {@link StreamElement} to be written to the system
*/
Expand All @@ -94,11 +95,33 @@ public static StreamElement deleteWildcard(
AttributeDescriptor<?> attributeDesc,
String uuid,
String key,
String attribute,
long stamp) {

return new StreamElement(
entityDesc, attributeDesc, uuid,
key, attributeDesc.toAttributePrefix() + "*", stamp, null);
key, attribute, stamp, null);
}

/**
* Delete all versions of given wildcard attribute.
* @param entityDesc descriptor of entity
* @param attributeDesc descriptor of attribute
* @param uuid UUID of the event
* @param key key of entity
* @param stamp timestamp of the event
* @return {@link StreamElement} to be written to the system
*/
public static StreamElement deleteWildcard(
EntityDescriptor entityDesc,
AttributeDescriptor<?> attributeDesc,
String uuid,
String key,
long stamp) {

return deleteWildcard(
entityDesc, attributeDesc, uuid, key,
attributeDesc.toAttributePrefix() + "*", stamp);
}

@Getter
Expand All @@ -114,7 +137,6 @@ public static StreamElement deleteWildcard(
private final String key;

@Getter
@Nullable
private final String attribute;

@Getter
Expand All @@ -129,15 +151,15 @@ protected StreamElement(
AttributeDescriptor<?> attributeDesc,
String uuid,
String key,
@Nullable String attribute,
String attribute,
long stamp,
@Nullable byte[] value) {

this.entityDescriptor = Objects.requireNonNull(entityDesc);
this.attributeDescriptor = Objects.requireNonNull(attributeDesc);
this.uuid = Objects.requireNonNull(uuid);
this.key = Objects.requireNonNull(key);
this.attribute = attribute;
this.attribute = Objects.requireNonNull(attribute);
this.stamp = stamp;
this.value = value;
}
Expand Down
Loading

0 comments on commit f6833d9

Please sign in to comment.