diff --git a/docs/content/developer_guide/configurationtables.md b/docs/content/developer_guide/configurationtables.md index b317d3d..d1f1273 100644 --- a/docs/content/developer_guide/configurationtables.md +++ b/docs/content/developer_guide/configurationtables.md @@ -18,7 +18,12 @@ Table used for adding databases/paths/individual table to be tracked by DeltaOMS Typical usage for adding input source configuration : ```$sql --- Adding a Database. OMS will discover all Delta tables in the database. +-- Adding a Catalog or Catalogs by Pattern +INSERT INTO ..sourceconfig values('', false) +INSERT INTO ..sourceconfig values('*', false) +INSERT INTO ..sourceconfig values('*|*', false) + +-- Adding a Database/Schema. OMS will discover all Delta tables in the schema/database. -- ConfigurePaths process could take longer depending on number of tables in the database. INSERT INTO .sourceconfig values('.', false) diff --git a/docs/content/faq/execution.md b/docs/content/faq/execution.md index 44dd8d7..1ed9b13 100644 --- a/docs/content/faq/execution.md +++ b/docs/content/faq/execution.md @@ -16,7 +16,7 @@ using simple SQL `INSERT` statement. Example: -`INSERT INTO .sourceconfig VALUES('',false, Map('wildCardLevel','0'))` +`INSERT INTO ..sourceconfig VALUES('',false)` For more details on the configurations and parameters, refer to [Getting Started]({{%relref "getting_started/_index.md" %}}) and [Developer Guide]({{%relref "developer_guide/_index.md" %}}) diff --git a/src/main/scala/com/databricks/labs/deltaoms/utils/UtilityOperations.scala b/src/main/scala/com/databricks/labs/deltaoms/utils/UtilityOperations.scala index c6b08b6..f3bd463 100644 --- a/src/main/scala/com/databricks/labs/deltaoms/utils/UtilityOperations.scala +++ b/src/main/scala/com/databricks/labs/deltaoms/utils/UtilityOperations.scala @@ -37,22 +37,39 @@ trait UtilityOperations extends Serializable with Logging { import spark.implicits._ if (!sourceConfig.path.contains("/") && !sourceConfig.path.contains(".")) { - // CATALOG - if (sourceConfig.path.equalsIgnoreCase("hive_metastore")) { - throw new RuntimeException(s"${sourceConfig.path} catalog is not supported. " + - s"Configure databases from ${sourceConfig.path} catalog instead") - } - val tableList = spark.sql(s"SELECT (table_catalog || '.`' || " + - s"table_schema || '`.`' || table_name || '`' ) as qualifiedName " + - s"FROM ${sourceConfig.path}.information_schema.tables " + - s"WHERE table_schema <> 'information_schema' " + - s"AND table_type <> 'VIEW' " + - s"AND (data_source_format = 'DELTA' OR data_source_format = 'delta')").as[String].collect - tableList.map { tl => - getTableLocation(tl) match { - case (t, l) => (t, l, Map("wildCardLevel" -> "1")) + val catalogs: Array[String] = + // PATTERN MATCHED CATALOGS + if (sourceConfig.path.contains("*") || sourceConfig.path.contains("|")) { + val catalogPattern = sourceConfig.path + spark.sql(s"SHOW CATALOGS LIKE '${catalogPattern}'") + .as[String].collect.filterNot(_ == "hive_metastore").filterNot(_.startsWith("_")) + } else { + // SINGLE CATALOG + if (sourceConfig.path.equalsIgnoreCase("hive_metastore")) { + throw new RuntimeException(s"${sourceConfig.path} catalog is not supported. " + + s"Configure individual schemas/databases from ${sourceConfig.path} catalog instead") } + Array(sourceConfig.path) + } + val tableList = catalogs.flatMap(cat => { + val catalogTablesTry = Try { spark.sql(s"SELECT ('`' || table_catalog || '`.`' || " + + s"table_schema || '`.`' || table_name || '`' ) as qualifiedName " + + s"FROM `${cat}`.information_schema.tables " + + s"WHERE table_schema <> 'information_schema' " + + s"AND table_type <> 'VIEW' " + + s"AND (data_source_format = 'DELTA' OR data_source_format = 'delta')").as[String].collect + } + catalogTablesTry match { + case Success(tabs) => tabs + case Failure(ex) => logError(s"Error while accessing catalog ${cat} : $ex") + Array.empty[String] + } + }) + tableList.map { tl => + getTableLocation(tl) match { + case (t, l) => (t, l, Map("wildCardLevel" -> "1")) + } } } else if (!sourceConfig.path.contains("/") && sourceConfig.path.count(_ == '.') == 1) { @@ -70,9 +87,9 @@ trait UtilityOperations extends Serializable with Logging { lit("`.`"), col("tableName"), lit("`")).as("qualifiedName")).as[String].collect() } else { - spark.sql(s"SELECT (table_catalog || '.`' " + + spark.sql(s"SELECT ('`' || table_catalog || '`.`' " + s"|| table_schema || '`.`' || table_name || '`' ) as qualifiedName " + - s"FROM ${catalogName}.information_schema.tables " + + s"FROM `${catalogName}`.information_schema.tables " + s"WHERE table_schema = '${schemaName}' AND table_type <> 'VIEW' " + s"AND (data_source_format = 'DELTA' OR data_source_format = 'delta')").as[String].collect }