diff --git a/online/src/main/scala/ai/chronon/online/MetadataStore.scala b/online/src/main/scala/ai/chronon/online/MetadataStore.scala index f89314855..9b38db757 100644 --- a/online/src/main/scala/ai/chronon/online/MetadataStore.scala +++ b/online/src/main/scala/ai/chronon/online/MetadataStore.scala @@ -39,7 +39,7 @@ case class DataMetrics(series: Seq[(Long, SortedMap[String, Any])]) class MetadataStore(kvStore: KVStore, val dataset: String = ChrononMetadataKey, timeoutMillis: Long) { @transient implicit lazy val logger = LoggerFactory.getLogger(getClass) private var partitionSpec = PartitionSpec(format = "yyyy-MM-dd", spanMillis = WindowUtils.Day.millis) - private val CONF_BATCH_SIZE = 50 + private val CONF_BATCH_SIZE = 100 // Note this should match with the format used in the warehouse def setPartitionMeta(format: String, spanMillis: Long): Unit = { @@ -149,43 +149,16 @@ class MetadataStore(kvStore: KVStore, val dataset: String = ChrononMetadataKey, }, { gb => Metrics.Context(environment = "group_by.serving_info.fetch", groupBy = gb) }) - // upload the materialized JSONs to KV store: - // key = // in bytes e.g joins/team/team.example_join.v1 value = materialized json string in bytes - def putConf(configPath: String): Future[Seq[Boolean]] = { - val configFile = new File(configPath) - assert(configFile.exists(), s"$configFile does not exist") - logger.info(s"Uploading Chronon configs from $configPath") - val fileList = listFiles(configFile) + // derive a key from path to file + def pathToKey(confPath: String): String = { + // capture // as key e.g joins/team/team.example_join.v1 + confPath.split("/").takeRight(3).mkString("/") + } - val puts = fileList - .filter { file => - val name = parseName(file.getPath) - if (name.isEmpty) logger.info(s"Skipping invalid file ${file.getPath}") - name.isDefined - } - .flatMap { file => - val path = file.getPath - val confJsonOpt = path match { - case value if value.contains("staging_queries/") => loadJson[StagingQuery](value) - case value if value.contains("joins/") => loadJson[Join](value) - case value if value.contains("group_bys/") => loadJson[GroupBy](value) - case _ => logger.info(s"unknown config type in file $path"); None - } - val key = path.confPathToKey - confJsonOpt.map { conf => - logger.info(s"""Putting metadata for - |key: $key - |conf: $conf""".stripMargin) - PutRequest(keyBytes = key.getBytes(), - valueBytes = conf.getBytes(), - dataset = dataset, - tsMillis = Some(System.currentTimeMillis())) - } - } - val putsBatches = puts.grouped(CONF_BATCH_SIZE).toSeq - logger.info(s"Putting ${puts.size} configs to KV Store, dataset=$dataset") - val futures = putsBatches.map(batch => kvStore.multiPut(batch)) - Future.sequence(futures).map(_.flatten) + // derive team from path to file + def pathToTeam(confPath: String): String = { + // capture / as key e.g group_bys/host_churn + confPath.split("/").takeRight(3).take(2).mkString("/") } // list file recursively @@ -233,4 +206,83 @@ class MetadataStore(kvStore: KVStore, val dataset: String = ChrononMetadataKey, None } } + + // upload the metadata to KV store: + def putConf(configPath: String): Future[Seq[Boolean]] = { + val configFile = new File(configPath) + assert(configFile.exists(), s"$configFile does not exist") + logger.info(s"Uploading Chronon configs from $configPath") + val fileList = listFiles(configFile).filter { file => + val name = parseName(file.getPath) + if (name.isEmpty) logger.info(s"Skipping invalid file ${file.getPath}") + name.isDefined + } + + // Upload materialized JSONs to KV store by name to KV store + val putsByName: Seq[PutRequest] = putConfByName(fileList) + + // Upload entities list to KV store by team to KV store + val putsByTeam: Seq[PutRequest] = putConfByTeam(fileList) + + val puts = putsByName ++ putsByTeam + + val putsBatches = puts.grouped(CONF_BATCH_SIZE).toSeq + logger.info(s"Putting ${puts.size} configs to KV Store, dataset=$dataset") + val futures = putsBatches.map(batch => kvStore.multiPut(batch)) + Future.sequence(futures).map(_.flatten) + } + + // upload the materialized JSONs to KV store: + // key = // in bytes e.g joins/team/team.example_join.v1 value = materialized json string in bytes + def putConfByName(fileList: Seq[File]): Seq[PutRequest] = { + fileList + .flatMap { file => + val path = file.getPath + val key = pathToKey(path) + val confJsonOpt = path match { + case value if value.contains("staging_queries/") => loadJson[StagingQuery](value) + case value if value.contains("joins/") => loadJson[Join](value) + case value if value.contains("group_bys/") => loadJson[GroupBy](value) + case _ => logger.info(s"unknown config type in file $path"); None + } + + confJsonOpt.map { value => + logger.info(s"""Putting metadata for + |key: $key + |conf: $value""".stripMargin) + PutRequest(keyBytes = key.getBytes(), + valueBytes = value.getBytes(), + dataset = dataset, + tsMillis = Some(System.currentTimeMillis())) + } + } + } + + // upload the configs list by team to KV store: + // key = / in bytes, e.g group_bys/host_churn + // value = list of config names in bytes, e.g List(group_bys/host_churn/agg_p2_impressions.v1, group_bys/host_churn/agg_deactivations.v1) + def putConfByTeam(fileList: Seq[File]): Seq[PutRequest] = { + val kvPairs: Map[String, List[String]] = fileList.foldLeft(Map.empty[String, List[String]]) { (map, file) => + { + val path = file.getPath + val key = pathToTeam(path) + val value = pathToKey(path) + val updatedList = map.getOrElse(key, List()) :+ value + map + (key -> updatedList) + } + } + + kvPairs.map { + case (key, list) => { + val jsonString = "[" + list.map(name => "\"" + name + "\"").mkString(",") + "]" + logger.info(s"""Putting metadata for + |key: $key + |conf: $jsonString""".stripMargin) + PutRequest(keyBytes = key.getBytes(), + valueBytes = jsonString.getBytes(), + dataset = dataset, + tsMillis = Some(System.currentTimeMillis())) + } + }.toSeq + } } diff --git a/spark/src/main/scala/ai/chronon/spark/Driver.scala b/spark/src/main/scala/ai/chronon/spark/Driver.scala index e17536c08..a0c4b7902 100644 --- a/spark/src/main/scala/ai/chronon/spark/Driver.scala +++ b/spark/src/main/scala/ai/chronon/spark/Driver.scala @@ -732,10 +732,10 @@ object Driver { } def run(args: Args): Unit = { - val putRequest = args.metaDataStore.putConf(args.confPath()) - val res = Await.result(putRequest, 1.hour) - logger.info( - s"Uploaded Chronon Configs to the KV store, success count = ${res.count(v => v)}, failure count = ${res.count(!_)}") + val putRequests = args.metaDataStore.putConf(args.confPath()) + val res = Await.result(putRequests, 1.hour) + logger.info(s"Uploaded Chronon ${args.confPath} Configs to the KV store, success count = ${res.count(v => + v)}, failure count = ${res.count(!_)}") } } diff --git a/spark/src/test/scala/ai/chronon/spark/test/FetcherTest.scala b/spark/src/test/scala/ai/chronon/spark/test/FetcherTest.scala index 3f10a9ed5..834bd6a2b 100644 --- a/spark/src/test/scala/ai/chronon/spark/test/FetcherTest.scala +++ b/spark/src/test/scala/ai/chronon/spark/test/FetcherTest.scala @@ -74,7 +74,7 @@ class FetcherTest extends TestCase { val singleFileMetadataStore = new MetadataStore(inMemoryKvStore, singleFileDataSet, timeoutMillis = 10000) inMemoryKvStore.create(singleFileDataSet) // set the working directory to /chronon instead of $MODULE_DIR in configuration if Intellij fails testing - val singleFilePut = singleFileMetadataStore.putConf(confResource.getPath) + val singleFilePut = singleFileMetadataStore.putConfByName(confResource.getPath) Await.result(singleFilePut, Duration.Inf) val response = inMemoryKvStore.get(GetRequest(joinPath.getBytes(), singleFileDataSet)) val res = Await.result(response, Duration.Inf) @@ -86,7 +86,7 @@ class FetcherTest extends TestCase { val directoryDataSetDataSet = ChrononMetadataKey + "_directory_test" val directoryMetadataStore = new MetadataStore(inMemoryKvStore, directoryDataSetDataSet, timeoutMillis = 10000) inMemoryKvStore.create(directoryDataSetDataSet) - val directoryPut = directoryMetadataStore.putConf(confResource.getPath.replace(s"/$joinPath", "")) + val directoryPut = directoryMetadataStore.putConfByName(confResource.getPath.replace(s"/$joinPath", "")) Await.result(directoryPut, Duration.Inf) val dirResponse = inMemoryKvStore.get(GetRequest(joinPath.getBytes(), directoryDataSetDataSet))