Skip to content

Commit

Permalink
feat: add JsonWriterCache.java and added JsonWriterCache in DirectWri…
Browse files Browse the repository at this point in the history
…ter to allow JsonWrites (#489)

* feat: add JsonDirectWriter

* Added JsonWriterCache.java, added JsonWriterCache in DirectWriter, added test cases and changed some test file errors.

* Add IT test

* Fixed error in IT test

* Fixed spelling in DirectWriter, fixed non static import * to be single class imports
  • Loading branch information
allenc3 committed Aug 16, 2020
1 parent b875340 commit 34193b8
Show file tree
Hide file tree
Showing 7 changed files with 730 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,13 @@
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Logger;
import org.json.JSONArray;

/**
* Writer that can help user to write data to BigQuery. This is a simplified version of the Write
* API. For users writing with COMMITTED stream and don't care about row deduplication, it is
* recommended to use this Writer.
* recommended to use this Writer. The DirectWriter can be used to write both JSON and protobuf
* data.
*
* <pre>{@code
* DataProto data;
Expand All @@ -50,7 +52,9 @@
public class DirectWriter {
private static final Logger LOG = Logger.getLogger(DirectWriter.class.getName());
private static WriterCache cache = null;
private static JsonWriterCache jsonCache = null;
private static Lock cacheLock = new ReentrantLock();
private static Lock jsonCacheLock = new ReentrantLock();

/**
* Append rows to the given table.
Expand Down Expand Up @@ -103,10 +107,53 @@ public Long apply(Storage.AppendRowsResponse appendRowsResponse) {
MoreExecutors.directExecutor());
}

/**
* Append rows to the given table.
*
* @param tableName table name in the form of "projects/{pName}/datasets/{dName}/tables/{tName}"
* @param json A JSONArray
* @return A future that contains the offset at which the append happened. Only when the future
* returns with valid offset, then the append actually happened.
* @throws IOException, InterruptedException, InvalidArgumentException,
* Descriptors.DescriptorValidationException
*/
public static ApiFuture<Long> append(String tableName, JSONArray json)
throws IOException, InterruptedException, InvalidArgumentException,
Descriptors.DescriptorValidationException {
Preconditions.checkNotNull(tableName, "TableName is null.");
Preconditions.checkNotNull(json, "JSONArray is null.");

if (json.length() == 0) {
throw new InvalidArgumentException(
new Exception("Empty JSONArrays are not allowed"),
GrpcStatusCode.of(Status.Code.INVALID_ARGUMENT),
false);
}
try {
jsonCacheLock.lock();
if (jsonCache == null) {
jsonCache = JsonWriterCache.getInstance();
}
} finally {
jsonCacheLock.unlock();
}
JsonStreamWriter writer = jsonCache.getTableWriter(tableName);
return ApiFutures.<Storage.AppendRowsResponse, Long>transform(
writer.append(json, /* offset = */ -1, /*allowUnknownFields = */ false),
new ApiFunction<Storage.AppendRowsResponse, Long>() {
@Override
public Long apply(Storage.AppendRowsResponse appendRowsResponse) {
return Long.valueOf(appendRowsResponse.getOffset());
}
},
MoreExecutors.directExecutor());
}

@VisibleForTesting
public static void testSetStub(
BigQueryWriteClient stub, int maxTableEntry, SchemaCompatibility schemaCheck) {
cache = WriterCache.getTestInstance(stub, maxTableEntry, schemaCheck);
jsonCache = JsonWriterCache.getTestInstance(stub, maxTableEntry);
}

/** Clears the underlying cache and all the transport connections. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,11 @@ public void close() {
this.streamWriter.close();
}

/** Returns if a stream has expired. */
public Boolean expired() {
return this.streamWriter.expired();
}

private class JsonStreamWriterOnSchemaUpdateRunnable extends OnSchemaUpdateRunnable {
private JsonStreamWriter jsonStreamWriter;
/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
/*
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigquery.storage.v1alpha2;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.protobuf.Descriptors;
import java.io.IOException;
import java.util.concurrent.ConcurrentMap;
import java.util.logging.Logger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

/**
* A cache of JsonStreamWriters that can be looked up by Table Name. The entries will expire after 5
* minutes if not used. Code sample: JsonWriterCache cache = JsonWriterCache.getInstance();
* JsonStreamWriter writer = cache.getWriter(); // Use... cache.returnWriter(writer);
*/
public class JsonWriterCache {
private static final Logger LOG = Logger.getLogger(JsonWriterCache.class.getName());

private static String tablePatternString = "(projects/[^/]+/datasets/[^/]+/tables/[^/]+)";
private static Pattern tablePattern = Pattern.compile(tablePatternString);

private static JsonWriterCache instance;
private Cache<String, JsonStreamWriter> jsonWriterCache;

// Maximum number of tables to hold in the cache, once the maxium exceeded, the cache will be
// evicted based on least recent used.
private static final int MAX_TABLE_ENTRY = 100;
private static final int MAX_WRITERS_PER_TABLE = 1;

private final BigQueryWriteClient stub;

private JsonWriterCache(BigQueryWriteClient stub, int maxTableEntry) {
this.stub = stub;
jsonWriterCache =
CacheBuilder.newBuilder().maximumSize(maxTableEntry).<String, JsonStreamWriter>build();
}

public static JsonWriterCache getInstance() throws IOException {
if (instance == null) {
BigQueryWriteSettings stubSettings = BigQueryWriteSettings.newBuilder().build();
BigQueryWriteClient stub = BigQueryWriteClient.create(stubSettings);
instance = new JsonWriterCache(stub, MAX_TABLE_ENTRY);
}
return instance;
}

/** Returns a cache with custom stub used by test. */
@VisibleForTesting
public static JsonWriterCache getTestInstance(BigQueryWriteClient stub, int maxTableEntry) {
Preconditions.checkNotNull(stub, "Stub is null.");
return new JsonWriterCache(stub, maxTableEntry);
}

private Stream.WriteStream CreateNewWriteStream(String tableName) {
Stream.WriteStream stream =
Stream.WriteStream.newBuilder().setType(Stream.WriteStream.Type.COMMITTED).build();
stream =
stub.createWriteStream(
Storage.CreateWriteStreamRequest.newBuilder()
.setParent(tableName)
.setWriteStream(stream)
.build());
LOG.info("Created write stream:" + stream.getName());
return stream;
}

JsonStreamWriter CreateNewWriter(Stream.WriteStream writeStream)
throws IllegalArgumentException, IOException, InterruptedException,
Descriptors.DescriptorValidationException {
return JsonStreamWriter.newBuilder(writeStream.getName(), writeStream.getTableSchema())
.setChannelProvider(stub.getSettings().getTransportChannelProvider())
.setCredentialsProvider(stub.getSettings().getCredentialsProvider())
.setExecutorProvider(stub.getSettings().getExecutorProvider())
.build();
}
/**
* Gets a writer for a given table with the given tableName
*
* @param tableName
* @return
* @throws Exception
*/
public JsonStreamWriter getTableWriter(String tableName)
throws IllegalArgumentException, IOException, InterruptedException,
Descriptors.DescriptorValidationException {
Preconditions.checkNotNull(tableName, "TableName is null.");
Matcher matcher = tablePattern.matcher(tableName);
if (!matcher.matches()) {
throw new IllegalArgumentException("Invalid table name: " + tableName);
}

Stream.WriteStream writeStream = null;
JsonStreamWriter writer = null;

synchronized (this) {
writer = jsonWriterCache.getIfPresent(tableName);
if (writer != null) {
if (!writer.expired()) {
return writer;
} else {
writer.close();
}
}
writeStream = CreateNewWriteStream(tableName);
writer = CreateNewWriter(writeStream);
jsonWriterCache.put(tableName, writer);
}
return writer;
}

/** Clear the cache and close all the writers in the cache. */
public void clear() {
synchronized (this) {
ConcurrentMap<String, JsonStreamWriter> map = jsonWriterCache.asMap();
for (String key : map.keySet()) {
JsonStreamWriter entry = jsonWriterCache.getIfPresent(key);
entry.close();
}
jsonWriterCache.cleanUp();
}
}

@VisibleForTesting
public long cachedTableCount() {
synchronized (jsonWriterCache) {
return jsonWriterCache.size();
}
}
}
Loading

0 comments on commit 34193b8

Please sign in to comment.