Skip to content

Commit

Permalink
Merge branch 'master' into thrift
Browse files Browse the repository at this point in the history
  • Loading branch information
zhoujinsong committed May 6, 2024
2 parents 0873fc4 + 75ac177 commit 0cb0e08
Show file tree
Hide file tree
Showing 26 changed files with 1,766 additions and 116 deletions.
12 changes: 6 additions & 6 deletions .github/workflows/docker-images.yml
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ jobs:
echo "MVN_HADOOP=-Dhadoop=${{ matrix.hadoop }}" >> $GITHUB_ENV
- name: Build dist module with Maven
run: mvn clean install -pl 'ams/dist' -am -e ${MVN_HADOOP} -DskipTests -B -ntp
run: mvn clean install -pl 'amoro-ams/dist' -am -e ${MVN_HADOOP} -DskipTests -B -ntp

- name: Build and Push Amoro Docker Image
uses: docker/build-push-action@v4
Expand Down Expand Up @@ -160,7 +160,7 @@ jobs:
&& echo "AMORO_VERSION=${AMORO_VERSION}" >> $GITHUB_OUTPUT
- name: Build optimizer module with Maven
run: mvn clean package -pl 'ams/optimizer/flink-optimizer' -am -e ${OPTIMIZER_FLINK} -DskipTests -B -ntp
run: mvn clean package -pl 'amoro-ams/amoro-ams-optimizer/flink-optimizer' -am -e ${OPTIMIZER_FLINK} -DskipTests -B -ntp

- name: Build and Push Flink Optimizer Docker Image
uses: docker/build-push-action@v4
Expand All @@ -174,7 +174,7 @@ jobs:
tags: ${{ steps.meta.outputs.tags }}
build-args: |
FLINK_VERSION=${{ matrix.flink }}
OPTIMIZER_JOB=ams/optimizer/flink-optimizer/target/flink-optimizer-${{ env.AMORO_VERSION }}-jar-with-dependencies.jar
OPTIMIZER_JOB=amoro-ams/amoro-ams-optimizer/flink-optimizer/target/flink-optimizer-${{ env.AMORO_VERSION }}-jar-with-dependencies.jar
docker-optimizer-spark:
name: Push Amoro Optimizer-Spark Docker Image to Docker Hub
Expand Down Expand Up @@ -232,7 +232,7 @@ jobs:
&& echo "AMORO_VERSION=${AMORO_VERSION}" >> $GITHUB_OUTPUT
- name: Build optimizer module with Maven
run: mvn clean package -pl 'ams/optimizer/spark-optimizer' -am -e ${OPTIMIZER_SPARK} -DskipTests -B -ntp
run: mvn clean package -pl 'amoro-ams/amoro-ams-optimizer/spark-optimizer' -am -e ${OPTIMIZER_SPARK} -DskipTests -B -ntp

- name: Build and Push Spark Optimizer Docker Image
uses: docker/build-push-action@v4
Expand All @@ -246,7 +246,7 @@ jobs:
tags: ${{ steps.meta.outputs.tags }}
build-args: |
SPARK_VERSION=${{ matrix.spark }}
OPTIMIZER_JOB=ams/optimizer/spark-optimizer/target/spark-optimizer-${{ env.AMORO_VERSION }}-jar-with-dependencies.jar
OPTIMIZER_JOB=amoro-ams/amoro-ams-optimizer/spark-optimizer/target/spark-optimizer-${{ env.AMORO_VERSION }}-jar-with-dependencies.jar
docker-quickdemo:
Expand Down Expand Up @@ -295,7 +295,7 @@ jobs:
password: ${{ secrets.DOCKERHUB_TOKEN }}

- name: Build optimizer module with Maven
run: mvn clean package -pl 'mixed/flink/v1.17/flink-runtime' -am -e -DskipTests -B -ntp
run: mvn clean package -pl 'amoro-mixed-format/amoro-mixed-format-flink/v1.17/amoro-mixed-format-flink-runtime-1.17' -am -e -DskipTests -B -ntp

- name: Build and Push Flink Optimizer Docker Image
uses: docker/build-push-action@v4
Expand Down
60 changes: 60 additions & 0 deletions .github/workflows/helm-charts-ci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#


