Skip to content

Commit

Permalink
Merge pull request #30 from datadrivencz/move-to-euphoria-0.8
Browse files Browse the repository at this point in the history
[proxima-core] upgrade to euphoria 0.8.0
  • Loading branch information
je-ik committed Jan 20, 2018
2 parents 14c5ad7 + 5e258a2 commit 5bfc477
Show file tree
Hide file tree
Showing 36 changed files with 511 additions and 223 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
import cz.o2.proxima.repository.AttributeDescriptor;
import cz.o2.proxima.storage.StreamElement;
import cz.o2.proxima.storage.URIUtil;
import cz.seznam.euphoria.shaded.guava.com.google.common.annotations.VisibleForTesting;
import cz.seznam.euphoria.shaded.guava.com.google.common.base.Strings;
import cz.seznam.euphoria.shadow.com.google.common.annotations.VisibleForTesting;
import cz.seznam.euphoria.shadow.com.google.common.base.Strings;
import lombok.extern.slf4j.Slf4j;

import javax.annotation.Nullable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@
import cz.o2.proxima.storage.batch.BatchLogObservable;
import cz.o2.proxima.storage.randomaccess.RandomAccessReader;
import cz.o2.proxima.util.Classpath;
import cz.seznam.euphoria.shaded.guava.com.google.common.annotations.VisibleForTesting;
import cz.seznam.euphoria.shaded.guava.com.google.common.base.Strings;
import cz.seznam.euphoria.shadow.com.google.common.annotations.VisibleForTesting;
import cz.seznam.euphoria.shadow.com.google.common.base.Strings;
import lombok.extern.slf4j.Slf4j;

import java.net.InetSocketAddress;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/
package cz.o2.proxima.storage.cassandra;

import cz.seznam.euphoria.shaded.guava.com.google.common.base.Strings;
import cz.seznam.euphoria.shadow.com.google.common.base.Strings;
import java.util.Date;
import javax.annotation.Nullable;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import cz.o2.proxima.repository.AttributeDescriptor;
import cz.o2.proxima.storage.StreamElement;
import cz.o2.proxima.util.Pair;
import cz.seznam.euphoria.shaded.guava.com.google.common.base.Joiner;
import cz.seznam.euphoria.shadow.com.google.common.base.Joiner;
import java.io.Serializable;
import lombok.extern.slf4j.Slf4j;

Expand Down
4 changes: 2 additions & 2 deletions client/src/main/java/cz/o2/proxima/client/IngestClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
import cz.o2.proxima.proto.service.IngestServiceGrpc.IngestServiceStub;
import cz.o2.proxima.proto.service.RetrieveServiceGrpc;
import cz.o2.proxima.proto.service.Rpc;
import cz.seznam.euphoria.shaded.guava.com.google.common.annotations.VisibleForTesting;
import cz.seznam.euphoria.shaded.guava.com.google.common.base.Strings;
import cz.seznam.euphoria.shadow.com.google.common.annotations.VisibleForTesting;
import cz.seznam.euphoria.shadow.com.google.common.base.Strings;
import io.grpc.Channel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;
Expand Down
6 changes: 3 additions & 3 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
</relocation>
<relocation>
<pattern>com.google.common.</pattern>
<shadedPattern>cz.seznam.euphoria.shaded.guava.com.google.common.</shadedPattern>
<shadedPattern>cz.seznam.euphoria.shadow.com.google.common.</shadedPattern>
</relocation>
</relocations>
</configuration>
Expand Down Expand Up @@ -131,7 +131,7 @@

<dependency>
<groupId>cz.seznam.euphoria</groupId>
<artifactId>euphoria-inmem</artifactId>
<artifactId>euphoria-local</artifactId>
<version>${euphoria.version}</version>
<scope>test</scope>
</dependency>
Expand All @@ -149,7 +149,7 @@
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>21.0</version>
<version>23.0</version>
<scope>test</scope>
</dependency>

