Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added new connector for OpenSearch data source #1335

Open
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

akuzin1
Copy link
Contributor

@akuzin1 akuzin1 commented Jun 15, 2023

DO NOT MERGE !!! - depends on pending features in OpenSearch sql and sql-jdbc

Description of changes:
This pull request introduces a new OpenSearch connector that implemented the Athena JDBC interface, leveraging the the OpenSearch driver, to enable metadata and data retrieval for the Athena query engine.

Added slight changes to Federation SDK, which should be reviewed and determined whether it is fitting.

By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.

@@ -849,6 +849,12 @@ else if (value instanceof Integer) {
if (value == null) {
float4Writer.writeNull();
}
else if (value instanceof java.lang.Integer) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do these changes have implications for any of the other connectors?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They should not. I used similar structure to other data types in BlockUtils::writeSimpleValue, checking if value is of specific instance helps with writing value to block, doesn't change core behavior.

@@ -42,6 +42,7 @@ public class FieldBuilder
private final boolean nullable;
//Using LinkedHashMap because Apache Arrow makes field order important so honoring that contract here
private final Map<String, Field> children = new LinkedHashMap<>();
private final Map<String, FieldBuilder> nestedChildren = new LinkedHashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would you mind explaining this addition? isn't nesting already available because we can have children which are Fields, and those children can themselves have children, etc? or does the existing children behavior not actually work like that?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

after reading through the rest of the PR, let me know if my understanding is correct.

if i have a field like:

"topObj": {
  "innerObj": {
    "intField": 1,
    "strField": "asdf"
  },
  "int": 5
}

then opensearch will send back each of these fields as their own columns, and our result set will have this:
topObj.int
topObj.innerObj.intField
topObj.innerObj.strField

so the reason for this new nested children field and it's corresponding logic with OpensearchUtils class is so you can say createNestedStruct andd pass in topObj.innerObj.intField (for example), have it find the struct topObj, and continue to build to the struct definition for topObj?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes that's correct. It makes it more intuitive to make complex nested fields in my opinion.

@@ -89,6 +89,18 @@ public SchemaBuilder addStructField(String fieldName)
return this;
}

/**
* Adds a new Nested STRUCT Field to the Schema as a top-level Field.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this is for nested structs, did we not support nested structs in any connectors before? Does this impact functionality in any existing connectors?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I don't believe that existing connectors should be impacted, its independent. We do support nested structs in other connectors. The added functions were kind of tailored to OpenSearch use case and I thought it made sense to expand the SchemaBuilder functionality.

readRecordsRequest.getSplit().getProperties());
try (Connection connection = this.jdbcConnectionFactory.getConnection(getCredentialProvider())) {
connection.setAutoCommit(false); // For consistency. This is needed to be false to enable streaming for some database types.
if (this.getAutoCommit().isPresent()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure I understand if this modification is adding any value. Are you expecting getAutoCommit() to not always return false in the future?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh i see, in the opensearch record handler you have it return Optional.empty(). But is there a reason to even have that as an optional wrapping around a boolean, instead of it just being a boolean?

Copy link
Contributor Author

@akuzin1 akuzin1 Jun 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, so the reason it's an Optional is because, the presence of a value does not signify true, we want to maintain the original behavior.
I can go back and do additional testing, however, we did not want to setAutoCommit for the OpenSearch driver.

@@ -168,8 +185,9 @@ public void readWithConstraint(BlockSpiller blockSpiller, ReadRecordsRequest rea
rowsReturnedFromDatabase++;
}
LOGGER.info("{} rows returned by database.", rowsReturnedFromDatabase);

connection.commit();
if (this.getAutoCommit().isPresent()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also not sure what this adds.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We only need to commit connection if we setAutoCommit, and we don't need to do that for the OpenSearch driver.

Comment on lines 381 to 379
// update schema from null because Opensearch doesn't have schema but change later
// resultSet.updateString("TABLE_SCHEM", databaseName);

// Temporary in schemaName
list.add(getSchemaTableName(resultSet, databaseName));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

having a hard time understanding the comments here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left over comments. Will get rid of.

Comment on lines +396 to +397
Pattern pattern = Pattern.compile(DATA_STREAM_REGEX);
Matcher matcher = pattern.matcher(tableName);
if (matcher.find()) {
tableName = matcher.group(2);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you explain this? I thought you mentioned data streams are not yet supported. Also, let's avoid magic numbers or add an example in the comment for what this looks like to explain why we match on group 2

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes will provide embedded example for reference.
But to give a brief explanation here is that a data stream lets you store time series data across multiple indices while giving you a single named resource for requests. The multiple indices that correlate with one data stream have the following naming convention: .ds-<data-stream>-<generation> where generation is a six-digit, zero-padded integer.
Therefore, instead of populating the many indices correlating to a single data stream, I only want to display the one data stream and I achieve that through pattern matching.

PreparedStatement preparedStatement = jdbcSplitQueryBuilder.buildSql(jdbcConnection, null, null, tableName.getTableName(), schema, constraints, split);

// Disable fetching all rows.
preparedStatement.setFetchSize(10000);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's put this in a constant.

what happens if the input query has over 10k results?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Setting fetch size Gives the JDBC driver a hint as to the number of rows that should be fetched from the database when more rows are needed for ResultSet objects generated by this Statement.
So what this really controls is the number of network calls, as well as enforces pagination so that SQL Plugin uses v2 instead of legacy engine. Otherwise legacy version only returns 200 rows per table.

@macohen
Copy link
Contributor

macohen commented Jan 23, 2024

What’s the sql/jdbc feature that’s needed from opensearch to bring this closer to merging? Also, are there any performance comparisons done between sql/jdbc vs using a Java or another client?

@akuzin1
Copy link
Contributor Author

akuzin1 commented Jan 24, 2024

Hi, @macohen! We were waiting for the implementation of shard partitioning, which would allow to parallelize the fetching of data with the JDBC driver. Currently the connector implements a single partition solution, which resulted in performance regression compared to current connector version.
I had opened an feature request and was working with the OpenSearch team to track item, however, due to shifting priorities the item got backlogged.

@macohen
Copy link
Contributor

macohen commented Feb 5, 2024

@akuzin1 can you link to the feature request here, please?

@akuzin1
Copy link
Contributor Author

akuzin1 commented Feb 5, 2024

@macohen linking issue here fromopensearch-project/sqlrepository

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants