## Backup msg from main to backup or restore from backup to main
Topics:
SensorThings Entities + beatsGlobal

In [1]:
from kafka import KafkaProducer,KafkaConsumer
import threading, logging, time
import multiprocessing

In [2]:
printLog = True
topics = ["beats","beats2","beatsGlobal"]
KFC_Main = ['smartaqnet-0-node1:9092','smartaqnet-0-node2:9092','smartaqnet-0-node3:9092']
KFC_Backup = ['smartaqnet-1-node1:9092']

In [3]:
class BackupAgent(multiprocessing.Process):
    def __init__(self, KFC_Main, KFC_Backup, topics, printLog = False):
        multiprocessing.Process.__init__(self)
        self.stop_event = multiprocessing.Event()
        self.KFC_Main = KFC_Main
        self.KFC_Backup = KFC_Backup
        self.topics = topics
        self.printLog = printLog
        
    def stop(self):
        self.stop_event.set()
        
    def run(self):
        consumer = KafkaConsumer(bootstrap_servers= self.KFC_Main,
                                 auto_offset_reset='latest',
                                 consumer_timeout_ms=1000)
        
        producer = KafkaProducer(bootstrap_servers= self.KFC_Backup)
        consumer.subscribe(topics)
        while not self.stop_event.is_set():
            for message in consumer:
                if(printLog):
                    print("Receive message from main:" + str(message.value))
                producer.send(topic = message.topic, value = message.value,
                             key = message.key, partition = message.partition,
                             timestamp_ms = message.timestamp)
                if(printLog):
                    print("Backing up message to backup:{} : {}".format(str(message.topic), str(message.value)))
                if self.stop_event.is_set():
                    break

        consumer.close()
class RestoreAgent(multiprocessing.Process):
    def __init__(self, KFC_Main, KFC_Backup, topics, printLog = False):
        multiprocessing.Process.__init__(self)
        self.stop_event = multiprocessing.Event()
        self.KFC_Main = KFC_Main
        self.KFC_Backup = KFC_Backup
        self.topics = topics
        self.printLog = printLog
        
    def stop(self):
        self.stop_event.set()
        
    def run(self):
        consumer = KafkaConsumer(bootstrap_servers= self.KFC_Main,
                                 auto_offset_reset='latest',
                                 consumer_timeout_ms=1000)
        
        producer = KafkaProducer(bootstrap_servers= self.KFC_Backup)
        consumer.subscribe(topics)
        while not self.stop_event.is_set():
            for message in consumer:
                if(printLog):
                    print("Receive message from backup:" + str(message.value))
                producer.send(topic = "beats1", value = message.value,
                             key = message.key, partition = message.partition,
                             timestamp_ms = message.timestamp)
                if(printLog):
                    print("Restoring message to main:" + message.value)
                if self.stop_event.is_set():
                    break

        consumer.close()

In [4]:
tasks = [
        BackupAgent(KFC_Main, KFC_Backup, topics, printLog)
]

In [5]:
for t in tasks:
        t.start()

Receive message from main:2018-07-18 17:47:57.726475
Backing up message to backup:beatsGlobal : 2018-07-18 17:47:57.726475
Receive message from main:2018-07-18 17:48:00.164957
Backing up message to backup:beatsGlobal : 2018-07-18 17:48:00.164957
Receive message from main:2018-07-18 17:48:02.501208
Backing up message to backup:beatsGlobal : 2018-07-18 17:48:02.501208
Receive message from main:2018-07-18 17:48:04.844280
Backing up message to backup:beatsGlobal : 2018-07-18 17:48:04.844280
Receive message from main:2018-07-18 17:48:07.134969
Backing up message to backup:beatsGlobal : 2018-07-18 17:48:07.134969
Receive message from main:2018-07-18 17:48:09.482473
Backing up message to backup:beatsGlobal : 2018-07-18 17:48:09.482473
Receive message from main:2018-07-18 17:48:11.926638
Backing up message to backup:beatsGlobal : 2018-07-18 17:48:11.926638
Receive message from main:2018-07-18 17:48:14.277651
Backing up message to backup:beatsGlobal : 2018-07-18 17:48:14.277651
Receive message 