# This workflow will execute Tests when submit a PR.

name: "CI for Helm Charts"

concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: true

on:
pull_request:
paths:
- 'charts/**'
push:
paths:
- 'charts/**'
merge_group:
workflow_dispatch:

jobs:
unit-tests:
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v4
with:
fetch-depth: 0

- name: Install Helm
uses: azure/setup-helm@v4

- name: Install helm-unittests
run: |
helm plugin install https://github.com/helm-unittest/helm-unittest.git
- name: Rebuild Chart with Dependency
run: |
cd charts/amoro
helm dependency build
- name: Run Unit Tests
run: |
helm unittest charts/amoro
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public class AbstractOptimizerOperator implements Serializable {

private final OptimizerConfig config;
private final AtomicReference<String> token = new AtomicReference<>();
private boolean stopped = false;
private volatile boolean stopped = false;

public AbstractOptimizerOperator(OptimizerConfig config) {
Preconditions.checkNotNull(config);
Expand Down
5 changes: 5 additions & 0 deletions amoro-ams/amoro-ams-server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>io.fabric8</groupId>
<artifactId>kubernetes-client</artifactId>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

/** The controller that handles catalog requests. */
public class CatalogController {
Expand Down Expand Up @@ -357,18 +358,18 @@ private CatalogMeta constructCatalogMeta(CatalogRegisterInfo info, CatalogMeta o
.put(
CatalogMetaProperties.TABLE_PROPERTIES_PREFIX + TableProperties.SELF_OPTIMIZING_GROUP,
info.getOptimizerGroup());
StringBuilder tableFormats = new StringBuilder();
String tableFormats;
try {
// validate table format
info.getTableFormatList()
.forEach(item -> tableFormats.append(TableFormat.valueOf(item).name()));
tableFormats =
info.getTableFormatList().stream()
.map(item -> TableFormat.valueOf(item).name())
.collect(Collectors.joining(","));
} catch (Exception e) {
throw new RuntimeException(
"Invalid table format list, " + String.join(",", info.getTableFormatList()));
}
catalogMeta
.getCatalogProperties()
.put(CatalogMetaProperties.TABLE_FORMATS, tableFormats.toString());
catalogMeta.getCatalogProperties().put(CatalogMetaProperties.TABLE_FORMATS, tableFormats);
fillAuthConfigs2CatalogMeta(catalogMeta, info.getAuthConfig(), oldCatalogMeta);
// change fileId to base64Code
Map<String, String> metaStorageConfig = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.amoro.server.manager;

import io.fabric8.kubernetes.api.model.Quantity;
import io.fabric8.kubernetes.api.model.ResourceRequirementsBuilder;
import io.fabric8.kubernetes.api.model.apps.Deployment;
import io.fabric8.kubernetes.api.model.apps.DeploymentBuilder;
import io.fabric8.kubernetes.client.Config;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClientBuilder;
import org.apache.amoro.api.resource.Resource;
import org.apache.commons.io.IOUtils;
import org.apache.curator.shaded.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Map;

/** Kubernetes Optimizer Container with Standalone Optimizer */
public class KubernetesOptimizerContainer extends AbstractResourceContainer {

private static final Logger LOG = LoggerFactory.getLogger(KubernetesOptimizerContainer.class);

public static final String MEMORY_PROPERTY = "memory";
public static final String CPU_FACTOR_PROPERTY = "cpu.factor";
public static final String NAMESPACE = "namespace";
public static final String IMAGE = "image";
public static final String KUBE_CONFIG_PATH = "kube-config-path";

private static final String NAME_PREFIX = "amoro-optimizer-";

private static final String KUBERNETES_NAME_PROPERTIES = "name";

private KubernetesClient client;

@Override
public void init(String name, Map<String, String> containerProperties) {
super.init(name, containerProperties);
// start k8s job using k8s client
String kubeConfigPath = checkAndGetProperty(containerProperties, KUBE_CONFIG_PATH);
Config config = Config.fromKubeconfig(getKubeConfigContent(kubeConfigPath));
this.client = new KubernetesClientBuilder().withConfig(config).build();
}

@Override
protected Map<String, String> doScaleOut(Resource resource) {
Map<String, String> groupProperties = Maps.newHashMap();
groupProperties.putAll(getContainerProperties());
groupProperties.putAll(resource.getProperties());

// generate pod start args
long memoryPerThread = Long.parseLong(checkAndGetProperty(groupProperties, MEMORY_PROPERTY));
long memory = memoryPerThread * resource.getThreadCount();
// point at amoro home in docker image
String startUpArgs =
String.format(
"/entrypoint.sh optimizer %s %s",
memory, super.buildOptimizerStartupArgsString(resource));
LOG.info("Starting k8s optimizer using k8s client with start command : {}", startUpArgs);

String namespace = groupProperties.getOrDefault(NAMESPACE, "default");
String image = checkAndGetProperty(groupProperties, IMAGE);
String cpuLimitFactorString = groupProperties.getOrDefault(CPU_FACTOR_PROPERTY, "1.0");
double cpuLimitFactor = Double.parseDouble(cpuLimitFactorString);
int cpuLimit = (int) (Math.ceil(cpuLimitFactor * resource.getThreadCount()));

String resourceId = resource.getResourceId();
String groupName = resource.getGroupName();
String kubernetesName = NAME_PREFIX + resourceId;
Deployment deployment =
new DeploymentBuilder()
.withNewMetadata()
.withName(NAME_PREFIX + resourceId)
.endMetadata()
.withNewSpec()
.withReplicas(1)
.withNewTemplate()
.withNewMetadata()
.addToLabels("app", NAME_PREFIX + resourceId)
.addToLabels("AmoroOptimizerGroup", groupName)
.addToLabels("AmoroResourceId", resourceId)
.endMetadata()
.withNewSpec()
.addNewContainer()
.withName("optimizer")
.withImage(image)
.withCommand("sh", "-c", startUpArgs)
.withResources(
new ResourceRequirementsBuilder()
.withLimits(
ImmutableMap.of(
"memory",
new Quantity(memory + "Mi"),
"cpu",
new Quantity(cpuLimit + "")))
.withRequests(
ImmutableMap.of(
"memory",
new Quantity(memory + "Mi"),
"cpu",
new Quantity(cpuLimit + "")))
.build())
.endContainer()
.endSpec()
.endTemplate()
.withNewSelector()
.addToMatchLabels("app", NAME_PREFIX + resourceId)
.endSelector()
.endSpec()
.build();
client.apps().deployments().inNamespace(namespace).resource(deployment).create();
Map<String, String> startupProperties = Maps.newHashMap();
startupProperties.put(NAMESPACE, namespace);
startupProperties.put(KUBERNETES_NAME_PROPERTIES, kubernetesName);
return startupProperties;
}

@Override
public void releaseOptimizer(Resource resource) {
String resourceId = resource.getResourceId();
LOG.info("release Kubernetes Optimizer Container {}", resourceId);
String namespace = resource.getProperties().get(NAMESPACE);
String name = resource.getProperties().get(KUBERNETES_NAME_PROPERTIES);
client.apps().deployments().inNamespace(namespace).withName(name).delete();
}

private static String checkAndGetProperty(Map<String, String> properties, String key) {
Preconditions.checkState(
properties != null && properties.containsKey(key), "Cannot find %s in properties", key);
return properties.get(key);
}

private String getKubeConfigContent(String path) {
try {
return IOUtils.toString(Files.newInputStream(Paths.get(path)), StandardCharsets.UTF_8);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ protected String buildOptimizerStartupArgsString(Resource resource) {
long memoryPerThread = Long.parseLong(resource.getRequiredProperty(JOB_MEMORY_PROPERTY));
long memory = memoryPerThread * resource.getThreadCount();
return String.format(
"%s/bin/optimizer.sh %s %s",
"%s/bin/optimizer.sh start %s %s",
amsHome, memory, super.buildOptimizerStartupArgsString(resource));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,5 @@

org.apache.amoro.server.manager.SparkOptimizerContainer
org.apache.amoro.server.manager.FlinkOptimizerContainer
org.apache.amoro.server.manager.LocalOptimizerContainer
org.apache.amoro.server.manager.LocalOptimizerContainer
org.apache.amoro.server.manager.KubernetesOptimizerContainer
Loading

0 comments on commit 0cb0e08

Please sign in to comment.