Skip to content
Permalink
Browse files
[GOBBLIN-1335] Publish GMCE(GobblinMetadataChangeEvent) publisher and…
… iceberg retention job to Gobblin OSS

[GOBBLIN-1335] Publish
GMCE(GobblinMetadataChangeEvent) publisher and
iceberg retention job to Gobblin OSS

add OSS comments

add unit test

Add icebergWriter to be able to consume GMCE and
register into iceberg table

Make it re-writable for catalog inside
icebergWriter

address comments

trigger build

trigger build

address comments

Merge branch 'master' into GOBBLIN-1335

Merge branch 'master' into GOBBLIN-1335

define iceberg version

address comments

include recent changes

Merge remote-tracking branch 'origin' into
GOBBLIN-1335

Merge remote-tracking branch 'origin' into
GOBBLIN-1335

merge master and modify method privacy level to
make them be re-writable

Closes #3172 from ZihanLi58/GOBBLIN-1335
  • Loading branch information
ZihanLi58 authored and autumnust committed Feb 4, 2021
1 parent 23f085a commit 4e2dc94eb570086c768c247f669255d62f84f3d6
Showing 31 changed files with 3,786 additions and 3 deletions.
@@ -159,7 +159,7 @@ gobblin.aws.master.s3.jars.uri="https://s3-us-west-2.amazonaws.com/gobblin-libs/
## All Gobblin Jars and Configuration on S3
gobblin.aws.master.s3.conf.files="application.conf,log4j.properties,quartz.properties"
gobblin.aws.master.s3.jars.uri="https://s3-us-west-2.amazonaws.com/gobblin-libs/latest-jars/"
gobblin.aws.master.s3.jars.files="ST4-4.0.4.jar,activation-1.1.1.jar,annotations-2.0.1.jar,ant-1.9.1.jar,ant-launcher-1.9.1.jar,antlr-runtime-3.5.2.jar,aopalliance-1.0.jar,apache-log4j-extras-1.2.17.jar,asm-3.1.jar,asm-commons-3.1.jar,asm-tree-3.1.jar,avro-1.7.7.jar,avro-ipc-1.7.7-tests.jar,avro-ipc-1.7.7.jar,avro-mapred-1.7.7-hadoop2.jar,aws-java-sdk-applicationautoscaling-1.11.8.jar,aws-java-sdk-autoscaling-1.11.8.jar,aws-java-sdk-core-1.11.8.jar,aws-java-sdk-ec2-1.11.8.jar,aws-java-sdk-iam-1.11.8.jar,aws-java-sdk-kms-1.11.8.jar,aws-java-sdk-s3-1.11.8.jar,aws-java-sdk-sts-1.11.8.jar,azkaban-2.5.0.jar,bcpg-jdk15on-1.52.jar,bcprov-jdk15on-1.52.jar,bonecp-0.8.0.RELEASE.jar,bsh-2.0b4.jar,c3p0-0.9.1.1.jar,calcite-avatica-1.2.0-incubating.jar,calcite-core-1.2.0-incubating.jar,calcite-linq4j-1.2.0-incubating.jar,cglib-nodep-2.2.jar,codemodel-2.2.jar,commons-cli-1.3.1.jar,commons-codec-1.10.jar,commons-collections-3.2.1.jar,commons-compiler-2.7.6.jar,commons-compress-1.10.jar,commons-configuration-1.10.jar,commons-daemon-1.0.13.jar,commons-dbcp-1.4.jar,commons-el-1.0.jar,commons-email-1.4.jar,commons-httpclient-3.1.jar,commons-io-2.5.jar,commons-lang-2.6.jar,commons-lang3-3.4.jar,commons-logging-1.2.jar,commons-math3-3.5.jar,commons-net-3.1.jar,commons-pool-1.5.4.jar,commons-pool2-2.4.2.jar,commons-vfs2-2.0.jar,config-1.2.1.jar,curator-client-2.10.0.jar,curator-framework-2.10.0.jar,curator-recipes-2.10.0.jar,d2-1.15.9.jar,data-1.15.9.jar,data-transform-1.15.9.jar,datanucleus-api-jdo-3.2.6.jar,datanucleus-core-3.2.10.jar,datanucleus-rdbms-3.2.9.jar,degrader-1.15.9.jar,derby-10.12.1.1.jar,eigenbase-properties-1.1.5.jar,flyway-core-3.2.1.jar,generator-1.15.9.jar,geronimo-annotation_1.0_spec-1.1.1.jar,geronimo-jaspic_1.0_spec-1.0.jar,geronimo-jpa_3.0_spec-1.0.jar,geronimo-jta_1.1_spec-1.1.1.jar,gobblin-admin-"${gobblin.aws.version}".jar,gobblin-api-"${gobblin.aws.version}".jar,gobblin-aws-"${gobblin.aws.version}".jar,gobblin-azkaban-"${gobblin.aws.version}".jar,gobblin-cluster-"${gobblin.aws.version}".jar,gobblin-compaction-"${gobblin.aws.version}".jar,gobblin-config-client-"${gobblin.aws.version}".jar,gobblin-config-core-"${gobblin.aws.version}".jar,gobblin-core-"${gobblin.aws.version}".jar,gobblin-data-management-"${gobblin.aws.version}".jar,gobblin-example-"${gobblin.aws.version}".jar,gobblin-hive-registration-"${gobblin.aws.version}".jar,gobblin-kafka-"${gobblin.aws.version}".jar,gobblin-metastore-"${gobblin.aws.version}".jar,gobblin-metrics-"${gobblin.aws.version}".jar,gobblin-rest-api-"${gobblin.aws.version}".jar,gobblin-rest-api-data-template-"${gobblin.aws.version}".jar,gobblin-rest-api-rest-client-"${gobblin.aws.version}".jar,gobblin-rest-client-"${gobblin.aws.version}".jar,gobblin-rest-server-"${gobblin.aws.version}".jar,gobblin-runtime-"${gobblin.aws.version}".jar,gobblin-test-harness-"${gobblin.aws.version}".jar,gobblin-tunnel-"${gobblin.aws.version}".jar,gobblin-utility-"${gobblin.aws.version}".jar,gobblin-yarn-"${gobblin.aws.version}".jar,groovy-all-2.1.6.jar,gson-2.6.2.jar,guava-15.0.jar,guava-retrying-2.0.0.jar,guice-4.0.jar,guice-servlet-3.0.jar,hadoop-annotations-2.3.0.jar,hadoop-auth-2.3.0.jar,hadoop-common-2.3.0.jar,hadoop-hdfs-2.3.0.jar,hadoop-mapreduce-client-common-2.3.0.jar,hadoop-mapreduce-client-core-2.3.0.jar,hadoop-yarn-api-2.3.0.jar,hadoop-yarn-client-2.3.0.jar,hadoop-yarn-common-2.3.0.jar,hadoop-yarn-server-common-2.3.0.jar,hamcrest-core-1.1.jar,helix-core-0.6.6-SNAPSHOT.jar,hive-ant-1.0.1.jar,hive-common-1.0.1.jar,hive-exec-1.0.1-core.jar,hive-jdbc-1.0.1.jar,hive-metastore-1.0.1.jar,hive-serde-1.0.1.jar,hive-service-1.0.1.jar,hive-shims-0.20-1.0.1.jar,hive-shims-0.20S-1.0.1.jar,hive-shims-0.23-1.0.1.jar,hive-shims-1.0.1.jar,hive-shims-common-1.0.1.jar,hive-shims-common-secure-1.0.1.jar,httpclient-4.5.2.jar,httpcore-4.4.4.jar,influxdb-java-2.1.jar,jackson-annotations-2.6.0.jar,jackson-core-2.6.6.jar,jackson-core-asl-1.9.13.jar,jackson-databind-2.6.6.jar,jackson-dataformat-cbor-2.6.6.jar,jackson-jaxrs-1.8.3.jar,jackson-mapper-asl-1.9.13.jar,jackson-xc-1.8.3.jar,janino-2.7.6.jar,jansi-1.11.jar,jasper-compiler-5.5.23.jar,jasper-runtime-5.5.23.jar,jasypt-1.9.2.jar,java-xmlbuilder-0.4.jar,javassist-3.18.2-GA.jar,javax.inject-1.jar,javax.mail-1.5.2.jar,javax.servlet-api-3.1.0.jar,jaxb-api-2.2.2.jar,jaxb-impl-2.2.3-1.jar,jcommander-1.48.jar,jdo-api-3.0.1.jar,jdo2-api-2.1.jar,jersey-core-1.9.jar,jersey-guice-1.9.jar,jersey-json-1.9.jar,jersey-server-1.9.jar,jets3t-0.9.0.jar,jettison-1.1.jar,jetty-6.1.26.jar,jetty-all-7.6.0.v20120127.jar,jetty-http-9.2.14.v20151106.jar,jetty-io-9.2.14.v20151106.jar,jetty-security-9.2.14.v20151106.jar,jetty-server-9.2.14.v20151106.jar,jetty-servlet-9.2.14.v20151106.jar,jetty-util-6.1.26.jar,jetty-util-9.2.14.v20151106.jar,jline-0.9.94.jar,joda-time-2.9.3.jar,jopt-simple-3.2.jar,jpam-1.1.jar,jsch-0.1.53.jar,json-20070829.jar,jsp-api-2.1.jar,jsr305-3.0.0.jar,jta-1.1.jar,junit-3.8.1.jar,kafka-clients-0.8.2.2.jar,kafka_2.11-0.8.2.2.jar,li-jersey-uri-1.15.9.jar,libfb303-0.9.0.jar,libthrift-0.9.3.jar,log4j-1.2.17.jar,lombok-1.16.8.jar,lz4-1.2.0.jar,mail-1.4.1.jar,maven-scm-api-1.4.jar,maven-scm-provider-svn-commons-1.4.jar,maven-scm-provider-svnexe-1.4.jar,metrics-core-2.2.0.jar,metrics-core-3.1.0.jar,metrics-graphite-3.1.0.jar,metrics-jvm-3.1.0.jar,mina-core-1.1.7.jar,mockito-core-1.10.19.jar,mysql-connector-java-5.1.38.jar,netty-3.2.3.Final.jar,netty-3.7.0.Final.jar,objenesis-2.1.jar,okhttp-2.4.0.jar,okio-1.4.0.jar,opencsv-2.3.jar,paranamer-2.3.jar,parseq-1.3.6.jar,pegasus-common-1.15.9.jar,pentaho-aggdesigner-algorithm-5.1.5-jhyde.jar,plexus-utils-1.5.6.jar,protobuf-java-2.5.0.jar,quartz-2.2.3.jar,r2-1.15.9.jar,reflections-0.9.10.jar,regexp-1.3.jar,restli-client-1.15.9.jar,restli-common-1.15.9.jar,restli-docgen-1.15.9.jar,restli-netty-standalone-1.15.9.jar,restli-server-1.15.9.jar,restli-tools-1.15.9.jar,retrofit-1.9.0.jar,scala-library-2.11.8.jar,scala-parser-combinators_2.11-1.0.2.jar,scala-xml_2.11-1.0.2.jar,servlet-api-2.5-20081211.jar,servlet-api-2.5.jar,slf4j-api-1.7.21.jar,slf4j-log4j12-1.7.21.jar,snappy-0.3.jar,snappy-java-1.1.1.7.jar,stax-api-1.0-2.jar,stax-api-1.0.1.jar,testng-6.9.10.jar,transaction-api-1.1.jar,velocity-1.7.jar,xmlenc-0.52.jar,zkclient-0.5.jar,zookeeper-3.4.6.jar"
gobblin.aws.master.s3.jars.files="ST4-4.0.4.jar,activation-1.1.1.jar,annotations-2.0.1.jar,ant-1.9.1.jar,ant-launcher-1.9.1.jar,antlr-runtime-3.5.2.jar,aopalliance-1.0.jar,apache-log4j-extras-1.2.17.jar,asm-3.1.jar,asm-commons-3.1.jar,asm-tree-3.1.jar,avro-1.7.7.jar,avro-ipc-1.7.7-tests.jar,avro-ipc-1.7.7.jar,avro-mapred-1.7.7-hadoop2.jar,aws-java-sdk-applicationautoscaling-1.11.8.jar,aws-java-sdk-autoscaling-1.11.8.jar,aws-java-sdk-core-1.11.8.jar,aws-java-sdk-ec2-1.11.8.jar,aws-java-sdk-iam-1.11.8.jar,aws-java-sdk-kms-1.11.8.jar,aws-java-sdk-s3-1.11.8.jar,aws-java-sdk-sts-1.11.8.jar,azkaban-2.5.0.jar,bcpg-jdk15on-1.52.jar,bcprov-jdk15on-1.52.jar,bonecp-0.8.0.RELEASE.jar,bsh-2.0b4.jar,c3p0-0.9.1.1.jar,calcite-avatica-1.2.0-incubating.jar,calcite-core-1.2.0-incubating.jar,calcite-linq4j-1.2.0-incubating.jar,cglib-nodep-2.2.jar,codemodel-2.2.jar,commons-cli-1.3.1.jar,commons-codec-1.10.jar,commons-collections-3.2.1.jar,commons-compiler-2.7.6.jar,commons-compress-1.10.jar,commons-configuration-1.10.jar,commons-daemon-1.0.13.jar,commons-dbcp-1.4.jar,commons-el-1.0.jar,commons-email-1.4.jar,commons-httpclient-3.1.jar,commons-io-2.5.jar,commons-lang-2.6.jar,commons-lang3-3.4.jar,commons-logging-1.2.jar,commons-math3-3.5.jar,commons-net-3.1.jar,commons-pool-1.5.4.jar,commons-pool2-2.4.2.jar,commons-vfs2-2.0.jar,config-1.2.1.jar,curator-client-2.10.0.jar,curator-framework-2.10.0.jar,curator-recipes-2.10.0.jar,d2-1.15.9.jar,data-1.15.9.jar,data-transform-1.15.9.jar,datanucleus-api-jdo-3.2.6.jar,datanucleus-core-3.2.10.jar,datanucleus-rdbms-3.2.9.jar,degrader-1.15.9.jar,derby-10.12.1.1.jar,eigenbase-properties-1.1.5.jar,flyway-core-3.2.1.jar,generator-1.15.9.jar,geronimo-annotation_1.0_spec-1.1.1.jar,geronimo-jaspic_1.0_spec-1.0.jar,geronimo-jpa_3.0_spec-1.0.jar,geronimo-jta_1.1_spec-1.1.1.jar,gobblin-admin-"${gobblin.aws.version}".jar,gobblin-api-"${gobblin.aws.version}".jar,gobblin-aws-"${gobblin.aws.version}".jar,gobblin-azkaban-"${gobblin.aws.version}".jar,gobblin-cluster-"${gobblin.aws.version}".jar,gobblin-compaction-"${gobblin.aws.version}".jar,gobblin-config-client-"${gobblin.aws.version}".jar,gobblin-config-core-"${gobblin.aws.version}".jar,gobblin-core-"${gobblin.aws.version}".jar,gobblin-data-management-"${gobblin.aws.version}".jar,gobblin-example-"${gobblin.aws.version}".jar,gobblin-hive-registration-"${gobblin.aws.version}".jar,gobblin-iceberg-"${gobblin.aws.version}".jar,gobblin-kafka-"${gobblin.aws.version}".jar,gobblin-metastore-"${gobblin.aws.version}".jar,gobblin-metrics-"${gobblin.aws.version}".jar,gobblin-rest-api-"${gobblin.aws.version}".jar,gobblin-rest-api-data-template-"${gobblin.aws.version}".jar,gobblin-rest-api-rest-client-"${gobblin.aws.version}".jar,gobblin-rest-client-"${gobblin.aws.version}".jar,gobblin-rest-server-"${gobblin.aws.version}".jar,gobblin-runtime-"${gobblin.aws.version}".jar,gobblin-test-harness-"${gobblin.aws.version}".jar,gobblin-tunnel-"${gobblin.aws.version}".jar,gobblin-utility-"${gobblin.aws.version}".jar,gobblin-yarn-"${gobblin.aws.version}".jar,groovy-all-2.1.6.jar,gson-2.6.2.jar,guava-15.0.jar,guava-retrying-2.0.0.jar,guice-4.0.jar,guice-servlet-3.0.jar,hadoop-annotations-2.3.0.jar,hadoop-auth-2.3.0.jar,hadoop-common-2.3.0.jar,hadoop-hdfs-2.3.0.jar,hadoop-mapreduce-client-common-2.3.0.jar,hadoop-mapreduce-client-core-2.3.0.jar,hadoop-yarn-api-2.3.0.jar,hadoop-yarn-client-2.3.0.jar,hadoop-yarn-common-2.3.0.jar,hadoop-yarn-server-common-2.3.0.jar,hamcrest-core-1.1.jar,helix-core-0.6.6-SNAPSHOT.jar,hive-ant-1.0.1.jar,hive-common-1.0.1.jar,hive-exec-1.0.1-core.jar,hive-jdbc-1.0.1.jar,hive-metastore-1.0.1.jar,hive-serde-1.0.1.jar,hive-service-1.0.1.jar,hive-shims-0.20-1.0.1.jar,hive-shims-0.20S-1.0.1.jar,hive-shims-0.23-1.0.1.jar,hive-shims-1.0.1.jar,hive-shims-common-1.0.1.jar,hive-shims-common-secure-1.0.1.jar,httpclient-4.5.2.jar,httpcore-4.4.4.jar,influxdb-java-2.1.jar,jackson-annotations-2.6.0.jar,jackson-core-2.6.6.jar,jackson-core-asl-1.9.13.jar,jackson-databind-2.6.6.jar,jackson-dataformat-cbor-2.6.6.jar,jackson-jaxrs-1.8.3.jar,jackson-mapper-asl-1.9.13.jar,jackson-xc-1.8.3.jar,janino-2.7.6.jar,jansi-1.11.jar,jasper-compiler-5.5.23.jar,jasper-runtime-5.5.23.jar,jasypt-1.9.2.jar,java-xmlbuilder-0.4.jar,javassist-3.18.2-GA.jar,javax.inject-1.jar,javax.mail-1.5.2.jar,javax.servlet-api-3.1.0.jar,jaxb-api-2.2.2.jar,jaxb-impl-2.2.3-1.jar,jcommander-1.48.jar,jdo-api-3.0.1.jar,jdo2-api-2.1.jar,jersey-core-1.9.jar,jersey-guice-1.9.jar,jersey-json-1.9.jar,jersey-server-1.9.jar,jets3t-0.9.0.jar,jettison-1.1.jar,jetty-6.1.26.jar,jetty-all-7.6.0.v20120127.jar,jetty-http-9.2.14.v20151106.jar,jetty-io-9.2.14.v20151106.jar,jetty-security-9.2.14.v20151106.jar,jetty-server-9.2.14.v20151106.jar,jetty-servlet-9.2.14.v20151106.jar,jetty-util-6.1.26.jar,jetty-util-9.2.14.v20151106.jar,jline-0.9.94.jar,joda-time-2.9.3.jar,jopt-simple-3.2.jar,jpam-1.1.jar,jsch-0.1.53.jar,json-20070829.jar,jsp-api-2.1.jar,jsr305-3.0.0.jar,jta-1.1.jar,junit-3.8.1.jar,kafka-clients-0.8.2.2.jar,kafka_2.11-0.8.2.2.jar,li-jersey-uri-1.15.9.jar,libfb303-0.9.0.jar,libthrift-0.9.3.jar,log4j-1.2.17.jar,lombok-1.16.8.jar,lz4-1.2.0.jar,mail-1.4.1.jar,maven-scm-api-1.4.jar,maven-scm-provider-svn-commons-1.4.jar,maven-scm-provider-svnexe-1.4.jar,metrics-core-2.2.0.jar,metrics-core-3.1.0.jar,metrics-graphite-3.1.0.jar,metrics-jvm-3.1.0.jar,mina-core-1.1.7.jar,mockito-core-1.10.19.jar,mysql-connector-java-5.1.38.jar,netty-3.2.3.Final.jar,netty-3.7.0.Final.jar,objenesis-2.1.jar,okhttp-2.4.0.jar,okio-1.4.0.jar,opencsv-2.3.jar,paranamer-2.3.jar,parseq-1.3.6.jar,pegasus-common-1.15.9.jar,pentaho-aggdesigner-algorithm-5.1.5-jhyde.jar,plexus-utils-1.5.6.jar,protobuf-java-2.5.0.jar,quartz-2.2.3.jar,r2-1.15.9.jar,reflections-0.9.10.jar,regexp-1.3.jar,restli-client-1.15.9.jar,restli-common-1.15.9.jar,restli-docgen-1.15.9.jar,restli-netty-standalone-1.15.9.jar,restli-server-1.15.9.jar,restli-tools-1.15.9.jar,retrofit-1.9.0.jar,scala-library-2.11.8.jar,scala-parser-combinators_2.11-1.0.2.jar,scala-xml_2.11-1.0.2.jar,servlet-api-2.5-20081211.jar,servlet-api-2.5.jar,slf4j-api-1.7.21.jar,slf4j-log4j12-1.7.21.jar,snappy-0.3.jar,snappy-java-1.1.1.7.jar,stax-api-1.0-2.jar,stax-api-1.0.1.jar,testng-6.9.10.jar,transaction-api-1.1.jar,velocity-1.7.jar,xmlenc-0.52.jar,zkclient-0.5.jar,zookeeper-3.4.6.jar"