Receive message from main:2018-07-18 17:50:37.024781
Backing up message to backup:beatsGlobal : 2018-07-18 17:50:37.024781
Receive message from main:2018-07-18 17:50:39.369734
Backing up message to backup:beatsGlobal : 2018-07-18 17:50:39.369734
Receive message from main:2018-07-18 17:50:41.716989
Backing up message to backup:beatsGlobal : 2018-07-18 17:50:41.716989
Receive message from main:2018-07-18 17:50:44.086598
Backing up message to backup:beatsGlobal : 2018-07-18 17:50:44.086598
Receive message from main:2018-07-18 17:50:46.431756
Backing up message to backup:beatsGlobal : 2018-07-18 17:50:46.431756
Receive message from main:2018-07-18 17:50:48.782105
Backing up message to backup:beatsGlobal : 2018-07-18 17:50:48.782105
Receive message from main:2018-07-18 17:50:51.221671
Backing up message to backup:beatsGlobal : 2018-07-18 17:50:51.221671
Receive message from main:2018-07-18 17:50:53.669023
Backing up message to backup:beatsGlobal : 2018-07-18 17:50:53.669023
Receive message 

Receive message from main:2018-07-18 17:53:16.073057
Backing up message to backup:beatsGlobal : 2018-07-18 17:53:16.073057
Receive message from main:2018-07-18 17:53:18.515919
Backing up message to backup:beatsGlobal : 2018-07-18 17:53:18.515919
Receive message from main:2018-07-18 17:53:20.864219
Backing up message to backup:beatsGlobal : 2018-07-18 17:53:20.864219
Receive message from main:2018-07-18 17:53:23.212041
Backing up message to backup:beatsGlobal : 2018-07-18 17:53:23.212041
Receive message from main:2018-07-18 17:53:25.654849
Backing up message to backup:beatsGlobal : 2018-07-18 17:53:25.654849
Receive message from main:2018-07-18 17:53:28.092670
Backing up message to backup:beatsGlobal : 2018-07-18 17:53:28.092670
Receive message from main:2018-07-18 17:53:30.433482
Backing up message to backup:beatsGlobal : 2018-07-18 17:53:30.433482
Receive message from main:2018-07-18 17:53:32.778582
Backing up message to backup:beatsGlobal : 2018-07-18 17:53:32.778582
Receive message 

Receive message from main:2018-07-18 17:55:55.066367
Backing up message to backup:beatsGlobal : 2018-07-18 17:55:55.066367
Receive message from main:2018-07-18 17:55:57.504006
Backing up message to backup:beatsGlobal : 2018-07-18 17:55:57.504006
Receive message from main:2018-07-18 17:55:59.944219
Backing up message to backup:beatsGlobal : 2018-07-18 17:55:59.944219
Receive message from main:2018-07-18 17:56:02.289916
Backing up message to backup:beatsGlobal : 2018-07-18 17:56:02.289916
Receive message from main:2018-07-18 17:56:04.638200
Backing up message to backup:beatsGlobal : 2018-07-18 17:56:04.638200
Receive message from main:2018-07-18 17:56:07.010771
Backing up message to backup:beatsGlobal : 2018-07-18 17:56:07.010771
Receive message from main:2018-07-18 17:56:09.357776
Backing up message to backup:beatsGlobal : 2018-07-18 17:56:09.357776
Receive message from main:2018-07-18 17:56:11.800008
Backing up message to backup:beatsGlobal : 2018-07-18 17:56:11.800008
Receive message 

Receive message from main:2018-07-18 17:58:35.017354
Backing up message to backup:beatsGlobal : 2018-07-18 17:58:35.017354
Receive message from main:2018-07-18 17:58:37.314975
Backing up message to backup:beatsGlobal : 2018-07-18 17:58:37.314975
Receive message from main:2018-07-18 17:58:39.755408
Backing up message to backup:beatsGlobal : 2018-07-18 17:58:39.755408
Receive message from main:2018-07-18 17:58:42.197989
Backing up message to backup:beatsGlobal : 2018-07-18 17:58:42.197989
Receive message from main:2018-07-18 17:58:44.640803
Backing up message to backup:beatsGlobal : 2018-07-18 17:58:44.640803
Receive message from main:2018-07-18 17:58:46.987204
Backing up message to backup:beatsGlobal : 2018-07-18 17:58:46.987204
Receive message from main:2018-07-18 17:58:49.333088
Backing up message to backup:beatsGlobal : 2018-07-18 17:58:49.333088
Receive message from main:2018-07-18 17:58:51.723267
Backing up message to backup:beatsGlobal : 2018-07-18 17:58:51.723267
Receive message 

Receive message from main:2018-07-18 18:01:14.448378
Backing up message to backup:beatsGlobal : 2018-07-18 18:01:14.448378
Receive message from main:2018-07-18 18:01:16.882801
Backing up message to backup:beatsGlobal : 2018-07-18 18:01:16.882801
Receive message from main:2018-07-18 18:01:19.310115
Backing up message to backup:beatsGlobal : 2018-07-18 18:01:19.310115
Receive message from main:2018-07-18 18:01:21.653025
Backing up message to backup:beatsGlobal : 2018-07-18 18:01:21.653025
Receive message from main:2018-07-18 18:01:24.094063
Backing up message to backup:beatsGlobal : 2018-07-18 18:01:24.094063
Receive message from main:2018-07-18 18:01:26.535964
Backing up message to backup:beatsGlobal : 2018-07-18 18:01:26.535964
Receive message from main:2018-07-18 18:01:28.974597
Backing up message to backup:beatsGlobal : 2018-07-18 18:01:28.974597
Receive message from main:2018-07-18 18:01:31.323106
Backing up message to backup:beatsGlobal : 2018-07-18 18:01:31.323106
Receive message 

Receive message from main:2018-07-18 18:03:53.809311
Backing up message to backup:beatsGlobal : 2018-07-18 18:03:53.809311
Receive message from main:2018-07-18 18:03:56.156194
Backing up message to backup:beatsGlobal : 2018-07-18 18:03:56.156194
Receive message from main:2018-07-18 18:03:58.505030
Backing up message to backup:beatsGlobal : 2018-07-18 18:03:58.505030
Receive message from main:2018-07-18 18:04:00.849398
Backing up message to backup:beatsGlobal : 2018-07-18 18:04:00.849398
Receive message from main:2018-07-18 18:04:03.285587
Backing up message to backup:beatsGlobal : 2018-07-18 18:04:03.285587
Receive message from main:2018-07-18 18:04:05.632745
Backing up message to backup:beatsGlobal : 2018-07-18 18:04:05.632745
Receive message from main:2018-07-18 18:04:08.073767
Backing up message to backup:beatsGlobal : 2018-07-18 18:04:08.073767
Receive message from main:2018-07-18 18:04:10.418750
Backing up message to backup:beatsGlobal : 2018-07-18 18:04:10.418750
Receive message 

Receive message from main:2018-07-18 18:06:33.252816
Backing up message to backup:beatsGlobal : 2018-07-18 18:06:33.252816
Receive message from main:2018-07-18 18:06:35.600641
Backing up message to backup:beatsGlobal : 2018-07-18 18:06:35.600641
Receive message from main:2018-07-18 18:06:37.952442
Backing up message to backup:beatsGlobal : 2018-07-18 18:06:37.952442
Receive message from main:2018-07-18 18:06:40.394896
Backing up message to backup:beatsGlobal : 2018-07-18 18:06:40.394896
Receive message from main:2018-07-18 18:06:42.833316
Backing up message to backup:beatsGlobal : 2018-07-18 18:06:42.833316
Receive message from main:2018-07-18 18:06:45.180538
Backing up message to backup:beatsGlobal : 2018-07-18 18:06:45.180538
Receive message from main:2018-07-18 18:06:47.627391
Backing up message to backup:beatsGlobal : 2018-07-18 18:06:47.627391
Receive message from main:2018-07-18 18:06:50.016476
Backing up message to backup:beatsGlobal : 2018-07-18 18:06:50.016476
Receive message 

Receive message from main:2018-07-18 18:09:12.362121
Backing up message to backup:beatsGlobal : 2018-07-18 18:09:12.362121
Receive message from main:2018-07-18 18:09:14.809753
Backing up message to backup:beatsGlobal : 2018-07-18 18:09:14.809753
Receive message from main:2018-07-18 18:09:17.247735
Backing up message to backup:beatsGlobal : 2018-07-18 18:09:17.247735
Receive message from main:2018-07-18 18:09:19.593907
Backing up message to backup:beatsGlobal : 2018-07-18 18:09:19.593907
Receive message from main:2018-07-18 18:09:21.936623
Backing up message to backup:beatsGlobal : 2018-07-18 18:09:21.936623
Receive message from main:2018-07-18 18:09:24.281657
Backing up message to backup:beatsGlobal : 2018-07-18 18:09:24.281657
Receive message from main:2018-07-18 18:09:26.719373
Backing up message to backup:beatsGlobal : 2018-07-18 18:09:26.719373
Receive message from main:2018-07-18 18:09:29.162674
Backing up message to backup:beatsGlobal : 2018-07-18 18:09:29.162674
Receive message 

