Skip to content

Commit

Permalink
Added support for monitoring tables using wildcard Catalog pattern
Browse files Browse the repository at this point in the history
  • Loading branch information
himanishk committed May 24, 2023
1 parent 1292075 commit a1bff01
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 18 deletions.
7 changes: 6 additions & 1 deletion docs/content/developer_guide/configurationtables.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,12 @@ Table used for adding databases/paths/individual table to be tracked by DeltaOMS

Typical usage for adding input source configuration :
```$sql
-- Adding a Database. OMS will discover all Delta tables in the database.
-- Adding a Catalog or Catalogs by Pattern
INSERT INTO <OMSCATALOGNAME>.<OMSDBNAME>.sourceconfig values('<Catalog Name>', false)
INSERT INTO <OMSCATALOGNAME>.<OMSDBNAME>.sourceconfig values('<Catalog Pattern A>*', false)
INSERT INTO <OMSCATALOGNAME>.<OMSDBNAME>.sourceconfig values('<Catalog Pattern A>*|<Catalog Pattern B>*', false)
-- Adding a Database/Schema. OMS will discover all Delta tables in the schema/database.
-- ConfigurePaths process could take longer depending on number of tables in the database.
INSERT INTO <OMSDBNAME>.sourceconfig values('<Catalog Name>.<Schema Name>', false)
Expand Down
2 changes: 1 addition & 1 deletion docs/content/faq/execution.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ using simple SQL `INSERT` statement.

Example:

`INSERT INTO <omsDBName>.sourceconfig VALUES('<Database Name>',false, Map('wildCardLevel','0'))`
`INSERT INTO <omsCatalogName>.<omsDBName>.sourceconfig VALUES('<Database Name>',false)`

For more details on the configurations and parameters, refer to [Getting Started]({{%relref "getting_started/_index.md" %}})
and [Developer Guide]({{%relref "developer_guide/_index.md" %}})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,22 +37,39 @@ trait UtilityOperations extends Serializable with Logging {
import spark.implicits._

if (!sourceConfig.path.contains("/") && !sourceConfig.path.contains(".")) {
// CATALOG
if (sourceConfig.path.equalsIgnoreCase("hive_metastore")) {
throw new RuntimeException(s"${sourceConfig.path} catalog is not supported. " +
s"Configure databases from ${sourceConfig.path} catalog instead")
}
val tableList = spark.sql(s"SELECT (table_catalog || '.`' || " +
s"table_schema || '`.`' || table_name || '`' ) as qualifiedName " +
s"FROM ${sourceConfig.path}.information_schema.tables " +
s"WHERE table_schema <> 'information_schema' " +
s"AND table_type <> 'VIEW' " +
s"AND (data_source_format = 'DELTA' OR data_source_format = 'delta')").as[String].collect

tableList.map { tl =>
getTableLocation(tl) match {
case (t, l) => (t, l, Map("wildCardLevel" -> "1"))
val catalogs: Array[String] =
// PATTERN MATCHED CATALOGS
if (sourceConfig.path.contains("*") || sourceConfig.path.contains("|")) {
val catalogPattern = sourceConfig.path
spark.sql(s"SHOW CATALOGS LIKE '${catalogPattern}'")
.as[String].collect.filterNot(_ == "hive_metastore").filterNot(_.startsWith("_"))
} else {
// SINGLE CATALOG
if (sourceConfig.path.equalsIgnoreCase("hive_metastore")) {
throw new RuntimeException(s"${sourceConfig.path} catalog is not supported. " +
s"Configure individual schemas/databases from ${sourceConfig.path} catalog instead")
}
Array(sourceConfig.path)
}
val tableList = catalogs.flatMap(cat => {
val catalogTablesTry = Try { spark.sql(s"SELECT ('`' || table_catalog || '`.`' || " +
s"table_schema || '`.`' || table_name || '`' ) as qualifiedName " +
s"FROM `${cat}`.information_schema.tables " +
s"WHERE table_schema <> 'information_schema' " +
s"AND table_type <> 'VIEW' " +
s"AND (data_source_format = 'DELTA' OR data_source_format = 'delta')").as[String].collect
}
catalogTablesTry match {
case Success(tabs) => tabs
case Failure(ex) => logError(s"Error while accessing catalog ${cat} : $ex")
Array.empty[String]
}
})
tableList.map { tl =>
getTableLocation(tl) match {
case (t, l) => (t, l, Map("wildCardLevel" -> "1"))
}
}
} else if (!sourceConfig.path.contains("/")
&& sourceConfig.path.count(_ == '.') == 1) {
Expand All @@ -70,9 +87,9 @@ trait UtilityOperations extends Serializable with Logging {
lit("`.`"), col("tableName"),
lit("`")).as("qualifiedName")).as[String].collect()
} else {
spark.sql(s"SELECT (table_catalog || '.`' " +
spark.sql(s"SELECT ('`' || table_catalog || '`.`' " +
s"|| table_schema || '`.`' || table_name || '`' ) as qualifiedName " +
s"FROM ${catalogName}.information_schema.tables " +
s"FROM `${catalogName}`.information_schema.tables " +
s"WHERE table_schema = '${schemaName}' AND table_type <> 'VIEW' " +
s"AND (data_source_format = 'DELTA' OR data_source_format = 'delta')").as[String].collect
}
Expand Down

0 comments on commit a1bff01

Please sign in to comment.