Skip to content

Commit

Permalink
#5 [SV] - Master dataset processing pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
SanthoshVasabhaktula committed Apr 10, 2023
1 parent bbbbd36 commit 8a6e896
Show file tree
Hide file tree
Showing 32 changed files with 858 additions and 81 deletions.
27 changes: 5 additions & 22 deletions dataset-registry/src/main/resources/dataset-registry.sql
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
CREATE TABLE IF NOT EXISTS datasets (
id text PRIMARY KEY,
type text NOT NULL,
validation_config json,
extraction_config json,
dedup_config json,
data_schema json,
denorm_config json,
router_config json NOT NULL,
dataset_config json NOT NULL,
status text NOT NULL,
created_by text NOT NULL,
updated_by text NOT NULL,
Expand All @@ -16,10 +18,10 @@ CREATE TABLE IF NOT EXISTS datasets (
CREATE INDEX IF NOT EXISTS datasets_status ON datasets(status);

CREATE TABLE IF NOT EXISTS datasources (
id text PRIMARY KEY,
datasource text PRIMARY KEY,
dataset_id text REFERENCES datasets (id),
ingestion_spec json NOT NULL,
datasource text NOT NULL,
datasource_ref text NOT NULL,
retention_period json,
archival_policy json,
purge_policy json,
Expand Down Expand Up @@ -48,31 +50,12 @@ CREATE TABLE IF NOT EXISTS dataset_transformations (
CREATE INDEX IF NOT EXISTS dataset_transformations_status ON dataset_transformations(status);
CREATE INDEX IF NOT EXISTS dataset_transformations_dataset ON dataset_transformations(dataset_id);

CREATE TABLE IF NOT EXISTS datasets_denorm_config (
id SERIAL PRIMARY KEY,
field_key text NOT NULL,
dataset_id text NOT NULL REFERENCES datasets (id),
connector_type text NOT NULL,
connector_config json NOT NULL,
data_schema json NOT NULL,
field_out_key text NOT NULL,
exclude_fields text[],
field_transformations json,
status text NOT NULL,
created_by text NOT NULL,
updated_by text NOT NULL,
created_date Date NOT NULL,
updated_date Date NOT NULL,
UNIQUE(field_key, dataset_id)
);
CREATE INDEX IF NOT EXISTS datasets_denorm_config_status ON datasets_denorm_config(status);
CREATE INDEX IF NOT EXISTS datasets_denorm_config_dataset ON datasets_denorm_config(dataset_id);

CREATE TABLE IF NOT EXISTS datasets_source_config (
id SERIAL PRIMARY KEY,
dataset_id text NOT NULL REFERENCES datasets (id),
connector_type text NOT NULL,
connector_config json NOT NULL,
connector_stats json NOT NULL,
status text NOT NULL,
created_by text NOT NULL,
updated_by text NOT NULL,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,15 @@ object DatasetModels {

case class RouterConfig(@JsonProperty("topic") topic: String)

case class Dataset(@JsonProperty("id") id: String, @JsonProperty("extraction_config") extractionConfig: Option[ExtractionConfig],
case class DatasetConfig(@JsonProperty("data_key") key: String, @JsonProperty("timestamp_key") tsKey: String,
@JsonProperty("entry_topic") entryTopic: String, @JsonProperty("exclude_fields") excludeFields: Option[List[String]] = None,
@JsonProperty("redis_db_host") redisDBHost: Option[String] = None, @JsonProperty("redis_db_port") redisDBPort: Option[Int] = None,
@JsonProperty("redis_db") redisDB: Option[Int] = None)

case class Dataset(@JsonProperty("id") id: String, @JsonProperty("type") datasetType: String , @JsonProperty("extraction_config") extractionConfig: Option[ExtractionConfig],
@JsonProperty("dedup_config") dedupConfig: Option[DedupConfig], @JsonProperty("validation_config") validationConfig: Option[ValidationConfig],
@JsonProperty("data_schema") jsonSchema: Option[String], @JsonProperty("denorm_config") denormConfig: Option[DenormConfig],
@JsonProperty("router_config") routerConfig: RouterConfig, @JsonProperty("status") status: String)

case class Datasource(@JsonProperty("id") id: String, @JsonProperty("dataset_id") datasetId: String,
@JsonProperty("ingestion_spec") ingestionSpec: String, @JsonProperty("datasource") datasource: String,
@JsonProperty("retention_policy") retentionPolicy: Option[String], @JsonProperty("archival_policy") archivalPolicy: Option[String],
@JsonProperty("purge_policy") purgePolicy: Option[String], @JsonProperty("backup_config") backupConfig: Option[String],
@JsonProperty("status") status: String)
@JsonProperty("router_config") routerConfig: RouterConfig, datasetConfig: DatasetConfig, @JsonProperty("status") status: String)

case class Condition(@JsonProperty("type") `type`: String, @JsonProperty("expr") expr: String)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ object DatasetRegistry {
private val datasets: Map[String, Dataset] = DatasetRegistryService.readAllDatasets()
private val datasetTransformations: Map[String, List[DatasetTransformation]] = DatasetRegistryService.readAllDatasetTransformations()

def getAllDatasets(): List[Dataset] = {
datasets.values.toList
def getAllDatasets(datasetType: String): List[Dataset] = {
datasets.filter(f => f._2.datasetType.equals(datasetType)).values.toList
}

def getDataset(id: String): Option[Dataset] = {
Expand All @@ -20,8 +20,8 @@ object DatasetRegistry {
datasetTransformations.get(id)
}

def getDataSetIds(): List[String] = {
datasets.keySet.toList
def getDataSetIds(datasetType: String): List[String] = {
datasets.filter(f => f._2.datasetType.equals(datasetType)).keySet.toList
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@ package org.sunbird.obsrv.service

import com.typesafe.config.{Config, ConfigFactory}
import org.sunbird.obsrv.core.util.{JSONUtil, PostgresConnect, PostgresConnectionConfig}
import org.sunbird.obsrv.model.DatasetModels.{Dataset, DatasetTransformation, DedupConfig, DenormConfig, ExtractionConfig, RouterConfig, TransformationFunction, ValidationConfig}
import org.sunbird.obsrv.model.DatasetModels.{Dataset, DatasetConfig, DatasetTransformation, DedupConfig, DenormConfig, ExtractionConfig, RouterConfig, TransformationFunction, ValidationConfig}

import java.io.File
import java.sql.ResultSet

object DatasetRegistryService {

val configFile = new File("/data/conf/base-config.conf")
private val configFile = new File("/data/conf/base-config.conf")
val config: Config = if (configFile.exists()) {
ConfigFactory.parseFile(configFile).resolve()
} else {
Expand Down Expand Up @@ -58,22 +58,25 @@ object DatasetRegistryService {

private def parseDataset(rs: ResultSet): Dataset = {
val datasetId = rs.getString("id")
val datasetType = rs.getString("type")
val validationConfig = rs.getString("validation_config")
val extractionConfig = rs.getString("extraction_config")
val dedupConfig = rs.getString("dedup_config")
val jsonSchema = rs.getString("data_schema")
val denormConfig = rs.getString("denorm_config")
val routerConfig = rs.getString("router_config")
val datasetConfig = rs.getString("dataset_config")
val status = rs.getString("status")

Dataset(datasetId,
Dataset(datasetId, datasetType,
if (extractionConfig == null) None else Some(JSONUtil.deserialize[ExtractionConfig](extractionConfig)),
if (dedupConfig == null) None else Some(JSONUtil.deserialize[DedupConfig](dedupConfig)),
if (validationConfig == null) None else Some(JSONUtil.deserialize[ValidationConfig](validationConfig)),
// if (jsonSchema == null) None else Some(jsonSchema),
Option(jsonSchema),
if (denormConfig == null) None else Some(JSONUtil.deserialize[DenormConfig](denormConfig)),
JSONUtil.deserialize[RouterConfig](routerConfig),
JSONUtil.deserialize[DatasetConfig](datasetConfig),
status
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,19 @@ class BaseSpecWithDatasetRegistry extends BaseSpecWithPostgres {

private def createSchema(postgresConnect: PostgresConnect) {

postgresConnect.execute("CREATE TABLE IF NOT EXISTS datasets ( id text PRIMARY KEY, validation_config json, extraction_config json, dedup_config json, data_schema json, denorm_config json, router_config json NOT NULL, status text NOT NULL, created_by text NOT NULL, updated_by text NOT NULL, created_date timestamp NOT NULL, updated_date timestamp NOT NULL );")
postgresConnect.execute("CREATE TABLE IF NOT EXISTS datasources ( id text PRIMARY KEY, dataset_id text REFERENCES datasets (id), ingestion_spec json NOT NULL, datasource text NOT NULL, retention_period json, archival_policy json, purge_policy json, backup_config json NOT NULL, status text NOT NULL, created_by text NOT NULL, updated_by text NOT NULL, created_date Date NOT NULL, updated_date Date NOT NULL );")
postgresConnect.execute("CREATE TABLE IF NOT EXISTS datasets ( id text PRIMARY KEY, type text NOT NULL, validation_config json, extraction_config json, dedup_config json, data_schema json, denorm_config json, router_config json NOT NULL, dataset_config json NOT NULL, status text NOT NULL, created_by text NOT NULL, updated_by text NOT NULL, created_date timestamp NOT NULL, updated_date timestamp NOT NULL );")
postgresConnect.execute("CREATE TABLE IF NOT EXISTS datasources ( id text PRIMARY KEY, dataset_id text REFERENCES datasets (id), ingestion_spec json NOT NULL, datasource text NOT NULL, datasource_ref text NOT NULL, retention_period json, archival_policy json, purge_policy json, backup_config json NOT NULL, status text NOT NULL, created_by text NOT NULL, updated_by text NOT NULL, created_date Date NOT NULL, updated_date Date NOT NULL );")
postgresConnect.execute("CREATE TABLE IF NOT EXISTS dataset_transformations ( id text PRIMARY KEY, dataset_id text REFERENCES datasets (id), field_key text NOT NULL, transformation_function text NOT NULL, status text NOT NULL, created_by text NOT NULL, updated_by text NOT NULL, created_date Date NOT NULL, updated_date Date NOT NULL, UNIQUE(field_key, dataset_id) );")
postgresConnect.execute("CREATE TABLE IF NOT EXISTS datasets_denorm_config ( id SERIAL PRIMARY KEY, field_key text NOT NULL, dataset_id text NOT NULL REFERENCES datasets (id), connector_type text NOT NULL, connector_config json NOT NULL, data_schema json NOT NULL, field_out_key text NOT NULL, exclude_fields text[], field_transformations json, status text NOT NULL, created_by text NOT NULL, updated_by text NOT NULL, created_date Date NOT NULL, updated_date Date NOT NULL, UNIQUE(field_key, dataset_id) );")
postgresConnect.execute("CREATE TABLE IF NOT EXISTS datasets_source_config ( id SERIAL PRIMARY KEY, dataset_id text NOT NULL REFERENCES datasets (id), connector_type text NOT NULL, connector_config json NOT NULL, status text NOT NULL, created_by text NOT NULL, updated_by text NOT NULL, created_date Date NOT NULL, updated_date Date NOT NULL, UNIQUE(dataset_id) );")
}

private def insertTestData(postgresConnect: PostgresConnect): Unit = {
postgresConnect.execute("insert into datasets(id, data_schema, validation_config, extraction_config, dedup_config, router_config, status, created_by, updated_by, created_date, updated_date) values ('d1', '{\"$schema\":\"https://json-schema.org/draft/2020-12/schema\",\"id\":\"https://sunbird.obsrv.com/test.json\",\"title\":\"Test Schema\",\"description\":\"Test Schema\",\"type\":\"object\",\"properties\":{\"id\":{\"type\":\"string\"},\"vehicleCode\":{\"type\":\"string\"},\"date\":{\"type\":\"string\"},\"dealer\":{\"type\":\"object\",\"properties\":{\"dealerCode\":{\"type\":\"string\"},\"locationId\":{\"type\":\"string\"},\"email\":{\"type\":\"string\"},\"phone\":{\"type\":\"string\"}},\"required\":[\"dealerCode\",\"locationId\"]},\"metrics\":{\"type\":\"object\",\"properties\":{\"bookingsTaken\":{\"type\":\"number\"},\"deliveriesPromised\":{\"type\":\"number\"},\"deliveriesDone\":{\"type\":\"number\"}}}},\"required\":[\"id\",\"vehicleCode\",\"date\",\"dealer\",\"metrics\"]}', '{\"validate\": true, \"mode\": \"Strict\"}', '{\"is_batch_event\": true, \"extraction_key\": \"events\", \"dedup_config\": {\"drop_duplicates\": true, \"dedup_key\": \"id\", \"dedup_period\": 3}}', '{\"drop_duplicates\": true, \"dedup_key\": \"id\", \"dedup_period\": 3}', '{\"topic\":\"d1-events\"}', 'ACTIVE', 'System', 'System', now(), now());")
postgresConnect.execute("insert into datasets(id, type, data_schema, validation_config, extraction_config, dedup_config, router_config, dataset_config, status, created_by, updated_by, created_date, updated_date) values ('d1', 'dataset', '{\"$schema\":\"https://json-schema.org/draft/2020-12/schema\",\"id\":\"https://sunbird.obsrv.com/test.json\",\"title\":\"Test Schema\",\"description\":\"Test Schema\",\"type\":\"object\",\"properties\":{\"id\":{\"type\":\"string\"},\"vehicleCode\":{\"type\":\"string\"},\"date\":{\"type\":\"string\"},\"dealer\":{\"type\":\"object\",\"properties\":{\"dealerCode\":{\"type\":\"string\"},\"locationId\":{\"type\":\"string\"},\"email\":{\"type\":\"string\"},\"phone\":{\"type\":\"string\"}},\"required\":[\"dealerCode\",\"locationId\"]},\"metrics\":{\"type\":\"object\",\"properties\":{\"bookingsTaken\":{\"type\":\"number\"},\"deliveriesPromised\":{\"type\":\"number\"},\"deliveriesDone\":{\"type\":\"number\"}}}},\"required\":[\"id\",\"vehicleCode\",\"date\",\"dealer\",\"metrics\"]}', '{\"validate\": true, \"mode\": \"Strict\"}', '{\"is_batch_event\": true, \"extraction_key\": \"events\", \"dedup_config\": {\"drop_duplicates\": true, \"dedup_key\": \"id\", \"dedup_period\": 3}}', '{\"drop_duplicates\": true, \"dedup_key\": \"id\", \"dedup_period\": 3}', '{\"topic\":\"d1-events\"}', '{\"data_key\":\"id\",\"timestamp_key\":\"date\",\"entry_topic\":\"ingest\",\"redis_db_host\":\"localhost\",\"redis_db_port\":\"6340\",\"redis_db\":2}', 'ACTIVE', 'System', 'System', now(), now());")
postgresConnect.execute("update datasets set denorm_config = '{\"redis_db_host\":\"localhost\",\"redis_db_port\":\"6340\",\"denorm_fields\":[{\"denorm_key\":\"vehicleCode\",\"redis_db\":2,\"denorm_out_field\":\"vehicleData\"}]}' where id='d1';")
postgresConnect.execute("insert into dataset_transformations values('tf1', 'd1', 'dealer.email', '{\"type\":\"mask\",\"expr\":\"dealer.email\"}', 'active', 'System', 'System', now(), now());")
postgresConnect.execute("insert into dataset_transformations values('tf2', 'd1', 'dealer.maskedPhone', '{\"type\":\"mask\",\"expr\": \"dealer.phone\"}', 'active', 'System', 'System', now(), now());")
postgresConnect.execute("insert into datasets(id, data_schema, router_config, status, created_by, updated_by, created_date, updated_date) values ('d2', '{\"$schema\":\"https://json-schema.org/draft/2020-12/schema\",\"id\":\"https://sunbird.obsrv.com/test.json\",\"title\":\"Test Schema\",\"description\":\"Test Schema\",\"type\":\"object\",\"properties\":{\"id\":{\"type\":\"string\"},\"vehicleCode\":{\"type\":\"string\"},\"date\":{\"type\":\"string\"},\"dealer\":{\"type\":\"object\",\"properties\":{\"dealerCode\":{\"type\":\"string\"},\"locationId\":{\"type\":\"string\"},\"email\":{\"type\":\"string\"},\"phone\":{\"type\":\"string\"}},\"required\":[\"dealerCode\",\"locationId\"]},\"metrics\":{\"type\":\"object\",\"properties\":{\"bookingsTaken\":{\"type\":\"number\"},\"deliveriesPromised\":{\"type\":\"number\"},\"deliveriesDone\":{\"type\":\"number\"}}}},\"required\":[\"id\",\"vehicleCode\",\"date\",\"dealer\",\"metrics\"]}', '{\"topic\":\"d1-events\"}', 'ACTIVE', 'System', 'System', now(), now());")
postgresConnect.execute("insert into datasets(id, type, data_schema, router_config, dataset_config, status, created_by, updated_by, created_date, updated_date) values ('d2', 'dataset', '{\"$schema\":\"https://json-schema.org/draft/2020-12/schema\",\"id\":\"https://sunbird.obsrv.com/test.json\",\"title\":\"Test Schema\",\"description\":\"Test Schema\",\"type\":\"object\",\"properties\":{\"id\":{\"type\":\"string\"},\"vehicleCode\":{\"type\":\"string\"},\"date\":{\"type\":\"string\"},\"dealer\":{\"type\":\"object\",\"properties\":{\"dealerCode\":{\"type\":\"string\"},\"locationId\":{\"type\":\"string\"},\"email\":{\"type\":\"string\"},\"phone\":{\"type\":\"string\"}},\"required\":[\"dealerCode\",\"locationId\"]},\"metrics\":{\"type\":\"object\",\"properties\":{\"bookingsTaken\":{\"type\":\"number\"},\"deliveriesPromised\":{\"type\":\"number\"},\"deliveriesDone\":{\"type\":\"number\"}}}},\"required\":[\"id\",\"vehicleCode\",\"date\",\"dealer\",\"metrics\"]}', '{\"topic\":\"d1-events\"}', '{\"data_key\":\"id\",\"timestamp_key\":\"date\",\"entry_topic\":\"ingest\"}', 'ACTIVE', 'System', 'System', now(), now());")
}

}
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package org.sunbird.obsrv.spec

import com.typesafe.config.{Config, ConfigFactory}
import org.scalatest.Matchers
import org.scalatestplus.mockito.MockitoSugar
import org.sunbird.obsrv.registry.DatasetRegistry
Expand All @@ -18,14 +17,14 @@ class TestDatasetRegistrySpec extends BaseSpecWithDatasetRegistry with Matchers
d2Opt.get.id should be ("d2")
d2Opt.get.denormConfig should be (None)

val allDatasets = DatasetRegistry.getAllDatasets()
val allDatasets = DatasetRegistry.getAllDatasets("dataset")
allDatasets.size should be (2)

val d1Tfs = DatasetRegistry.getDatasetTransformations("d1")
d1Tfs should not be (None)
d1Tfs.get.size should be (2)

val ids = DatasetRegistry.getDataSetIds()
val ids = DatasetRegistry.getDataSetIds("dataset")
ids.head should be ("d1")
ids.last should be ("d2")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ abstract class BaseJobConfig[T](val config: Config, val jobName: String) extends
val checkpointingBaseUrl: Option[String] = if (config.hasPath("job")) Option(config.getString("job.statebackend.base.url")) else None

// Base Methods
def datasetType(): String = if(config.hasPath("dataset.type")) config.getString("dataset.type") else "dataset"

def inputTopic(): String

def inputConsumer(): String
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,11 @@ abstract class WindowBaseProcessFunction[I, O, K](config: BaseJobConfig[O]) exte
metrics: Metrics): Unit

override def process(key: K, context: ProcessWindowFunction[I, O, K, TimeWindow]#Context, elements: lang.Iterable[I], out: Collector[O]): Unit = {
process(key, context, elements, metrics)
try {
process(key, context, elements, metrics)
} catch {
case exception: Exception => exception.printStackTrace()
}
}

}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package org.sunbird.obsrv.denormalizer.util;
package org.sunbird.obsrv.core.util;

import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.ReducingState;
Expand All @@ -25,7 +25,7 @@ public TriggerResult onElement(Object element, long timestamp, TimeWindow window
count.add(1L);
if ((Long) count.get() >= this.maxCount) {
count.clear();
return TriggerResult.FIRE;
return TriggerResult.FIRE_AND_PURGE;
} else {
return TriggerResult.CONTINUE;
}
Expand All @@ -36,7 +36,7 @@ public TriggerResult onEventTime(long time, TimeWindow window, Trigger.TriggerCo
}

public TriggerResult onProcessingTime(long time, TimeWindow window, Trigger.TriggerContext ctx) {
return TriggerResult.FIRE;
return TriggerResult.FIRE_AND_PURGE;
}

public void clear(TimeWindow window, Trigger.TriggerContext ctx) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package org.sunbird.obsrv.core.util

import org.apache.flink.api.java.functions.KeySelector

import scala.collection.mutable

class DatasetKeySelector() extends KeySelector[mutable.Map[String, AnyRef], String] {

override def getKey(in: mutable.Map[String, AnyRef]): String = {
in("dataset").asInstanceOf[String]
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package org.sunbird.obsrv.denormalizer.util;
package org.sunbird.obsrv.core.util;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@ class DenormalizerFunction(config: DenormalizerConfig)

override def getMetricsList(): MetricsList = {
val metrics = List(config.denormSuccess, config.denormTotal, config.denormFailed, config.eventsSkipped)
MetricsList(DatasetRegistry.getDataSetIds(), metrics)
MetricsList(DatasetRegistry.getDataSetIds(config.datasetType()), metrics)
}

override def open(parameters: Configuration): Unit = {
super.open(parameters)
denormCache = new DenormCache(config)
denormCache.open(DatasetRegistry.getAllDatasets())
denormCache.open(DatasetRegistry.getAllDatasets(config.datasetType()))
}

override def close(): Unit = {
Expand Down

0 comments on commit 8a6e896

Please sign in to comment.