-
Notifications
You must be signed in to change notification settings - Fork 3.7k
[feature](catalog) support trino connector catalog sink #49407
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Thank you for your contribution to Apache Doris. Please clearly describe your PR:
|
| return new IcebergTransactionManager(ops); | ||
| } | ||
|
|
||
| public static TransactionManager createTrinoConnectorTransactionManager() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No one call this?
| super(); | ||
| } | ||
|
|
||
| public TrinoConnectorInsertCommandContext(Map<String, String> trinoConnectorOptions) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This constructor is not used
| )); | ||
|
|
||
| // Set transaction flag | ||
| tTrinoConnectorTableSink.setUseTransaction(false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is diff between true and false?
| private void initConnector() { | ||
| String connectorName = trinoConnectorOptionParams.remove("connector.name"); | ||
|
|
||
| TrinoConnectorCacheKey cacheKey = new TrinoConnectorCacheKey(catalogNameString, connectorName, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think better use catalog id instead of createtime
| LOG.error("Exception: " + stringWriter); | ||
| } | ||
|
|
||
| private String mapTrinoTypeToDorisType(String trinoType) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what about complex type?
| } else if (trinoType.startsWith("varbinary")) { | ||
| return "BINARY"; | ||
| } else { | ||
| return "VARCHAR"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it safe to use varchar for all other types?
| return status; | ||
| } | ||
|
|
||
| LOG(INFO) << "Successfully wrote " << num_rows << " rows to Trino connector"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
debug level
| return status; | ||
| } | ||
|
|
||
| LOG(INFO) << "Successfully finished writing to Trino connector"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
debug
| : AsyncResultWriter(output_exprs, dep, fin_dep) { | ||
| const TTrinoConnnectorTableSink& t_trino_sink = t_sink.trino_connector_table_sink; | ||
|
|
||
| // 收集列名和类型 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
English
| return Status::OK(); | ||
| } | ||
|
|
||
| Status VTrinoConnectorTableWriter::finish(RuntimeState* state) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add some profile metric for these phases. like write, finish
| std::unique_ptr<long[]> meta_data; | ||
| RETURN_IF_ERROR(JniConnector::to_java_table(block, meta_data)); | ||
| long meta_address = (long)meta_data.get(); | ||
| auto table_schema = JniConnector::parse_table_schema(block); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
schema 复用
| return Status::OK(); | ||
| } | ||
|
|
||
| Status JniConnector::_init_jni_writer(JNIEnv* env) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
拆分子类
| ConnectorSession connectorSession = session.toConnectorSession(catalogHandle); | ||
| metadata = connector.getMetadata(connectorSession, connectorTransactionHandle); | ||
|
|
||
| insertTableHandle = metadata.beginInsert( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
insertTableHandle 是否需要放到 FE
| Objects.requireNonNull(connectorPageSinkProvider, | ||
| String.format("Connector '%s' returned a null page sink provider", catalogNameString)); | ||
| } catch (UnsupportedOperationException e) { | ||
| LOG.debug("exception when getPageSinkProvider: {}", e.getMessage()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
是否 debug 日志?
| new ConnectorPageSinkId() { | ||
| @Override | ||
| public long getId() { | ||
| return 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
0 id 存疑?
| String typeName = trinoType.getDisplayName(); | ||
| BlockBuilder blockBuilder = pageBuilder.getBlockBuilder(colIdx); | ||
|
|
||
| if (typeName.equals("boolean")) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
改成 switch
|
We're closing this PR because it hasn't been updated in a while. |
No description provided.