In [None]:
!pip install confluent-kafka

In [None]:
! pip list | grep Faker

global configuration

In [None]:
CONCURRENCY = 8
BOOTSTRAP_SERVER = 'my-kafka.kafka:9092'
GROUP_ID = 'group0'
TOPIC_NAME = 'test20'

kafka_conf = {'bootstrap.servers': BOOTSTRAP_SERVER, 
                'group.id': GROUP_ID, 
                'default.topic.config': {
                    'auto.offset.reset': 'smallest'
                }
            }

delete exist topics and recreate it

In [None]:
from confluent_kafka.admin import AdminClient, NewTopic
admin = AdminClient(kafka_conf)

def delete_topic(admin, topics):
    fs = admin.delete_topics(topics, operation_timeout=30)

    # Wait for operation to finish.
    for topic, f in fs.items():
        try:
            f.result()  # The result itself is None
            print("Topic {} deleted".format(topic))
        except Exception as e:
            print("Failed to delete topic {}: {}".format(topic, e))
            

def create_topic(admin, topic, parttition):
    topic = NewTopic(topic, parttition, 1)

    created_topics = admin.create_topics([topic])

    for t in created_topics:
        print(t)

delete_topic(admin, ['test1'])
#create_topic(admin, 'test', 8)

In [39]:
generator_config = {
    "batch_size" : 100,
    "repeat" : 20,
    "concurrency" : 1,
    "interval" : 1,
    "name" : "cpu",
    "start_date" : "",
    "precision" : "s",
    "realtime" : False,
    "fields" : [
        {"name":"time", "type":"timestamp", "precision" : "s"},
        {"name":"tag_id", "type" : "string", "format" : "####"},
        {"name": "usage_user", "type" : "number", "format" : "##"},
        {"name": "usage_system", "type" : "number", "format" : "##"},
        {"name": "usage_idle", "type" : "number", "format" : "##"},
        {"name": "usage_nice", "type" : "number", "format" : "##"},
        {"name": "usage_iowait", "type" : "number", "format" : "##"},
        {"name": "usage_irq", "type" : "number", "format" : "##"},
        {"name": "usage_softirq", "type" : "number", "format" : "##"},
        {"name": "usage_steal", "type" : "number", "format" : "##"},
        {"name": "usage_guest", "type" : "number", "format" : "##"},
        {"name": "usage_guest_nice", "type" : "number", "format" : "##"},
        {"name": "additional_tags", "type" : "string", "format" : "##"},
        {"name": "hostname", "type" : "string", "format" : "timeplus.io"}
        
    ],
    "sinks":[
        {
            "type":"clickhouse",
            "url" :"http://clickhouse:8123/",
            "schema" : {
                "create_table" : "auto",
                "create_table_sql" : "",
                "table_name" : "cpu",
                "database_name" : "devops",
                "partition_fields": "time"
            }
        }
    ]
}

sink defines where to send the generated data

In [9]:
from confluent_kafka import Producer
import socket
from faker import Faker
import requests

fake = Faker()

def is_number(n):
    is_number = True
    try:
        num = float(n)
        # check for "nan" floats
        is_number = num == num   # or use `math.isnan(num)`
    except ValueError:
        is_number = False
    return is_number


class Sink:
    def __init__(self, config, fields):
        self.config = config
        self.fields = fields
        self.name = 'sink'
        
    def send(self, data):
        print(data)
    
    def init(self, config):
        pass
    
    def name(self):
        return self.name
        
class KafkaSink(Sink):
    def __init__(self, config, fields):
        Sink.__init__(self, config, fields)
        self.kafka_config = {'bootstrap.servers': self.config["broker"], 
                'client.id': socket.gethostname()
            }
        self.topic_name = self.config["topic"]
        self.producer = Producer(self.kafka_config)
        self.name = f'kafka:{self.config["broker"]}'
        
    
    def send(self, data): 
        ts = time.time() 
        self.producer.produce(self.topic_name, key=fake.lexify(), value=data)
        self.producer.flush()
        te = time.time() 
        return te-ts
        
        
class ConsoleSink(Sink):
    def __init__(self, config, fields):
        Sink.__init__(self, config, fields)
        

class ClickhouseSink(Sink):
    def __init__(self, config, fields):
        Sink.__init__(self, config, fields)
        self.url = self.config["url"]
        
        self.schema_config = self.config["schema"]
        self.db = self.schema_config["database_name"]
        self.table = self.schema_config["table_name"]
        
        if self.schema_config["create_table"] == "auto":
            self.create_table = True
            self.create_table_sql = self.generate_schema()
        elif self.schema_config["create_table"] == "yes" :
            self.create_table = True
            self.create_table_sql = self.schema_config["create_table_sql"]
        else:
            self.create_table = False
            
        self.name = f'clickhouse:{self.config["url"]}'
        print('init ck sink()')  
        self.init()
        
    def send(self, data):
        load_sql = f'INSERT INTO {self.db}.{self.table} VALUES '
        rows=data.strip().split('\n')
        for row in rows:
            fields = row.split('|')
            processed_fields = self.process_fields(fields)
            load_sql = load_sql + '(' + ','.join(processed_fields) + ') '
        
        #print(load_sql)
        ts = time.time() 
        r = self.query(load_sql)
        te = time.time() 
        return te-ts
    
    def process_fields(self, fields):
        processed_fields = []
        for t,v in zip(self.fields_types, fields):
            if t == 'Float32':
                processed_fields.append(v)
            elif t.startswith('DateTime') and is_number(v):
                processed_fields.append(v)
            else:
                # adding ' for no number
                processed_fields.append(f"'{v}'")
        return processed_fields 
    
    def init(self):
        sql_create_db = f'CREATE DATABASE IF NOT EXISTS {self.db}'
        print(sql_create_db)
        r = self.query(sql_create_db)
        print(r.text)
        
        if self.create_table:
            print(self.create_table_sql)
            r = self.query(self.create_table_sql)
            print(r.text)
            
    def generate_schema(self):
        sql_create_table = f'CREATE TABLE IF NOT EXISTS {self.db}.{self.table} ( '
        fields = [ f'{field["name"]} {self.get_db_type(field["type"])}' for field in self.fields]
        self.fields_types = [ self.get_db_type(field["type"]) for field in self.fields]
        sql_create_table = sql_create_table + ','.join(fields)
        sql_create_table = sql_create_table + ') ENGINE = MergeTree() PARTITION BY toYYYYMMDD(time) ORDER BY time'
        return sql_create_table
    
    def get_db_type(self, field): 
        if field == "date" :
            return "Date"
        
        if field == "datetime" :
            return "DateTime('UTC')"
            
        if field == "timestamp" :
            return "DateTime('UTC')"
            
        if field == "number" :
            return "Float32"
            
        if field == "string" :
            return "String"
        
        return "String"
    
    def query(self, sql):
        print(sql)
        print(self.url)
        return requests.post(self.url, data = sql)
        
    def clean(self):        
        sql_drop_db = f'DROP DATABASE IF EXISTS {self.table}'
        r = self.query(sql_drop_db)
        print(r.text)
    
    def count(self):
        sql_count_table = f'SELECT COUNT(*) FROM {self.db}.{self.table}'
        r = self.query(sql_count_table)
        result = r.text
        print(result)
        return int(result)
    
class InfluxSink(Sink):
    def __init__(self, config, fields):
        Sink.__init__(self, config, fields)
        self.config = config
        self.url = f'{self.config["url"]}api/v2/write'
        self.org = self.config["org"]
        self.bucket = self.config["bucket"]
        self.token = self.config["token"]
        self.measure = self.config["measure"]
        self.precision = self.config["precision"]
        self.tags = []
        self.fields = []
        
        
        self.timestamp_index = -1
        
        for i, v in enumerate(fields):
            if v["type"] == 'timestamp':
                self.timestamp_index = i
            elif v["type"] == 'number':
                field = {"name": v["name"], "index":i }
                self.fields.append(field)
            else:
                tag = {"name": v["name"], "index":i }
                self.tags.append(tag)
        
    def query(self, flux):
        headers = {'Authorization' : f'Token {self.token}'}
        params = {'org': self.org, 'bucket': self.bucket, 'precision': self.precision}
        print(headers)
        print(params)
        print(self.url)
        return requests.post(self.url, headers=headers, params=params, data = flux)
        
    def send(self, data):
        line = ''
        for i in data.strip().split('\n'):  
            print(f'line is {i}')
            row = i.split('|')
            measure = self.measure 
            fields = ",".join([ f'{field["name"]}={row[field["index"]] }'  for field in self.fields])
            tags = ",".join([ f'{tag["name"]}={row[tag["index"]].replace(" ","")}'  for tag in self.tags])
            timestamp = int(float(row[self.timestamp_index]))
            
            line = line + f'{measure},{tags} {fields} {timestamp}' + '\n'
        
        print(line)
        
        ts = time.time() 
        r = self.query(line)
        te = time.time() 
        print(r.text)
        #time.sleep(1)
        return te-ts
        
    
    def count(self):
        return 0
        


generator will generarate events and send to sinks

In [23]:
import time
from datetime import datetime
from multiprocessing import Process

from faker import Faker
fake = Faker()

class DataGenerator:
    def __init__(self, config):
        print("creating generator here!")
        self.config = config
        self.t = time.time()
        self.t_unit = {
            's': 1,
            'ms': 1/1000,
            'us': 1/1000/1000,
            'ns': 1/1000/1000/1000
        }[self.config["precision"]]
        
        self.sinks_config = self.config["sinks"]
        print("create sinks!")
        self.sinks = self.create_sinks(self.sinks_config, self.config["fields"])
        
        print("genertor created!")
        
    def generate(self):
        for i in range(self.config["batch_size"]):
            self.t = self.t + self.t_unit
            yield self.generate_row()
            
    def generate_row(self):
        record = []
        for field in self.config["fields"]:
            record.append(self.generate_item(field))
        return record
    
    def generate_item(self, field):
        if field["type"] == 'date':
            return self.generate_date()
        
        if field["type"] == 'datetime':
            return self.generate_datetime()
        
        if field["type"] == 'timestamp':
            return self.generate_timestamp(precision=field["precision"])
        
        if field["type"] == 'number':
            return self.generate_number(format_string=field["format"])
        
        if field["type"] == 'string':
            return self.generate_string(format_string=field["format"])
    
    def generate_date(self):
        dt = datetime.fromtimestamp(self.t)
        return dt.strftime("%Y-%m-%d")
    
    def generate_datetime(self):
        dt = datetime.fromtimestamp(self.t)
        return dt.strftime("%Y-%m-%d %H:%M:%S")
    
    def generate_timestamp(self, precision="ms"):
        if self.config["realtime"]:
            t = time.time()
        else:
            t = self.t
            
        if precision == "s":
            print(f'gen s {t}')
            return str(t)
        
        if precision == "ms":
            print(f'gen ms {t}')
            return str(t*1000)
        
        if precision == "us":
            return str(t*1000*1000)
        
        if precision == "ns":
            return str(t*1000*1000*1000)
    
    def generate_string(self, format_string="????"):
        return fake.bothify(text=format_string)
    
    def generate_number(self, format_string="####"):
        return fake.numerify(text=format_string)

    # shuffle the data to test insert with dis-ordered data
    def shuffle(self, data):
        result = [ x for x in data]
        random.shuffle(result)
        return result
    
    def csv(self):
        data = self.generate()
        result = ''
        for i in data:
            result = result + '|'.join(i) + '\n'
        return result
    
    def observe(self, sink):
        count = sink.count()
        while True:
            time.sleep(1)
            new_count = sink.count()
            count_diff = new_count - count
            print(f'iops for {sink.name} is {count_diff}')
            if count_diff == 0:
                break;
            count = new_count
    
    def load_single(self):
        print('in load single')
        for i in range(self.config["repeat"]):
            for sink in self.sinks:
                print("send")
                query_latency = sink.send(self.csv())
                print(f'data send to {sink.name} with {query_latency}')
            
    def load_concurrent(self):
        print('load concurrent')
        workers = []
        for _ in range(self.config["concurrency"]):
            print('create one worker')
            w = Process(target=self.load_single)
            w.start()
            workers.append(w)
        
        obs = []
        for sink in self.sinks:
            ob = Process(target=self.observe, args=(sink,))
            ob.start()
            obs.append(ob)
        
        for ob in obs:
            ob.join()
        
        for w in workers:
            w.join()
        
                
    def create_sinks(self, config, fields):
        
        print("in create sinks")
        sinks = []

        for t in config:
            print("create sinks", t)
            if t["type"] == "kafka":
                sinks.append(KafkaSink(t,fields))
                
            if t["type"] == "console":
                sinks.append(ConsoleSink(t,fields))
                
            if t["type"] == "clickhouse":
                print("create ck sink!")
                sinks.append(ClickhouseSink(t,fields))
                
            if t["type"] == "influx":
                sinks.append(InfluxSink(t,fields))
                
            # register customer sink here
        return sinks
        

generate a batch

In [None]:
m = DataGenerator(generator_config)
#print(m.csv())
print([ x for x in m.generate()])

save config to file

In [40]:
import json
with open('config.json', 'w') as f:
    json.dump(generator_config, f, indent=4)

In [None]:
with open('config.json') as f:
    config = json.load(f) 
    m = DataGenerator(config)
    print(m.csv())

In [7]:
with open('config.json') as f:
    config = json.load(f) 
    print(config)
    m = DataGenerator(config)
    m.load_single()

