Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion _data/harnesses/anomaly-detection/ksql.yml
Original file line number Diff line number Diff line change
Expand Up @@ -128,14 +128,19 @@ dev:
- title: Write your statements to a file
content:
- action: make_file
file: src/test-statements.sql
file: src/statements.sql
render:
file: tutorials/anomaly-detection/ksql/markup/dev/make-src-file.adoc

test:
steps:
- title: Create the test data
content:
- action: make_file
file: test/input.json
render:
file: tutorials/anomaly-detection/ksql/markup/test/make-test-input.adoc

- action: make_file
file: test/output.json
render:
Expand Down
2 changes: 1 addition & 1 deletion _data/tutorials.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -551,7 +551,7 @@ anomaly-detection:
meta-description: "detect anomalies in a stream of Kafka events"
slug: "/anomaly-detection"
question: "If you have time series events in a Kafka topic, how can you find anomalous events?"
introduction: "A common technique of fraudsters is to disguise transactions under the name of a popular company, the idea being that the chances of them being recognized is very low. For example, transactions labeled Verizon, Citibank, or USPS are likely to look similar and blend in with legitimate transactions. This tutorial shows you how to identify this pattern of behavior by detecting 'abnormal' transactions that occur within a window of time.
introduction: "A common technique of fraudsters is to disguise transactions under the name of a popular company, the idea being that the chances of them being recognized is very low. For example, transactions labeled Verizon, Citibank, or USPS are likely to look similar and blend in with legitimate transactions. This tutorial shows you how to identify this pattern of behavior by detecting abnormal transactions that occur within a window of time.

Normally, a group of these transactions will occur within a 24 hour period. In fraud detection, financial institutions will categorize this behavior as unusual and alert their fraud team to investigate immediately. Other example use cases include detecting ATM fraud or unusual credit card activity."
status:
Expand Down
1 change: 1 addition & 0 deletions _includes/tutorials/anomaly-detection/ksql/code/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,6 @@ tutorial:
harness-runner ../../../../../_data/harnesses/anomaly-detection/ksql.yml $(TEMP_DIR) $(SEQUENCE)
diff --strip-trailing-cr $(STEPS_DIR)/dev/find-suspicious-transactions-output.log $(DEV_OUTPUTS_DIR)/find-suspicious-transactions/output-0.log
diff --strip-trailing-cr $(STEPS_DIR)/dev/print-accounts-to-monitor.log $(DEV_OUTPUTS_DIR)/print-accounts-to-monitor/output-0.log
bash -c "diff --strip-trailing-cr <(cut -d ',' -f 2 $(STEPS_DIR)/dev/print-accounts-to-monitor.log) <(cut -d ',' -f 2- $(DEV_OUTPUTS_DIR)/print-accounts-to-monitor/output-0.log)"
diff --strip-trailing-cr $(STEPS_DIR)/test/expected-results.log $(TEST_OUTPUTS_DIR)/test-results.log
reset
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ services:
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_LOG_RETENTION_MS: -1

schema-registry:
image: confluentinc/cp-schema-registry:7.1.0
Expand All @@ -43,9 +44,7 @@ services:
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:9092'

ksqldb-server:
# In ksqlDB 0.14+, ksql-test-runner can experience a flaky timing issue with joins.
# This test case is temporarily left at an earlier version.
image: confluentinc/ksqldb-server:0.13.0
image: confluentinc/ksqldb-server:0.24.0
hostname: ksqldb-server
container_name: ksqldb-server
depends_on:
Expand All @@ -64,7 +63,7 @@ services:
KSQL_KSQL_STREAMS_AUTO_OFFSET_RESET: "earliest"

ksqldb-cli:
image: confluentinc/ksqldb-cli:0.13.0
image: confluentinc/ksqldb-cli:0.24.0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch on both of these

container_name: ksqldb-cli
depends_on:
- broker
Expand Down
Original file line number Diff line number Diff line change
@@ -1,32 +1,32 @@
CREATE TABLE suspicious_names (CREATED_DATE VARCHAR,
CREATE TABLE suspicious_names (CREATED_TS VARCHAR,
COMPANY_NAME VARCHAR PRIMARY KEY,
COMPANY_ID INT)
WITH (kafka_topic='suspicious_names',
partitions=1,
value_format='JSON',
timestamp='CREATED_DATE',
timestamp_format='yyyy-MM-dd HH:mm:ss');
timestamp='CREATED_TS',
timestamp_format = 'yyyy-MM-dd HH:mm:ss');

CREATE STREAM transactions (TXN_ID BIGINT, USERNAME VARCHAR, RECIPIENT VARCHAR, AMOUNT DOUBLE, TIMESTAMP VARCHAR)
CREATE STREAM transactions (TXN_ID BIGINT, USERNAME VARCHAR, RECIPIENT VARCHAR, AMOUNT DOUBLE, TS VARCHAR)
WITH (kafka_topic='transactions',
partitions=1,
value_format='JSON',
timestamp='TIMESTAMP',
timestamp_format='yyyy-MM-dd HH:mm:ss');
timestamp='TS',
timestamp_format = 'yyyy-MM-dd HH:mm:ss');

CREATE STREAM suspicious_transactions
WITH (kafka_topic='suspicious_transactions', partitions=1, value_format='JSON') AS
SELECT T.TXN_ID, T.USERNAME, T.RECIPIENT, T.AMOUNT, T.TIMESTAMP
SELECT T.TXN_ID, T.USERNAME, T.RECIPIENT, T.AMOUNT, T.TS
FROM transactions T
INNER JOIN
suspicious_names S
ON T.RECIPIENT = S.COMPANY_NAME;

CREATE TABLE accounts_to_monitor
WITH (kafka_topic='accounts_to_monitor', partitions=1, value_format='JSON') AS
SELECT TIMESTAMPTOSTRING(WINDOWSTART, 'yyyy-MM-dd HH:mm:ss Z') AS WINDOW_START,
TIMESTAMPTOSTRING(WINDOWEND, 'yyyy-MM-dd HH:mm:ss Z') AS WINDOW_END,
USERNAME
SELECT USERNAME,
TIMESTAMPTOSTRING(WINDOWSTART, 'yyyy-MM-dd HH:mm:ss Z') AS WINDOW_START,
TIMESTAMPTOSTRING(WINDOWEND, 'yyyy-MM-dd HH:mm:ss Z') AS WINDOW_END
FROM suspicious_transactions
WINDOW TUMBLING (SIZE 24 HOURS)
GROUP BY USERNAME
Expand Down

This file was deleted.

