Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] [Core]In Spark multi-table synchronization, there is inconsistent behavior compared to the Zeta engine. #7517

Closed
3 tasks done
FuYouJ opened this issue Aug 28, 2024 · 1 comment

Comments

@FuYouJ
Copy link
Contributor

FuYouJ commented Aug 28, 2024

Search before asking

  • I had searched in the issues and found no similar issues.

What happened

I am developing a multi-table synchronization plugin and currently running unit tests. I have noticed that configurations that work in the Zeta engine do not pass the test cases in Spark mode.
Here is my configuration:

source {
  Elasticsearch {
    hosts = ["https://elasticsearch:9200"]
    username = "elastic"
    password = "elasticsearch"
    tls_verify_certificate = false
    tls_verify_hostname = false
    index_list = [
       {
           index = "read_index1"
           query = {"range": {"c_int": {"gte": 10, "lte": 20}}}
           source = [
           c_map,
           c_array,
           c_string,
           c_boolean,
           c_tinyint,
           c_smallint,
           c_bigint,
           c_float,
           c_double,
           c_decimal,
           c_bytes,
           c_int,
           c_date,
           c_timestamp]
           array_column = {
           c_array = "array<tinyint>"
           }
       }
       {
           index = "read_index1"
           query = {"match_all": {}}
           source = [
           c_map,
           c_array,
           c_string,
           c_boolean,
           c_tinyint,
           c_smallint,
           c_bigint,
           c_float,
           c_double,
           c_decimal,
           c_bytes,
           c_int,
           c_date,
           c_timestamp]
           array_column = {
           c_array = "array<tinyint>"
           }
       }

    ]

  }
}

transform {
}

sink {
  Elasticsearch {
    hosts = ["https://elasticsearch:9200"]
    username = "elastic"
    password = "elasticsearch"
    tls_verify_certificate = false
    tls_verify_hostname = false

    index = "multi_source_write_test_index"
    index_type = "st"
    "schema_save_mode"="CREATE_SCHEMA_WHEN_NOT_EXIST"
    "data_save_mode"="APPEND_DATA"
  }
}

There seems to be an issue with my configuration. I shouldn't have configured two identical indices, but this configuration runs without issues in the Zeta engine, while it throws an error in Spark mode.
image
I believe that if configuring two identical tables has a valid use case in user-defined partitioning scenarios, then Spark needs to be changed. Alternatively, if it is not valid, then the Zeta engine needs to be modified.

SeaTunnel Version

2.3.8 dev

SeaTunnel Config

source {
  Elasticsearch {
    hosts = ["https://elasticsearch:9200"]
    username = "elastic"
    password = "elasticsearch"
    tls_verify_certificate = false
    tls_verify_hostname = false
    index_list = [
       {
           index = "read_index1"
           query = {"range": {"c_int": {"gte": 10, "lte": 20}}}
           source = [
           c_map,
           c_array,
           c_string,
           c_boolean,
           c_tinyint,
           c_smallint,
           c_bigint,
           c_float,
           c_double,
           c_decimal,
           c_bytes,
           c_int,
           c_date,
           c_timestamp]
           array_column = {
           c_array = "array<tinyint>"
           }
       }
       {
           index = "read_index2"
           query = {"match_all": {}}
           source = [
           c_map,
           c_array,
           c_string,
           c_boolean,
           c_tinyint,
           c_smallint,
           c_bigint,
           c_float,
           c_double,
           c_decimal,
           c_bytes,
           c_int,
           c_date,
           c_timestamp]
           array_column = {
           c_array = "array<tinyint>"
           }
       }

    ]

  }
}

transform {
}

sink {
  Elasticsearch {
    hosts = ["https://elasticsearch:9200"]
    username = "elastic"
    password = "elasticsearch"
    tls_verify_certificate = false
    tls_verify_hostname = false

    index = "multi_source_write_test_index"
    index_type = "st"
    "schema_save_mode"="CREATE_SCHEMA_WHEN_NOT_EXIST"
    "data_save_mode"="APPEND_DATA"
  }
}

Running Command

default

Error Exception

.

Zeta or Flink or Spark Version

No response

Java or Scala Version

No response

Screenshots

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@FuYouJ FuYouJ added the bug label Aug 28, 2024
Copy link

This issue has been automatically marked as stale because it has not had recent activity for 30 days. It will be closed in next 7 days if no further activity occurs.

@github-actions github-actions bot added the stale label Sep 28, 2024
@FuYouJ FuYouJ closed this as completed Sep 28, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant