Skip to content

Commit

Permalink
[Backport] Fix Appenderator.push() to commit the metadata of all segm…
Browse files Browse the repository at this point in the history
…ents (#5756)
  • Loading branch information
jihoonson authored and gianm committed May 8, 2018
1 parent fc716fa commit 7dc4f4b
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@
* both of those. It can also push data to deep storage. But, it does not decide which segments data should go into.
* It also doesn't publish segments to the metadata store or monitor handoff; you have to do that yourself!
* <p>
* You can provide a {@link Committer} or a Supplier of one when you call one of the methods that adds, persists, or
* pushes data. The Committer should represent all data you have given to the Appenderator so far. This Committer will
* be used when that data has been persisted to disk.
* You can provide a {@link Committer} or a Supplier of one when you call one of the methods that {@link #add},
* {@link #persistAll}, or {@link #push}. The Committer should represent all data you have given to the Appenderator so
* far. This Committer will be used when that data has been persisted to disk.
*/
public interface Appenderator extends QuerySegmentWalker, Closeable
{
Expand Down Expand Up @@ -73,8 +73,9 @@ default AppenderatorAddResult add(SegmentIdentifier identifier, InputRow row, Su
* Committer is guaranteed to be *created* synchronously with the call to add, but will actually be used
* asynchronously.
* <p>
* If committer is not provided, no metadata is persisted. If it's provided, the add, clear, persist, persistAll,
* and push methods should all be called from the same thread to keep the metadata committed by Committer in sync.
* If committer is not provided, no metadata is persisted. If it's provided, {@link #add}, {@link #clear},
* {@link #persistAll}, and {@link #push} methods should all be called from the same thread to keep the metadata
* committed by Committer in sync.
*
* @param identifier the segment into which this row should be added
* @param row the row to add
Expand Down Expand Up @@ -128,8 +129,8 @@ AppenderatorAddResult add(
* for some reason, rows have been added that we do not actually want to hand off. Blocks until all data has been
* cleared. This may take some time, since all pending persists must finish first.
* <p>
* The add, clear, persist, persistAll, and push methods should all be called from the same thread to keep the
* metadata committed by Committer in sync.
* {@link #add}, {@link #clear}, {@link #persistAll}, and {@link #push} methods should all be called from the same
* thread to keep the metadata committed by Committer in sync.
*/
void clear() throws InterruptedException;

Expand All @@ -147,50 +148,31 @@ AppenderatorAddResult add(
*/
ListenableFuture<?> drop(SegmentIdentifier identifier);

/**
* Persist any in-memory indexed data for segments of the given identifiers to durable storage. This may be only
* somewhat durable, e.g. the machine's local disk. The Committer will be made synchronously with the call to
* persist, but will actually be used asynchronously. Any metadata returned by the committer will be associated with
* the data persisted to disk.
* <p>
* If committer is not provided, no metadata is persisted. If it's provided, the add, clear, persist, persistAll,
* and push methods should all be called from the same thread to keep the metadata committed by Committer in sync.
*
* @param identifiers segment identifiers to be persisted
* @param committer a committer associated with all data that has been added to segments of the given identifiers so
* far
*
* @return future that resolves when all pending data to segments of the identifiers has been persisted, contains
* commit metadata for this persist
*/
ListenableFuture<Object> persist(Collection<SegmentIdentifier> identifiers, @Nullable Committer committer);

/**
* Persist any in-memory indexed data to durable storage. This may be only somewhat durable, e.g. the
* machine's local disk. The Committer will be made synchronously with the call to persistAll, but will actually
* be used asynchronously. Any metadata returned by the committer will be associated with the data persisted to
* disk.
* <p>
* If committer is not provided, no metadata is persisted. If it's provided, the add, clear, persist, persistAll,
* and push methods should all be called from the same thread to keep the metadata committed by Committer in sync.
* If committer is not provided, no metadata is persisted. If it's provided, {@link #add}, {@link #clear},
* {@link #persistAll}, and {@link #push} methods should all be called from the same thread to keep the metadata
* committed by Committer in sync.
*
* @param committer a committer associated with all data that has been added so far
*
* @return future that resolves when all pending data has been persisted, contains commit metadata for this persist
*/
default ListenableFuture<Object> persistAll(@Nullable Committer committer)
{
return persist(getSegments(), committer);
}
ListenableFuture<Object> persistAll(@Nullable Committer committer);

/**
* Merge and push particular segments to deep storage. This will trigger an implicit
* {@link #persist(Collection, Committer)} using the provided Committer.
* {@link #persistAll(Committer)} using the provided Committer.
* <p>
* After this method is called, you cannot add new data to any segments that were previously under construction.
* <p>
* If committer is not provided, no metadata is persisted. If it's provided, the add, clear, persist, persistAll,
* and push methods should all be called from the same thread to keep the metadata committed by Committer in sync.
* If committer is not provided, no metadata is persisted. If it's provided, {@link #add}, {@link #clear},
* {@link #persistAll}, and {@link #push} methods should all be called from the same thread to keep the metadata
* committed by Committer in sync.
*
* @param identifiers list of segments to push
* @param committer a committer associated with all data that has been added so far
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -389,12 +389,12 @@ public ListenableFuture<?> drop(final SegmentIdentifier identifier)
}

@Override
public ListenableFuture<Object> persist(Collection<SegmentIdentifier> identifiers, @Nullable Committer committer)
public ListenableFuture<Object> persistAll(@Nullable final Committer committer)
{
final Map<String, Integer> currentHydrants = Maps.newHashMap();
final List<Pair<FireHydrant, SegmentIdentifier>> indexesToPersist = Lists.newArrayList();
int numPersistedRows = 0;
for (SegmentIdentifier identifier : identifiers) {
for (SegmentIdentifier identifier : sinks.keySet()) {
final Sink sink = sinks.get(identifier);
if (sink == null) {
throw new ISE("No sink for identifier: %s", identifier);
Expand Down Expand Up @@ -496,13 +496,6 @@ public Object doCall()
return future;
}

@Override
public ListenableFuture<Object> persistAll(@Nullable final Committer committer)
{
// Submit persistAll task to the persistExecutor
return persist(sinks.keySet(), committer);
}

@Override
public ListenableFuture<SegmentsAndMetadata> push(
final Collection<SegmentIdentifier> identifiers,
Expand All @@ -521,7 +514,9 @@ public ListenableFuture<SegmentsAndMetadata> push(
}

return Futures.transform(
persist(identifiers, committer),
// We should always persist all segments regardless of the input because metadata should be committed for all
// segments.
persistAll(committer),
(Function<Object, SegmentsAndMetadata>) commitMetadata -> {
final List<DataSegment> dataSegments = Lists.newArrayList();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,10 +191,9 @@ public void run()
Assert.assertEquals(1, ((AppenderatorImpl) appenderator).getRowsInMemory());
appenderator.add(IDENTIFIERS.get(0), IR("2000", "bob", 1), committerSupplier);
Assert.assertEquals(2, ((AppenderatorImpl) appenderator).getRowsInMemory());
appenderator.persist(ImmutableList.of(IDENTIFIERS.get(1)), committerSupplier.get());
Assert.assertEquals(1, ((AppenderatorImpl) appenderator).getRowsInMemory());
appenderator.close();
appenderator.persistAll(committerSupplier.get());
Assert.assertEquals(0, ((AppenderatorImpl) appenderator).getRowsInMemory());
appenderator.close();
}
}

Expand Down Expand Up @@ -238,12 +237,12 @@ public void run()
Assert.assertEquals(4, ((AppenderatorImpl) appenderator).getRowsInMemory());
appenderator.add(IDENTIFIERS.get(0), IR("2000", "bob", 1), committerSupplier, false);
Assert.assertEquals(5, ((AppenderatorImpl) appenderator).getRowsInMemory());
appenderator.persist(ImmutableList.of(IDENTIFIERS.get(1)), committerSupplier.get());
Assert.assertEquals(3, ((AppenderatorImpl) appenderator).getRowsInMemory());
appenderator.close();
appenderator.persistAll(committerSupplier.get());
Assert.assertEquals(0, ((AppenderatorImpl) appenderator).getRowsInMemory());
appenderator.close();
}
}

@Test
public void testRestoreFromDisk() throws Exception
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -454,15 +454,13 @@ public ListenableFuture<?> drop(SegmentIdentifier identifier)
}

@Override
public ListenableFuture<Object> persist(
Collection<SegmentIdentifier> identifiers, Committer committer
)
public ListenableFuture<Object> persistAll(Committer committer)
{
if (persistEnabled) {
// do nothing
return Futures.immediateFuture(committer.getMetadata());
} else {
return Futures.immediateFailedFuture(new ISE("Fail test while persisting segments[%s]", identifiers));
return Futures.immediateFailedFuture(new ISE("Fail test while persisting segments[%s]", rows.keySet()));
}
}

Expand All @@ -488,7 +486,7 @@ public ListenableFuture<SegmentsAndMetadata> push(
)
.collect(Collectors.toList());
return Futures.transform(
persist(identifiers, committer),
persistAll(committer),
(Function<Object, SegmentsAndMetadata>) commitMetadata -> new SegmentsAndMetadata(segments, commitMetadata)
);
} else {
Expand Down

0 comments on commit 7dc4f4b

Please sign in to comment.