18 changes: 18 additions & 0 deletions _includes/tutorials/anomaly-detection/ksql/code/test/input.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
{
"inputs": [
{ "topic": "suspicious_names", "key": "Verizon", "value": { "CREATED_TS": "2019-03-08 00:00:00", "COMPANY_NAME": "Verizon", "COMPANY_ID": 1 } },
{ "topic": "suspicious_names", "key": "Spirit Halloween", "value": { "CREATED_TS": "2019-10-31 00:00:00", "COMPANY_NAME": "Spirit Halloween", "COMPANY_ID": 2 } },
{ "topic": "suspicious_names", "key": "Best Buy", "value": { "CREATED_TS": "2019-12-15 00:00:00", "COMPANY_NAME": "Best Buy", "COMPANY_ID": 3 } },
{ "topic": "transactions", "value": { "TXN_ID": 9900, "USERNAME": "Abby Normal", "RECIPIENT": "Verizon", "AMOUNT": 22.0, "TS": "2020-10-20 13:05:36" } },
{ "topic": "transactions", "value": { "TXN_ID": 12, "USERNAME": "Victor von Frankenstein", "RECIPIENT": "Tattered Cover", "AMOUNT": 7.0, "TS": "2020-10-20 13:07:59" } },
{ "topic": "transactions", "value": { "TXN_ID": 13, "USERNAME": "Frau Blücher", "RECIPIENT": "Peebles", "AMOUNT": 70.0, "TS": "2020-10-20 13:15:00" } },
{ "topic": "transactions", "value": { "TXN_ID": 9903, "USERNAME": "Abby Normal", "RECIPIENT": "Verizon", "AMOUNT": 61.0, "TS": "2020-10-20 13:31:02" } },
{ "topic": "transactions", "value": { "TXN_ID": 9901, "USERNAME": "Abby Normal", "RECIPIENT": "Spirit Halloween", "AMOUNT": 83.0, "TS": "2020-10-20 13:44:41" } },
{ "topic": "transactions", "value": { "TXN_ID": 9902, "USERNAME": "Abby Normal", "RECIPIENT": "Spirit Halloween", "AMOUNT": 46.0, "TS": "2020-10-20 13:44:43" } },
{ "topic": "transactions", "value": { "TXN_ID": 9904, "USERNAME": "Abby Normal", "RECIPIENT": "Spirit Halloween", "AMOUNT": 59.0, "TS": "2020-10-20 13:44:44" } },
{ "topic": "transactions", "value": { "TXN_ID": 6, "USERNAME": "Victor von Frankenstein", "RECIPIENT": "Confluent Cloud", "AMOUNT": 21.0, "TS": "2020-10-20 13:47:51" } },
{ "topic": "transactions", "value": { "TXN_ID": 18, "USERNAME": "Frau Blücher", "RECIPIENT": "Target", "AMOUNT": 70.0, "TS": "2020-10-20 13:52:01" } },
{ "topic": "transactions", "value": { "TXN_ID": 7, "USERNAME": "Victor von Frankenstein", "RECIPIENT": "Verizon", "AMOUNT": 100.0, "TS": "2020-10-20 13:55:06" } },
{ "topic": "transactions", "value": { "TXN_ID": 19, "USERNAME": "Frau Blücher", "RECIPIENT": "Goodwill", "AMOUNT": 7.0, "TS": "2020-10-20 14:12:32" } }
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
"TXN_ID": 9900,
"USERNAME": "Abby Normal",
"AMOUNT": 22.0,
"TIMESTAMP": "2020-10-20 13:05:36"
"TS": "2020-10-20 13:05:36"
}
},
{
Expand All @@ -17,7 +17,7 @@
"TXN_ID": 9903,
"USERNAME": "Abby Normal",
"AMOUNT": 61.0,
"TIMESTAMP": "2020-10-20 13:31:02"
"TS": "2020-10-20 13:31:02"
}
},
{
Expand All @@ -27,7 +27,7 @@
"TXN_ID": 9901,
"USERNAME": "Abby Normal",
"AMOUNT": 83.0,
"TIMESTAMP": "2020-10-20 13:44:41"
"TS": "2020-10-20 13:44:41"
}
},
{
Expand All @@ -37,7 +37,7 @@
"TXN_ID": 9902,
"USERNAME": "Abby Normal",
"AMOUNT": 46.0,
"TIMESTAMP": "2020-10-20 13:44:43"
"TS": "2020-10-20 13:44:43"
}
},
{
Expand All @@ -47,7 +47,7 @@
"TXN_ID": 9904,
"USERNAME": "Abby Normal",
"AMOUNT": 59.0,
"TIMESTAMP": "2020-10-20 13:44:44"
"TS": "2020-10-20 13:44:44"
}
},
{
Expand All @@ -57,42 +57,9 @@
"TXN_ID": 7,
"USERNAME": "Victor von Frankenstein",
"AMOUNT": 100.0,
"TIMESTAMP": "2020-10-20 13:55:06"
"TS": "2020-10-20 13:55:06"
}
},
{
"topic": "accounts_to_monitor",
"key": "Abby Normal",
"window": {
"start": 1603152000000,
"end": 1603238400000,
"type": "time"
},
"value": null,
"timestamp": 1603199136000
},
{
"topic": "accounts_to_monitor",
"key": "Abby Normal",
"window": {
"start": 1603152000000,
"end": 1603238400000,
"type": "time"
},
"value": null,
"timestamp": 1603200662000
},
{
"topic": "accounts_to_monitor",
"key": "Abby Normal",
"window": {
"start": 1603152000000,
"end": 1603238400000,
"type": "time"
},
"value": null,
"timestamp": 1603201481000
},
{
"topic": "accounts_to_monitor",
"key": "Abby Normal",
Expand Down Expand Up @@ -120,17 +87,6 @@
"WINDOW_END": "2020-10-21 00:00:00 +0000"
},
"timestamp": 1603201484000
},
{
"topic": "accounts_to_monitor",
"key": "Victor von Frankenstein",
"window": {
"start": 1603152000000,
"end": 1603238400000,
"type": "time"
},
"value": null,
"timestamp": 1603202106000
}

]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
CREATE TABLE accounts_to_monitor
WITH (kafka_topic='accounts_to_monitor', partitions=1, value_format='JSON') AS
SELECT TIMESTAMPTOSTRING(WINDOWSTART, 'yyyy-MM-dd HH:mm:ss Z') AS WINDOW_START,
TIMESTAMPTOSTRING(WINDOWEND, 'yyyy-MM-dd HH:mm:ss Z') AS WINDOW_END,
USERNAME
SELECT USERNAME,
TIMESTAMPTOSTRING(WINDOWSTART, 'yyyy-MM-dd HH:mm:ss Z') AS WINDOW_START,
TIMESTAMPTOSTRING(WINDOWEND, 'yyyy-MM-dd HH:mm:ss Z') AS WINDOW_END
FROM suspicious_transactions
WINDOW TUMBLING (SIZE 24 HOURS)
GROUP BY USERNAME
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
CREATE TABLE suspicious_names (CREATED_DATE VARCHAR,
CREATE TABLE suspicious_names (CREATED_TS VARCHAR,
COMPANY_NAME VARCHAR PRIMARY KEY,
COMPANY_ID INT)
WITH (kafka_topic='suspicious_names',
partitions=1,
value_format='JSON',
timestamp='CREATED_DATE',
timestamp_format='yyyy-MM-dd HH:mm:ss');
timestamp='CREATED_TS',
timestamp_format = 'yyyy-MM-dd HH:mm:ss');
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
CREATE STREAM suspicious_transactions
WITH (kafka_topic='suspicious_transactions', partitions=1, value_format='JSON') AS
SELECT T.TXN_ID, T.USERNAME, T.RECIPIENT, T.AMOUNT, T.TIMESTAMP
SELECT T.TXN_ID, T.USERNAME, T.RECIPIENT, T.AMOUNT, T.TS
FROM transactions T
INNER JOIN
suspicious_names S
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
CREATE STREAM transactions (TXN_ID BIGINT, USERNAME VARCHAR, RECIPIENT VARCHAR, AMOUNT DOUBLE, TIMESTAMP VARCHAR)
CREATE STREAM transactions (TXN_ID BIGINT, USERNAME VARCHAR, RECIPIENT VARCHAR, AMOUNT DOUBLE, TS VARCHAR)
WITH (kafka_topic='transactions',
partitions=1,
value_format='JSON',
timestamp='TIMESTAMP',
timestamp_format='yyyy-MM-dd HH:mm:ss');
timestamp='TS',
timestamp_format = 'yyyy-MM-dd HH:mm:ss');
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
INSERT INTO suspicious_names (CREATED_DATE, COMPANY_NAME, COMPANY_ID) VALUES ('2019-03-08 00:00:00', 'Verizon', 1);
INSERT INTO suspicious_names (CREATED_DATE, COMPANY_NAME, COMPANY_ID) VALUES ('2019-10-31 00:00:00', 'Spirit Halloween', 2);
INSERT INTO suspicious_names (CREATED_DATE, COMPANY_NAME, COMPANY_ID) VALUES ('2019-12-15 00:00:00', 'Best Buy', 3);
INSERT INTO suspicious_names (CREATED_TS, COMPANY_NAME, COMPANY_ID) VALUES (FORMAT_TIMESTAMP(FROM_UNIXTIME(UNIX_TIMESTAMP() - (5 * 24 * 60 * 60 * 1000)),'yyyy-MM-dd HH:mm:ss'), 'Verizon', 1);
INSERT INTO suspicious_names (CREATED_TS, COMPANY_NAME, COMPANY_ID) VALUES (FORMAT_TIMESTAMP(FROM_UNIXTIME(UNIX_TIMESTAMP() - (4 * 24 * 60 * 60 * 1000)),'yyyy-MM-dd HH:mm:ss'), 'Spirit Halloween', 2);
INSERT INTO suspicious_names (CREATED_TS, COMPANY_NAME, COMPANY_ID) VALUES (FORMAT_TIMESTAMP(FROM_UNIXTIME(UNIX_TIMESTAMP() - (3 * 24 * 60 * 60 * 1000)),'yyyy-MM-dd HH:mm:ss'), 'Best Buy', 3);
Loading