In [1]:
! pip3 install confluent_kafka
! pip3 install pika

Collecting confluent_kafka
  Downloading https://files.pythonhosted.org/packages/48/8c/01c71291da9722756304675d992eddba1c88fe3f6a4662f9c788c3f0c263/confluent_kafka-1.2.0-cp36-cp36m-manylinux1_x86_64.whl (7.4MB)
[K    100% |################################| 7.4MB 223kB/s ta 0:00:011
[?25hInstalling collected packages: confluent-kafka
Successfully installed confluent-kafka-1.2.0
Collecting pika
  Downloading https://files.pythonhosted.org/packages/a1/ae/8bedf0e9f1c0c5d046db3a7428a4227fe36ec1b8e25607f3c38ac9bf513c/pika-1.1.0-py2.py3-none-any.whl (148kB)
[K    100% |################################| 153kB 1.7MB/s ta 0:00:01
[?25hInstalling collected packages: pika
Successfully installed pika-1.1.0


In [11]:
from confluent_kafka import Consumer, KafkaException
import sys
import logging

In [12]:
# Create logger for consumer (logs will be emitted when poll() is called)
logger = logging.getLogger('consumer')
logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter('%(asctime)-15s %(levelname)-8s %(message)s'))
logger.addHandler(handler)

In [13]:
conf = {'bootstrap.servers': "localhost:9092,localhost:9093,localhost:9094",
        'group.id': "flinkConsumer",
        'auto.offset.reset': 'earliest'}

In [22]:
consumer = Consumer(conf, logger=logger)

In [15]:
def print_assignment(consumer, partitions):
        print('Assignment:', partitions)

In [29]:
def dict_list_to_csv(data_list):
    with open('trades.csv','w') as f:
        f.write('"code", "timestamp","trade_timestamp","trade_price","trade_volume",
                "ask_bid","prev_closing_price","change","change_price","sequential_id"\n')
        for data in data_list:
            code = data["code"]
            timestamp = data["timestamp"]
            trade_timestamp = data["trade_timestamp"]
            trade_price = data["trade_price"]
            trade_volume = data["trade_volume"]
            ask_bid = data["ask_bid"]
            prev_closing_price = data["prev_closing_price"]
            change = data["change"]
            change_price = data["change_price"]
            sequential_id = data["sequential_id"]
            f.write('"{}",{},{},{},{},"{}",{},"{}",{},{}\n'.format(code,timestamp,trade_timestamp,trade_price,trade_volume,
                                                               ask_bid,prev_closing_price,change,change_price,sequential_id))


In [23]:
consumer.subscribe(["trade"], on_assign=print_assignment)

In [None]:
import json

data_list = []
while True:
    try:
            msg = consumer.poll(1.0)
            
            if msg is None:
                continue
            if msg.error():
                raise KafkaException(msg.error())
            else:
                # Proper message
                data = json.loads(msg.value())
                data_list.append(data)
                print(data)
            
            if len(data_list) == 1000:
                break
                
    except KeyboardInterrupt:
        sys.stderr.write('%% Aborted by user\n')
        consumer.close()
        break

In [30]:
dict_list_to_csv(data_list)

In [31]:
from pyflink.dataset import ExecutionEnvironment
from pyflink.table import TableConfig, DataTypes, BatchTableEnvironment
from pyflink.table.descriptors import Schema, OldCsv, FileSystem

In [32]:
exec_env = ExecutionEnvironment.get_execution_environment()
exec_env.set_parallelism(1)

In [33]:
t_config = TableConfig()

In [34]:
t_env = BatchTableEnvironment.create(exec_env, t_config)

In [40]:
t_env.connect(FileSystem().path('./trades.csv')) \
    .with_format(OldCsv() \
                 .line_delimiter('\n') \
                 .field('code', DataTypes.STRING()) \
                 .field('timestamp', DataTypes.BIGINT()) \
                 .field('trade_timestamp', DataTypes.BIGINT()) \
                 .field('trade_price', DataTypes.DOUBLE()) \
                 .field('trade_volume', DataTypes.DOUBLE()) \
                 .field('ask_bid', DataTypes.STRING()) \
                 .field('prev_closing_price', DataTypes.DOUBLE()) \
                 .field('change', DataTypes.STRING()) \
                 .field('change_price', DataTypes.DOUBLE()) \
                 .field('sequential_id', DataTypes.BIGINT())) \
    .with_schema(Schema() \
                 .field('code', DataTypes.STRING()) \
                 .field('timestamp', DataTypes.BIGINT()) \
                 .field('trade_timestamp', DataTypes.BIGINT()) \
                 .field('trade_price', DataTypes.DOUBLE()) \
                 .field('trade_volume', DataTypes.DOUBLE()) \
                 .field('ask_bid', DataTypes.STRING()) \
                 .field('prev_closing_price', DataTypes.DOUBLE()) \
                 .field('change', DataTypes.STRING()) \
                 .field('change_price', DataTypes.DOUBLE()) \
                 .field('sequential_id', DataTypes.BIGINT())) \
    .register_table_source('tradeSource')


<pyflink.table.descriptors.BatchTableDescriptor at 0x7f743205f908>

In [41]:
t_env.connect(FileSystem().path('./trades_result.csv')) \
    .with_format(OldCsv()
                 .field_delimiter(',')
                 .field('code', DataTypes.STRING())
                 .field('count', DataTypes.BIGINT())) \
    .with_schema(Schema()
                 .field('code', DataTypes.STRING())
                 .field('count', DataTypes.BIGINT())) \
    .register_table_sink('tradeSink')

<pyflink.table.descriptors.BatchTableDescriptor at 0x7f743185ec50>

In [42]:
t_env.scan('tradeSource') \
    .group_by('code') \
    .select('code, count(1)') \
    .insert_into('tradeSink')

In [43]:
t_env.execute("trade_count_job")

In [44]:
! /flink/build-target/bin/start-cluster.sh

Starting cluster.
Starting standalonesession daemon on host koock-Blade.
Starting taskexecutor daemon on host koock-Blade.
