Skip to content

Commit

Permalink
Add very basic schema compact check
Browse files Browse the repository at this point in the history
  • Loading branch information
yirutang committed Apr 15, 2020
1 parent 5e76b4d commit 85b5606
Show file tree
Hide file tree
Showing 9 changed files with 327 additions and 77 deletions.
Expand Up @@ -98,7 +98,8 @@ public Long apply(Storage.AppendRowsResponse appendRowsResponse) {
}

@VisibleForTesting
public static void testSetStub(BigQueryWriteClient stub, int maxTableEntry) {
cache = WriterCache.getTestInstance(stub, maxTableEntry);
public static void testSetStub(
BigQueryWriteClient stub, int maxTableEntry, SchemaCompact schemaCheck) {
cache = WriterCache.getTestInstance(stub, maxTableEntry, schemaCheck);
}
}
@@ -0,0 +1,99 @@
/*
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigquery.storage.v1alpha2;

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.Schema;
import com.google.cloud.bigquery.Table;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.testing.RemoteBigQueryHelper;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.Descriptors;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

/**
* A class that checks the schema compatibility between user schema in proto descriptor and Bigquery
* table schema. If this check is passed, then user can write to BigQuery table using the user
* schema, otherwise the write will fail.
*
* <p>The implementation as of now is not complete, which measn, if this check passed, there is
* still a possbility of writing will fail.
*/
public class SchemaCompact {
private BigQuery bigquery;
private static SchemaCompact compact;
private static String tablePatternString = "projects/([^/]+)/datasets/([^/]+)/tables/([^/]+)";
private static Pattern tablePattern = Pattern.compile(tablePatternString);

private SchemaCompact(BigQuery bigquery) {
this.bigquery = bigquery;
}

/**
* Gets a singleton {code SchemaCompact} object.
*
* @return
*/
public static SchemaCompact getInstance() {
if (compact == null) {
RemoteBigQueryHelper bigqueryHelper = RemoteBigQueryHelper.create();
compact = new SchemaCompact(bigqueryHelper.getOptions().getService());
}
return compact;
}

/**
* Gets a {code SchemaCompact} object with custom BigQuery stub.
*
* @param bigquery
* @return
*/
@VisibleForTesting
public static SchemaCompact getInstance(BigQuery bigquery) {
return new SchemaCompact(bigquery);
}

private TableId getTableId(String tableName) {
Matcher matcher = tablePattern.matcher(tableName);
if (!matcher.matches() || matcher.groupCount() != 3) {
throw new IllegalArgumentException("Invalid table name: " + tableName);
}
return TableId.of(matcher.group(1), matcher.group(2), matcher.group(3));
}

/**
* Checks if the userSchema is compatible with the table's current schema for writing. The current
* implementatoin is not complete. If the check failed, the write couldn't succeed.
*
* @param tableName The name of the table to write to.
* @param userSchema The schema user uses to append data.
* @throws IllegalArgumentException the check failed.
*/
public void check(String tableName, Descriptors.Descriptor userSchema)
throws IllegalArgumentException {
Table table = bigquery.getTable(getTableId(tableName));
Schema schema = table.getDefinition().getSchema();
// TODO: We only have very limited check here. More checks to be added.
if (schema.getFields().size() != userSchema.getFields().size()) {
throw new IllegalArgumentException(
"User schema doesn't have expected field number with BigQuery table schema, expected: "
+ schema.getFields().size()
+ " actual: "
+ userSchema.getFields().size());
}
}
}
Expand Up @@ -121,13 +121,10 @@ public static long getApiMaxInflightRequests() {
}

