Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improve][Connector-V2][SelectDB Cloud]Refactor some SelectDB Cloud Sink code as well as support copy into batch and async flush and cdc #4312

Merged
merged 5 commits into from Mar 9, 2023

Conversation

zy-kkk
Copy link
Member

@zy-kkk zy-kkk commented Mar 9, 2023

Purpose of this pull request

Refactor some SelectDB Cloud Sink code as well as support copy into batch and async flush and cdc

Test procedure

SelectDB Table Schema

CREATE  TABLE `TypeTest` (
    c_map varchar(1000),
    c_array array<int>,
    c_string String,
    c_boolean boolean,
    c_tinyint tinyint,
    c_smallint smallint,
    c_int int,
    c_bigint bigint,
    c_float float,
    c_double double,
    c_decimal  decimal(27, 9),
    c_null String,
    c_bytes String,
    c_date date,
    c_timestamp datetime
) ENGINE=OLAP
DUPLICATE KEY(`c_map`)
COMMENT 'OLAP'
DISTRIBUTED BY HASH(`c_map`) BUCKETS 4
  1. ST Engine
    • config
   env {
      execution.parallelism = 1
      job.mode = "BATCH"
      checkpoint.interval = 5000
    }

      source {
        FakeSource {
          row.num = 10
          parallelism = 1
          schema = {
            fields {
              c_map = "map<string, string>"
              c_array = "array<int>"
              c_string = string
              c_boolean = boolean
              c_tinyint = tinyint
              c_smallint = smallint
              c_int = int
              c_bigint = bigint
              c_float = float
              c_double = double
              c_decimal = "decimal(27, 9)"
              c_null = "null"
              c_bytes = bytes
              c_date = date
              c_timestamp = timestamp
            }
          }
          result_table_name = "fake"
        }
      }
      
      transform {
      }
      
      sink {
        SelectDBCloud {
          load-url="ip:port"
          jdbc-url="ip:port"
          cluster-name="SeaTunnel"
          table.identifier="test.TypeTest"
          username="admin"
          password="******"
          selectdb.config {
              file.type="json"
          }
        }
      }
  • result
    image
    image
  1. Flink
    • config
   env {
        execution.parallelism = 1
        job.mode = "BATCH"
    }

      source {
        FakeSource {
          row.num = 10
          parallelism = 1
          schema = {
            fields {
              c_map = "map<string, string>"
              c_array = "array<int>"
              c_string = string
              c_boolean = boolean
              c_tinyint = tinyint
              c_smallint = smallint
              c_int = int
              c_bigint = bigint
              c_float = float
              c_double = double
              c_decimal = "decimal(27, 9)"
              c_null = "null"
              c_bytes = bytes
              c_date = date
              c_timestamp = timestamp
            }
          }
          result_table_name = "fake"
        }
      }
      
      transform {
      }
      
      sink {
        SelectDBCloud {
          load-url="ip:port"
          jdbc-url="ip:port"
          cluster-name="SeaTunnel"
          table.identifier="test.TypeTest"
          username="admin"
          password="******"
          selectdb.config {
              file.type="json"
          }
        }
      }
  • result
    image
    image
  1. Spark
    • Config
  env {
        job.name = "SeaTunnel"
        spark.executor.instances = 1
        spark.executor.cores = 1
        spark.executor.memory = "1g"
        spark.master = local
      }
      
      source {
        FakeSource {
          row.num = 10
          parallelism = 1
          schema = {
            fields {
              c_map = "map<string, string>"
              c_array = "array<int>"
              c_string = string
              c_boolean = boolean
              c_tinyint = tinyint
              c_smallint = smallint
              c_int = int
              c_bigint = bigint
              c_float = float
              c_double = double
              c_decimal = "decimal(27, 9)"
              c_null = "null"
              c_bytes = bytes
              c_date = date
              c_timestamp = timestamp
            }
          }
          result_table_name = "fake"
        }
      }
      
      transform {
      }
      
      sink {
        SelectDBCloud {
          load-url="ip:port"
          jdbc-url="ip:port"
          cluster-name="SeaTunnel"
          table.identifier="test.TypeTest"
          username="admin"
          password="******"
          selectdb.config {
              file.type="json"
          }
        }
      }
  • result

image
image

Check list

…ink code as well as support copy into batch and async flush and cdc
@zy-kkk zy-kkk marked this pull request as ready for review March 9, 2023 06:04
Copy link
Member

@TyrantLucifer TyrantLucifer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@CalvinKirs CalvinKirs merged commit 11e94b2 into apache:dev Mar 9, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants