Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

96 replication master rebase #105

Merged
merged 24 commits into from
Jul 18, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
cb9d8b9
[proxima-core] #95 skeleton on double-sided proxy transforms
je-ik May 24, 2018
626ac4c
[proxima-core] #96 create implicit private replication attributes and…
je-ik May 28, 2018
092c233
[proxima-core] #96 add tests for loading of master-slave replication
je-ik Jun 11, 2018
af5c7d4
[proxima-core] #96 transform attribute name in commit log observation
je-ik Jun 11, 2018
d2de730
[proxima-kafka] [proxima-pubsub] enable read of protected attributes
je-ik Jun 11, 2018
45e6ebf
[proxima-core] #96 group proxied attributes to single family
je-ik Jun 12, 2018
b93a2cc
[proxima-core] #96 partly working replication, missing some core tests
je-ik Jun 14, 2018
b4a85b6
[proxima-core] #96 add concept of read-only replication
je-ik Jun 18, 2018
9d444a5
[proxima-kafka] bughunting
je-ik Jun 18, 2018
4974351
[proxima-core] #96 add tests for cached view with and without replica…
je-ik Jun 19, 2018
5f5bc6d
[proxima-core] #96 add read from replication local writes
je-ik Jun 20, 2018
4d286e1
[proxima-kafka] clone Properties on KafkaConsumerFactory
je-ik Jun 20, 2018
fd40a45
[proxima-core] #96 functional replication on proxied attributes
je-ik Jun 26, 2018
c84489f
[proxima-core] add validation to attributes on InMemStorage
je-ik Jul 2, 2018
3097680
[proxima-core] add TransformationRunner
je-ik Jul 10, 2018
2e64d27
[proxima-core] #96 finalize replication support
je-ik Jul 10, 2018
86326b7
[proxima-core] #96 correctly bind transformations to replicated attri…
je-ik Jul 11, 2018
38bf4ae
[proxima-kafka] [proxima-ingest-server] small fixes, improved logging
je-ik Jul 11, 2018
e0328c4
[proxima-core] #96 multiple transformations fix
je-ik Jul 13, 2018
3026d6f
[proxima-core] #96 fix delete wildcard replication
je-ik Jul 13, 2018
646dffa
[proxima] move to 0.2.1-SNAPSHOT
je-ik Jul 17, 2018
d36caf6
[proxima-core] Tune sonar issues
je-ik Jul 17, 2018
74ff3ba
[proxima-parent] remove oracle-java8-installer from .travis
je-ik Jul 18, 2018
c7d1ba2
[proxima-kafka] small tuning
je-ik Jul 18, 2018
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ jdk:
addons:
apt:
packages:
- oracle-java8-installer
- realpath
sonarcloud:
organization: datadriven
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,29 +13,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cz.o2.proxima.repository;
package cz.o2.proxima.functional;

import cz.o2.proxima.annotations.Stable;
import java.io.Serializable;

