Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-8844] Missing commit with suggested review changes #10547

Merged
merged 1 commit into from Jan 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 0 additions & 2 deletions .test-infra/jenkins/job_PerformanceTests_SQLIO_Java.groovy
Expand Up @@ -17,8 +17,6 @@
*/
import CommonJobProperties as common

def now = new Date().format("MMddHHmmss", TimeZone.getTimeZone('UTC'))

def jobConfigs = [
[
title : 'SQL BigQueryIO with push-down Batch Performance Test Java',
Expand Down
Expand Up @@ -35,6 +35,7 @@
import org.apache.beam.sdk.extensions.sql.meta.store.InMemoryMetaStore;
import org.apache.beam.sdk.io.common.IOITHelper;
import org.apache.beam.sdk.io.common.IOTestPipelineOptions;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead.Method;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testutils.NamedTestResult;
Expand All @@ -57,6 +58,8 @@
public class BigQueryIOPushDownIT {
private static final String READ_FROM_TABLE = "bigquery-public-data:hacker_news.full";
private static final String NAMESPACE = BigQueryIOPushDownIT.class.getName();
private static final String FIELDS_READ_METRIC = "fields_read";
private static final String READ_TIME_METRIC = "read_time";
private static final String CREATE_TABLE_STATEMENT =
"CREATE EXTERNAL TABLE HACKER_NEWS( \n"
+ " title VARCHAR, \n"
Expand Down Expand Up @@ -97,17 +100,17 @@ public static void setUp() {

@Before
public void before() {
sqlEnv = BeamSqlEnv.inMemory(new BigQueryPerfTableProvider(NAMESPACE, "fields_read"));
sqlEnv = BeamSqlEnv.inMemory(new BigQueryPerfTableProvider(NAMESPACE, FIELDS_READ_METRIC));
}

@Test
public void readUsingDirectReadMethodPushDown() {
sqlEnv.executeDdl(String.format(CREATE_TABLE_STATEMENT, "DIRECT_READ"));
sqlEnv.executeDdl(String.format(CREATE_TABLE_STATEMENT, Method.DIRECT_READ.toString()));

BeamRelNode beamRelNode = sqlEnv.parseQuery(SELECT_STATEMENT);
PCollection<Row> output =
BeamSqlRelUtils.toPCollection(pipeline, beamRelNode)
.apply(ParDo.of(new TimeMonitor<>(NAMESPACE, "read_time")));
.apply(ParDo.of(new TimeMonitor<>(NAMESPACE, READ_TIME_METRIC)));

PipelineResult result = pipeline.run();
result.waitUntilFinish();
Expand All @@ -124,18 +127,19 @@ public void readUsingDirectReadMethod() {
ruleList.remove(BeamIOPushDownRule.INSTANCE);

InMemoryMetaStore inMemoryMetaStore = new InMemoryMetaStore();
inMemoryMetaStore.registerProvider(new BigQueryPerfTableProvider(NAMESPACE, "fields_read"));
inMemoryMetaStore.registerProvider(
new BigQueryPerfTableProvider(NAMESPACE, FIELDS_READ_METRIC));
sqlEnv =
BeamSqlEnv.builder(inMemoryMetaStore)
.setPipelineOptions(PipelineOptionsFactory.create())
.setRuleSets(new RuleSet[] {RuleSets.ofList(ruleList)})
.build();
sqlEnv.executeDdl(String.format(CREATE_TABLE_STATEMENT, "DIRECT_READ"));
sqlEnv.executeDdl(String.format(CREATE_TABLE_STATEMENT, Method.DIRECT_READ.toString()));

BeamRelNode beamRelNode = sqlEnv.parseQuery(SELECT_STATEMENT);
PCollection<Row> output =
BeamSqlRelUtils.toPCollection(pipeline, beamRelNode)
.apply(ParDo.of(new TimeMonitor<>(NAMESPACE, "read_time")));
.apply(ParDo.of(new TimeMonitor<>(NAMESPACE, READ_TIME_METRIC)));

PipelineResult result = pipeline.run();
result.waitUntilFinish();
Expand All @@ -144,12 +148,12 @@ public void readUsingDirectReadMethod() {

@Test
public void readUsingDefaultMethod() {
sqlEnv.executeDdl(String.format(CREATE_TABLE_STATEMENT, "DEFAULT"));
sqlEnv.executeDdl(String.format(CREATE_TABLE_STATEMENT, Method.DEFAULT.toString()));

BeamRelNode beamRelNode = sqlEnv.parseQuery(SELECT_STATEMENT);
PCollection<Row> output =
BeamSqlRelUtils.toPCollection(pipeline, beamRelNode)
.apply(ParDo.of(new TimeMonitor<>(NAMESPACE, "read_time")));
.apply(ParDo.of(new TimeMonitor<>(NAMESPACE, READ_TIME_METRIC)));

PipelineResult result = pipeline.run();
result.waitUntilFinish();
Expand All @@ -171,14 +175,15 @@ private Set<Function<MetricsReader, NamedTestResult>> getReadSuppliers(
Set<Function<MetricsReader, NamedTestResult>> suppliers = new HashSet<>();
suppliers.add(
reader -> {
long readStart = reader.getStartTimeMetric("read_time");
long readEnd = reader.getEndTimeMetric("read_time");
return NamedTestResult.create(uuid, timestamp, "read_time", (readEnd - readStart) / 1e3);
long readStart = reader.getStartTimeMetric(READ_TIME_METRIC);
long readEnd = reader.getEndTimeMetric(READ_TIME_METRIC);
return NamedTestResult.create(
uuid, timestamp, READ_TIME_METRIC, (readEnd - readStart) / 1e3);
});
suppliers.add(
reader -> {
long fieldsRead = reader.getCounterMetric("fields_read");
return NamedTestResult.create(uuid, timestamp, "fields_read", fieldsRead);
long fieldsRead = reader.getCounterMetric(FIELDS_READ_METRIC);
return NamedTestResult.create(uuid, timestamp, FIELDS_READ_METRIC, fieldsRead);
});
return suppliers;
}
Expand Down
Expand Up @@ -34,35 +34,22 @@
* limitations under the License.
*/

import static org.apache.beam.vendor.calcite.v1_20_0.com.google.common.base.MoreObjects.firstNonNull;

import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
import org.apache.beam.sdk.extensions.sql.meta.Table;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils.ConversionOptions;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils.ConversionOptions.TruncateTimestamps;

/** A test table provider for BigQueryIOPushDownIT. */
public class BigQueryPerfTableProvider extends BigQueryTableProvider {
private final String namespace;
private final String metric;

BigQueryPerfTableProvider(String namespace, String metric) {
super();
this.namespace = namespace;
this.metric = metric;
}

@Override
public BeamSqlTable buildBeamSqlTable(Table table) {
return new BigQueryPerfTable(
table,
ConversionOptions.builder()
.setTruncateTimestamps(
firstNonNull(table.getProperties().getBoolean("truncateTimestamps"), false)
? TruncateTimestamps.TRUNCATE
: TruncateTimestamps.REJECT)
.build(),
namespace,
metric);
table, getConversionOptions(table.getProperties()), namespace, metric);
}
}
Expand Up @@ -19,6 +19,7 @@

import static org.apache.beam.vendor.calcite.v1_20_0.com.google.common.base.MoreObjects.firstNonNull;

import com.alibaba.fastjson.JSONObject;
import com.google.auto.service.AutoService;
import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
import org.apache.beam.sdk.extensions.sql.meta.Table;
Expand Down Expand Up @@ -52,13 +53,15 @@ public String getTableType() {

@Override
public BeamSqlTable buildBeamSqlTable(Table table) {
return new BigQueryTable(
table,
ConversionOptions.builder()
.setTruncateTimestamps(
firstNonNull(table.getProperties().getBoolean("truncateTimestamps"), false)
? TruncateTimestamps.TRUNCATE
: TruncateTimestamps.REJECT)
.build());
return new BigQueryTable(table, getConversionOptions(table.getProperties()));
}

protected static ConversionOptions getConversionOptions(JSONObject properties) {
return ConversionOptions.builder()
.setTruncateTimestamps(
firstNonNull(properties.getBoolean("truncateTimestamps"), false)
? TruncateTimestamps.TRUNCATE
: TruncateTimestamps.REJECT)
.build();
}
}