Skip to content

Commit

Permalink
add insert support
Browse files Browse the repository at this point in the history
  • Loading branch information
Scott Kuehn committed Aug 28, 2021
1 parent 4685db1 commit cae8e7b
Show file tree
Hide file tree
Showing 9 changed files with 237 additions and 45 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
<sonar.host.url>https://sonarcloud.io</sonar.host.url>
<sonar.version>3.6.1.1688</sonar.version>
<sonar.version>3.6.1.1688</sonar.version>
<zetasql.version>2021.02.1</zetasql.version>
<zetasql.version>2021.03.2</zetasql.version>
<protoc-jar-maven-plugin.version>3.11.4</protoc-jar-maven-plugin.version>
<truth.version>1.1.2</truth.version>
</properties>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@
package com.google.cloud.solutions.datalineage;

import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static com.google.zetasql.Analyzer.extractTableNamesFromStatement;

import com.google.cloud.solutions.datalineage.extractor.ColumnLineageExtractor;
import com.google.cloud.solutions.datalineage.extractor.ColumnLineageExtractorFactory;
import com.google.cloud.solutions.datalineage.extractor.FunctionExpressionsExtractor;
import com.google.cloud.solutions.datalineage.extractor.GroupByExtractor;
import com.google.cloud.solutions.datalineage.extractor.InsertStatementExtractor;
import com.google.cloud.solutions.datalineage.extractor.SimpleAggregateExtractor;
import com.google.cloud.solutions.datalineage.model.LineageMessages.ColumnEntity;
import com.google.cloud.solutions.datalineage.model.LineageMessages.ColumnLineage;
Expand All @@ -48,7 +48,7 @@ public class BigQuerySqlParser {
static {
ColumnLineageExtractorFactory.register(
FunctionExpressionsExtractor.class, GroupByExtractor.class,
SimpleAggregateExtractor.class);
SimpleAggregateExtractor.class, InsertStatementExtractor.class);
}

private final ZetaSqlSchemaLoader tableSchemaLoader;
Expand Down Expand Up @@ -110,7 +110,7 @@ private ImmutableSet<ImmutableMap<ColumnEntity, ColumnLineage>> extractLineages(
}

private static ImmutableSet<String> extractReferencedTables(String sql) {
return extractTableNamesFromStatement(sql).stream()
return Analyzer.extractTableNamesFromStatement(sql, enableAllFeatures()).stream()
.flatMap(List::stream)
.collect(toImmutableSet());
}
Expand All @@ -119,9 +119,11 @@ private ResolvedStatement resolve(String sql) {
return Analyzer.analyzeStatement(sql, enableAllFeatures(), buildCatalogWithQueryTables(sql));
}

