Skip to content

Conversation

@JunRuiLee
Copy link
Contributor

What is the purpose of the change

Change the default config file to config.yaml in flink-dist.

Brief change log

  • Change the default config file to config.yaml in flink-dist.
  • Update the content 'flink-conf.yaml' in the code to use 'config.yaml'

Verifying this change

This change is already covered by existing tests, include all e2e cases and it cases.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes / no)
  • The serializers: (yes / no / don't know)
  • The runtime per-record code paths (performance sensitive): (yes / no / don't know)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
  • The S3 file system connector: (yes / no / don't know)

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)

@JunRuiLee JunRuiLee force-pushed the change_default_config_file branch from 92052dc to e9748af Compare January 23, 2024 13:13
@flinkbot
Copy link
Collaborator

flinkbot commented Jan 23, 2024

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@JunRuiLee JunRuiLee force-pushed the change_default_config_file branch 3 times, most recently from 4219c46 to a68940e Compare January 24, 2024 03:57
@JunRuiLee
Copy link
Contributor Author

Hi @HuangXingBo , this pr includes some PyFlink changes related to the adoption of a new configuration file, config.yaml. Could you please help review it?

@JunRuiLee JunRuiLee force-pushed the change_default_config_file branch from 8bdd43f to 0d63427 Compare January 25, 2024 03:43
@JunRuiLee
Copy link
Contributor Author

Thanks @zhuzhurk for reviews, I've updated this pr accordingly, PTAL.

Copy link
Contributor

@HuangXingBo HuangXingBo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JunRuiLee Thanks a lot for the great work. I have left some comments


def read_from_config(key, default_value, flink_conf_file):
def read_from_config(key, default_value, flink_conf_directory):
import yaml
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel the logic of this block of code can be refactored:

if  flink-conf.yaml exist:
    yaml.parse
elif conf.yaml exist:
    old parse logic
else
    return default value

if key in [jars_key, classpaths_key]:
add_jars_to_context_class_loader(value.split(";"))

isStandardYaml = jvm.org.apache.flink.configuration. \
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isStandardYaml -> is_standard_yaml is more pythonic

Comment on lines 111 to 117
if isStandardYaml:
import yaml
jar_urls_list = yaml.safe_load(value)
if isinstance(jar_urls_list, list):
add_jars_to_context_class_loader(jar_urls_list)
else:
add_jars_to_context_class_loader(value.split(";"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel the logic of this block of code can be refactored.

if key in [jars_key, classpaths_key]:
	jar_urls = parse_jars_value(value)
	add_jars_to_context_class_loader(jar_urls)

def parse_jars_value(value: str):
	is_standard_yaml = jvm.org.apache.flink.configuration.GlobalConfiguration.isStandardYaml()
    if is_standard_yaml:
    	import yaml
        jar_urls_list = yaml.safe_load(value)
        if isinstance(jar_urls_list, list):
        	return jar_urls_list

    return value

jvm = get_gateway().jvm
jar_urls = self.get_config().get(config_key, None)

isStandardYaml = jvm.org.apache.flink.configuration. \
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isStandardYaml->is_standard_yaml

Comment on lines 1550 to 1593
# normalize
if not isStandardYaml:
self._parse_url_by_legacy_parser(jar_urls, jar_urls_list, jvm)
j_configuration = get_j_env_configuration(self._get_j_env())
self._parse_j_env_url_by_legacy_parser(config_key, j_configuration, jar_urls_list)
else:
import yaml
parsed_jar_urls = yaml.safe_load(jar_urls)
if isinstance(parsed_jar_urls, list):
for url in parsed_jar_urls:
url = url.strip()
if url != "":
jar_urls_list.append(jvm.java.net.URL(url).toString())
else:
self._parse_url_by_legacy_parser(jar_urls, jar_urls_list, jvm)

j_configuration = get_j_env_configuration(self._get_j_env())
if j_configuration.containsKey(config_key):
jar_urls_from_j_env = yaml. \
safe_load(j_configuration.getString(config_key, ""))
if isinstance(jar_urls_from_j_env, list):
for url in jar_urls_from_j_env:
url = url.strip()
if url != "" and url not in jar_urls_list:
jar_urls_list.append(url)
else:
self._parse_j_env_url_by_legacy_parser(
config_key, j_configuration, jar_urls_list)

j_configuration.setString(config_key, ";".join(jar_urls_list))

def _parse_j_env_url_by_legacy_parser(self, config_key, j_configuration, jar_urls_list):
if j_configuration.containsKey(config_key):
for url in j_configuration.getString(config_key, "").split(";"):
url = url.strip()
if url != "" and url not in jar_urls_list:
jar_urls_list.append(url)

def _parse_url_by_legacy_parser(self, jar_urls, jar_urls_list, jvm):
for url in jar_urls.split(";"):
url = url.strip()
if url != "":
jar_urls_list.append(jvm.java.net.URL(url).toString())

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel the logic of this block of code can be refactored:

def _add_jars_to_j_env_config(self, config_key):
	jar_urls = self.get_config().get(config_key, None)

	if jar_urls:
		jvm = get_gateway().jvm
		jar_urls_list = []
		_parse_urls([jvm.java.net.URL(url).toString() if item else "" for item in parse_jars_value(jar_urls, jvm)], jar_urls_list)

                  j_configuration = get_j_env_configuration(self._get_j_env())
                  _parse_urls(parse_jars_value(j_configuration.getString(config_key, ""), jvm), jar_urls_list)

                 j_configuration.setString(config_key, ";".join(jar_urls_list))

def _parse_urls(self, jar_urls, jar_urls_list):
    for url in jar_urls.split(";"):
    	url = url.strip()
    	if url != "" and url not in jar_urls_list:
        	jar_urls_list.append(url)

@JunRuiLee JunRuiLee force-pushed the change_default_config_file branch from 450b7eb to faec8b3 Compare January 25, 2024 06:59
@JunRuiLee JunRuiLee force-pushed the change_default_config_file branch from faec8b3 to 77182d8 Compare January 25, 2024 08:15
@JunRuiLee
Copy link
Contributor Author

Thanks @HuangXingBo for reviews, I've updated this pr follow your comments, PTAL.

Copy link
Contributor

@HuangXingBo HuangXingBo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JunRuiLee LGTM

Copy link
Contributor

@zhuzhurk zhuzhurk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.
The CI gives green. I will squash and merge the commits locally.

@zhuzhurk zhuzhurk closed this in 9721ce8 Jan 25, 2024
RexXiong pushed a commit to apache/celeborn that referenced this pull request Mar 20, 2024
### What changes were proposed in this pull request?

Support Flink 1.19.

### Why are the changes needed?

Flink 1.19.0 is announced to release: [Announcing the Release of Apache Flink 1.19] (https://flink.apache.org/2024/03/18/announcing-the-release-of-apache-flink-1.19).

The main changes includes:

- `org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel` constructor change parameters:
   - `consumedSubpartitionIndex` changes to `consumedSubpartitionIndexSet`: [[FLINK-33743][runtime] Support consuming multiple subpartitions on a single channel](apache/flink#23927).
   - adds `partitionRequestListenerTimeout`: [[FLINK-25055][network] Support listen and notify mechanism for partition request](apache/flink#23565).
- `org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate` constructor removes parameters `subpartitionIndexRange`, `tieredStorageConsumerClient`, `nettyService` and `tieredStorageConsumerSpecs`: [[FLINK-33743][runtime] Support consuming multiple subpartitions on a single channel](apache/flink#23927).
- Change the default config file to `config.yaml` in `flink-dist`: [[FLINK-33577][dist] Change the default config file to config.yaml in flink-dist](apache/flink#24177).
- `org.apache.flink.configuration.RestartStrategyOptions` uses `org.apache.commons.compress.utils.Sets` of `commons-compress` dependency: [[FLINK-33865][runtime] Adding an ITCase to ensure exponential-delay.attempts-before-reset-backoff works well](apache/flink#23942).

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Local test:

- Flink batch job submission

```
$ ./bin/flink run examples/streaming/WordCount.jar --execution-mode BATCH
Executing example with default input data.
Use --input to specify file input.
Printing result to stdout. Use --output to specify output path.
Job has been submitted with JobID 2e9fb659991a9c29d376151783bdf6de
Program execution finished
Job with JobID 2e9fb659991a9c29d376151783bdf6de has finished.
Job Runtime: 1912 ms
```

- Flink batch job execution

![image](https://github.com/apache/incubator-celeborn/assets/10048174/18b60861-cafc-4df3-b94d-93307e728be2)

- Celeborn master log
```

24/03/18 20:52:47,513 INFO [celeborn-dispatcher-42] Master: Offer slots successfully for 1 reducers of 1710766312631-2e9fb659991a9c29d376151783bdf6de-0 on 1 workers.
```

- Celeborn worker log
```
24/03/18 20:52:47,704 INFO [celeborn-dispatcher-1] StorageManager: created file at /Users/nicholas/Software/Celeborn/apache-celeborn-0.5.0-SNAPSHOT/shuffle/celeborn-worker/shuffle_data/1710766312631-2e9fb659991a9c29d376151783bdf6de/0/0-0-0
24/03/18 20:52:47,707 INFO [celeborn-dispatcher-1] Controller: Reserved 1 primary location and 0 replica location for 1710766312631-2e9fb659991a9c29d376151783bdf6de-0
24/03/18 20:52:47,874 INFO [celeborn-dispatcher-2] Controller: Start commitFiles for 1710766312631-2e9fb659991a9c29d376151783bdf6de-0
24/03/18 20:52:47,890 INFO [worker-rpc-async-replier] Controller: CommitFiles for 1710766312631-2e9fb659991a9c29d376151783bdf6de-0 success with 1 committed primary partitions, 0 empty primary partitions, 0 failed primary partitions, 0 committed replica partitions, 0 empty replica partitions, 0 failed replica partitions.
```

Closes #2399 from SteNicholas/CELEBORN-1310.

Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants