Skip to content

Commit

Permalink
test: add test for timestamp-to-string with join (MINOR) (#5834)
Browse files Browse the repository at this point in the history
  • Loading branch information
agavra committed Jul 16, 2020
1 parent b2dd772 commit bc5e5fd
Show file tree
Hide file tree
Showing 4 changed files with 460 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,242 @@
{
"plan" : [ {
"@type" : "ksqlPlanV1",
"statementText" : "CREATE STREAM INPUT (K STRING KEY, WLAN_SA ARRAY<STRING>) WITH (KAFKA_TOPIC='input', VALUE_FORMAT='JSON');",
"ddlCommand" : {
"@type" : "createStreamV1",
"sourceName" : "INPUT",
"schema" : "`K` STRING KEY, `WLAN_SA` ARRAY<STRING>",
"topicName" : "input",
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "JSON"
}
},
"orReplace" : false
}
}, {
"@type" : "ksqlPlanV1",
"statementText" : "CREATE TABLE DEVICES (ROWKEY STRING PRIMARY KEY, NAME STRING) WITH (KAFKA_TOPIC='devices', VALUE_FORMAT='JSON');",
"ddlCommand" : {
"@type" : "createTableV1",
"sourceName" : "DEVICES",
"schema" : "`ROWKEY` STRING KEY, `NAME` STRING",
"topicName" : "devices",
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "JSON"
}
},
"orReplace" : false
}
}, {
"@type" : "ksqlPlanV1",
"statementText" : "CREATE TABLE OUTPUT AS SELECT\n D.NAME DEVICE_NAME,\n TIMESTAMPTOSTRING(MIN(P.ROWTIME), 'yyyy-MM-dd HH:mm:ss zzz', 'Europe/London') KSQL_COL_0\nFROM INPUT P\nINNER JOIN DEVICES D ON ((P.WLAN_SA[1] = D.ROWKEY))\nGROUP BY D.NAME\nEMIT CHANGES",
"ddlCommand" : {
"@type" : "createTableV1",
"sourceName" : "OUTPUT",
"schema" : "`DEVICE_NAME` STRING KEY, `KSQL_COL_0` STRING",
"topicName" : "OUTPUT",
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "JSON"
}
},
"orReplace" : false
},
"queryPlan" : {
"sources" : [ "DEVICES", "INPUT" ],
"sink" : "OUTPUT",
"physicalPlan" : {
"@type" : "tableSinkV1",
"properties" : {
"queryContext" : "OUTPUT"
},
"source" : {
"@type" : "tableSelectV1",
"properties" : {
"queryContext" : "Aggregate/Project"
},
"source" : {
"@type" : "streamAggregateV1",
"properties" : {
"queryContext" : "Aggregate/Aggregate"
},
"source" : {
"@type" : "streamGroupByV1",
"properties" : {
"queryContext" : "Aggregate/GroupBy"
},
"source" : {
"@type" : "streamSelectV1",
"properties" : {
"queryContext" : "Aggregate/Prepare"
},
"source" : {
"@type" : "streamTableJoinV1",
"properties" : {
"queryContext" : "Join"
},
"joinType" : "INNER",
"internalFormats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "JSON"
}
},
"leftSource" : {
"@type" : "streamSelectV1",
"properties" : {
"queryContext" : "PrependAliasLeft"
},
"source" : {
"@type" : "streamSelectKeyV2",
"properties" : {
"queryContext" : "LeftSourceKeyed"
},
"source" : {
"@type" : "streamSourceV1",
"properties" : {
"queryContext" : "KafkaTopic_Left/Source"
},
"topicName" : "input",
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "JSON"
}
},
"sourceSchema" : "`K` STRING KEY, `WLAN_SA` ARRAY<STRING>"
},
"keyExpression" : "WLAN_SA[1]"
},
"keyColumnNames" : [ "P_KSQL_COL_0" ],
"selectExpressions" : [ "WLAN_SA AS P_WLAN_SA", "ROWTIME AS P_ROWTIME", "K AS P_K", "KSQL_COL_0 AS P_KSQL_COL_0" ]
},
"rightSource" : {
"@type" : "tableSelectV1",
"properties" : {
"queryContext" : "PrependAliasRight"
},
"source" : {
"@type" : "tableSourceV1",
"properties" : {
"queryContext" : "KafkaTopic_Right/Source"
},
"topicName" : "devices",
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "JSON"
}
},
"sourceSchema" : "`ROWKEY` STRING KEY, `NAME` STRING",
"forceChangelog" : true
},
"keyColumnNames" : [ "D_ROWKEY" ],
"selectExpressions" : [ "NAME AS D_NAME", "ROWTIME AS D_ROWTIME", "ROWKEY AS D_ROWKEY" ]
},
"keyColName" : "D_ROWKEY"
},
"keyColumnNames" : [ "D_ROWKEY" ],
"selectExpressions" : [ "D_NAME AS D_NAME", "P_ROWTIME AS P_ROWTIME" ]
},
"internalFormats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "JSON"
}
},
"groupByExpressions" : [ "D_NAME" ]
},
"internalFormats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "JSON"
}
},
"nonAggregateColumns" : [ "D_NAME", "P_ROWTIME" ],
"aggregationFunctions" : [ "MIN(P_ROWTIME)" ]
},
"keyColumnNames" : [ "DEVICE_NAME" ],
"selectExpressions" : [ "TIMESTAMPTOSTRING(KSQL_AGG_VARIABLE_0, 'yyyy-MM-dd HH:mm:ss zzz', 'Europe/London') AS KSQL_COL_0" ]
},
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "JSON"
}
},
"topicName" : "OUTPUT"
},
"queryId" : "CTAS_OUTPUT_0"
}
} ],
"configs" : {
"ksql.extension.dir" : "ext",
"ksql.streams.cache.max.bytes.buffering" : "0",
"ksql.security.extension.class" : null,
"metric.reporters" : "",
"ksql.transient.prefix" : "transient_",
"ksql.persistence.wrap.single.values" : "true",
"ksql.authorization.cache.expiry.time.secs" : "30",
"ksql.schema.registry.url" : "",
"ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler",
"ksql.output.topic.name.prefix" : "",
"ksql.streams.auto.offset.reset" : "earliest",
"ksql.query.pull.enable.standby.reads" : "false",
"ksql.connect.url" : "http://localhost:8083",
"ksql.service.id" : "some.ksql.service.id",
"ksql.internal.topic.min.insync.replicas" : "1",
"ksql.streams.shutdown.timeout.ms" : "300000",
"ksql.internal.topic.replicas" : "1",
"ksql.insert.into.values.enabled" : "true",
"ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807",
"ksql.query.pull.max.qps" : "2147483647",
"ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler",
"ksql.access.validator.enable" : "auto",
"ksql.streams.bootstrap.servers" : "localhost:0",
"ksql.streams.commit.interval.ms" : "2000",
"ksql.query.pull.metrics.enabled" : "false",
"ksql.create.or.replace.enabled" : "false",
"ksql.streams.auto.commit.interval.ms" : "0",
"ksql.metrics.extension" : null,
"ksql.streams.topology.optimization" : "all",
"ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses",
"ksql.streams.num.stream.threads" : "4",
"ksql.timestamp.throw.on.invalid" : "false",
"ksql.authorization.cache.max.entries" : "10000",
"ksql.metrics.tags.custom" : "",
"ksql.pull.queries.enable" : "true",
"ksql.udfs.enabled" : "true",
"ksql.udf.enable.security.manager" : "true",
"ksql.connect.worker.config" : "",
"ksql.sink.window.change.log.additional.retention" : "1000000",
"ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses",
"ksql.udf.collect.metrics" : "false",
"ksql.persistent.prefix" : "query_",
"ksql.query.persistent.active.limit" : "2147483647",
"ksql.error.classifier.regex" : ""
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
{
"version" : "6.1.0",
"timestamp" : 1594853229724,
"path" : "query-validation-tests/timestamp-to-string.json",
"schemas" : {
"CTAS_OUTPUT_0.KafkaTopic_Right.Source" : "STRUCT<NAME VARCHAR> NOT NULL",
"CTAS_OUTPUT_0.KafkaTopic_Left.Source" : "STRUCT<WLAN_SA ARRAY<VARCHAR>> NOT NULL",
"CTAS_OUTPUT_0.Join.Left" : "STRUCT<P_WLAN_SA ARRAY<VARCHAR>, P_ROWTIME BIGINT, P_K VARCHAR, P_KSQL_COL_0 VARCHAR> NOT NULL",
"CTAS_OUTPUT_0.Aggregate.GroupBy" : "STRUCT<D_NAME VARCHAR, P_ROWTIME BIGINT> NOT NULL",
"CTAS_OUTPUT_0.Aggregate.Aggregate.Materialize" : "STRUCT<D_NAME VARCHAR, P_ROWTIME BIGINT, KSQL_AGG_VARIABLE_0 BIGINT> NOT NULL",
"CTAS_OUTPUT_0.OUTPUT" : "STRUCT<KSQL_COL_0 VARCHAR> NOT NULL"
},
"testCase" : {
"name" : "timestamp to string in join",
"inputs" : [ {
"topic" : "devices",
"key" : "a",
"value" : {
"name" : "device"
},
"timestamp" : 1526075912000
}, {
"topic" : "input",
"key" : "foo",
"value" : {
"WLAN_SA" : [ "a" ]
},
"timestamp" : 1526075913000
} ],
"outputs" : [ {
"topic" : "OUTPUT",
"key" : "device",
"value" : {
"KSQL_COL_0" : "2018-05-11 22:58:33 BST"
},
"timestamp" : 1526075913000
} ],
"topics" : [ {
"name" : "input",
"replicas" : 1,
"numPartitions" : 4
}, {
"name" : "devices",
"replicas" : 1,
"numPartitions" : 4
}, {
"name" : "OUTPUT",
"replicas" : 1,
"numPartitions" : 4
} ],
"statements" : [ "CREATE STREAM INPUT (K STRING KEY, WLAN_SA ARRAY<VARCHAR>) WITH (kafka_topic='input', value_format='JSON');", "CREATE TABLE DEVICES (ROWKEY STRING PRIMARY KEY, NAME VARCHAR) WITH (kafka_topic='devices', value_format='JSON');", "CREATE TABLE OUTPUT AS SELECT D.NAME AS DEVICE_NAME, TIMESTAMPTOSTRING(MIN(P.ROWTIME),'yyyy-MM-dd HH:mm:ss zzz','Europe/London') FROM INPUT P INNER JOIN DEVICES D ON P.WLAN_SA[1] = D.ROWKEY GROUP BY D.NAME emit changes;" ],
"post" : {
"topics" : {
"topics" : [ {
"name" : "OUTPUT",
"keyFormat" : {
"formatInfo" : {
"format" : "KAFKA"
}
},
"valueFormat" : {
"format" : "JSON"
},
"partitions" : 4
}, {
"name" : "_confluent-ksql-some.ksql.service.idquery_CTAS_OUTPUT_0-Aggregate-Aggregate-Materialize-changelog",
"keyFormat" : {
"formatInfo" : {
"format" : "KAFKA"
}
},
"valueFormat" : {
"format" : "JSON"
}
}, {
"name" : "_confluent-ksql-some.ksql.service.idquery_CTAS_OUTPUT_0-Aggregate-GroupBy-repartition",
"keyFormat" : {
"formatInfo" : {
"format" : "KAFKA"
}
},
"valueFormat" : {
"format" : "JSON"
}
}, {
"name" : "_confluent-ksql-some.ksql.service.idquery_CTAS_OUTPUT_0-Join-repartition",
"keyFormat" : {
"formatInfo" : {
"format" : "KAFKA"
}
},
"valueFormat" : {
"format" : "JSON"
}
}, {
"name" : "_confluent-ksql-some.ksql.service.idquery_CTAS_OUTPUT_0-KafkaTopic_Right-Reduce-changelog",
"keyFormat" : {
"formatInfo" : {
"format" : "KAFKA"
}
},
"valueFormat" : {
"format" : "JSON"
}
}, {
"name" : "devices",
"keyFormat" : {
"formatInfo" : {
"format" : "KAFKA"
}
},
"valueFormat" : {
"format" : "JSON"
},
"partitions" : 4
}, {
"name" : "input",
"keyFormat" : {
"formatInfo" : {
"format" : "KAFKA"
}
},
"valueFormat" : {
"format" : "JSON"
},
"partitions" : 4
} ]
}
}
}
}
Loading

0 comments on commit bc5e5fd

Please sign in to comment.