Skip to content

Commit

Permalink
Incremental producer.
Browse files Browse the repository at this point in the history
  • Loading branch information
Paul Sandoz authored and PaulSandoz committed Apr 9, 2019
1 parent 4d962da commit a17693c
Show file tree
Hide file tree
Showing 14 changed files with 1,847 additions and 1,059 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@

/**
* Beta API subject to change.
* @deprecated see {@link com.netflix.hollow.api.producer.listener.IncrementalPopulateListener}
* @see com.netflix.hollow.api.producer.listener.IncrementalPopulateListener
*/
@Deprecated
public class AbstractIncrementalCycleListener implements IncrementalCycleListener {
@Override
public void onCycleComplete(IncrementalCycleStatus status, long elapsed, TimeUnit unit) { }
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.netflix.hollow.api.producer;

import static com.netflix.hollow.api.producer.HollowIncrementalCyclePopulator.AddIfAbsent;
import static com.netflix.hollow.api.producer.HollowIncrementalCyclePopulator.DELETE_RECORD;

import com.netflix.hollow.core.write.objectmapper.HollowObjectMapper;
Expand All @@ -9,7 +10,7 @@

// @@@ Move into HollowIncrementalCyclePopulator since the event map is a shared resource
// HollowIncrementalCyclePopulator constructed from the write state instance
final class CloseableIncrementalWriteState implements HollowProducer.IncrementalWriteState, AutoCloseable {
final class CloseableIncrementalWriteState implements HollowProducer.Incremental.IncrementalWriteState, AutoCloseable {
private final ConcurrentHashMap<RecordPrimaryKey, Object> events;
private final HollowObjectMapper objectMapper;
private volatile boolean closed;
Expand All @@ -21,41 +22,42 @@ public CloseableIncrementalWriteState(
this.objectMapper = objectMapper;
}

@Override public void addOrModify(Object o) {
@Override
public void addOrModify(Object o) {
ensureNotClosed();

RecordPrimaryKey key;
if (o instanceof FlatRecord) {
FlatRecord fr = (FlatRecord) o;
key = fr.getRecordPrimaryKey();
} else {
key = objectMapper.extractPrimaryKey(o);
}

events.put(key, o);
events.put(getKey(o), o);
}

@Override public void delete(Object o) {
@Override
public void addIfAbsent(Object o) {
ensureNotClosed();

RecordPrimaryKey key;
if (o instanceof FlatRecord) {
FlatRecord fr = (FlatRecord) o;
key = fr.getRecordPrimaryKey();
} else {
key = objectMapper.extractPrimaryKey(o);
}
events.putIfAbsent(getKey(o), new AddIfAbsent(o));
}

delete(key);
@Override
public void delete(Object o) {
delete(getKey(o));
}

@Override public void delete(RecordPrimaryKey key) {
@Override
public void delete(RecordPrimaryKey key) {
ensureNotClosed();

// @@@ Deletion is silently ignored if no object exists for the key
events.put(key, DELETE_RECORD);
}

private RecordPrimaryKey getKey(Object o) {
if (o instanceof FlatRecord) {
FlatRecord fr = (FlatRecord) o;
return fr.getRecordPrimaryKey();
} else {
return objectMapper.extractPrimaryKey(o);
}
}

private void ensureNotClosed() {
if (closed) {
throw new IllegalStateException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,10 @@
/**
* Used by HollowIncrementalProducer for Delta-Based Producer Input
* @since 2.9.9
* @deprecated see {@link HollowProducer.Incremental#runIncrementalCycle(HollowProducer.Incremental.IncrementalPopulator)}
* @see HollowProducer.Incremental#runIncrementalCycle(HollowProducer.Incremental.IncrementalPopulator)
*/
@Deprecated
public class HollowIncrementalCyclePopulator implements HollowProducer.Populator {

public static final Object DELETE_RECORD = new Object();
Expand Down Expand Up @@ -150,7 +153,8 @@ private void addRecords(final HollowProducer.WriteState newState) {
List<Map.Entry<RecordPrimaryKey, Object>> entryList = new ArrayList<>(mutations.entrySet());

AtomicInteger nextMutation = new AtomicInteger(0);


// @@@ Use parallel stream
SimultaneousExecutor executor = new SimultaneousExecutor(threadsPerCpu, getClass(), "add-records");
for(int i=0;i<executor.getCorePoolSize();i++) {
executor.execute(() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,10 @@

/**
* Warning: This is a BETA API and is subject to breaking changes.
* @deprecated see {@link HollowProducer.Incremental}
* @see HollowProducer.Incremental
*/
@Deprecated
public class HollowIncrementalProducer {

private static final long FAILED_VERSION = Long.MIN_VALUE;
Expand Down
Loading

0 comments on commit a17693c

Please sign in to comment.