Skip to content

Commit

Permalink
feat: use faillog for handling write errors (#66)
Browse files Browse the repository at this point in the history
  • Loading branch information
mwalkiewicz committed Oct 14, 2021
1 parent 667eb28 commit 24395bf
Show file tree
Hide file tree
Showing 15 changed files with 264 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,16 @@
import com.google.cloud.bigtable.hbase.mirroring.utils.failinghbaseminicluster.FailingHBaseHRegion;
import com.google.cloud.bigtable.hbase.mirroring.utils.failinghbaseminicluster.FailingHBaseHRegionRule;
import com.google.cloud.bigtable.mirroring.hbase1_x.MirroringConnection;
import com.google.cloud.bigtable.mirroring.hbase1_x.utils.faillog.DefaultAppender;
import com.google.common.base.Predicate;
import com.google.common.primitives.Longs;
import java.io.File;
import java.io.FileFilter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Delete;
Expand All @@ -43,6 +48,7 @@
import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.shaded.org.apache.commons.io.FileUtils;
import org.checkerframework.checker.nullness.compatqual.NullableDecl;
import org.junit.Assume;
import org.junit.ClassRule;
Expand Down Expand Up @@ -1223,17 +1229,53 @@ private void catchIOExceptionsIfWillThrow(byte[] rowKey, RunnableThrowingIO runn
}
}

private static int getSecondaryWriteErrorLogMessagesWritten() throws IOException {
Configuration configuration = ConfigurationHelper.newConfiguration();
String prefixPath = configuration.get(DefaultAppender.PREFIX_PATH_KEY);
String[] prefixParts = prefixPath.split("/");
final String fileNamePrefix = prefixParts[prefixParts.length - 1];
String[] directoryParts = Arrays.copyOf(prefixParts, prefixParts.length - 1);
StringBuilder sb = new StringBuilder();
for (String directoryPart : directoryParts) {
sb.append(directoryPart);
sb.append("/");
}
String directoryPath = sb.toString();
File dir = new File(directoryPath);
File[] files =
dir.listFiles(
new FileFilter() {
@Override
public boolean accept(File file) {
return file.getName().startsWith(fileNamePrefix);
}
});

int numberOfLines = 0;
for (File f : files) {
String fileStr = FileUtils.readFileToString(f);
if (!fileStr.isEmpty()) {
numberOfLines += fileStr.split("\n").length;
}
}
return numberOfLines;
}

