Skip to content

Commit

Permalink
spark-bigquery: fix a few of the common errors (#1377)
Browse files Browse the repository at this point in the history
* spark-bigquery: do not create relation on output dataset

Signed-off-by: Maciej Obuchowski <obuchowski.maciej@gmail.com>

* spark: do not collect metrics on spark 2

Signed-off-by: Maciej Obuchowski <obuchowski.maciej@gmail.com>

Signed-off-by: Maciej Obuchowski <obuchowski.maciej@gmail.com>
  • Loading branch information
mobuchowski committed Dec 8, 2022
1 parent 8d09259 commit f69c240
Show file tree
Hide file tree
Showing 34 changed files with 636 additions and 172 deletions.
3 changes: 3 additions & 0 deletions .circleci/continue_config.yml
Expand Up @@ -453,6 +453,9 @@ jobs:
JDK8_HOME: /usr/lib/jvm/java-8-openjdk-amd64
steps:
- *checkout_project_root
- gcp-cli/install
- gcp-cli/initialize
- run: mkdir -p app/build/gcloud && echo $GCLOUD_SERVICE_KEY > app/build/gcloud/gcloud-service-key.json && chmod 644 app/build/gcloud/gcloud-service-key.json
- restore_cache:
keys:
- v1-integration-spark-{{ .Branch }}-{{ .Revision }}
Expand Down
1 change: 1 addition & 0 deletions .circleci/workflows/openlineage-java.yml
Expand Up @@ -30,6 +30,7 @@ workflows:
parameters:
spark-version: [ '2.4.6', '3.1.3', '3.2.2', '3.3.1' ]
- integration-test-integration-spark:
context: integration-tests
matrix:
parameters:
spark-version: [ '2.4.6', '3.1.3', '3.2.2', '3.3.1' ]
Expand Down
Expand Up @@ -83,7 +83,6 @@ public void onSuccess(UserRecordResult result) {
public void onFailure(Throwable t) {
log.error("Failed to send to Kinesis lineage event: {}", eventAsJson, t);
}
;
};

Futures.addCallback(future, callback, this.listeningExecutor);
Expand Down
14 changes: 8 additions & 6 deletions integration/spark/app/build.gradle
Expand Up @@ -44,6 +44,7 @@ configurations {
spark3.extendsFrom testImplementation
spark32.extendsFrom testImplementation
spark33.extendsFrom testImplementation
pysparkContainerOnly
}

archivesBaseName='openlineage-spark-app'
Expand All @@ -58,10 +59,10 @@ ext {
shortVersion = sparkVersion.substring(0,3)

versionsMap = [
"3.3": ["module": "spark33", "scala": "2.12", "delta": "NA", "snowflake": "2.11.0-spark_3.3"],
"3.2": ["module": "spark32", "scala": "2.12", "delta": "1.1.0", "snowflake": "2.11.0-spark_3.2"],
"3.1": ["module": "spark3", "scala": "2.12", "delta": "1.0.0", "snowflake": "2.11.0-spark_3.1"],
"2.4": ["module": "spark2", "scala": "2.11", "delta": "NA", "snowflake": "2.9.3-spark_2.4"]
"3.3": ["module": "spark33", "scala": "2.12", "delta": "NA", "gcs": "hadoop3-2.2.9", "snowflake": "2.11.0-spark_3.3"],
"3.2": ["module": "spark32", "scala": "2.12", "delta": "1.1.0", "gcs": "hadoop3-2.2.9", "snowflake": "2.11.0-spark_3.2"],
"3.1": ["module": "spark3", "scala": "2.12", "delta": "1.0.0", "gcs": "hadoop3-2.2.9", "snowflake": "2.11.0-spark_3.1"],
"2.4": ["module": "spark2", "scala": "2.11", "delta": "NA", "gcs": "hadoop2-2.2.9", "snowflake": "2.9.3-spark_2.4"]
]
versions = versionsMap[shortVersion]
}
Expand Down Expand Up @@ -89,7 +90,7 @@ dependencies {

compileOnly "org.apache.spark:spark-core_${versions.scala}:${sparkVersion}"
compileOnly "org.apache.spark:spark-sql_${versions.scala}:${sparkVersion}"
compileOnly ("com.google.cloud.spark:spark-bigquery_${versions.scala}:${bigqueryVersion}") {
compileOnly ("com.google.cloud.spark:spark-bigquery-with-dependencies_${versions.scala}:${bigqueryVersion}") {
exclude group: 'com.fasterxml.jackson.core'
exclude group: 'com.fasterxml.jackson.module'
exclude group: 'com.sun.jmx'
Expand Down Expand Up @@ -124,6 +125,7 @@ dependencies {
}

testFixturesApi("org.apache.hadoop:hadoop-client:2.10.2") { force=true }
pysparkContainerOnly "com.google.cloud.bigdataoss:gcs-connector:${versions.gcs}:shaded"

if(versions.delta != "NA") { testFixturesApi "io.delta:delta-core_2.12:${versions.delta}" }

Expand Down Expand Up @@ -171,7 +173,7 @@ task copyDependencies(type: Copy) {
duplicatesStrategy = DuplicatesStrategy.EXCLUDE
delete layout.buildDirectory.dir("dependencies")
def config = configurations."${versions.module}"
from config.getFiles()
from config.getFiles() + configurations.pysparkContainerOnly.getFiles()
include "*.jar"
into layout.buildDirectory.dir("dependencies")
}
Expand Down
@@ -0,0 +1,59 @@
{
"eventType": "COMPLETE",
"job": {
"namespace": "testReadAndWriteFromBigquery",
"name": "open_lineage_spark_bigquery.execute_insert_into_hadoop_fs_relation_command"
},
"inputs": [{
"namespace": "bigquery",
"name": "openlineage-ci.airflow_integration.3_3_1_source",
"facets": {
"dataSource": {
"name": "bigquery",
"uri": "bigquery"
},
"schema": {
"fields": [{
"name": "a",
"type": "long"
}, {
"name": "b",
"type": "long"
}]
}
},
"inputFacets": {}
}],
"outputs": [{
"namespace": "gs://openlineage-spark-bigquery-integration",
"facets": {
"schema": {
"fields": [{
"name": "a",
"type": "long"
}, {
"name": "b",
"type": "long"
}]
},
"columnLineage": {
"fields": {
"a": {
"inputFields": [{
"namespace": "namespace",
"name": "openlineage-ci.airflow_integration.3_3_1_source",
"field": "a"
}]
},
"b": {
"inputFields": [{
"namespace": "namespace",
"name": "openlineage-ci.airflow_integration.3_3_1_source",
"field": "b"
}]
}
}
}
}
}]
}
@@ -0,0 +1,54 @@
{
"eventType": "START",
"job": {
"namespace": "testReadAndWriteFromBigquery",
"name": "open_lineage_spark_bigquery.execute_insert_into_hadoop_fs_relation_command"
},
"inputs": [{
"namespace": "bigquery",
"name": "openlineage-ci.airflow_integration.3_3_1_source",
"facets": {
"schema": {
"fields": [{
"name": "a",
"type": "long"
}, {
"name": "b",
"type": "long"
}]
}
}
}],
"outputs": [{
"namespace": "gs://openlineage-spark-bigquery-integration",
"facets": {
"schema": {
"fields": [{
"name": "a",
"type": "long"
}, {
"name": "b",
"type": "long"
}]
},
"columnLineage": {
"fields": {
"a": {
"inputFields": [{
"namespace": "namespace",
"name": "openlineage-ci.airflow_integration.3_3_1_source",
"field": "a"
}]
},
"b": {
"inputFields": [{
"namespace": "namespace",
"name": "openlineage-ci.airflow_integration.3_3_1_source",
"field": "b"
}]
}
}
}
}
}]
}
@@ -0,0 +1,26 @@
{
"eventType": "COMPLETE",
"job": {
"namespace": "testReadAndWriteFromBigquery",
"name": "open_lineage_spark_bigquery.execute_save_into_data_source_command"
},
"inputs": [{
"namespace": "bigquery",
"name": "openlineage-ci.airflow_integration.3_3_1_source",
"facets": {
"schema": {
"fields": [{
"name": "a",
"type": "long"
}, {
"name": "b",
"type": "long"
}]
}
}
}],
"outputs": [{
"namespace": "bigquery",
"name": "openlineage-ci.airflow_integration.3_3_1_target"
}]
}
@@ -0,0 +1,26 @@
{
"eventType": "START",
"job": {
"namespace": "testReadAndWriteFromBigquery",
"name": "open_lineage_spark_bigquery.execute_save_into_data_source_command"
},
"inputs": [{
"namespace": "bigquery",
"name": "openlineage-ci.airflow_integration.3_3_1_source",
"facets": {
"schema": {
"fields": [{
"name": "a",
"type": "long"
}, {
"name": "b",
"type": "long"
}]
}
}
}],
"outputs": [{
"namespace": "bigquery",
"name": "openlineage-ci.airflow_integration.3_3_1_target"
}]
}
Expand Up @@ -91,7 +91,7 @@ public void emit(OpenLineage.RunEvent event) {
log.debug(
"Emitting lineage completed successfully: {}", OpenLineageClientUtils.toJson(event));
} catch (OpenLineageClientException exception) {
log.error("Could not emit lineage w/ exception", exception);
log.error("Could not emit lineage w/ exception", exception.getCause());
}
}

Expand Down
Expand Up @@ -37,6 +37,7 @@
import org.apache.spark.SparkContext$;
import org.apache.spark.SparkEnv;
import org.apache.spark.SparkEnv$;
import org.apache.spark.package$;
import org.apache.spark.rdd.RDD;
import org.apache.spark.scheduler.ActiveJob;
import org.apache.spark.scheduler.SparkListenerApplicationEnd;
Expand Down Expand Up @@ -81,6 +82,8 @@ public class OpenLineageSparkListener extends org.apache.spark.scheduler.SparkLi
private final Function0<Option<SparkContext>> activeSparkContext =
ScalaConversionUtils.toScalaFn(SparkContext$.MODULE$::getActive);

String sparkVersion = package$.MODULE$.SPARK_VERSION();

private static final boolean isDisabled = checkIfDisabled();

/** called by the tests */
Expand Down Expand Up @@ -137,7 +140,10 @@ public void onJobStart(SparkListenerJobStart jobStart) {
ScalaConversionUtils.fromSeq(jobStart.stageIds()).stream()
.map(Integer.class::cast)
.collect(Collectors.toSet());
jobMetrics.addJobStages(jobStart.jobId(), stages);

if (sparkVersion.startsWith("3")) {
jobMetrics.addJobStages(jobStart.jobId(), stages);
}

Optional.ofNullable(getSqlExecutionId(jobStart.properties()))
.map(Optional::of)
Expand Down Expand Up @@ -178,12 +184,14 @@ public void onJobEnd(SparkListenerJobEnd jobEnd) {
if (context != null) {
context.end(jobEnd);
}
jobMetrics.cleanUp(jobEnd.jobId());
if (sparkVersion.startsWith("3")) {
jobMetrics.cleanUp(jobEnd.jobId());
}
}

@Override
public void onTaskEnd(SparkListenerTaskEnd taskEnd) {
if (isDisabled) {
if (isDisabled || sparkVersion.startsWith("2")) {
return;
}
jobMetrics.addMetrics(taskEnd.stageId(), taskEnd.taskMetrics());
Expand Down
Expand Up @@ -154,13 +154,16 @@ public Collection<PartialFunction<Object, List<OutputDataset>>> createOutputData
@Override
public Collection<CustomFacetBuilder<?, ? extends OutputDatasetFacet>>
createOutputDatasetFacetBuilders(OpenLineageContext context) {
return ImmutableList.<CustomFacetBuilder<?, ? extends OutputDatasetFacet>>builder()
.addAll(
generate(
eventHandlerFactories,
factory -> factory.createOutputDatasetFacetBuilders(context)))
.add(new OutputStatisticsOutputDatasetFacetBuilder(context))
.build();
ImmutableList.Builder<CustomFacetBuilder<?, ? extends OutputDatasetFacet>> builder =
ImmutableList.<CustomFacetBuilder<?, ? extends OutputDatasetFacet>>builder()
.addAll(
generate(
eventHandlerFactories,
factory -> factory.createOutputDatasetFacetBuilders(context)));
if (context.getSparkVersion().startsWith("3")) {
builder.add(new OutputStatisticsOutputDatasetFacetBuilder(context));
}
return builder.build();
}

@Override
Expand Down
Expand Up @@ -3,23 +3,31 @@
/* SPDX-License-Identifier: Apache-2.0
*/

package com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery;
package com.google.cloud.bigquery;

import com.google.cloud.bigquery.connector.common.BigQueryClient;
import com.google.cloud.bigquery.connector.common.MockBigQueryClientModule;
import com.google.cloud.spark.bigquery.BigQueryRelation;
import com.google.cloud.spark.bigquery.BigQueryRelationProvider;
import com.google.cloud.spark.bigquery.DataSourceVersion;
import com.google.cloud.spark.bigquery.SparkBigQueryConfig;
import com.google.cloud.spark.bigquery.SparkBigQueryConnectorModule;
import com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.common.MockBigQueryClientModule;
import com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.BigQuery;
import com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.StandardTableDefinition;
import com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.Table;
import com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.TableDefinition;
import com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.TableId;
import com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.TableInfo;
import com.google.cloud.spark.bigquery.repackaged.com.google.inject.Binder;
import com.google.cloud.spark.bigquery.repackaged.com.google.inject.Guice;
import com.google.cloud.spark.bigquery.repackaged.com.google.inject.Injector;
import com.google.cloud.spark.bigquery.repackaged.com.google.inject.Key;
import com.google.cloud.spark.bigquery.repackaged.com.google.inject.Module;
import java.math.BigInteger;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.Collections;
import java.util.Optional;
import org.apache.commons.lang3.reflect.MethodUtils;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
Expand All @@ -41,11 +49,17 @@ public class MockBigQueryRelationProvider extends BigQueryRelationProvider {
public static final BigQuery BIG_QUERY = Mockito.mock(BigQuery.class);
public static final MockInjector INJECTOR = new MockInjector();

public static Table makeTable(TableId id, StandardTableDefinition tableDefinition) {
return new Table.Builder(BIG_QUERY, id, tableDefinition)
.setNumBytes(tableDefinition.getNumBytes())
.setNumRows(BigInteger.valueOf(tableDefinition.getNumRows()))
.build();
public static Table makeTable(TableId id, StandardTableDefinition tableDefinition)
throws InstantiationException, IllegalAccessException, InvocationTargetException,
NoSuchMethodException {
Constructor<Table.Builder> constructor =
Table.Builder.class.getDeclaredConstructor(
BigQuery.class, TableId.class, TableDefinition.class);
constructor.setAccessible(true);

Table.Builder builder = constructor.newInstance(BIG_QUERY, id, tableDefinition);
MethodUtils.invokeMethod(builder, true, "setNumBytes", tableDefinition.getNumBytes());
return builder.build();
}

@Override
Expand Down Expand Up @@ -100,7 +114,7 @@ public StructType apply() {
}
})),
DataSourceVersion.V1,
true));
false));
}

public void setTestModule(Module testModule) {
Expand Down

0 comments on commit f69c240

Please sign in to comment.