{'batch_size': 10, 'repeat': 20, 'concurrency': 10000, 'interval': 1, 'name': 'cpu', 'start_date': '', 'precision': 'ms', 'realtime': False, 'fields': [{'name': 'time', 'type': 'timestamp', 'precision': 'ms'}, {'name': 'tag_id', 'type': 'string', 'format': '####'}, {'name': 'usage_user', 'type': 'number', 'format': '##'}, {'name': 'usage_system', 'type': 'number', 'format': '##'}, {'name': 'usage_idle', 'type': 'number', 'format': '##'}, {'name': 'usage_nice', 'type': 'number', 'format': '##'}, {'name': 'usage_iowait', 'type': 'number', 'format': '##'}, {'name': 'usage_irq', 'type': 'number', 'format': '##'}, {'name': 'usage_softirq', 'type': 'number', 'format': '##'}, {'name': 'usage_steal', 'type': 'number', 'format': '##'}, {'name': 'usage_guest', 'type': 'number', 'format': '##'}, {'name': 'usage_guest_nice', 'type': 'number', 'format': '##'}, {'name': 'additional_tags', 'type': 'string', 'format': '##'}], 'sinks': [{'type': 'clickhouse', 'url': 'http://clickhouse:8123/', 'schema':

data send to clickhouse:http://clickhouse:8123/ with 0.029042720794677734
send
INSERT INTO devops.cpu VALUES (1626308826937.6965,'2051',75,68,96,32,54,44,46,21,34,51,'01') (1626308826938.6963,'2911',80,02,34,26,27,63,66,84,78,77,'57') (1626308826939.6963,'1655',14,34,64,36,24,79,81,75,22,01,'10') (1626308826940.6963,'3430',74,10,16,72,83,74,07,59,76,92,'81') (1626308826941.6963,'3320',23,61,92,19,63,36,06,44,19,05,'30') (1626308826942.696,'4204',99,81,50,66,46,51,84,36,73,94,'73') (1626308826943.696,'0027',65,90,69,95,69,41,69,57,55,80,'85') (1626308826944.696,'2024',97,80,19,14,62,93,44,38,81,69,'30') (1626308826945.6958,'7894',69,35,36,91,92,63,61,11,42,19,'32') (1626308826946.6958,'1121',58,57,25,03,61,56,29,18,79,36,'11') 
http://clickhouse:8123/
data send to clickhouse:http://clickhouse:8123/ with 0.02666306495666504
send
INSERT INTO devops.cpu VALUES (1626308826947.6958,'7973',14,78,58,05,15,29,46,51,16,46,'75') (1626308826948.6956,'5951',57,55,42,15,98,13,05,53,75,54,'95') (1626

In [3]:
with open('config.json') as f:
    config = json.load(f) 
    m = DataGenerator(config)
    m.load_concurrent()

NameError: name 'DataGenerator' is not defined

In [41]:
with open('config.json') as f:
    config = json.load(f) 
    m = DataGenerator(config)
    print(m.csv())

creating generator here!
create sinks!
in create sinks
create sinks {'type': 'clickhouse', 'url': 'http://clickhouse:8123/', 'schema': {'create_table': 'auto', 'create_table_sql': '', 'table_name': 'cpu', 'database_name': 'devops', 'partition_fields': 'time'}}
create ck sink!
init ck sink()
CREATE DATABASE IF NOT EXISTS devops
CREATE DATABASE IF NOT EXISTS devops
http://clickhouse:8123/

CREATE TABLE IF NOT EXISTS devops.cpu ( time DateTime('UTC'),tag_id String,usage_user Float32,usage_system Float32,usage_idle Float32,usage_nice Float32,usage_iowait Float32,usage_irq Float32,usage_softirq Float32,usage_steal Float32,usage_guest Float32,usage_guest_nice Float32,additional_tags String,hostname String) ENGINE = MergeTree() PARTITION BY toYYYYMMDD(time) ORDER BY time
CREATE TABLE IF NOT EXISTS devops.cpu ( time DateTime('UTC'),tag_id String,usage_user Float32,usage_system Float32,usage_idle Float32,usage_nice Float32,usage_iowait Float32,usage_irq Float32,usage_softirq Float32,usage_steal

In [28]:
with open('config.json') as f:
    config = json.load(f) 
    print(",".join([ field["name"] for field in config["fields"]]))

time,tag_id,usage_user,usage_system,usage_idle,usage_nice,usage_iowait,usage_irq,usage_softirq,usage_steal,usage_guest,usage_guest_nice,additional_tags,hostname


In [14]:
time.time()

1626553761.4174144