Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEATURE] Support Flink RSS. #1366

Closed
wants to merge 10 commits into from
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
17 changes: 17 additions & 0 deletions client-flink/common/.baseline/copyright/apache-license-header.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

61 changes: 61 additions & 0 deletions client-flink/common/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<artifactId>uniffle-parent</artifactId>
<groupId>org.apache.uniffle</groupId>
<version>0.9.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>

<artifactId>rss-client-flink-common</artifactId>
<version>0.9.0-SNAPSHOT</version>
<packaging>jar</packaging>
<name>Apache Uniffle Client (Flink Common)</name>

<dependencies>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<dependency>
<groupId>org.apache.uniffle</groupId>
<artifactId>rss-common</artifactId>
</dependency>
<dependency>
<groupId>org.apache.uniffle</groupId>
<artifactId>rss-client</artifactId>
</dependency>
<dependency>
<groupId>org.apache.uniffle</groupId>
<artifactId>shuffle-storage</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-inline</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime</artifactId>
<scope>provided</scope>
<version>1.14.6</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<exclusions>
<exclusion>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-json</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.uniffle.shuffle;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.shuffle.PartitionDescriptor;
import org.apache.flink.runtime.shuffle.ProducerDescriptor;
import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
import org.apache.flink.runtime.shuffle.ShuffleMaster;

import org.apache.uniffle.shuffle.resource.RssShuffleResourceDescriptor;

public class RssFlinkApplication {

private String uniffleApplicationId;
private String flinkShuffleId;

////////////////////////////////
// FlinkId2UniffleId mapping relationship
////////////////////////////////

// When we implemented Uniffle Flink, Uniffle already fully supported the related functions of MR
// and Spark.
// In order to be compatible with this part of the code, we need to map Flink's Id.
private final Map<String, Integer> flinkShuffleId2UniffleId = new ConcurrentHashMap<>();
private final Map<Integer, String> uniffleShuffleId2FlinkShuffleId = new ConcurrentHashMap<>();
private final Map<Integer, Integer> uniffleShuffleId2PartitionId = new ConcurrentHashMap<>();
private final Map<Integer, Map<Integer, AtomicInteger>> uniffleShuffleIdMapIdAttemptId =
new ConcurrentHashMap<>();
private AtomicInteger shuffleIndex = new AtomicInteger(0);

public RssFlinkApplication() {}

public String genUniffleApplicationId(JobID jobId) {
if (StringUtils.isNotBlank(uniffleApplicationId)) {
return uniffleApplicationId;
}
uniffleApplicationId = "uniffle_" + jobId.toString();
return uniffleApplicationId;
}

/**
* Generate ShuffleResourceDescriptor based on jobId, partitionDescriptor, producerDescriptor.
*
* @param jobId Unique (at least statistically unique) identifier for a Flink Job.
* @param partitionDescriptor Partition descriptor for {@link ShuffleMaster} to obtain {@link
* ShuffleDescriptor}.
* @return RssShuffleResourceDescriptor.
*/
public RssShuffleResourceDescriptor genShuffleResourceDescriptor(
JobID jobId, PartitionDescriptor partitionDescriptor) {
// Step1. Generate Flink ShuffleId.
this.flinkShuffleId = getFlinkShuffleId(jobId, partitionDescriptor);

// Step2. Generate Uniffle ShuffleId\Uniffle PartitionId\Uniffle mapPartitionId\Uniffle
// AttemptId.
int uniffleShuffleId = getUniffleShuffleId(flinkShuffleId);
int unifflePartitionId = getUnifflePartitionId(uniffleShuffleId);
int mapPartitionId = partitionDescriptor.getPartitionId().getPartitionNumber();
int uniffleAttemptId = genUniffleAttemptId(uniffleShuffleId, mapPartitionId);

// Step3. Generate RssShuffleResourceDescriptor.
return new RssShuffleResourceDescriptor(
uniffleShuffleId, mapPartitionId, uniffleAttemptId, unifflePartitionId);
}

/**
* Get Flink ShuffleId. We will concatenate jobId and dataSetId together.
*
* @param jobId Unique (at least statistically unique) identifier for a Flink Job.
* @param partitionDescriptor Partition descriptor for {@link ShuffleMaster} to obtain {@link
* ShuffleDescriptor}.
* @return Flink ShuffleId.
*/
public String getFlinkShuffleId(JobID jobId, PartitionDescriptor partitionDescriptor) {
IntermediateDataSetID dataSetId = partitionDescriptor.getResultId();
return jobId.toString() + "_" + dataSetId.toString();
}

/**
* Get Uniffle ShuffleId.
*
* @param shuffleId flink shuffle id
* @return Uniffle ShuffleId, Value of type Int.
*/
public int getUniffleShuffleId(String shuffleId) {
// Ensure that under concurrent requests, the data meets expectations.
synchronized (flinkShuffleId2UniffleId) {
// If the data exists, we will return directly.
if (flinkShuffleId2UniffleId.containsKey(shuffleId)) {
return flinkShuffleId2UniffleId.get(shuffleId);
}
int newUniffleShuffleId = shuffleIndex.intValue();
flinkShuffleId2UniffleId.put(shuffleId, newUniffleShuffleId);
uniffleShuffleId2FlinkShuffleId.put(newUniffleShuffleId, shuffleId);
uniffleShuffleIdMapIdAttemptId.put(newUniffleShuffleId, new ConcurrentHashMap<>());
uniffleShuffleId2PartitionId.put(newUniffleShuffleId, 0);
return shuffleIndex.getAndIncrement();
}
}

/**
* Generate Uniffle PartitionId.
*
* @param uniffleShuffleId Uniffle ShuffleId.
* @return Uniffle PartitionId.
*/
private int getUnifflePartitionId(int uniffleShuffleId) {
synchronized (uniffleShuffleId2PartitionId) {
int partitionId = uniffleShuffleId2PartitionId.getOrDefault(uniffleShuffleId, 0);
uniffleShuffleId2PartitionId.put(uniffleShuffleId, partitionId + 1);
return partitionId;
}
}

/**
* Generate Uniffle AttemptId.
*
* @param uniffleShuffleId Uniffle ShuffleId.
* @param mapPartitionId Flink Map PartitionId.
* @return Uniffle AttemptId.
*/
private int genUniffleAttemptId(int uniffleShuffleId, int mapPartitionId) {
synchronized (uniffleShuffleIdMapIdAttemptId) {
Map<Integer, AtomicInteger> shuffleIdMapIdAttemptId =
uniffleShuffleIdMapIdAttemptId.getOrDefault(uniffleShuffleId, new ConcurrentHashMap<>());
AtomicInteger attemptId =
shuffleIdMapIdAttemptId.getOrDefault(mapPartitionId, new AtomicInteger(0));
return attemptId.getAndIncrement();
}
}

public String getUniffleApplicationId() {
return uniffleApplicationId;
}

public String getFlinkShuffleId() {
return flinkShuffleId;
}

public ResultPartitionID genResultPartitionId(
PartitionDescriptor partitionDescriptor, ProducerDescriptor producerDescriptor) {
return new ResultPartitionID(
partitionDescriptor.getPartitionId(), producerDescriptor.getProducerExecutionId());
}
}