Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][seatunnel-connectors-v2][connector-kafka] Kafka supports custom schema #2371

Closed
3 tasks done
eyys opened this issue Aug 5, 2022 · 13 comments
Closed
3 tasks done
Assignees

Comments

@eyys
Copy link
Contributor

eyys commented Aug 5, 2022

Search before asking

  • I had searched in the feature and found no similar feature requirement.

Description

English:
The original Kafka connector supports custom schema. The current V2 connector does not support custom schema

中文:
原始的Kafka连接器支持自定义模式。当前的V2连接器不支持自定义schema

Usage Scenario

Kafka Source user-defined data structures

Related issues

none

Are you willing to submit a PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@Hisoka-X Hisoka-X assigned Hisoka-X and eyys and unassigned Hisoka-X Aug 5, 2022
@Hisoka-X
Copy link
Member

Hisoka-X commented Aug 5, 2022

Yes, this is very import function we should implement. If you can do this will be very helpful

@hailin0
Copy link
Member

hailin0 commented Aug 7, 2022

I want to discuss how to define fields schema configuration for connector in seatunnel

I see use case from flink:
create table x ( field1 bigint, field2 string, ...... ) with ( format: 'xxx' )

Connector selectd xxx format factory to convert message data format to row[fleid1<bigint>,field2<string>].

How should it be defined in seatunnel?
e.g:
source { KafkaSource { format = xxx format_schema = [{"name": "field1", "data_type": "bigint"} , {"name": "field2", "data_type": "string"}] } }

@Hisoka-X
Copy link
Member

Hisoka-X commented Aug 8, 2022

Yes, it is a important thing that decide how define schema. Please send a discuss mail in dev@seatunnel.apache.org. And attach your idea.

@Hisoka-X
Copy link
Member

Hisoka-X commented Aug 8, 2022

I want to discuss how to define fields schema configuration for connector in seatunnel

I see use case from flink: create table x ( field1 bigint, field2 string, ...... ) with ( format: 'xxx' )

Connector selectd xxx format factory to convert message data format to row[fleid1<bigint>,field2<string>].

How should it be defined in seatunnel? e.g: source { KafkaSource { format = xxx format_schema = [{"name": "field1", "data_type": "bigint"} , {"name": "field2", "data_type": "string"}] } }

cc @ashulin

@ashulin
Copy link
Member

ashulin commented Aug 8, 2022

#2299

@EricJoy2048
Copy link
Member

cc @TyrantLucifer

@ashulin
Copy link
Member

ashulin commented Aug 8, 2022

This is a common feature, for example, kafka, HTTP, File, Pulsar, etc;
My opinion is to conform to the habit of SQL;

# conf file
Source {
    schema {
        field = "STRING"
        field2 = "INT"
        field3 = "DECIMAL(30, 3)"
    }
}

@TyrantLucifer
Copy link
Member

This is a common feature, for example, kafka, HTTP, File, Pulsar, etc; My opinion is to conform to the habit of SQL;

# conf file
Source {
    schema {
        field = "STRING"
        field2 = "INT"
        field3 = "DECIMAL(30, 3)"
    }
}

I think it's a good idea that add a common feature in connector to support user-defined schema. In my option, I think we can add a new config option schema in all source connector source configs and for each connector they can parse their own schema in getProducedType method. The implement function of parsing schema we can add it in module seatunnel-common. And I agree with your advice @ashulin to conform schema type to the habit of SQL the same as defined in code.

image

@hailin0
Copy link
Member

hailin0 commented Aug 8, 2022

Another problem is missing metadata of mq connector rowdata.
e.g: partition、timestamp、key、headers

public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers)

I see use case from flink:
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchema.java#L119

@TyrantLucifer
Copy link
Member

TyrantLucifer commented Aug 8, 2022

This is a common feature, for example, kafka, HTTP, File, Pulsar, etc; My opinion is to conform to the habit of SQL;

# conf file
Source {
    schema {
        field = "STRING"
        field2 = "INT"
        field3 = "DECIMAL(30, 3)"
    }
}

I think it's a good idea that add a common feature in connector to support user-defined schema. In my option, I think we can add a new config option schema in all source connector source configs and for each connector they can parse their own schema in getProducedType method. The implement function of parsing schema we can add it in module seatunnel-common. And I agree with your advice @ashulin to conform schema type to the habit of SQL the same as defined in code.

image

By the way, the row data format information also should defined in source connector. The final source config as the following:

Source {
   Kafka {
     schema {
       fields {
          field = "STRING"
          field2 = "INT"
          field3 = "DECIMAL(30, 3)"
       }
        format = "json" // format = "text"
     }
  }
}

In most use cases, I think we can support json or text, if we assign text, we also assign delimiter in config file.

@eyys
Copy link
Contributor Author

eyys commented Aug 15, 2022

Where should the text type column delimiter be defined here
Is something like that okay?
怎么定义txt类型的列分隔符,这样定义是否可以?

Kafka {
  consumer.bootstrap.servers = "127.0.0.1:9092"
  consumer.group.id = "seatunnel5"
  topic = test
  result_table_name = test
  schema {
    fields {
      name = string
      age = int
    }
    format = "json" // format = "text"
    text.delimiter = ","
  }
}

@TyrantLucifer
Copy link
Member

This feature is in development, please waiting pr.

@TyrantLucifer
Copy link
Member

This feature is in development, please waiting pr.

@eyys Please refer to #2439 #2436, look forward to your contribution.

eyys pushed a commit to eyys/incubator-seatunnel that referenced this issue Sep 30, 2022
eyys added a commit to eyys/incubator-seatunnel that referenced this issue Oct 15, 2022
eyys added a commit to eyys/incubator-seatunnel that referenced this issue Oct 16, 2022
eyys added a commit to eyys/incubator-seatunnel that referenced this issue Oct 16, 2022
eyys added a commit to eyys/incubator-seatunnel that referenced this issue Oct 17, 2022
eyys added a commit to eyys/incubator-seatunnel that referenced this issue Oct 18, 2022
eyys added a commit to eyys/incubator-seatunnel that referenced this issue Oct 18, 2022
@eyys eyys closed this as completed Jan 4, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

6 participants