From 0330b533b41aad115c915465756b4eb289d3b40f Mon Sep 17 00:00:00 2001 From: tcodehuber Date: Thu, 22 Feb 2024 11:18:19 +0800 Subject: [PATCH] [AMORO-1812] Support spark-based external optimizer (#2421) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * [AMORO-1812] support spark-based external optimizer * resolve code style error * [AMORO-1951] Support parallelized planning in one optimizer group (#2282) * [AMORO-1951] Support parallelized planning in one optimizer group * [AMORO-1951] add unit test for OptimizingQueue and DefaultOptimizingService * [AMORO-1951] optimize default parameters * fix bugs * fix warnings and spotless issues * merge from #2290 * add apache license and fix spotless * fix config error * Update ams/server/src/main/java/com/netease/arctic/server/DefaultOptimizingService.java Co-authored-by: ZhouJinsong * add annotations * fix compile errors * fix import problem * remove isDebugEnabled() * spotless apply * Update ArcticManagementConf.java * fix reboot bug and supply document content * use MoreObjects.toStringHelper for OptimizerThread.java * Merged from [AMORO-2376] Print right log info after calculating and sorting tables * fix import problem * remove unused codes * spotless * remove incorrect comments * add max-planning-parallelism to config --------- Co-authored-by: majin1102 Co-authored-by: ZhouJinsong * [AMORO-2378] The optimizer based on Flink on YARN should prioritize loading the optimizer-job.jar (#2379) * load optimizer jar first * fix code style * change config name * add config taskmanager.memory.managed.fraction * fix * [AMORO-2222] [Improvement]: Skip cleaning up dangling delete files for Iceberg V1 table (#2361) * [AMORO-2222] [Improvement]: Skip cleaning up dangling delete files for Iceberg V1 table * Update IcebergTableMaintainer.java The `total-delete-files` could be 0. --------- Co-authored-by: wangtaohz <103108928+wangtaohz@users.noreply.github.com> * [AMORO-2404] fix Mixed Hive table mistakenly deletes hive files during expiring snapshots (#2405) get hive locations return the uri path * [AMORO-2407] Fix access data file from dashboard of non-partitioned table (#2408) * fix null partition * fix listing files of non-partitioned iceberg table * [AMORO-2383] Add serialVersionUID to RewriteFilesInput (#2384) * add serialVersionUID * fix comment * [AMORO-1720] Fix Mixed Format KeyedTable expiring all the snapshots with optimized sequence (#2394) * should not expire the latest snapshot contains optimized sequence * add visible for testing * add fetchLatestNonOptimizedSnapshotTime for base store * get hive locations return the uri path * refactor codes and fix comments * improve for exclude files is empty for expring snapshots --------- Co-authored-by: ZhouJinsong * [AMORO-2386][AMS] Configure `iceberg.worker.num-threads` in the config.yaml (#2393) * [AMORO-2386][AMS] Configure `iceberg.worker.num-threads` in the config.yaml * Fix * [AMORO-2386][AMS] reuse config `table-manifest-io.thread-count` and reuse thread pool * Add comment * [AMORO-1716] [Improvement]: sort the table list returned by server (#2362) * improve: sort the table list returned by server * optimize: sort tables by format * optimize: optimiz tables sorting * style: udpate comment --------- Co-authored-by: chenyuzhi Co-authored-by: ZhouJinsong * [HotFix] Re-add table-filter to Server ExternalCatalog (#2310) * re add table filter * implement in external catalog * add ut case * fix comment * fix comment * fix comment * fix ut * fix update properties * roll back the engine side's filter * resolve conflicts * add ut --------- Co-authored-by: baiyangtx Co-authored-by: ZhouJinsong * [AMORO-2299]: Cancel the running optimizing process from ams web console (#2297) * cancel the running opimizing process from ams web console * refact code to avoid NPE * add o comment for com.netease.arctic.server.table.TableService#getServerTableIdentifier * change the cancel post api to be more restful style * [AMORO-2415] Print GC date stamps (#2416) add gc timestamp * Update wrong comments in SnapshotsExpiringExecutor.java (#2422) * [AMORO-2276]: UnifiiedCatalog for Spark Engine (#2269) * Add UnifiedSparkCatalog under spark common module * Extract MixedSparkCatalogBase and MixedSparkSessionCatalogBase to spark common module * Refactor spark unit test framework to adapt unifed catalog tests and mixed format tests. * [AMORO-2261] Extract the deleting dangling files from the cleaning orphan files (#2403) * [Improvement]: Extract the deleting dangling files from the cleaning orphan files * [Improvement]: Extract the deleting dangling files from the cleaning orphan files * [Improvement]: Extract the deleting dangling files from the cleaning orphan files * [AMORO-1341] [Flink]: Support UnifiedCatalog to contain Mixed format table in Flink Engine (#2370) * [AMORO-1341] [Flink]: Support UnifiedCatalog to contain Mixed format table in Flink Engine * [AMORO-2413] Need to select the first db after switching to another Catalog (#2419) * fix: If the current catalog is not the one in the query, the first db is selected by default. * build dashboard frontend --------- Co-authored-by: wangtao * [HotFix] Fix loading the optimizing snapshot id of change store for Mixed Format KeyedTable (#2430) fix load target change snapshot id * [AMORO-2260] Show the format version of iceberg table (#2425) [AMORO-2260] Show the format version of Iceberg Table Signed-off-by: tcodehuber * [AMORO-2115] Support displaying Optimizing tasks (#2322) * dashboard: rename optimized to optimizing * dashboard: support optimizing taskes * add optimizer token * dashboard: modify column width * dashboard: build * sort the metrics field and change record cnt to long * modify MetricsSummary Compatibility * dashbard: build * Update ams/server/src/main/java/com/netease/arctic/server/optimizing/TaskRuntime.java Co-authored-by: Qishang Zhong * fix * support input metrics and output metrics for optimizing process * dashboard: support optimizing metrics * dashbard: build * dashboard:rebuild * support MetricsSummary to map * optimizing task supports input output * dashboard: optimizing tasks support input and output * dashboard: not display seconds when longer than 1 hour * dashboard: optimizing process show summary * remove useless import * dashboard: build * as head * dashbard: build * change process status to CLOSED after cancel process * remove useless log * dashboard: refresh after cancelled * support cancel optimizing tasks * dashboard: handle exception when can't cancel optimizing process * throw exception when can't cancel optimizing process * dashboard: build * dashboard: refresh optimizing process when exist optimizing detail page * dashboard: build * fix cost time is 0ms * change metrics name * fix task startTime and endTime * fix costTime * using Preconditions.checkArgument * fix task reset * add comments * cancel tasks before closing optimizing process * fix unit test * fix cancel task * as head * Revert "as head" This reverts commit e469e718ad34b0daf903e58bf00fa874aca2d6a2. * dashboard: build --------- Co-authored-by: Qishang Zhong * [AMORO-2385] Make the maximum input file size for per optimize thread configurable (#2387) * add config self-optimizing.max-input-file-size-per-thread * add doc * add resource group property max-input-file-size-per-thread * add doc * fix compile * [Hotfix] Add database filter to Server ExternalCatalog (#2414) * [Hotfix] Add database filter to Server ExternalCatalog * [Hotfix] Add database filter to Server ExternalCatalog * Rename config database.filter-regular-expression to database-filter --------- Co-authored-by: baiyangtx * [AMORO-2423] [Flink]: Using 'mixed_iceberg' and 'mixed_hive' indentifier to CREATE CATALOG and deprecate 'arctic' identifier (#2424) * [AMORO-2423] [Flink]: Using 'mixed_iceberg' and 'mixed_hive' identifiers to CREATE CATALOG and deprecate 'arctic' identifier * [AMORO-2316] The Files page supports filtering by partition name and sorting by dictionary value. (#2420) * AMORO-2316: The Files page supports filtering by partition name and sorting by dictionary value. * build dashboard frontend * dashboard: build --------- Co-authored-by: wangtao * [AMORO-1892] Improve the SQL Shortcuts in Terminal Web UI (#2434) * [AMORO-1892] Improve the SQL Shortcuts in Terminal Web UI * refactor some code --------- Signed-off-by: tcodehuber * [AMORO-2440] Fix the batch deletion of Change Store files for Mixed Format Table (#2439) fix remove change files * [AMORO-2418] Exclude kryo dependency from flink-optimizer (#2437) exclude kryo * [AMORO-2441] Fix `TableEntriesScan` when no file format is specified in the file name (#2442) * fix TableEntriesScan without format suffix * using the file format from entries * [Hotfix] Fix fetchLatestNonOptimizedSnapshotTime (#2396) * fix fetchLatestNonOptimizedSnapshotTime * fix * spotless * fix ut * rename data-expire.since * using UTC zone for snapshot timestamp * resolve conflict * rerview * spotless * review * review * [AMORO-2344] [Flink]: Support UnifiedCatalog to contain Iceberg format table in Flink Engine (#2427) * [AMORO-2330] Improve major plan (#2332) * [AMORO-2330][AMS] Improve major plan * [AMORO-2330][AMS] remove filter dataFileWith1Pos * Fix comments * Fix * Move the rollback logic of undersized segment to the split task stage * Rename * Fix * Rollback method name `fileShouldRewrite` * Rollback mixed table format full trigger condition & reuse `isUndersizedSegmentFile` * Rollback testSegmentFilesBase() * TreeNodeTaskSplitter logic keep same with bin-pack * Improve code duplicate --------- Co-authored-by: ZhouJinsong * [AMORO-1810] Check the validity of the heatbeat interval when an optiā€¦ (#2432) * [AMORO-1810] Check the validity of the heatbeat interval when an optimizer start * adjust the import way * resolve some logic code * refactor code * refactor code * fix ut error * resolve ut error * fix ut error * fix ut error * fix ut error * fix ci error * refactor code * refactor code * refactor code * [AMORO-1812] support spark-based external optimizer * resolve code style error * refactor code * refactor code * bugfix * refactor code * refactor code * code style * bugfix * bugfix --------- Signed-off-by: tcodehuber Co-authored-by: JinMat Co-authored-by: majin1102 Co-authored-by: ZhouJinsong Co-authored-by: wangzeyu Co-authored-by: ConradJam Co-authored-by: wangtaohz <103108928+wangtaohz@users.noreply.github.com> Co-authored-by: yeatsliao Co-authored-by: Qishang Zhong Co-authored-by: chenyuzhi459 <553673833@qq.com> Co-authored-by: chenyuzhi Co-authored-by: HuangFru <68625618+HuangFru@users.noreply.github.com> Co-authored-by: baiyangtx Co-authored-by: xujiangfeng001 <104614523+xujiangfeng001@users.noreply.github.com> Co-authored-by: Xianxun Ye Co-authored-by: liuweimin Co-authored-by: wangtao Co-authored-by: Xavier Bai --- ams/dist/pom.xml | 6 + ams/dist/src/main/assemblies/bin.xml | 10 +- .../common/AbstractOptimizerOperator.java | 4 +- .../arctic/optimizer/common/Optimizer.java | 13 +- .../optimizer/common/OptimizerExecutor.java | 73 ++++--- ams/optimizer/pom.xml | 1 + ams/optimizer/spark-optimizer/pom.xml | 186 ++++++++++++++++++ .../optimizer/spark/SparkOptimizer.java | 65 ++++++ .../spark/SparkOptimizerExecutor.java | 92 +++++++++ .../spark/SparkOptimizingTaskFunction.java | 47 +++++ .../arctic/optimizing/RewriteFilesInput.java | 20 +- 11 files changed, 473 insertions(+), 44 deletions(-) create mode 100644 ams/optimizer/spark-optimizer/pom.xml create mode 100644 ams/optimizer/spark-optimizer/src/main/java/com/netease/arctic/optimizer/spark/SparkOptimizer.java create mode 100644 ams/optimizer/spark-optimizer/src/main/java/com/netease/arctic/optimizer/spark/SparkOptimizerExecutor.java create mode 100644 ams/optimizer/spark-optimizer/src/main/java/com/netease/arctic/optimizer/spark/SparkOptimizingTaskFunction.java diff --git a/ams/dist/pom.xml b/ams/dist/pom.xml index 1cd8e05162..9ab4fbfac7 100644 --- a/ams/dist/pom.xml +++ b/ams/dist/pom.xml @@ -52,6 +52,12 @@ ${project.version} compile + + com.netease.amoro + spark-optimizer + ${project.version} + compile + diff --git a/ams/dist/src/main/assemblies/bin.xml b/ams/dist/src/main/assemblies/bin.xml index a96042a934..816c1ed9d3 100644 --- a/ams/dist/src/main/assemblies/bin.xml +++ b/ams/dist/src/main/assemblies/bin.xml @@ -36,6 +36,14 @@ optimizer-job.jar 0644 + + + ../optimizer/spark-optimizer/target/spark-optimizer-${project.version}.jar + + plugin/optimizer/spark + optimizer-job.jar + 0644 + ../server/target/amoro-ams-server-${project.version}.jar lib/ @@ -81,4 +89,4 @@ 0644 - + \ No newline at end of file diff --git a/ams/optimizer/common/src/main/java/com/netease/arctic/optimizer/common/AbstractOptimizerOperator.java b/ams/optimizer/common/src/main/java/com/netease/arctic/optimizer/common/AbstractOptimizerOperator.java index 7d9fb72efd..f71552bd47 100644 --- a/ams/optimizer/common/src/main/java/com/netease/arctic/optimizer/common/AbstractOptimizerOperator.java +++ b/ams/optimizer/common/src/main/java/com/netease/arctic/optimizer/common/AbstractOptimizerOperator.java @@ -163,11 +163,11 @@ protected void waitAShortTime(long waitTime) { } } - interface AmsCallOperation { + protected interface AmsCallOperation { T call(OptimizingService.Iface client) throws TException; } - interface AmsAuthenticatedCallOperation { + protected interface AmsAuthenticatedCallOperation { T call(OptimizingService.Iface client, String token) throws TException; } } diff --git a/ams/optimizer/common/src/main/java/com/netease/arctic/optimizer/common/Optimizer.java b/ams/optimizer/common/src/main/java/com/netease/arctic/optimizer/common/Optimizer.java index 4daca72b1e..b00c2ce026 100644 --- a/ams/optimizer/common/src/main/java/com/netease/arctic/optimizer/common/Optimizer.java +++ b/ams/optimizer/common/src/main/java/com/netease/arctic/optimizer/common/Optimizer.java @@ -23,6 +23,8 @@ import org.slf4j.LoggerFactory; import java.util.Arrays; +import java.util.function.IntFunction; +import java.util.function.Supplier; import java.util.stream.IntStream; public class Optimizer { @@ -33,11 +35,18 @@ public class Optimizer { private final OptimizerExecutor[] executors; public Optimizer(OptimizerConfig config) { + this(config, () -> new OptimizerToucher(config), (i) -> new OptimizerExecutor(config, i)); + } + + protected Optimizer( + OptimizerConfig config, + Supplier toucherFactory, + IntFunction executorFactory) { this.config = config; - this.toucher = new OptimizerToucher(config); + this.toucher = toucherFactory.get(); this.executors = new OptimizerExecutor[config.getExecutionParallel()]; IntStream.range(0, config.getExecutionParallel()) - .forEach(i -> executors[i] = new OptimizerExecutor(config, i)); + .forEach(i -> executors[i] = executorFactory.apply(i)); if (config.getResourceId() != null) { toucher.withRegisterProperty(OptimizerProperties.RESOURCE_ID, config.getResourceId()); } diff --git a/ams/optimizer/common/src/main/java/com/netease/arctic/optimizer/common/OptimizerExecutor.java b/ams/optimizer/common/src/main/java/com/netease/arctic/optimizer/common/OptimizerExecutor.java index 5182ef524f..c0f75459ed 100644 --- a/ams/optimizer/common/src/main/java/com/netease/arctic/optimizer/common/OptimizerExecutor.java +++ b/ams/optimizer/common/src/main/java/com/netease/arctic/optimizer/common/OptimizerExecutor.java @@ -99,24 +99,50 @@ private boolean ackTask(OptimizingTask task) { } } + protected OptimizingTaskResult executeTask(OptimizingTask task) { + return executeTask(getConfig(), getThreadId(), task, LOG); + } + + protected void completeTask(OptimizingTaskResult optimizingTaskResult) { + try { + callAuthenticatedAms( + (client, token) -> { + client.completeTask(token, optimizingTaskResult); + return null; + }); + LOG.info( + "Optimizer executor[{}] completed task[{}] to ams", + threadId, + optimizingTaskResult.getTaskId()); + } catch (TException exception) { + LOG.error( + "Optimizer executor[{}] completed task[{}] failed", + threadId, + optimizingTaskResult.getTaskId(), + exception); + } + } + @SuppressWarnings({"rawtypes", "unchecked"}) - private OptimizingTaskResult executeTask(OptimizingTask task) { + public static OptimizingTaskResult executeTask( + OptimizerConfig config, int threadId, OptimizingTask task, Logger logger) { + long startTime = System.currentTimeMillis(); + TableOptimizing.OptimizingInput input = null; try { OptimizingInputProperties properties = OptimizingInputProperties.parse(task.getProperties()); + input = SerializationUtil.simpleDeserialize(task.getTaskInput()); String executorFactoryImpl = properties.getExecutorFactoryImpl(); - TableOptimizing.OptimizingInput input = - SerializationUtil.simpleDeserialize(task.getTaskInput()); DynConstructors.Ctor ctor = DynConstructors.builder(OptimizingExecutorFactory.class) .impl(executorFactoryImpl) .buildChecked(); OptimizingExecutorFactory factory = ctor.newInstance(); - if (getConfig().isExtendDiskStorage()) { + if (config.isExtendDiskStorage()) { properties.enableSpillMap(); } - properties.setMaxSizeInMemory(getConfig().getMemoryStorageSize() * 1024 * 1024); - properties.setSpillMapPath(getConfig().getDiskStoragePath()); + properties.setMaxSizeInMemory(config.getMemoryStorageSize() * 1024 * 1024); + properties.setSpillMapPath(config.getDiskStoragePath()); factory.initialize(properties.getProperties()); OptimizingExecutor executor = factory.createExecutor(input); @@ -125,33 +151,24 @@ private OptimizingTaskResult executeTask(OptimizingTask task) { OptimizingTaskResult result = new OptimizingTaskResult(task.getTaskId(), threadId); result.setTaskOutput(outputByteBuffer); result.setSummary(output.summary()); - LOG.info("Optimizer executor[{}] executed task[{}]", threadId, task.getTaskId()); + logger.info( + "Optimizer executor[{}] executed task[{}]({}) and cost {}", + threadId, + task.getTaskId(), + input, + System.currentTimeMillis() - startTime); return result; } catch (Throwable t) { - LOG.error("Optimizer executor[{}] executed task[{}] failed", threadId, task.getTaskId(), t); + logger.error( + "Optimizer executor[{}] executed task[{}]({}) failed and cost {}", + threadId, + task.getTaskId(), + input, + System.currentTimeMillis() - startTime, + t); OptimizingTaskResult errorResult = new OptimizingTaskResult(task.getTaskId(), threadId); errorResult.setErrorMessage(ExceptionUtil.getErrorMessage(t, 4000)); return errorResult; } } - - private void completeTask(OptimizingTaskResult optimizingTaskResult) { - try { - callAuthenticatedAms( - (client, token) -> { - client.completeTask(token, optimizingTaskResult); - return null; - }); - LOG.info( - "Optimizer executor[{}] completed task[{}] to ams", - threadId, - optimizingTaskResult.getTaskId()); - } catch (TException exception) { - LOG.error( - "Optimizer executor[{}] completed task[{}] failed", - threadId, - optimizingTaskResult.getTaskId(), - exception); - } - } } diff --git a/ams/optimizer/pom.xml b/ams/optimizer/pom.xml index 8bf6ea6d11..edce4ab786 100644 --- a/ams/optimizer/pom.xml +++ b/ams/optimizer/pom.xml @@ -34,6 +34,7 @@ flink-optimizer + spark-optimizer standalone-optimizer common diff --git a/ams/optimizer/spark-optimizer/pom.xml b/ams/optimizer/spark-optimizer/pom.xml new file mode 100644 index 0000000000..a0f6cc7897 --- /dev/null +++ b/ams/optimizer/spark-optimizer/pom.xml @@ -0,0 +1,186 @@ + + + + + + amoro-optimizer + com.netease.amoro + 0.7.0-SNAPSHOT + ../pom.xml + + 4.0.0 + + spark-optimizer + Amoro Project AMS Spark Optimizer + https://amoro.netease.com + + + 3.3.2 + 2.12 + + + + + com.netease.amoro + optimizer-common + ${project.version} + + + + + org.apache.spark + spark-core_${spark-optimizer.scala-version} + ${spark-optimizer.spark-version} + provided + + + slf4j-api + org.slf4j + + + + + + org.apache.spark + spark-sql_${spark-optimizer.scala-version} + ${spark-optimizer.spark-version} + provided + + + org.apache.parquet + parquet-column + + + org.apache.parquet + parquet-hadoop + + + org.apache.parquet + parquet-hadoop + + + org.slf4j + slf4j-api + + + org.apache.avro + avro + + + org.apache.arrow + arrow-memory-core + + + org.apache.arrow + arrow-memory-netty + + + org.apache.arrow + arrow-vector + + + org.apache.orc + orc-core + + + org.apache.orc + orc-mapreduce + + + + + + + + + org.apache.maven.plugins + maven-shade-plugin + + + package + + shade + + + false + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + + + + + org.apache.parquet + com.netease.arctic.shade.org.apache.parquet + + + ${project.artifactId}-${project.version} + + + + + + org.apache.maven.plugins + maven-deploy-plugin + + true + + + + + + + + optimizer.spark3.3 + + + !optimizer.spark + + + + 3.3.2 + 2.12 + + + + optimizer.spark3.2 + + + optimizer.spark + 3.2 + + + + 3.2.2 + 2.12 + + + + \ No newline at end of file diff --git a/ams/optimizer/spark-optimizer/src/main/java/com/netease/arctic/optimizer/spark/SparkOptimizer.java b/ams/optimizer/spark-optimizer/src/main/java/com/netease/arctic/optimizer/spark/SparkOptimizer.java new file mode 100644 index 0000000000..102b7f26b0 --- /dev/null +++ b/ams/optimizer/spark-optimizer/src/main/java/com/netease/arctic/optimizer/spark/SparkOptimizer.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.netease.arctic.optimizer.spark; + +import com.netease.arctic.ams.api.resource.Resource; +import com.netease.arctic.optimizer.common.Optimizer; +import com.netease.arctic.optimizer.common.OptimizerConfig; +import com.netease.arctic.optimizer.common.OptimizerToucher; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.util.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** The {@code SparkOptimizer} acts as an entrypoint of the spark program */ +public class SparkOptimizer extends Optimizer { + private static final Logger LOG = LoggerFactory.getLogger(SparkOptimizer.class); + private static final String APP_NAME = "amoro-spark-optimizer"; + + public SparkOptimizer(OptimizerConfig config, JavaSparkContext jsc) { + super( + config, + () -> new OptimizerToucher(config), + (i) -> new SparkOptimizerExecutor(jsc, config, i)); + } + + public static void main(String[] args) throws Exception { + SparkSession spark = SparkSession.builder().appName(APP_NAME).getOrCreate(); + JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext()); + OptimizerConfig config = new OptimizerConfig(args); + if (!jsc.getConf().getBoolean("spark.dynamicAllocation.enabled", false)) { + LOG.warn( + "To better utilize computing resources, it is recommended to enable 'spark.dynamicAllocation.enabled' " + + "and set 'spark.dynamicAllocation.maxExecutors' equal to 'OPTIMIZER_EXECUTION_PARALLEL'"); + } + + // calculate optimizer memory allocation + int driverMemory = Utils.memoryStringToMb(jsc.getConf().get("spark.driver.memory", "1g")); + int executorMemory = Utils.memoryStringToMb(jsc.getConf().get("spark.executor.memory", "1g")); + config.setMemorySize(driverMemory + config.getExecutionParallel() * executorMemory); + + SparkOptimizer optimizer = new SparkOptimizer(config, jsc); + OptimizerToucher toucher = optimizer.getToucher(); + toucher.withRegisterProperty(Resource.PROPERTY_JOB_ID, spark.sparkContext().applicationId()); + + LOG.info("Starting the spark optimizer with configuration:{}", config); + optimizer.startOptimizing(); + } +} diff --git a/ams/optimizer/spark-optimizer/src/main/java/com/netease/arctic/optimizer/spark/SparkOptimizerExecutor.java b/ams/optimizer/spark-optimizer/src/main/java/com/netease/arctic/optimizer/spark/SparkOptimizerExecutor.java new file mode 100644 index 0000000000..9f6dbc9747 --- /dev/null +++ b/ams/optimizer/spark-optimizer/src/main/java/com/netease/arctic/optimizer/spark/SparkOptimizerExecutor.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.netease.arctic.optimizer.spark; + +import com.netease.arctic.ams.api.OptimizingTask; +import com.netease.arctic.ams.api.OptimizingTaskResult; +import com.netease.arctic.optimizer.common.OptimizerConfig; +import com.netease.arctic.optimizer.common.OptimizerExecutor; +import com.netease.arctic.optimizing.RewriteFilesInput; +import com.netease.arctic.optimizing.TableOptimizing; +import com.netease.arctic.utils.ExceptionUtil; +import com.netease.arctic.utils.SerializationUtil; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.spark.api.java.JavaSparkContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +/** + * The {@code SparkOptimizerExecutor} takes OptimizingTask from AMS and wraps it as a spark job, + * then submit to the spark environment. + */ +public class SparkOptimizerExecutor extends OptimizerExecutor { + private static final Logger LOG = LoggerFactory.getLogger(SparkOptimizerExecutor.class); + private final JavaSparkContext jsc; + private final int threadId; + + public SparkOptimizerExecutor(JavaSparkContext jsc, OptimizerConfig config, int threadId) { + super(config, threadId); + this.jsc = jsc; + this.threadId = threadId; + } + + @Override + protected OptimizingTaskResult executeTask(OptimizingTask task) { + OptimizingTaskResult result; + String threadName = Thread.currentThread().getName(); + try { + long startTime = System.currentTimeMillis(); + ImmutableList of = ImmutableList.of(task); + jsc.setJobDescription(jobDescription(task)); + SparkOptimizingTaskFunction taskFunction = + new SparkOptimizingTaskFunction(getConfig(), threadId); + List results = jsc.parallelize(of, 1).map(taskFunction).collect(); + result = results.get(0); + LOG.info( + "Optimizer executor[{}] executed task[{}] and cost {}", + threadName, + task.getTaskId(), + System.currentTimeMillis() - startTime); + return result; + } catch (Throwable r) { + LOG.error( + "Optimizer executor[{}] executed task[{}] failed, and cost {}", threadName, task, r); + result = new OptimizingTaskResult(task.getTaskId(), threadId); + result.setErrorMessage(ExceptionUtil.getErrorMessage(r, 4000)); + return result; + } + } + + private String jobDescription(OptimizingTask task) { + String description; + TableOptimizing.OptimizingInput input = + SerializationUtil.simpleDeserialize(task.getTaskInput()); + if (input instanceof RewriteFilesInput) { + description = + String.format( + "Amoro rewrite files task, table name:%s, task id:%s", + ((RewriteFilesInput) input).getTable().name(), task.getTaskId()); + } else { + throw new IllegalArgumentException("Unsupported task:" + input.getClass()); + } + return description; + } +} diff --git a/ams/optimizer/spark-optimizer/src/main/java/com/netease/arctic/optimizer/spark/SparkOptimizingTaskFunction.java b/ams/optimizer/spark-optimizer/src/main/java/com/netease/arctic/optimizer/spark/SparkOptimizingTaskFunction.java new file mode 100644 index 0000000000..e3c95547fd --- /dev/null +++ b/ams/optimizer/spark-optimizer/src/main/java/com/netease/arctic/optimizer/spark/SparkOptimizingTaskFunction.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.netease.arctic.optimizer.spark; + +import com.netease.arctic.ams.api.OptimizingTask; +import com.netease.arctic.ams.api.OptimizingTaskResult; +import com.netease.arctic.optimizer.common.OptimizerConfig; +import com.netease.arctic.optimizer.common.OptimizerExecutor; +import org.apache.spark.api.java.function.Function; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The {@code SparkOptimizingTaskExecuteFunction} defines the whole processing logic that how to + * execute {@code OptimizingTask} + */ +public class SparkOptimizingTaskFunction implements Function { + private static final Logger LOG = LoggerFactory.getLogger(SparkOptimizingTaskFunction.class); + private final OptimizerConfig config; + private final int threadId; + + public SparkOptimizingTaskFunction(OptimizerConfig config, int threadId) { + this.config = config; + this.threadId = threadId; + } + + @Override + public OptimizingTaskResult call(OptimizingTask task) { + return OptimizerExecutor.executeTask(config, threadId, task, LOG); + } +} diff --git a/core/src/main/java/com/netease/arctic/optimizing/RewriteFilesInput.java b/core/src/main/java/com/netease/arctic/optimizing/RewriteFilesInput.java index 7b65ac80f9..2377e20be2 100644 --- a/core/src/main/java/com/netease/arctic/optimizing/RewriteFilesInput.java +++ b/core/src/main/java/com/netease/arctic/optimizing/RewriteFilesInput.java @@ -25,6 +25,7 @@ import org.apache.iceberg.ContentFile; import org.apache.iceberg.DataFile; import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; import java.util.ArrayList; import java.util.Arrays; @@ -153,16 +154,13 @@ public ArcticTable getTable() { @Override public String toString() { - return "RewriteFilesInput{" - + "rewrittenDataFilesSize=" - + (rewrittenDataFiles == null ? 0 : rewrittenDataFiles.length) - + ", rePosDeletedDataFilesSize=" - + (rePosDeletedDataFiles == null ? 0 : rePosDeletedDataFiles.length) - + ", readOnlyDeleteFilesSize=" - + (readOnlyDeleteFiles == null ? 0 : readOnlyDeleteFiles.length) - + ", rewrittenDeleteFilesSize=" - + (rewrittenDeleteFiles == null ? 0 : rewrittenDeleteFiles.length) - + "} " - + super.toString(); + return MoreObjects.toStringHelper(this) + .add("rewrittenDataFilesSize", rewrittenDataFiles.length) + .add("rePosDeletedDataFilesSize", rePosDeletedDataFiles.length) + .add("readOnlyDeleteFilesSize", readOnlyDeleteFiles.length) + .add("rewrittenDeleteFilesSize", rewrittenDeleteFiles.length) + .add("table", table.name()) + .addValue(super.toString()) + .toString(); } }