gobblin.aws.worker.s3.conf.uri=${gobblin.aws.master.s3.conf.uri}
gobblin.aws.worker.s3.conf.files=${gobblin.aws.master.s3.conf.files}
@@ -53,6 +53,7 @@ dependencies {
compile project(':gobblin-modules:gobblin-grok')
compile project(':gobblin-modules:gobblin-helix')
compile project(':gobblin-hive-registration')
compile project(':gobblin-iceberg')
compile project(':gobblin-modules:gobblin-http')
compile project(':gobblin-modules:gobblin-kafka-08')
//gobblin-kafka-09 is blacklisted for jacocoBuilds. check settings.xml
@@ -1051,6 +1051,12 @@ public class ConfigurationKeys {
public static final String AVRO_SCHEMA_CHECK_STRATEGY_DEFAULT =
"org.apache.gobblin.util.schema_check.AvroSchemaCheckDefaultStrategy";

/**
* Configuration and constant vale for GobblinMetadataChangeEvent
*/
public static final String LIST_DELIMITER_KEY = ",";
public static final String RANGE_DELIMITER_KEY = "-";

/**
* Configuration for emitting task events
*/
@@ -24,6 +24,7 @@ dependencies {
compile project(":gobblin-metrics-libs:gobblin-metrics")
compile project(":gobblin-core")
compile project(":gobblin-data-management")
compile project(":gobblin-iceberg")
compile project(":gobblin-runtime")
compile project(":gobblin-modules:gobblin-kafka-common")
compile externalDependency.orcMapreduce
@@ -0,0 +1,121 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.compaction.action;

import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.compaction.mapreduce.CompactionJobConfigurator;
import org.apache.gobblin.compaction.parser.CompactionPathParser;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.dataset.FileSystemDataset;
import org.apache.gobblin.iceberg.GobblinMCEProducer;
import org.apache.gobblin.iceberg.Utils.IcebergUtils;
import org.apache.gobblin.iceberg.publisher.GobblinMCEPublisher;
import org.apache.gobblin.metadata.OperationType;
import org.apache.gobblin.metadata.SchemaSource;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.util.HadoopUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Metrics;
import org.apache.iceberg.Schema;
import org.apache.iceberg.avro.AvroSchemaUtil;
import org.apache.iceberg.mapping.MappingUtil;
import org.apache.iceberg.mapping.NameMapping;
import org.apache.iceberg.orc.ORCSchemaUtil;
import org.apache.iceberg.shaded.org.apache.orc.TypeDescription;
import org.apache.orc.OrcConf;


/**
* Class responsible for emitting GMCE after compaction is complete
*/
@Slf4j
public class CompactionGMCEPublishingAction implements CompactionCompleteAction<FileSystemDataset> {

public static final String ICEBERG_ID_ATTRIBUTE = "iceberg.id";
public static final String ICEBERG_REQUIRED_ATTRIBUTE = "iceberg.required";
private final State state;
private final CompactionJobConfigurator configurator;
private final Configuration conf;
private EventSubmitter eventSubmitter;

public CompactionGMCEPublishingAction(State state, CompactionJobConfigurator configurator) {
if (!(state instanceof WorkUnitState)) {
throw new UnsupportedOperationException(this.getClass().getName() + " only supports workunit state");
}
this.state = state;
this.configurator = configurator;
this.conf = HadoopUtils.getConfFromState(state);
}

public void onCompactionJobComplete(FileSystemDataset dataset) throws IOException {
if (dataset.isVirtual()) {
return;
}
try (GobblinMCEProducer producer = GobblinMCEProducer.getGobblinMCEProducer(state)) {

CompactionPathParser.CompactionParserResult result = new CompactionPathParser(state).parse(dataset);
String datasetDir = Joiner.on("/").join(result.getDstBaseDir(), result.getDatasetName());
state.setProp(ConfigurationKeys.DATA_PUBLISHER_DATASET_DIR, datasetDir);
producer.sendGMCE(getNewFileMetrics(result), null, Lists.newArrayList(this.configurator.getOldFiles()), null,
OperationType.rewrite_files, SchemaSource.NONE);
}
//clear old files to release memory
this.configurator.getOldFiles().clear();
}

private Map<Path, Metrics> getNewFileMetrics(CompactionPathParser.CompactionParserResult result) {
NameMapping mapping = null;
try {
if (IcebergUtils.getIcebergFormat(state) == FileFormat.ORC) {
String s =
this.configurator.getConfiguredJob().getConfiguration().get(OrcConf.MAPRED_OUTPUT_SCHEMA.getAttribute());
TypeDescription orcSchema = TypeDescription.fromString(s);
for (int i = 0; i <= orcSchema.getMaximumId(); i++) {
orcSchema.findSubtype(i).setAttribute(ICEBERG_ID_ATTRIBUTE, Integer.toString(i));
orcSchema.findSubtype(i).setAttribute(ICEBERG_REQUIRED_ATTRIBUTE, Boolean.toString(true));
}
Schema icebergSchema = ORCSchemaUtil.convert(orcSchema);
state.setProp(GobblinMCEPublisher.AVRO_SCHEMA_WITH_ICEBERG_ID,
AvroSchemaUtil.convert(icebergSchema.asStruct()).toString());
mapping = MappingUtil.create(icebergSchema);
}
} catch (Exception e) {
log.warn(
"Table {} contains complex union type which is not compatible with iceberg, will not calculate the metrics for it",
result.getDatasetName());
}
Map<Path, Metrics> newFiles = new HashMap<>();
for (Path filePath : this.configurator.getDstNewFiles()) {
newFiles.put(filePath, GobblinMCEPublisher.getMetrics(state, filePath, conf, mapping));
}
return newFiles;
}

public void addEventSubmitter(EventSubmitter submitter) {
this.eventSubmitter = submitter;
}
}

0 comments on commit 4e2dc94

Please sign in to comment.