Expand Down
5 changes: 2 additions & 3 deletions core/src/main/java/cz/o2/proxima/metrics/AbsoluteMetric.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright 2017 O2 Czech Republic, a.s.
* Copyright 2017-2018 O2 Czech Republic, a.s.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -13,10 +13,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cz.o2.proxima.metrics;

import cz.seznam.euphoria.shaded.guava.com.google.common.util.concurrent.AtomicDouble;
import cz.seznam.euphoria.shadow.com.google.common.util.concurrent.AtomicDouble;
import java.beans.ConstructorProperties;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

import com.tdunning.math.stats.TDigest;
import cz.o2.proxima.util.Pair;
import cz.seznam.euphoria.shaded.guava.com.google.common.base.Preconditions;
import cz.seznam.euphoria.shadow.com.google.common.base.Preconditions;
import java.util.Arrays;

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright 2017 O2 Czech Republic, a.s.
* Copyright 2017-2018 O2 Czech Republic, a.s.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -13,11 +13,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cz.o2.proxima.repository;

import cz.o2.proxima.util.NamePattern;
import cz.seznam.euphoria.shaded.guava.com.google.common.collect.Maps;
import cz.seznam.euphoria.shadow.com.google.common.collect.Maps;
import java.util.Collections;
import java.util.List;
import java.util.Map;
Expand Down
3 changes: 3 additions & 0 deletions core/src/main/java/cz/o2/proxima/repository/Repository.java
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ private Builder(Config config, boolean test) {
this.executorFactory = () -> Executors.newCachedThreadPool(r -> {
Thread t = new Thread(r);
t.setName("ProximaRepositoryPool");
t.setUncaughtExceptionHandler((thr, exc) -> {
log.error("Error running task in thread {}", thr.getName(), exc);
});
return t;
});

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright 2017 O2 Czech Republic, a.s.
* Copyright 2017-2018 O2 Czech Republic, a.s.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,7 +17,7 @@

import cz.o2.proxima.storage.PassthroughFilter;
import cz.o2.proxima.storage.StorageFilter;
import cz.seznam.euphoria.shaded.guava.com.google.common.base.Preconditions;
import cz.seznam.euphoria.shadow.com.google.common.base.Preconditions;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down
112 changes: 57 additions & 55 deletions core/src/main/java/cz/o2/proxima/view/input/DataSourceUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,17 @@
*/
package cz.o2.proxima.view.input;

import cz.seznam.euphoria.core.client.functional.UnaryFunction;
import cz.seznam.euphoria.core.client.functional.VoidFunction;
import cz.seznam.euphoria.core.client.io.DataSource;
import cz.seznam.euphoria.core.client.io.Partition;
import cz.seznam.euphoria.core.client.io.Reader;
import cz.seznam.euphoria.core.client.io.UnboundedDataSource;
import cz.seznam.euphoria.core.client.io.UnboundedPartition;
import cz.seznam.euphoria.core.client.io.UnboundedReader;
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.BlockingQueue;

/**
Expand All @@ -47,73 +47,75 @@ public static interface Producer extends Serializable {
* @return the single partitioned {@code DataSource}.
*
*/
public static <T> Partition<T> fromBlockingQueue(
public static <T, OFF extends Serializable> UnboundedPartition<T, OFF> fromBlockingQueue(
BlockingQueue<T> queue,
Producer producer) {

return new Partition<T>() {
@Override
public Set<String> getLocations() {
return Collections.singleton("local");
}

@Override
public Reader<T> openReader() throws IOException {
producer.run();
return new Reader<T>() {

T next = null;

@Override
public void close() throws IOException {
// nop
}

@Override
public boolean hasNext() {
try {
next = queue.take();
return true;
} catch (InterruptedException ex) {
log.warn("Interrupted while waiting for next queue element.");
return false;
}
}

@Override
public T next() {
return next;
Producer producer,
VoidFunction<OFF> offsetProducer,
UnaryFunction<OFF, Void> offsetReset,
UnaryFunction<OFF, Void> commitOffset) {

return () -> {
producer.run();
return new UnboundedReader<T, OFF>() {
T next = null;

@Override
public OFF getCurrentOffset() {
return offsetProducer.apply();
}

@Override
public void reset(OFF offset) {
offsetReset.apply(offset);
}

@Override
public void commitOffset(OFF offset) {
commitOffset.apply(offset);
}

@Override
public boolean hasNext() {
try {
next = queue.take();
return true;
} catch (InterruptedException ex) {
log.warn("Interrupted while waiting for next queue element.");
return false;
}
};
}
}

@Override
public T next() {
return next;
}

@Override
public void close() {
// nop
}
};
};
}

/**
* Create {@code Dataset} with given partitions.
*/
@SafeVarargs
public static <T> DataSource<T> fromPartitions(Partition<T>... partitions) {
public static <T, OFF extends Serializable> DataSource<T> fromPartitions(
UnboundedPartition<T, OFF>... partitions) {

return fromPartitions(Arrays.asList(partitions));
}


/**
* Create {@code Dataset} with given partitions.
*/
public static <T> DataSource<T> fromPartitions(List<Partition<T>> partitions) {
return new DataSource<T>() {
@Override
public List<Partition<T>> getPartitions() {
return partitions;
}

@Override
public boolean isBounded() {
return false;
}
public static <T, OFF extends Serializable> DataSource<T> fromPartitions(
List<UnboundedPartition<T, OFF>> partitions) {

};
return (UnboundedDataSource<T, OFF>) () -> partitions;
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright 2017 O2 Czech Republic, a.s.
* Copyright 2017-2018 O2 Czech Republic, a.s.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -13,7 +13,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cz.o2.proxima.repository;

import com.typesafe.config.ConfigFactory;
Expand All @@ -26,7 +25,7 @@
import cz.seznam.euphoria.core.client.dataset.Dataset;
import cz.seznam.euphoria.core.client.io.StdoutSink;
import cz.seznam.euphoria.core.client.operator.MapElements;
import cz.seznam.euphoria.inmem.InMemExecutor;
import cz.seznam.euphoria.executor.local.LocalExecutor;
import java.io.Serializable;
import java.util.concurrent.TimeUnit;
import org.junit.After;
Expand All @@ -42,7 +41,7 @@ public class PartitionedViewTest implements Serializable {
private final transient Repository repo = Repository.Builder.of(
ConfigFactory.load().resolve()).build();

private transient InMemExecutor executor;
private transient LocalExecutor executor;
private final transient EntityDescriptor entity = repo.findEntity("event").get();
private final transient AttributeDescriptor<?> attr = entity.findAttribute("data").get();

Expand All @@ -51,7 +50,7 @@ public class PartitionedViewTest implements Serializable {

@Before
public void setUp() {
executor = new InMemExecutor();
executor = new LocalExecutor();
AttributeFamilyDescriptor family = repo.getAllFamilies()
.filter(af -> af.getName().equals("event-storage-stream"))
.findAny()
Expand Down
6 changes: 3 additions & 3 deletions core/src/test/java/cz/o2/proxima/storage/InMemStorage.java
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,8 @@ public void close() throws Exception {

return flow.createInput(
DataSourceUtils.fromPartitions(
DataSourceUtils.fromBlockingQueue(queue, producer)));
DataSourceUtils.fromBlockingQueue(queue, producer, () -> 0,
a -> null, a -> null)));

}

Expand All @@ -228,8 +229,7 @@ public <T> Dataset<T> observe(
String name,
PartitionedLogObserver<T> observer) {

// FIXME
throw new UnsupportedOperationException("Unsupported yet.");
return observePartitions(flow, getPartitions(), observer);
}

}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright 2017 O2 Czech Republic, a.s.
* Copyright 2017-2018 O2 Czech Republic, a.s.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -13,15 +13,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cz.o2.proxima.util;

import java.io.Serializable;
import java.util.concurrent.CountDownLatch;

/**
* This is pseudo serializable {@code CountDownLatch}.
* This is just for testing consumption in InMemExecutor
* This is just for testing consumption in LocalExecutor
*/
public class SerializableCountDownLatch implements Serializable {

Expand Down
Loading

0 comments on commit 5bfc477

Please sign in to comment.