private AnalyzerOptions enableAllFeatures() {
private static AnalyzerOptions enableAllFeatures() {
LanguageOptions languageOptions = new LanguageOptions().enableMaximumLanguageFeatures();
languageOptions.setSupportsAllStatementKinds();
AnalyzerOptions analyzerOptions = new AnalyzerOptions();
analyzerOptions.setLanguageOptions(new LanguageOptions().enableMaximumLanguageFeatures());
analyzerOptions.setLanguageOptions(languageOptions);
analyzerOptions.setPruneUnusedColumns(true);

return analyzerOptions;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import com.google.zetasql.SimpleCatalog;
import com.google.zetasql.resolvedast.ResolvedNodes.ResolvedStatement;
import java.util.Arrays;
import java.util.Collection;
import java.util.Optional;

/**
Expand Down Expand Up @@ -91,22 +90,10 @@ public QueryColumns outputColumns() {
}

/**
* Returns a set of applicable Extractors for the provided SQL query.
* Returns a set of applicable Extractors
*/
public ImmutableSet<ColumnLineageExtractor> buildExtractors() {
return outputColumns()
.getProcessedColumnTypes().stream()
.map(this::buildExtractorFor)
.flatMap(Collection::stream)
.distinct()
.collect(toImmutableSet());
}

/**
* Returns a set of extractors for given columnType or empty set, if not find.
*/
public ImmutableSet<ColumnLineageExtractor> buildExtractorFor(String columnType) {
return extractorTypeMap.get(columnType).stream()
return extractorTypeMap.values().stream()
.map(clazz -> buildExtractor(clazz, resolvedStatement))
.filter(Optional::isPresent)
.map(Optional::get)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package com.google.cloud.solutions.datalineage.extractor;

import static com.google.cloud.solutions.datalineage.converter.ResolvedColumnToColumnEntityConverter.convertToColumnEntity;

import com.google.cloud.solutions.datalineage.model.LineageMessages.ColumnEntity;
import com.google.cloud.solutions.datalineage.model.LineageMessages.ColumnLineage;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.zetasql.resolvedast.ResolvedColumn;
import com.google.zetasql.resolvedast.ResolvedNodes;
import java.util.HashMap;
import java.util.stream.IntStream;

public class InsertStatementExtractor extends ColumnLineageExtractor {

public InsertStatementExtractor(ResolvedNodes.ResolvedStatement resolvedStatement) {
super(resolvedStatement);
}

@Override
public String getSupportedColumnType() {
// TODO skuehn this PR determine how to support column types and the extractor instantiation
// optimizations
return null;
}

@Override
public ImmutableMap<ColumnEntity, ColumnLineage> extract() {
HashMap<ColumnEntity, ColumnLineage> exprLineageMapBuilder = new HashMap<>();

resolvedStatement.accept(
new ResolvedNodes.Visitor() {
@Override
public void visit(ResolvedNodes.ResolvedInsertStmt insertStmt) {
if (!insertStmt.getQueryOutputColumnList().isEmpty()) {
// For inserts, BQ requires values to be added in the same order as the specified
// columns, and the number of values added must match the number of specified columns.
IntStream.range(0, insertStmt.getInsertColumnList().size()).forEach(columnIndex -> {
ColumnEntity outputColumn =
convertToColumnEntity(insertStmt.getInsertColumnList().get(columnIndex));
ResolvedColumn sourceColumn = insertStmt.getQueryOutputColumnList()
.get(columnIndex);

exprLineageMapBuilder.put(
outputColumn,
ColumnLineage.newBuilder()
.setTarget(outputColumn)
.addAllParents(ImmutableList.of(convertToColumnEntity(sourceColumn)))
.build());
});
}
super.visit(insertStmt);
}
});

return ImmutableMap.copyOf(exprLineageMapBuilder);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import com.google.cloud.solutions.datalineage.model.LineageMessages.ColumnEntity;
import com.google.cloud.solutions.datalineage.model.QueryColumns;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.zetasql.resolvedast.ResolvedColumn;
import com.google.zetasql.resolvedast.ResolvedNodes;
import com.google.zetasql.resolvedast.ResolvedNodes.ResolvedStatement;
Expand All @@ -42,7 +41,6 @@ public OutputColumnExtractor(ResolvedStatement resolvedStatement) {
*/
public QueryColumns extract() {
ImmutableMap.Builder<String, ColumnEntity> outputColumnBuilder = ImmutableMap.builder();
ImmutableSet.Builder<String> nonTableColumnTypes = ImmutableSet.builder();

resolvedStatement.accept(
new ResolvedNodes.Visitor() {
Expand All @@ -53,12 +51,16 @@ public void visit(ResolvedNodes.ResolvedOutputColumn outputColumn) {
outputColumn.getName(),
convertToColumnEntity(resolvedColumn));

if (resolvedColumn.getTableName().startsWith("$")) {
nonTableColumnTypes.add(resolvedColumn.getTableName());
}

super.visit(outputColumn);
}

@Override
public void visit(ResolvedNodes.ResolvedInsertStmt insertStmt) {
insertStmt.getInsertColumnList().forEach(resolvedColumn ->
outputColumnBuilder.put(resolvedColumn.getName(),
convertToColumnEntity(resolvedColumn)));
super.visit(insertStmt);
}
});

return QueryColumns.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,12 @@

package com.google.cloud.solutions.datalineage.model;

import static com.google.common.collect.ImmutableSet.toImmutableSet;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonPOJOBuilder;
import com.google.auto.value.AutoValue;
import com.google.cloud.solutions.datalineage.model.LineageMessages.ColumnEntity;
import com.google.cloud.solutions.datalineage.model.LineageMessages.DataEntity.DataEntityTypes;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.schemas.annotations.SchemaCreate;
Expand All @@ -40,15 +36,6 @@ public abstract class QueryColumns {

public abstract ImmutableMap<String, ColumnEntity> getColumnMap();

public final ImmutableSet<String> getProcessedColumnTypes() {
return getColumnMap().values().stream()
.filter(columnEntity -> columnEntity.getTable().getKind()
.equals(DataEntityTypes.QUERY_LEVEL_TABLE))
.map(columnEntity -> columnEntity.getTable().getSqlResource())
.filter(tableName -> tableName.startsWith("$"))
.collect(toImmutableSet());
}

@SchemaCreate
public static QueryColumns create(ImmutableMap<String, ColumnEntity> columnMap) {
return builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,12 @@
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;


@RunWith(JUnit4.class)
public final class BigQuerySqlParserTest {

@Test
public void extractColumnLineage_concatColumns_correctColumnNames() {
public void queryExtractColumnLineage_concatColumns_correctColumnNames() {
FakeBigQueryServiceFactory fakeBigqueryFactory =
FakeBigQueryServiceFactory
.forTableSchemas(
Expand Down Expand Up @@ -72,7 +73,7 @@ public void extractColumnLineage_concatColumns_correctColumnNames() {
}

@Test
public void extractColumnLineage_multipleOutputColumnsWithAlias_correctColumnLineage() {
public void queryExtractColumnLineage_multipleOutputColumnsWithAlias_correctColumnLineage() {
FakeBigQueryServiceFactory fakeBigqueryFactory =
FakeBigQueryServiceFactory.forTableSchemas(
TestResourceLoader.load("schemas/tableA_schema.json"),
Expand Down Expand Up @@ -116,7 +117,7 @@ public void extractColumnLineage_multipleOutputColumnsWithAlias_correctColumnLin
}

@Test
public void extractColumnLineage_multipleOutputColumnsWithoutAlias_correctColumnLineage() {
public void queryExtractColumnLineage_multipleOutputColumnsWithoutAlias_correctColumnLineage() {
FakeBigQueryServiceFactory fakeBigqueryFactory =
FakeBigQueryServiceFactory.forTableSchemas(
TestResourceLoader.load("schemas/tableA_schema.json"),
Expand Down Expand Up @@ -169,7 +170,7 @@ public void extractColumnLineage_multipleOutputColumnsWithoutAlias_correctColumn

@Test
public void
extractColumnLineage_bigQuerySchemaMultipleOutputColumnsWithoutAlias_correctColumnLineage() {
queryExtractColumnLineage_bigQuerySchemaMultipleOutputColumnsWithoutAlias_correctColumnLineage() {
FakeBigQueryServiceFactory fakeBigqueryFactory =
FakeBigQueryServiceFactory.forTableSchemas(
TestResourceLoader.load("schemas/daily_report_table_schema.json"),
Expand Down Expand Up @@ -278,4 +279,86 @@ public void extractColumnLineage_publicDatasetQuery_correctColumnLineage() {
.setColumn("color").build()))
.build());
}
}

@Test
public void insertExtractColumnLineage_tableA_specifyColumns_correctColumnLineage() {
FakeBigQueryServiceFactory fakeBigQueryFactory =
FakeBigQueryServiceFactory.forTableSchemas(
TestResourceLoader.load("schemas/tableA_schema.json"),
TestResourceLoader.load("schemas/tableB_schema.json"));
BigQueryZetaSqlSchemaLoader fakeSchemaLoader =
new BigQueryZetaSqlSchemaLoader(
BigQueryTableLoadService.usingServiceFactory(fakeBigQueryFactory));

ImmutableSet<ColumnLineage> resolvedStatement =
new BigQuerySqlParser(fakeSchemaLoader)
.extractColumnLineage(
TestResourceLoader
.load(
"sql/tableA_specify_columns_insert.sql"));

assertThat(resolvedStatement)
.containsExactly(
ColumnLineage.newBuilder()
.setTarget(ColumnEntity.newBuilder().setColumn("colA").build())
.addAllParents(
ImmutableSet.of(
ColumnEntity.newBuilder().setTable(
BigQueryTableCreator
.usingBestEffort("project2.datasetB.TableB")
.dataEntity())
.setColumn("colB").build()))
.build(),
ColumnLineage.newBuilder()
.setTarget(ColumnEntity.newBuilder().setColumn("colC").build())
.addAllParents(
ImmutableSet.of(
ColumnEntity.newBuilder().setTable(
BigQueryTableCreator
.usingBestEffort("project2.datasetB.TableB")
.dataEntity())
.setColumn("colC").build()))
.build());
}

@Test
public void insertExtractColumnLineage_tableA_inferColumns_correctColumnLineage() {
FakeBigQueryServiceFactory fakeBigQueryFactory =
FakeBigQueryServiceFactory.forTableSchemas(
TestResourceLoader.load("schemas/tableA_schema.json"),
TestResourceLoader.load("schemas/tableB_schema.json"));
BigQueryZetaSqlSchemaLoader fakeSchemaLoader =
new BigQueryZetaSqlSchemaLoader(
BigQueryTableLoadService.usingServiceFactory(fakeBigQueryFactory));

ImmutableSet<ColumnLineage> resolvedStatement =
new BigQuerySqlParser(fakeSchemaLoader)
.extractColumnLineage(
TestResourceLoader
.load(
"sql/tableA_infer_columns_insert.sql"));

assertThat(resolvedStatement)
.containsExactly(
ColumnLineage.newBuilder()
.setTarget(ColumnEntity.newBuilder().setColumn("colA").build())
.addAllParents(
ImmutableSet.of(
ColumnEntity.newBuilder().setTable(
BigQueryTableCreator
.usingBestEffort("project2.datasetB.TableB")
.dataEntity())
.setColumn("colB").build()))
.build(),
ColumnLineage.newBuilder()
.setTarget(ColumnEntity.newBuilder().setColumn("colC").build())
.addAllParents(
ImmutableSet.of(
ColumnEntity.newBuilder().setTable(
BigQueryTableCreator
.usingBestEffort("project2.datasetB.TableB")
.dataEntity())
.setColumn("colC").build()))
.build());
}
}
36 changes: 36 additions & 0 deletions src/test/resources/sql/tableA_infer_columns_insert.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
--
-- Licensed under the Apache License, Version 2.0 (the "License");
-- you may not use this file except in compliance with the License.
-- You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.

#standardSQL
INSERT
`project1.datasetA.TableA`
SELECT
colB,
colC
FROM
`project2.datasetB.TableB`
Loading

0 comments on commit cae8e7b

Please sign in to comment.