private StreamWriter(Builder builder)
throws InvalidArgumentException, IOException, InterruptedException {
throws IllegalArgumentException, IOException, InterruptedException {
Matcher matcher = streamPattern.matcher(builder.streamName);
if (!matcher.matches()) {
throw new InvalidArgumentException(
new Exception("Invalid stream name: " + builder.streamName),
GrpcStatusCode.of(Status.Code.INVALID_ARGUMENT),
false);
throw new IllegalArgumentException("Invalid stream name: " + builder.streamName);
}
streamName = builder.streamName;
tableName = matcher.group(1);
Expand Down Expand Up @@ -224,7 +221,7 @@ public ApiFuture<AppendRowsResponse> append(AppendRowsRequest message) {
setupAlarm();
if (!batchesToSend.isEmpty()) {
for (final InflightBatch batch : batchesToSend) {
LOG.info("Scheduling a batch for immediate sending.");
LOG.fine("Scheduling a batch for immediate sending.");
writeBatch(batch);
}
}
Expand Down Expand Up @@ -696,7 +693,7 @@ public Builder setEndpoint(String endpoint) {
}

/** Builds the {@code StreamWriter}. */
public StreamWriter build() throws InvalidArgumentException, IOException, InterruptedException {
public StreamWriter build() throws IllegalArgumentException, IOException, InterruptedException {
return new StreamWriter(this);
}
}
Expand Down
Expand Up @@ -15,11 +15,8 @@
*/
package com.google.cloud.bigquery.storage.v1alpha2;

import com.google.api.gax.grpc.GrpcStatusCode;
import com.google.api.gax.rpc.InvalidArgumentException;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.Descriptors;
import io.grpc.Status;
import com.google.protobuf.Descriptors.Descriptor;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.logging.Logger;
Expand All @@ -38,34 +35,36 @@ public class WriterCache {
private static Pattern tablePattern = Pattern.compile(tablePatternString);

private static WriterCache instance;
private LRUCache<String, LRUCache<Descriptors.Descriptor, StreamWriter>> writerCache;
private LRUCache<String, LRUCache<Descriptor, StreamWriter>> writerCache;

// Maximum number of tables to hold in the cache, once the maxium exceeded, the cache will be
// evicted based on least recent used.
private static final int MAX_TABLE_ENTRY = 100;
private static final int MAX_WRITERS_PER_TABLE = 2;

private final BigQueryWriteClient stub;
private final SchemaCompact compact;

private WriterCache(BigQueryWriteClient stub, int maxTableEntry) {
private WriterCache(BigQueryWriteClient stub, int maxTableEntry, SchemaCompact compact) {
this.stub = stub;
writerCache =
new LRUCache<String, LRUCache<Descriptors.Descriptor, StreamWriter>>(maxTableEntry);
this.compact = compact;
writerCache = new LRUCache<String, LRUCache<Descriptor, StreamWriter>>(maxTableEntry);
}

public static WriterCache getInstance() throws IOException {
if (instance == null) {
BigQueryWriteSettings stubSettings = BigQueryWriteSettings.newBuilder().build();
BigQueryWriteClient stub = BigQueryWriteClient.create(stubSettings);
instance = new WriterCache(stub, MAX_TABLE_ENTRY);
instance = new WriterCache(stub, MAX_TABLE_ENTRY, SchemaCompact.getInstance());
}
return instance;
}

/** Returns a cache with custom stub used by test. */
@VisibleForTesting
public static WriterCache getTestInstance(BigQueryWriteClient stub, int maxTableEntry) {
return new WriterCache(stub, maxTableEntry);
public static WriterCache getTestInstance(
BigQueryWriteClient stub, int maxTableEntry, SchemaCompact compact) {
return new WriterCache(stub, maxTableEntry, compact);
}

/** Returns an entry with {@code StreamWriter} and expiration time in millis. */
Expand All @@ -83,7 +82,7 @@ private String CreateNewStream(String tableName) {
}

StreamWriter CreateNewWriter(String streamName)
throws InvalidArgumentException, IOException, InterruptedException {
throws IllegalArgumentException, IOException, InterruptedException {
return StreamWriter.newBuilder(streamName)
.setChannelProvider(stub.getSettings().getTransportChannelProvider())
.setCredentialsProvider(stub.getSettings().getCredentialsProvider())
Expand All @@ -98,41 +97,38 @@ StreamWriter CreateNewWriter(String streamName)
* @return
* @throws Exception
*/
public StreamWriter getTableWriter(String tableName, Descriptors.Descriptor userSchema)
throws InvalidArgumentException, IOException, InterruptedException {
public StreamWriter getTableWriter(String tableName, Descriptor userSchema)
throws IllegalArgumentException, IOException, InterruptedException {
Matcher matcher = tablePattern.matcher(tableName);
if (!matcher.matches()) {
throw new InvalidArgumentException(
new Exception("Invalid table name: " + tableName),
GrpcStatusCode.of(Status.Code.INVALID_ARGUMENT),
false);
throw new IllegalArgumentException("Invalid table name: " + tableName);
}

String streamName = null;
Boolean streamExpired = false;
StreamWriter writer = null;
LRUCache<Descriptors.Descriptor, StreamWriter> tableEntry = null;
LRUCache<Descriptor, StreamWriter> tableEntry = null;

synchronized (this) {
tableEntry = writerCache.get(tableName);
if (tableEntry != null) {
writer = tableEntry.get(userSchema);
if (writer != null && !writer.expired()) {
return writer;
} else {
if (writer != null && writer.expired()) {
if (writer != null) {
if (!writer.expired()) {
return writer;
} else {
writer.close();
}
streamName = CreateNewStream(tableName);
writer = CreateNewWriter(streamName);
// Schema compat check should be done here!
tableEntry.put(userSchema, writer);
}
compact.check(tableName, userSchema);
streamName = CreateNewStream(tableName);
writer = CreateNewWriter(streamName);
tableEntry.put(userSchema, writer);
} else {
compact.check(tableName, userSchema);
streamName = CreateNewStream(tableName);
tableEntry = new LRUCache<Descriptors.Descriptor, StreamWriter>(MAX_WRITERS_PER_TABLE);
tableEntry = new LRUCache<Descriptor, StreamWriter>(MAX_WRITERS_PER_TABLE);
writer = CreateNewWriter(streamName);
// Schema compat check should be done here!
tableEntry.put(userSchema, writer);
writerCache.put(tableName, tableEntry);
}
Expand Down
Expand Up @@ -16,13 +16,13 @@
package com.google.cloud.bigquery.storage.v1alpha2;

import static org.junit.Assert.*;
import static org.mockito.Mockito.*;

import com.google.api.core.ApiFuture;
import com.google.api.gax.core.NoCredentialsProvider;
import com.google.api.gax.grpc.testing.LocalChannelProvider;
import com.google.api.gax.grpc.testing.MockGrpcService;
import com.google.api.gax.grpc.testing.MockServiceHelper;
import com.google.api.gax.rpc.InvalidArgumentException;
import com.google.cloud.bigquery.storage.test.Test.*;
import com.google.protobuf.AbstractMessage;
import com.google.protobuf.Timestamp;
Expand All @@ -31,9 +31,13 @@
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.junit.*;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.threeten.bp.Instant;

@RunWith(JUnit4.class)
Expand All @@ -47,6 +51,8 @@ public class DirectWriterTest {
private BigQueryWriteClient client;
private LocalChannelProvider channelProvider;

@Mock private static SchemaCompact schemaCheck;

@BeforeClass
public static void startStaticServer() {
mockBigQueryWrite = new MockBigQueryWrite();
Expand All @@ -71,6 +77,7 @@ public void setUp() throws IOException {
.setCredentialsProvider(NoCredentialsProvider.create())
.build();
client = BigQueryWriteClient.create(settings);
MockitoAnnotations.initMocks(this);
}

@After
Expand Down Expand Up @@ -106,12 +113,13 @@ void WriterCreationResponseMock(String testStreamName, List<Long> responseOffset

@Test
public void testWriteSuccess() throws Exception {
DirectWriter.testSetStub(client, 10);
DirectWriter.testSetStub(client, 10, schemaCheck);
FooType m1 = FooType.newBuilder().setFoo("m1").build();
FooType m2 = FooType.newBuilder().setFoo("m2").build();

WriterCreationResponseMock(TEST_STREAM, Arrays.asList(Long.valueOf(0L)));
ApiFuture<Long> ret = DirectWriter.<FooType>append(TEST_TABLE, Arrays.asList(m1, m2));
verify(schemaCheck).check(TEST_TABLE, FooType.getDescriptor());
assertEquals(Long.valueOf(0L), ret.get());
List<AbstractMessage> actualRequests = mockBigQueryWrite.getRequests();
Assert.assertEquals(3, actualRequests.size());
Expand Down Expand Up @@ -154,6 +162,7 @@ public void testWriteSuccess() throws Exception {
WriterCreationResponseMock(TEST_STREAM_2, Arrays.asList(Long.valueOf(0L)));
AllSupportedTypes m3 = AllSupportedTypes.newBuilder().setStringValue("s").build();
ret = DirectWriter.<AllSupportedTypes>append(TEST_TABLE, Arrays.asList(m3));
verify(schemaCheck).check(TEST_TABLE, AllSupportedTypes.getDescriptor());
assertEquals(Long.valueOf(0L), ret.get());
dataBuilder = Storage.AppendRowsRequest.ProtoData.newBuilder();
dataBuilder.setWriterSchema(ProtoSchemaConverter.convert(AllSupportedTypes.getDescriptor()));
Expand All @@ -176,21 +185,21 @@ public void testWriteSuccess() throws Exception {

@Test
public void testWriteBadTableName() throws Exception {
DirectWriter.testSetStub(client, 10);
DirectWriter.testSetStub(client, 10, schemaCheck);
FooType m1 = FooType.newBuilder().setFoo("m1").build();
FooType m2 = FooType.newBuilder().setFoo("m2").build();

try {
ApiFuture<Long> ret = DirectWriter.<FooType>append("abc", Arrays.asList(m1, m2));
fail("should fail");
} catch (InvalidArgumentException expected) {
assertEquals("java.lang.Exception: Invalid table name: abc", expected.getMessage());
} catch (IllegalArgumentException expected) {
assertEquals("Invalid table name: abc", expected.getMessage());
}
}

@Test
public void testConcurrentAccess() throws Exception {
WriterCache cache = WriterCache.getTestInstance(client, 2);
WriterCache cache = WriterCache.getTestInstance(client, 2, schemaCheck);
final FooType m1 = FooType.newBuilder().setFoo("m1").build();
final FooType m2 = FooType.newBuilder().setFoo("m2").build();
final List<Long> expectedOffset =
Expand All @@ -203,9 +212,10 @@ public void testConcurrentAccess() throws Exception {
// Make sure getting the same table writer in multiple thread only cause create to be called
// once.
WriterCreationResponseMock(TEST_STREAM, expectedOffset);
ExecutorService executor = Executors.newFixedThreadPool(5);
for (int i = 0; i < 5; i++) {
Thread t =
new Thread() {
executor.execute(
new Runnable() {
@Override
public void run() {
try {
Expand All @@ -216,8 +226,7 @@ public void run() {
fail(e.getMessage());
}
}
};
t.start();
});
}
}
}

0 comments on commit 85b5606

Please sign in to comment.