/**
* A transformation of attribute name applied both on reading and writing attribute.
* Binary function of two arguments.
*/
@Stable
public interface ProxyTransform extends Serializable {
@FunctionalInterface
public interface BiFunction<A, B, OUT> extends Serializable {

/**
* Apply transformation to attribute name from proxy naming.
* @param proxy name of the attribute in proxy namespace
* @return the raw attribute
*/
String fromProxy(String proxy);

/**
* Apply transformation to attribute name to proxy naming.
* @param raw the raw attribute name
* @return the proxy attribute name
*/
String toProxy(String raw);
OUT apply(A first, B second);

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package cz.o2.proxima.repository;

import cz.o2.proxima.transform.ProxyTransform;
import cz.o2.proxima.annotations.Stable;
import cz.o2.proxima.scheme.ValueSerializerFactory;
import java.io.Serializable;
Expand Down Expand Up @@ -49,6 +50,9 @@ private Builder(Repository repo) {
@Setter
private URI schemeURI;

@Setter
private boolean replica = false;

@SuppressWarnings("unchecked")
public <T> AttributeDescriptorImpl<T> build() {
Objects.requireNonNull(name, "Please specify name");
Expand All @@ -57,8 +61,13 @@ public <T> AttributeDescriptorImpl<T> build() {

ValueSerializerFactory factory = repo.getValueSerializerFactory(schemeURI.getScheme());

return new AttributeDescriptorImpl<>(name, entity, schemeURI,
factory == null ? null : (ValueSerializer<T>) factory.getValueSerializer(schemeURI));
return new AttributeDescriptorImpl<>(
name, entity,
schemeURI,
factory == null
? null
: factory.getValueSerializer(schemeURI),
replica);
}
}

Expand All @@ -68,9 +77,25 @@ static Builder newBuilder(Repository repo) {

static <T> AttributeDescriptorBase<T> newProxy(
String name,
AttributeDescriptorBase<T> target,
ProxyTransform transform) {
return new AttributeProxyDescriptorImpl<>(name, target, transform);
AttributeDescriptor<T> targetRead,
ProxyTransform transformRead,
AttributeDescriptor<T> targetWrite,
ProxyTransform transformWrite) {

return newProxy(
name, targetRead, transformRead, targetWrite, transformWrite, false);
}

static <T> AttributeDescriptorBase<T> newProxy(
String name,
AttributeDescriptor<T> targetRead,
ProxyTransform transformRead,
AttributeDescriptor<T> targetWrite,
ProxyTransform transformWrite,
boolean replica) {

return new AttributeProxyDescriptorImpl<>(
name, targetRead, transformRead, targetWrite, transformWrite, replica);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package cz.o2.proxima.repository;

import com.google.common.base.Preconditions;
import cz.o2.proxima.annotations.Internal;
import cz.o2.proxima.scheme.ValueSerializer;
import java.net.URI;
Expand Down Expand Up @@ -43,18 +44,23 @@ public abstract class AttributeDescriptorBase<T> implements AttributeDescriptor<
@Getter
protected final boolean wildcard;

@Getter
protected final boolean replica;

protected @Nullable final ValueSerializer<T> valueSerializer;

public AttributeDescriptorBase(
String name, String entity, URI schemeURI,
@Nullable ValueSerializer<T> valueSerializer) {
@Nullable ValueSerializer<T> valueSerializer,
boolean replica) {

this.name = Objects.requireNonNull(name);
this.entity = Objects.requireNonNull(entity);
this.schemeURI = Objects.requireNonNull(schemeURI);
this.wildcard = this.name.endsWith(".*");
this.proxy = false;
this.valueSerializer = valueSerializer;
this.replica = replica;
if (this.wildcard) {
if (name.length() < 3
|| name.substring(0, name.length() - 1).contains("*")
Expand All @@ -69,13 +75,32 @@ public AttributeDescriptorBase(
}
}

public AttributeDescriptorBase(String name, AttributeDescriptorBase<T> target) {
public AttributeDescriptorBase(
String name,
AttributeDescriptor<T> targetRead,
AttributeDescriptor<T> targetWrite,
boolean replica) {

this.name = Objects.requireNonNull(name);
this.entity = target.getEntity();
this.schemeURI = target.getSchemeURI();
Preconditions.checkArgument(
targetRead.getEntity().equals(targetWrite.getEntity()),
String.format(
"Cannot mix entities in proxies, got %s and %s",
targetRead.getEntity(), targetWrite.getEntity()));
Preconditions.checkArgument(
targetRead.getSchemeURI().equals(targetWrite.getSchemeURI()),
String.format(
"Cannot mix attributes with different schemes, got %s and %s",
targetRead.getSchemeURI(), targetWrite.getSchemeURI()));
Preconditions.checkArgument(
targetRead.isWildcard() == targetWrite.isWildcard(),
"Cannot mix non-wildcard and wildcard attributes in proxy");
this.entity = targetRead.getEntity();
this.schemeURI = targetRead.getSchemeURI();
this.proxy = true;
this.wildcard = target.isWildcard();
this.valueSerializer = target.getValueSerializer();
this.replica = replica;
this.wildcard = targetRead.isWildcard();
this.valueSerializer = targetRead.getValueSerializer();
}

@Override
Expand Down Expand Up @@ -123,4 +148,11 @@ public Builder toBuilder(Repository repo) {
.setSchemeURI(getSchemeURI());
}

AttributeProxyDescriptorImpl<T> toProxy() {
Preconditions.checkArgument(
this instanceof AttributeProxyDescriptorImpl,
"Attribute " + this + " is not proxy attribute");
return (AttributeProxyDescriptorImpl<T>) this;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,10 @@ public class AttributeDescriptorImpl<T>

AttributeDescriptorImpl(
String name, String entity,
URI schemeURI, @Nullable ValueSerializer<T> serializer) {
URI schemeURI, @Nullable ValueSerializer<T> serializer,
boolean replica) {

super(name, entity, schemeURI, serializer);
super(name, entity, schemeURI, serializer, replica);
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@
import cz.o2.proxima.storage.randomaccess.RandomAccessReader;
import cz.o2.proxima.view.PartitionedCachedView;
import cz.o2.proxima.view.PartitionedView;
import cz.seznam.euphoria.shadow.com.google.common.collect.Lists;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
Expand Down Expand Up @@ -83,6 +85,11 @@ public static final class Builder {

private Builder() { }

public Builder clearAttributes() {
attributes.clear();
return this;
}

public Builder addAttribute(AttributeDescriptor<?> desc) {
attributes.add(desc);
return this;
Expand Down Expand Up @@ -143,7 +150,7 @@ public static Builder newBuilder() {

AttributeFamilyDescriptor(String name,
StorageType type,
List<AttributeDescriptor<?>> attributes,
Collection<AttributeDescriptor<?>> attributes,
@Nullable AttributeWriterBase writer,
@Nullable CommitLogReader commitLogReader,
@Nullable BatchLogObservable batchObservable,
Expand All @@ -156,7 +163,7 @@ public static Builder newBuilder() {

this.name = Objects.requireNonNull(name);
this.type = type;
this.attributes = Objects.requireNonNull(attributes);
this.attributes = Lists.newArrayList(Objects.requireNonNull(attributes));
this.writer = writer;
this.commitLogReader = commitLogReader;
this.batchObservable = batchObservable;
Expand All @@ -174,7 +181,7 @@ public List<AttributeDescriptor<?>> getAttributes() {

@Override
public String toString() {
return "AttributeFamily(" + name + ")";
return "AttributeFamily(name=" + name + ", attributes=" + attributes + ")";
}

@Override
Expand Down Expand Up @@ -281,4 +288,29 @@ public Optional<String> getSource() {
return Optional.ofNullable(source);
}

Builder toBuilder() {
Builder ret = new Builder()
.setAccess(access)
.setBatchObservable(batchObservable)
.setCachedView(cachedView)
.setCommitLog(commitLogReader)
.setFilter(filter)
.setName(name)
.setPartitionedView(partitionedView)
.setRandomAccess(randomAccess)
.setSource(source)
.setType(type)
.setWriter(writer);
attributes.forEach(ret::addAttribute);
return ret;
}

/**
* Check if this proxied family.
* @return {@code true} if proxied family
*/
boolean isProxy() {
return false;
}

}
Loading