Skip to content

Commit

Permalink
Remove unused schema parameter and cleanup. (#2780)
Browse files Browse the repository at this point in the history
  • Loading branch information
davinchia authored Apr 7, 2021
1 parent 94093f0 commit a7b9cd5
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 32 deletions.
10 changes: 9 additions & 1 deletion airbyte-integrations/bases/base-python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,15 @@
url="https://github.com/airbytehq/airbyte",
packages=setuptools.find_packages(),
package_data={"": ["models/yaml/*.yaml"]},
install_requires=["PyYAML==5.4", "pydantic==1.6.1", "airbyte-protocol", "jsonschema==2.6.0", "requests==2.25.1", "backoff==1.10.0", "pytest"],
install_requires=[
"PyYAML==5.4",
"pydantic==1.6.1",
"airbyte-protocol",
"jsonschema==2.6.0",
"requests==2.25.1",
"backoff==1.10.0",
"pytest",
],
entry_points={
"console_scripts": ["base-python=base_python.entrypoint:main"],
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@
["ProperCased", "proper_cased"],
["camelCased", "camel_cased"],
["veryVeryLongCamelCasedName", "very_very_long_camel_cased_name"],
["throw2NumbersH3re", "throw2_numbers_h3re"]
]
["throw2NumbersH3re", "throw2_numbers_h3re"],
],
)
def test_camel_to_snake(camel_cased, snake_cased):
assert camel_to_snake(camel_cased) == snake_cased
Original file line number Diff line number Diff line change
Expand Up @@ -134,19 +134,16 @@ public AirbyteConnectionStatus check(JsonNode config) {
public AirbyteCatalog discover(JsonNode config) throws Exception {
try (final JdbcDatabase database = createDatabase(config)) {
return new AirbyteCatalog()
.withStreams(getTables(
database,
Optional.ofNullable(config.get("database")).map(JsonNode::asText),
Optional.ofNullable(config.get("schema")).map(JsonNode::asText))
.stream()
.map(t -> CatalogHelpers.createAirbyteStream(t.getName(), t.getSchemaName(), t.getFields())
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(t.getPrimaryKeys()
.stream()
.filter(Objects::nonNull)
.map(Collections::singletonList)
.collect(Collectors.toList())))
.collect(Collectors.toList()));
.withStreams(getTables(database, Optional.ofNullable(config.get("database")).map(JsonNode::asText))
.stream()
.map(t -> CatalogHelpers.createAirbyteStream(t.getName(), t.getSchemaName(), t.getFields())
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(t.getPrimaryKeys()
.stream()
.filter(Objects::nonNull)
.map(Collections::singletonList)
.collect(Collectors.toList())))
.collect(Collectors.toList()));
}
}

Expand All @@ -158,10 +155,8 @@ public AutoCloseableIterator<AirbyteMessage> read(JsonNode config, ConfiguredAir

final JdbcDatabase database = createDatabase(config);

final Map<String, TableInfoInternal> tableNameToTable = discoverInternal(
database,
Optional.ofNullable(config.get("database")).map(JsonNode::asText),
Optional.ofNullable(config.get("schema")).map(JsonNode::asText))
final Map<String, TableInfoInternal> tableNameToTable =
discoverInternal(database, Optional.ofNullable(config.get("database")).map(JsonNode::asText))
.stream()
.collect(Collectors.toMap(t -> String.format("%s.%s", t.getSchemaName(), t.getName()), Function.identity()));

Expand Down Expand Up @@ -332,12 +327,9 @@ private static AutoCloseableIterator<AirbyteMessage> getFullRefreshStream(JdbcDa
}

@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
private List<TableInfo> getTables(final JdbcDatabase database,
final Optional<String> databaseOptional,
final Optional<String> schemaOptional)
throws Exception {
final List<TableInfoInternal> tableInfos = discoverInternal(database, databaseOptional, schemaOptional);
final Map<String, List<String>> tablePrimaryKeys = discoverPrimaryKeys(database, databaseOptional, schemaOptional, tableInfos);
private List<TableInfo> getTables(final JdbcDatabase database, final Optional<String> databaseOptional) throws Exception {
final List<TableInfoInternal> tableInfos = discoverInternal(database, databaseOptional);
final Map<String, List<String>> tablePrimaryKeys = discoverPrimaryKeys(database, databaseOptional, tableInfos);
return tableInfos.stream()
.map(t -> {
// some databases return multiple copies of the same record for a column (e.g. redshift) because
Expand Down Expand Up @@ -367,12 +359,11 @@ private List<TableInfo> getTables(final JdbcDatabase database,
*/
private Map<String, List<String>> discoverPrimaryKeys(JdbcDatabase database,
Optional<String> databaseOptional,
Optional<String> schemaOptional,
List<TableInfoInternal> tableInfos) {
try {
// Get all primary keys without specifying a table name
final Map<String, List<String>> tablePrimaryKeys = aggregatePrimateKeys(database.bufferedResultSetQuery(
conn -> conn.getMetaData().getPrimaryKeys(databaseOptional.orElse(null), schemaOptional.orElse(null), null),
conn -> conn.getMetaData().getPrimaryKeys(databaseOptional.orElse(null), null, null),
r -> {
final String schemaName =
r.getObject(JDBC_COLUMN_SCHEMA_NAME) != null ? r.getString(JDBC_COLUMN_SCHEMA_NAME) : r.getString(JDBC_COLUMN_DATABASE_NAME);
Expand Down Expand Up @@ -437,13 +428,11 @@ private static void assertColumnsWithSameNameAreSame(String schemaName, String t
}

@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
private List<TableInfoInternal> discoverInternal(final JdbcDatabase database,
final Optional<String> databaseOptional,
final Optional<String> schemaOptional)
private List<TableInfoInternal> discoverInternal(final JdbcDatabase database, final Optional<String> databaseOptional)
throws Exception {
final Set<String> internalSchemas = new HashSet<>(getExcludedInternalSchemas());
return database.bufferedResultSetQuery(
conn -> conn.getMetaData().getColumns(databaseOptional.orElse(null), schemaOptional.orElse(null), null, null),
conn -> conn.getMetaData().getColumns(databaseOptional.orElse(null), null, null, null),
resultSet -> Jsons.jsonNode(ImmutableMap.<String, Object>builder()
// we always want a namespace, if we cannot get a schema, use db name.
.put(INTERNAL_SCHEMA_NAME,
Expand Down

0 comments on commit a7b9cd5

Please sign in to comment.