Receive message from main:2018-07-18 18:11:52.336482
Backing up message to backup:beatsGlobal : 2018-07-18 18:11:52.336482
Receive message from main:2018-07-18 18:11:54.779672
Backing up message to backup:beatsGlobal : 2018-07-18 18:11:54.779672
Receive message from main:2018-07-18 18:11:57.123233
Backing up message to backup:beatsGlobal : 2018-07-18 18:11:57.123233
Receive message from main:2018-07-18 18:11:59.470696
Backing up message to backup:beatsGlobal : 2018-07-18 18:11:59.470696
Receive message from main:2018-07-18 18:12:01.817070
Backing up message to backup:beatsGlobal : 2018-07-18 18:12:01.817070
Receive message from main:2018-07-18 18:12:04.213257
Backing up message to backup:beatsGlobal : 2018-07-18 18:12:04.213257
Receive message from main:2018-07-18 18:12:06.557582
Backing up message to backup:beatsGlobal : 2018-07-18 18:12:06.557582
Receive message from main:2018-07-18 18:12:08.898995
Backing up message to backup:beatsGlobal : 2018-07-18 18:12:08.898995
Receive message 

Process BackupAgent-1:
Traceback (most recent call last):
  File "/home/wang/anaconda2/lib/python2.7/multiprocessing/process.py", line 267, in _bootstrap
    self.run()
  File "<ipython-input-3-9f4e6a5e4905>", line 21, in run
    for message in consumer:
  File "/home/wang/anaconda2/lib/python2.7/site-packages/kafka/vendor/six.py", line 561, in next
    return type(self).__next__(self)
  File "/home/wang/anaconda2/lib/python2.7/site-packages/kafka/consumer/group.py", line 1075, in __next__
    return next(self._iterator)
  File "/home/wang/anaconda2/lib/python2.7/site-packages/kafka/consumer/group.py", line 1013, in _message_generator
    self._client.poll(timeout_ms=poll_ms)
  File "/home/wang/anaconda2/lib/python2.7/site-packages/kafka/client_async.py", line 554, in poll
    responses.extend(self._poll(timeout))
  File "/home/wang/anaconda2/lib/python2.7/site-packages/kafka/client_async.py", line 568, in _poll
    ready = self._selector.select(timeout)
  File "/home/wang/anaconda2/li

In [6]:
for t in tasks:
        t.stop()

In [6]:
producer = KafkaProducer(bootstrap_servers='smartaqnet-0-node3:9092',value_serializer=lambda v: json.dumps(v).encode('utf-8'), compression_type='gzip')

In [19]:
producer.send('beats2', "someThing2")

<kafka.producer.future.FutureRecordMetadata at 0x7f10b0f89a10>

In [20]:
aconsumer = KafkaConsumer(bootstrap_servers='smartaqnet-0-node3:9092',auto_offset_reset='earlist')

In [21]:
from kafka import TopicPartition
aconsumer.assign([TopicPartition('beats', 0)])
aconsumer.assign([TopicPartition('beats2', 0)])
aconsumer.seek_to_beginning()

In [None]:
for message in aconsumer:
    print("Receive message from node3:" + str(message.value) + " " +  str(message.topic))

Receive message from node3:1 beats2
Receive message from node3:2 beats2
Receive message from node3:3 beats2
Receive message from node3:4 beats2
Receive message from node3:5 beats2
Receive message from node3:6 beats2
Receive message from node3:7 beats2
Receive message from node3:8 beats2
Receive message from node3:9 beats2
Receive message from node3:10 beats2
Receive message from node3:11 beats2
Receive message from node3:12 beats2
Receive message from node3:13 beats2
Receive message from node3:14 beats2
Receive message from node3:15 beats2
Receive message from node3:16 beats2
Receive message from node3:17 beats2
Receive message from node3:18 beats2
Receive message from node3:19 beats2
Receive message from node3:20 beats2
Receive message from node3:21 beats2
Receive message from node3:2 beats2
Receive message from node3:3 beats2
Receive message from node3:4 beats2
Receive message from node3:5 beats2
Receive message from node3:6 beats2
Receive message from node3:7 beats2
Receive message 

In [11]:
"Backing up message to backup:{}: {}".format('message.topic', 'message.value')

'Backing up message to backup:message.topic: message.value'