static class ReportedErrorsContext {
final int initialErrorsConsumed;
final int initialErrorsWritten;

public ReportedErrorsContext() throws IOException {
this.initialErrorsConsumed = TestWriteErrorConsumer.getErrorCount();
this.initialErrorsWritten = getSecondaryWriteErrorLogMessagesWritten();
}

public void assertNewErrorsReported(int expectedNewErrors) throws IOException {
int errorsConsumed = TestWriteErrorConsumer.getErrorCount();
int errorsWritten = getSecondaryWriteErrorLogMessagesWritten();

assertThat(errorsConsumed - initialErrorsConsumed).isEqualTo(expectedNewErrors);
assertThat(errorsWritten - initialErrorsWritten).isEqualTo(expectedNewErrors);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
*/
package com.google.cloud.bigtable.hbase.mirroring.utils;

import com.google.cloud.bigtable.mirroring.hbase1_x.utils.DefaultSecondaryWriteErrorConsumer;
import com.google.cloud.bigtable.mirroring.hbase1_x.utils.SecondaryWriteErrorConsumer;
import com.google.cloud.bigtable.mirroring.hbase1_x.utils.faillog.Logger;
import com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringSpanConstants.HBaseOperation;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -25,6 +27,11 @@

public class TestWriteErrorConsumer implements SecondaryWriteErrorConsumer {
static AtomicInteger errorCount = new AtomicInteger(0);
private final DefaultSecondaryWriteErrorConsumer secondaryWriteErrorConsumer;

public TestWriteErrorConsumer(Logger writeErrorLogger) {
this.secondaryWriteErrorConsumer = new DefaultSecondaryWriteErrorConsumer(writeErrorLogger);
}

public static int getErrorCount() {
return errorCount.get();
Expand All @@ -37,11 +44,13 @@ public static void clearErrors() {
@Override
public void consume(HBaseOperation operation, Mutation r, Throwable cause) {
errorCount.incrementAndGet();
this.secondaryWriteErrorConsumer.consume(operation, r, cause);
}

@Override
public void consume(HBaseOperation operation, RowMutations r, Throwable cause) {
errorCount.incrementAndGet();
this.secondaryWriteErrorConsumer.consume(operation, r, cause);
}

@Override
Expand All @@ -50,5 +59,6 @@ public void consume(HBaseOperation operation, List<? extends Row> operations, Th
assert row instanceof Mutation || row instanceof RowMutations;
}
errorCount.addAndGet(operations.size());
this.secondaryWriteErrorConsumer.consume(operation, operations, cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,19 @@
<name>use-hbase-mini-cluster</name>
<value>true</value>
</property>

<property>
<name>google.bigtable.mirroring.write-error-log.appender.prefix-path</name>
<value>/tmp/write-error-log</value>
</property>

<property>
<name>google.bigtable.mirroring.write-error-log.appender.max-buffer-size</name>
<value>8388608</value>
</property>

<property>
<name>google.bigtable.mirroring.write-error-log.appender.drop-on-overflow</name>
<value>false</value>
</property>
</configuration>
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,19 @@
<name>use-hbase-mini-cluster</name>
<value>true</value>
</property>

<property>
<name>google.bigtable.mirroring.write-error-log.appender.prefix-path</name>
<value>/tmp/write-error-log</value>
</property>

<property>
<name>google.bigtable.mirroring.write-error-log.appender.max-buffer-size</name>
<value>8388608</value>
</property>

<property>
<name>google.bigtable.mirroring.write-error-log.appender.drop-on-overflow</name>
<value>false</value>
</property>
</configuration>
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@
import com.google.cloud.bigtable.mirroring.hbase1_x.utils.AccumulatedExceptions;
import com.google.cloud.bigtable.mirroring.hbase1_x.utils.CallableThrowingIOException;
import com.google.cloud.bigtable.mirroring.hbase1_x.utils.ListenableReferenceCounter;
import com.google.cloud.bigtable.mirroring.hbase1_x.utils.Logger;
import com.google.cloud.bigtable.mirroring.hbase1_x.utils.ReadSampler;
import com.google.cloud.bigtable.mirroring.hbase1_x.utils.SecondaryWriteErrorConsumer;
import com.google.cloud.bigtable.mirroring.hbase1_x.utils.SecondaryWriteErrorConsumerWithMetrics;
import com.google.cloud.bigtable.mirroring.hbase1_x.utils.faillog.Appender;
import com.google.cloud.bigtable.mirroring.hbase1_x.utils.faillog.Logger;
import com.google.cloud.bigtable.mirroring.hbase1_x.utils.faillog.Serializer;
import com.google.cloud.bigtable.mirroring.hbase1_x.utils.flowcontrol.FlowControlStrategy;
import com.google.cloud.bigtable.mirroring.hbase1_x.utils.flowcontrol.FlowController;
import com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringSpanConstants.HBaseOperation;
Expand All @@ -47,15 +49,16 @@
import org.apache.hadoop.hbase.security.User;

public class MirroringConnection implements Connection {

private static final Logger Log = new Logger(MirroringConnection.class);
private static final com.google.cloud.bigtable.mirroring.hbase1_x.utils.Logger Log =
new com.google.cloud.bigtable.mirroring.hbase1_x.utils.Logger(MirroringConnection.class);
private final FlowController flowController;
private final ExecutorService executorService;
private final MismatchDetector mismatchDetector;
private final ListenableReferenceCounter referenceCounter;
private final MirroringTracer mirroringTracer;
private final SecondaryWriteErrorConsumer secondaryWriteErrorConsumer;
private final ReadSampler readSampler;
private final Logger failedWritesLogger;
private final MirroringConfiguration configuration;
private final Connection primaryConnection;
private final Connection secondaryConnection;
Expand Down Expand Up @@ -97,9 +100,18 @@ public MirroringConnection(Configuration conf, boolean managed, ExecutorService
ReflectionConstructor.construct(
this.configuration.mirroringOptions.mismatchDetectorClass, this.mirroringTracer);

this.failedWritesLogger =
new Logger(
ReflectionConstructor.<Appender>construct(
this.configuration.mirroringOptions.writeErrorLogAppenderClass,
Configuration.class,
this.configuration),
ReflectionConstructor.<Serializer>construct(
this.configuration.mirroringOptions.writeErrorLogSerializerClass));

final SecondaryWriteErrorConsumer secondaryWriteErrorConsumer =
ReflectionConstructor.construct(
this.configuration.mirroringOptions.writeErrorConsumerClass);
this.configuration.mirroringOptions.writeErrorConsumerClass, this.failedWritesLogger);

this.secondaryWriteErrorConsumer =
new SecondaryWriteErrorConsumerWithMetrics(
Expand Down Expand Up @@ -195,6 +207,12 @@ public void close() throws IOException {
IOException wrapperException = new InterruptedIOException();
wrapperException.initCause(e);
throw wrapperException;
} finally {
try {
this.failedWritesLogger.close();
} catch (Exception e) {
throw new IOException(e);
}
}

AccumulatedExceptions exceptions = new AccumulatedExceptions();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,13 @@
import static com.google.cloud.bigtable.mirroring.hbase1_x.utils.MirroringConfigurationHelper.MIRRORING_MISMATCH_DETECTOR_CLASS;
import static com.google.cloud.bigtable.mirroring.hbase1_x.utils.MirroringConfigurationHelper.MIRRORING_READ_VERIFICATION_RATE_PERCENT;
import static com.google.cloud.bigtable.mirroring.hbase1_x.utils.MirroringConfigurationHelper.MIRRORING_WRITE_ERROR_CONSUMER_CLASS;
import static com.google.cloud.bigtable.mirroring.hbase1_x.utils.MirroringConfigurationHelper.MIRRORING_WRITE_ERROR_LOG_APPENDER_CLASS;
import static com.google.cloud.bigtable.mirroring.hbase1_x.utils.MirroringConfigurationHelper.MIRRORING_WRITE_ERROR_LOG_SERIALIZER_CLASS;

import com.google.api.core.InternalApi;
import com.google.cloud.bigtable.mirroring.hbase1_x.utils.DefaultSecondaryWriteErrorConsumer;
import com.google.cloud.bigtable.mirroring.hbase1_x.utils.faillog.DefaultAppender;
import com.google.cloud.bigtable.mirroring.hbase1_x.utils.faillog.DefaultSerializer;
import com.google.cloud.bigtable.mirroring.hbase1_x.utils.flowcontrol.RequestCountingFlowControlStrategy;
import com.google.cloud.bigtable.mirroring.hbase1_x.verification.DefaultMismatchDetector;
import com.google.common.base.Preconditions;
Expand All @@ -39,6 +43,9 @@ public class MirroringOptions {
public final String writeErrorConsumerClass;
public final int readSamplingRate;

public final String writeErrorLogAppenderClass;
public final String writeErrorLogSerializerClass;

public MirroringOptions(Configuration configuration) {
this.mismatchDetectorClass =
configuration.get(
Expand All @@ -63,5 +70,11 @@ public MirroringOptions(Configuration configuration) {
Integer.parseInt(configuration.get(MIRRORING_READ_VERIFICATION_RATE_PERCENT, "100"));
Preconditions.checkArgument(this.readSamplingRate >= 0);
Preconditions.checkArgument(this.readSamplingRate <= 100);
this.writeErrorLogAppenderClass =
configuration.get(
MIRRORING_WRITE_ERROR_LOG_APPENDER_CLASS, DefaultAppender.class.getCanonicalName());
this.writeErrorLogSerializerClass =
configuration.get(
MIRRORING_WRITE_ERROR_LOG_SERIALIZER_CLASS, DefaultSerializer.class.getCanonicalName());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,47 @@
*/
package com.google.cloud.bigtable.mirroring.hbase1_x.utils;

import com.google.cloud.bigtable.mirroring.hbase1_x.utils.faillog.Logger;
import com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringSpanConstants.HBaseOperation;
import java.util.List;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Row;
import org.apache.hadoop.hbase.client.RowMutations;

public class DefaultSecondaryWriteErrorConsumer implements SecondaryWriteErrorConsumer {
private static final com.google.cloud.bigtable.mirroring.hbase1_x.utils.Logger Log =
new com.google.cloud.bigtable.mirroring.hbase1_x.utils.Logger(
DefaultSecondaryWriteErrorConsumer.class);
private final Logger writeErrorLogger;

public DefaultSecondaryWriteErrorConsumer(Logger writeErrorLogger) {
this.writeErrorLogger = writeErrorLogger;
}

@Override
public void consume(HBaseOperation operation, Mutation r, Throwable cause) {
System.out.printf("Couldn't write row to secondary database %s", new String(r.getRow()));
try {
writeErrorLogger.mutationFailed(r, cause);
} catch (InterruptedException e) {
Log.error(
"Writing mutation that failed on secondary database to faillog interrupted: mutation=%s, failure_cause=%s, exception=%s",
r, cause, e);
Thread.currentThread().interrupt();
}
}

@Override
public void consume(HBaseOperation operation, RowMutations r, Throwable cause) {
System.out.printf(
"Couldn't apply row mutations to secondary database %s", new String(r.getRow()));
for (Mutation m : r.getMutations()) {
try {
writeErrorLogger.mutationFailed(m, cause);
} catch (InterruptedException e) {
Log.error(
"Writing mutation that failed on secondary database to faillog interrupted: mutation=%s, failure_cause=%s, exception=%s",
r, cause, e);
Thread.currentThread().interrupt();
}
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,11 @@ public class MirroringConfigurationHelper {
public static final String MIRRORING_WRITE_ERROR_CONSUMER_CLASS =
"google.bigtable.mirroring.write-error-consumer.impl";

public static final String MIRRORING_WRITE_ERROR_LOG_APPENDER_CLASS =
"google.bigtable.mirroring.write-error-log.appender.impl";
public static final String MIRRORING_WRITE_ERROR_LOG_SERIALIZER_CLASS =
"google.bigtable.mirroring.write-error-log.serializer.impl";

/**
* Integer value representing percentage of read operations performed on primary database that
* should be verified against secondary. Each call to {@link Table#get(Get)}, {@link
Expand Down
Loading

0 comments on commit 24395bf

Please sign in to comment.