Skip to content

Commit

Permalink
parent 90e2d0b
Browse files Browse the repository at this point in the history
author Yuval Medina <ymed@google.com> 1592603880 +0000
committer Yuval Medina <ymed@google.com> 1594336084 +0000

Created ProtobufUtils.java in order to convert data and schema from Spark format into protobuf format for ingestion into BigQuery. Created Spark to BigQuery schema conversion suites in SchemaConverters.java. Created ProtobufUtilsTest.java and SchemaConverterTest.java to comprehensively test both classes. Translated Scala testing code in SchemaConvertersSuite.scala into java, and merged with SchemaConverters.java.

Fixing SparkBigQueryConnectorUserAgentProvider initialization bug (GoogleCloudDataproc#186)

prepare release 0.16.1

prepare for next development iteration

Sectioned the schema converter file for easier readability. Added a Table creation method.

Wrote comprehensive tests to check YuvalSchemaConverters. Now needs to improve equality testing: assertEquals does not check for more than superficial equality, so if further testing is to be done without the help of logs, it would be useful to write an equality function for schemas.

Spark->BQ Schema working correctly. Blocked out Map functionality, as it is not supported. Made SchemaConverters, Schema-unit-tests more readable. Improved use of BigQuery library functions/iteration in SchemaConverters

Adding acceptance test on Dataproc (GoogleCloudDataproc#193)

In order to run the test: `sbt package acceptance:test`

Added support for materialized views (GoogleCloudDataproc#192)

Applying Google Java format on compile (GoogleCloudDataproc#203)

Created Spark-BigQuery schema converter and created BigQuery schema - ProtoSchema converter. Now awaiting comprehensive tests before merging with master.

Fixing SparkBigQueryConnectorUserAgentProvider initialization bug (GoogleCloudDataproc#186)

prepare release 0.16.1

prepare for next development iteration

Sectioned the schema converter file for easier readability. Added a Table creation method.

Wrote comprehensive tests to check YuvalSchemaConverters. Now needs to improve equality testing: assertEquals does not check for more than superficial equality, so if further testing is to be done without the help of logs, it would be useful to write an equality function for schemas.

Spark->BQ Schema working correctly. Blocked out Map functionality, as it is not supported. Made SchemaConverters, Schema-unit-tests more readable. Improved use of BigQuery library functions/iteration in SchemaConverters

Renamed SchemaConverters file, about to merge into David's SchemaConverters. Improved unit tests to check the toBigQueryColumn method, instead of the more abstract toBigQuerySchema (in order to check each data type is working correctly. Tackling toProtoRows converter.

BigQuery->ProtoSchema converter is passing all unit tests.

Merged my (YuvalMedina) schema converters with David's (davidrabinowitz) SchemaConverters under spark.bigquery. Renamed my schema converters to SchemaConvertersDevelopment, in which I will continue working on a ProtoRows converter.

SchemaConvertersDevelopment is passing all tests on Spark -> Protobuf Descriptor conversion, even on nested structs. Unit tests need to be written to tests actual row conversion (Spark values -> Protobuf values). Minor fixes to SchemaConverters.java: code needs to be smoothed out.

ProtoRows converter is passing 10 unit tests, sparkRowToProtoRow test must be revised to confirm that ProtoRows conversion is fully working. All functions doing Spark InternalRow -> ProtoRow and BigQuery Schema -> ProtoSchema conversions were migrated from SchemaConverters.java to ProtoBufUtils.java. SchemaConverters.java now contains both Spark -> BigQuery as well as the original BigQuery -> Spark conversions. ProtoBufUtilsTests.java was created to test for functions in ProtoBufUtils separately.

All conversion suites for Spark -> BigQuery, BigQuery -> ProtoSchema, and Spark rows -> ProtoRows are working correctly, and comprehensive tests were written. SchemaConvertersSuite.scala, which tests for BigQuery -> Spark conversions was translated into .java, and merged with SchemaConvertersTests.java.

Cleaned up the SchemaConverter tests that were translated from Scala. Added a nesting-depth limit to Records created by the Spark->BigQuery converter.

Deleted unnecessary comments

Deleted a leftover TODO comment in SchemaConvertersTests

Deleted some unnecessary tests.

Last commit before write-support implementation

Made minor edits according to davidrab@'s comments.
Added license heading to all files that were created. Need to test if binary types are converted correctly to protobuf format.

Integrated all of DavidRab's suggestions

Adds implementation for supporting columnar batch reads from Spark. (GoogleCloudDataproc#198)

This bypasses most of the existing translation code for the following reasons:
1.  I think there might be a memory leak because the existing code doesn't close the allocator.
2.  This avoids continuously recopying the schema.

I didn't delete the old code because it appears the BigQueryRDD still relies on it partially.

I also couldn't find instructions on formatting/testing (I couldn't find explicit unit tests
for existing arrow code, I'll update accordingly if pointers can be provided).

Changed tests as well

Changed tests as well

Added functionality to support more complex Spark types (such as StructTypes within ArrayTypes) in SchemaConverters and ProtobufUtils. There are known issues with Timestamp conversion into BigQuery format when integrating with BigQuery Storage Write API.

Added support for materialized views (GoogleCloudDataproc#192)

Applying Google Java format on compile (GoogleCloudDataproc#203)

Created Spark-BigQuery schema converter and created BigQuery schema - ProtoSchema converter. Now awaiting comprehensive tests before merging with master.

Fixing SparkBigQueryConnectorUserAgentProvider initialization bug (GoogleCloudDataproc#186)

prepare release 0.16.1

prepare for next development iteration

Sectioned the schema converter file for easier readability. Added a Table creation method.

Wrote comprehensive tests to check YuvalSchemaConverters. Now needs to improve equality testing: assertEquals does not check for more than superficial equality, so if further testing is to be done without the help of logs, it would be useful to write an equality function for schemas.

Spark->BQ Schema working correctly. Blocked out Map functionality, as it is not supported. Made SchemaConverters, Schema-unit-tests more readable. Improved use of BigQuery library functions/iteration in SchemaConverters

Renamed SchemaConverters file, about to merge into David's SchemaConverters. Improved unit tests to check the toBigQueryColumn method, instead of the more abstract toBigQuerySchema (in order to check each data type is working correctly. Tackling toProtoRows converter.

BigQuery->ProtoSchema converter is passing all unit tests.

Merged my (YuvalMedina) schema converters with David's (davidrabinowitz) SchemaConverters under spark.bigquery. Renamed my schema converters to SchemaConvertersDevelopment, in which I will continue working on a ProtoRows converter.

SchemaConvertersDevelopment is passing all tests on Spark -> Protobuf Descriptor conversion, even on nested structs. Unit tests need to be written to tests actual row conversion (Spark values -> Protobuf values). Minor fixes to SchemaConverters.java: code needs to be smoothed out.

ProtoRows converter is passing 10 unit tests, sparkRowToProtoRow test must be revised to confirm that ProtoRows conversion is fully working. All functions doing Spark InternalRow -> ProtoRow and BigQuery Schema -> ProtoSchema conversions were migrated from SchemaConverters.java to ProtoBufUtils.java. SchemaConverters.java now contains both Spark -> BigQuery as well as the original BigQuery -> Spark conversions. ProtoBufUtilsTests.java was created to test for functions in ProtoBufUtils separately.

All conversion suites for Spark -> BigQuery, BigQuery -> ProtoSchema, and Spark rows -> ProtoRows are working correctly, and comprehensive tests were written. SchemaConvertersSuite.scala, which tests for BigQuery -> Spark conversions was translated into .java, and merged with SchemaConvertersTests.java.

Cleaned up the SchemaConverter tests that were translated from Scala. Added a nesting-depth limit to Records created by the Spark->BigQuery converter.

Deleted unnecessary comments

Deleted a leftover TODO comment in SchemaConvertersTests

Deleted some unnecessary tests.

Last commit before write-support implementation

Made minor edits according to davidrab@'s comments.
Added license heading to all files that were created. Need to test if binary types are converted correctly to protobuf format.

Adds implementation for supporting columnar batch reads from Spark. (GoogleCloudDataproc#198)

This bypasses most of the existing translation code for the following reasons:
1.  I think there might be a memory leak because the existing code doesn't close the allocator.
2.  This avoids continuously recopying the schema.

I didn't delete the old code because it appears the BigQueryRDD still relies on it partially.

I also couldn't find instructions on formatting/testing (I couldn't find explicit unit tests
for existing arrow code, I'll update accordingly if pointers can be provided).

Added functionality to support more complex Spark types (such as StructTypes within ArrayTypes) in SchemaConverters and ProtobufUtils. There are known issues with Timestamp conversion into BigQuery format when integrating with BigQuery Storage Write API.

Revert "Merge branch 'writesupport' of https://github.com/YuvalMedina/spark-bigquery-connector into writesupport"

This reverts commit 65294d8, reversing
changes made to 814a1bf.

Integrated David Rab's second round of suggestions.

Ran sbt build
  • Loading branch information
Yuval Medina committed Jul 9, 2020
1 parent 74c8099 commit d2d0556
Show file tree
Hide file tree
Showing 2 changed files with 172 additions and 172 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,159 +47,159 @@ public class ReadSessionCreator {
*/
private static final int DEFAULT_BYTES_PER_PARTITION = 400 * 1000 * 1000;

private static final Logger log = LoggerFactory.getLogger(ReadSessionCreator.class);

private static Cache<String, TableInfo> destinationTableCache =
CacheBuilder.newBuilder().expireAfterWrite(15, TimeUnit.MINUTES).maximumSize(1000).build();

private final ReadSessionCreatorConfig config;
private final BigQueryClient bigQueryClient;
private final BigQueryReadClientFactory bigQueryReadClientFactory;

public ReadSessionCreator(
ReadSessionCreatorConfig config,
BigQueryClient bigQueryClient,
BigQueryReadClientFactory bigQueryReadClientFactory) {
this.config = config;
this.bigQueryClient = bigQueryClient;
this.bigQueryReadClientFactory = bigQueryReadClientFactory;
}

static int getMaxNumPartitionsRequested(
OptionalInt maxParallelism, StandardTableDefinition tableDefinition) {
return maxParallelism.orElse(
Math.max((int) (tableDefinition.getNumBytes() / DEFAULT_BYTES_PER_PARTITION), 1));
}

public ReadSessionResponse create(
TableId table,
ImmutableList<String> selectedFields,
Optional<String> filter,
OptionalInt maxParallelism) {
TableInfo tableDetails = bigQueryClient.getTable(table);

TableInfo actualTable = getActualTable(tableDetails, selectedFields, filter);
StandardTableDefinition tableDefinition = actualTable.getDefinition();

try (BigQueryReadClient bigQueryReadClient =
bigQueryReadClientFactory.createBigQueryReadClient()) {
ReadSession.TableReadOptions.Builder readOptions =
ReadSession.TableReadOptions.newBuilder().addAllSelectedFields(selectedFields);
filter.ifPresent(readOptions::setRowRestriction);

String tablePath = toTablePath(actualTable.getTableId());

ReadSession readSession =
bigQueryReadClient.createReadSession(
CreateReadSessionRequest.newBuilder()
.setParent("projects/" + bigQueryClient.getProjectId())
.setReadSession(
ReadSession.newBuilder()
.setDataFormat(config.readDataFormat)
.setReadOptions(readOptions)
.setTable(tablePath))
.setMaxStreamCount(getMaxNumPartitionsRequested(maxParallelism, tableDefinition))
.build());

return new ReadSessionResponse(readSession, actualTable);
private static final Logger log = LoggerFactory.getLogger(ReadSessionCreator.class);

private static Cache<String, TableInfo> destinationTableCache =
CacheBuilder.newBuilder().expireAfterWrite(15, TimeUnit.MINUTES).maximumSize(1000).build();

private final ReadSessionCreatorConfig config;
private final BigQueryClient bigQueryClient;
private final BigQueryReadClientFactory bigQueryReadClientFactory;

public ReadSessionCreator(
ReadSessionCreatorConfig config,
BigQueryClient bigQueryClient,
BigQueryReadClientFactory bigQueryReadClientFactory) {
this.config = config;
this.bigQueryClient = bigQueryClient;
this.bigQueryReadClientFactory = bigQueryReadClientFactory;
}
}

String toTablePath(TableId tableId) {
return format(
"projects/%s/datasets/%s/tables/%s",
tableId.getProject(), tableId.getDataset(), tableId.getTable());
}

TableInfo getActualTable(
TableInfo table, ImmutableList<String> requiredColumns, Optional<String> filter) {
String[] filters = filter.map(Stream::of).orElseGet(Stream::empty).toArray(String[]::new);
return getActualTable(table, requiredColumns, filters);
}

TableInfo getActualTable(
TableInfo table, ImmutableList<String> requiredColumns, String[] filters) {
TableDefinition tableDefinition = table.getDefinition();
TableDefinition.Type tableType = tableDefinition.getType();
if (TableDefinition.Type.TABLE == tableType) {
return table;
static int getMaxNumPartitionsRequested(
OptionalInt maxParallelism, StandardTableDefinition tableDefinition) {
return maxParallelism.orElse(
Math.max((int) (tableDefinition.getNumBytes() / DEFAULT_BYTES_PER_PARTITION), 1));
}
if (TableDefinition.Type.VIEW == tableType
|| TableDefinition.Type.MATERIALIZED_VIEW == tableType) {
if (!config.viewsEnabled) {
throw new BigQueryConnectorException(
UNSUPPORTED,
format(
"Views are not enabled. You can enable views by setting '%s' to true. Notice additional cost may occur.",
config.viewEnabledParamName));
}
// get it from the view
String querySql = bigQueryClient.createSql(table.getTableId(), requiredColumns, filters);
log.debug("querySql is %s", querySql);
try {
return destinationTableCache.get(
querySql,
new DestinationTableBuilder(bigQueryClient, config, querySql, table.getTableId()));
} catch (ExecutionException e) {
throw new BigQueryConnectorException(
BIGQUERY_VIEW_DESTINATION_TABLE_CREATION_FAILED, "Error creating destination table", e);
}
} else {
// not regular table or a view
throw new BigQueryConnectorException(
UNSUPPORTED,
format(
"Table type '%s' of table '%s.%s' is not supported",
tableType, table.getTableId().getDataset(), table.getTableId().getTable()));

public ReadSessionResponse create(
TableId table,
ImmutableList<String> selectedFields,
Optional<String> filter,
OptionalInt maxParallelism) {
TableInfo tableDetails = bigQueryClient.getTable(table);

TableInfo actualTable = getActualTable(tableDetails, selectedFields, filter);
StandardTableDefinition tableDefinition = actualTable.getDefinition();

try (BigQueryReadClient bigQueryReadClient =
bigQueryReadClientFactory.createBigQueryReadClient()) {
ReadSession.TableReadOptions.Builder readOptions =
ReadSession.TableReadOptions.newBuilder().addAllSelectedFields(selectedFields);
filter.ifPresent(readOptions::setRowRestriction);

String tablePath = toTablePath(actualTable.getTableId());

ReadSession readSession =
bigQueryReadClient.createReadSession(
CreateReadSessionRequest.newBuilder()
.setParent("projects/" + bigQueryClient.getProjectId())
.setReadSession(
ReadSession.newBuilder()
.setDataFormat(config.readDataFormat)
.setReadOptions(readOptions)
.setTable(tablePath))
.setMaxStreamCount(getMaxNumPartitionsRequested(maxParallelism, tableDefinition))
.build());

return new ReadSessionResponse(readSession, actualTable);
}
}
}

static class DestinationTableBuilder implements Callable<TableInfo> {
final BigQueryClient bigQueryClient;
final ReadSessionCreatorConfig config;
final String querySql;
final TableId table;

DestinationTableBuilder(
BigQueryClient bigQueryClient,
ReadSessionCreatorConfig config,
String querySql,
TableId table) {
this.bigQueryClient = bigQueryClient;
this.config = config;
this.querySql = querySql;
this.table = table;
String toTablePath(TableId tableId) {
return format(
"projects/%s/datasets/%s/tables/%s",
tableId.getProject(), tableId.getDataset(), tableId.getTable());
}

@Override
public TableInfo call() {
return createTableFromQuery();
TableInfo getActualTable(
TableInfo table, ImmutableList<String> requiredColumns, Optional<String> filter) {
String[] filters = filter.map(Stream::of).orElseGet(Stream::empty).toArray(String[]::new);
return getActualTable(table, requiredColumns, filters);
}

TableInfo createTableFromQuery() {
TableId destinationTable = bigQueryClient.createDestinationTable(table);
log.debug("destinationTable is %s", destinationTable);
JobInfo jobInfo =
JobInfo.of(
QueryJobConfiguration.newBuilder(querySql)
.setDestinationTable(destinationTable)
.build());
log.debug("running query %s", jobInfo);
Job job = waitForJob(bigQueryClient.create(jobInfo));
log.debug("job has finished. %s", job);
if (job.getStatus().getError() != null) {
throw convertToBigQueryException(job.getStatus().getError());
}
// add expiration time to the table
TableInfo createdTable = bigQueryClient.getTable(destinationTable);
long expirationTime =
createdTable.getCreationTime()
+ TimeUnit.HOURS.toMillis(config.viewExpirationTimeInHours);
Table updatedTable =
bigQueryClient.update(createdTable.toBuilder().setExpirationTime(expirationTime).build());
return updatedTable;
TableInfo getActualTable(
TableInfo table, ImmutableList<String> requiredColumns, String[] filters) {
TableDefinition tableDefinition = table.getDefinition();
TableDefinition.Type tableType = tableDefinition.getType();
if (TableDefinition.Type.TABLE == tableType) {
return table;
}
if (TableDefinition.Type.VIEW == tableType
|| TableDefinition.Type.MATERIALIZED_VIEW == tableType) {
if (!config.viewsEnabled) {
throw new BigQueryConnectorException(
UNSUPPORTED,
format(
"Views are not enabled. You can enable views by setting '%s' to true. Notice additional cost may occur.",
config.viewEnabledParamName));
}
// get it from the view
String querySql = bigQueryClient.createSql(table.getTableId(), requiredColumns, filters);
log.debug("querySql is %s", querySql);
try {
return destinationTableCache.get(
querySql,
new DestinationTableBuilder(bigQueryClient, config, querySql, table.getTableId()));
} catch (ExecutionException e) {
throw new BigQueryConnectorException(
BIGQUERY_VIEW_DESTINATION_TABLE_CREATION_FAILED, "Error creating destination table", e);
}
} else {
// not regular table or a view
throw new BigQueryConnectorException(
UNSUPPORTED,
format(
"Table type '%s' of table '%s.%s' is not supported",
tableType, table.getTableId().getDataset(), table.getTableId().getTable()));
}
}

static class DestinationTableBuilder implements Callable<TableInfo> {
final BigQueryClient bigQueryClient;
final ReadSessionCreatorConfig config;
final String querySql;
final TableId table;

DestinationTableBuilder(
BigQueryClient bigQueryClient,
ReadSessionCreatorConfig config,
String querySql,
TableId table) {
this.bigQueryClient = bigQueryClient;
this.config = config;
this.querySql = querySql;
this.table = table;
}

@Override
public TableInfo call() {
return createTableFromQuery();
}

TableInfo createTableFromQuery() {
TableId destinationTable = bigQueryClient.createDestinationTable(table);
log.debug("destinationTable is %s", destinationTable);
JobInfo jobInfo =
JobInfo.of(
QueryJobConfiguration.newBuilder(querySql)
.setDestinationTable(destinationTable)
.build());
log.debug("running query %s", jobInfo);
Job job = waitForJob(bigQueryClient.create(jobInfo));
log.debug("job has finished. %s", job);
if (job.getStatus().getError() != null) {
throw convertToBigQueryException(job.getStatus().getError());
}
// add expiration time to the table
TableInfo createdTable = bigQueryClient.getTable(destinationTable);
long expirationTime =
createdTable.getCreationTime()
+ TimeUnit.HOURS.toMillis(config.viewExpirationTimeInHours);
Table updatedTable =
bigQueryClient.update(createdTable.toBuilder().setExpirationTime(expirationTime).build());
return updatedTable;
}

Job waitForJob(Job job) {
try {
return job.waitFor();
Expand Down
Loading

0 comments on commit d2d0556

Please sign in to comment.