# Task 2 - Timely stream processing
Karen from the security department has a new request for you. She needs to find suspicious transfers. The first idea is to look at the transfers sent shortly after app activation. Help her and write the proper job!

## Project initialization

In [None]:
%reload_ext streaming_jupyter_integrations.magics
%flink_connect

In [None]:
%env KAFKA_PASSWORD=passwordNxZrJ4JhdDUn

## Data definition
You have access to 2 Kafka topics:  `transfers` and `app_activations`. First, you have to prepare definitions for these tables. You can find Kafka Table example [HERE](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/kafka/). To check events schema look at sample messages in `data/` directory.

In [None]:
%%flink_execute_sql
CREATE TABLE transfers (
    transaction_id STRING,
    user_id INT,
    amount DECIMAL(38,2),
    currency STRING,
    IBAN STRING,
    ts TIMESTAMP(3),
    WATERMARK FOR ts AS ts
) WITH (
    'connector' = 'kafka',
    'topic' = 'transfers',
    'properties.bootstrap.servers' = '34.82.24.157:9092',
    'properties.group.id' = 'task2',
    'properties.security.protocol' = 'SASL_PLAINTEXT',
    'properties.sasl.mechanism' = 'PLAIN',
    'properties.sasl.plain.username' = 'user',
    'properties.sasl.plain.password' = '${KAFKA_PASSWORD}',
    'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username="user" password="${KAFKA_PASSWORD}";',
    'scan.startup.mode' = 'earliest-offset',
    'format' = 'json'
)

In [None]:
%%flink_execute_sql
CREATE TABLE app_activations (
    user_id INT,
    device_fingerprint STRING,
    app_version STRING,
    ts TIMESTAMP(3),
    WATERMARK FOR ts AS ts
) WITH (
    'connector' = 'kafka',
    'topic' = 'app_activations',
    'properties.bootstrap.servers' = '34.82.24.157:9092',
    'properties.group.id' = 'task2',
    'properties.security.protocol' = 'SASL_PLAINTEXT',
    'properties.sasl.mechanism' = 'PLAIN',
    'properties.sasl.plain.username' = 'user',
    'properties.sasl.plain.password' = '${KAFKA_PASSWORD}',
    'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username="user" password="${KAFKA_PASSWORD}";',
    'scan.startup.mode' = 'earliest-offset',
    'format' = 'json'
)

## Join&Query Data
Now it's time to write a select clause. Find the transfers which happened max 2 minutes after app activation. You can find the join syntax [HERE](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/).

In [None]:
%%flink_execute_sql
SELECT
    t.*
FROM
    app_activations a,
    transfers t 
WHERE
    a.user_id = t.user_id
AND t.ts BETWEEN a.ts AND a.ts + INTERVAL '2' MINUTES

## Windows&aggregations
Karen has one extra request. Every hour she would like to get the report with the number of suspicious transfers that happened since the beginning of this day. Please help her. More about windows you can find [HERE](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/window-tvf/).

In [None]:
%%flink_execute_sql
WITH suspicious AS (
    SELECT
        t.*
    FROM
        app_activations a,
        transfers t 
    WHERE a.user_id = t.user_id
    AND t.ts BETWEEN a.ts AND a.ts + INTERVAL '2' MINUTES
)
SELECT
    window_start,
    window_end,
    count(distinct transaction_id) as t_cnt
FROM TABLE(
    CUMULATE(
        TABLE suspicious,
        DESCRIPTOR(ts),
        INTERVAL '1' HOUR,
        INTERVAL '1' DAY))
GROUP BY
    window_start,
    window_end