Skip to content

Commit

Permalink
Fixed 'select *' case when ColumnarBatch is used (#263)
Browse files Browse the repository at this point in the history
  • Loading branch information
mingjialiu committed Nov 6, 2020
1 parent 4d3bd1b commit 4c2ec94
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -150,14 +150,25 @@ public List<InputPartition<ColumnarBatch>> planBatchInputPartitions() {
readSessionCreator.create(
tableId, selectedFields, filter, readSessionCreatorConfig.getMaxParallelism());
ReadSession readSession = readSessionResponse.getReadSession();

if (selectedFields.isEmpty()) {
// means select *
Schema tableSchema = readSessionResponse.getReadTableInfo().getDefinition().getSchema();
selectedFields =
tableSchema.getFields().stream()
.map(Field::getName)
.collect(ImmutableList.toImmutableList());
}

ImmutableList<String> partitionSelectedFields = selectedFields;
return readSession.getStreamsList().stream()
.map(
stream ->
new ArrowInputPartition(
bigQueryReadClientFactory,
stream.getName(),
readSessionCreatorConfig.getMaxReadRowsRetries(),
selectedFields,
partitionSelectedFields,
readSessionResponse))
.collect(Collectors.toList());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,18 @@ class SparkBigQueryEndToEndITSuite extends FunSuite
assert(row(1).isInstanceOf[String])
}

test("select all columns from a table. DataSource %s. Data Format %s"
.format(dataSourceFormat, dataFormat)) {
val row = spark.read.format(dataSourceFormat)
.option("table", SHAKESPEARE_TABLE)
.option("readDataFormat", dataFormat).load()
.select("word_count", "word", "corpus", "corpus_date").head
assert(row(0).isInstanceOf[Long])
assert(row(1).isInstanceOf[String])
assert(row(2).isInstanceOf[String])
assert(row(3).isInstanceOf[Long])
}

test("cache data frame in DataSource %s. Data Format %s".format(dataSourceFormat, dataFormat)) {
val allTypesTable = readAllTypesTable("bigquery")
writeToBigQuery(allTypesTable, SaveMode.Overwrite, "avro")
Expand Down

0 comments on commit 4c2ec94

Please sign in to comment.