In [0]:
%flink.conf
execution.checkpointing.interval 60000

In [1]:
%flink
val brokers = "Bootstrap_Brokers_Go_Here"
z.put("brokers", brokers)

In [2]:
%flink.ssql(type=update, interpolate=true)
DROP TABLE IF EXISTS stock;
DROP TABLE IF EXISTS stock_datagen;
DROP TABLE IF EXISTS transactions;
DROP TABLE IF EXISTS transactions_datagen;
DROP TABLE IF EXISTS output;

CREATE TABLE IF NOT EXISTS stock ( 
    ticker_symbol STRING NOT NULL PRIMARY KEY,
    price DOUBLE,
    s_timestamp TIMESTAMP_LTZ(3),
    WATERMARK FOR s_timestamp AS s_timestamp
) WITH (
  'connector' = 'upsert-kafka',
  'topic' = 'stock-topic',
  'properties.bootstrap.servers' = '{brokers}',
  'key.format' = 'raw',
  'value.format' = 'json'
);


CREATE TABLE IF NOT EXISTS transactions ( 
    id STRING,
    transaction_ticker_symbol STRING,
    shares_purchased DOUBLE,
    t_timestamp TIMESTAMP_LTZ(3),
    WATERMARK FOR t_timestamp AS t_timestamp
) WITH (
  'connector' = 'kafka',
  'topic' = 'transaction-topic',
  'properties.bootstrap.servers' = '{brokers}',
  'value.format' = 'json'
);

CREATE TABLE IF NOT EXISTS stock_datagen
WITH (
    'connector' = 'datagen',
    'rows-per-second' = '1',
    'fields.ticker_symbol.length' = '2',
    'fields.price.min' = '0.00',
    'fields.price.max' = '10000.00'
   -- 'fields.s_timestamp.max-past' = '150000' -- only in 1.15 > 
    )
    LIKE stock (EXCLUDING ALL);
    
CREATE TABLE IF NOT EXISTS transactions_datagen
WITH (
    'connector' = 'datagen',
    'rows-per-second' = '1000',
    'fields.transaction_ticker_symbol.length' = '2',
    'fields.shares_purchased.min' = '1',
    'fields.shares_purchased.max' = '100'
   -- 'fields.t_timestamp.max-past' = '30000' -- only in 1.15 > 
    )
    LIKE transactions (EXCLUDING ALL);
    
CREATE TABLE IF NOT EXISTS output (
    data STRING
    ) WITH (
  'connector' = 'kafka',
  'topic' = 'output',
  'properties.bootstrap.servers' = '{brokers}',
  'value.format' = 'json',
  'format' = 'raw')

In [3]:
%flink.ssql(type=update)
INSERT INTO STOCK SELECT * FROM STOCK_DATAGEN;


In [4]:
%flink.ssql(type=update)
INSERT INTO TRANSACTIONS SELECT * FROM TRANSACTIONS_DATAGEN;


In [5]:
%flink.ssql(type=update)
SELECT 
     id,
     price,
     transaction_ticker_symbol,
     shares_purchased,
     t_timestamp
FROM transactions
LEFT JOIN stock FOR SYSTEM_TIME AS OF transactions.t_timestamp
ON transactions.transaction_ticker_symbol = stock.ticker_symbol
WHERE ticker_symbol IS NOT NULL;