diff --git a/ad-insertion/kafka2db/main.py b/ad-insertion/kafka2db/main.py index 3b9c1cee..07a8d272 100755 --- a/ad-insertion/kafka2db/main.py +++ b/ad-insertion/kafka2db/main.py @@ -3,51 +3,41 @@ from messaging import Consumer from db import DataBase -from threading import Lock, Timer +from threading import Thread, Condition import json import time import os kafka_topic = "seg_analytics_data" kafka_group = "kafka_to_db_converter" -ingest_batch=int(os.environ["INGEST_BATCH"]) -ingest_duration=float(os.environ["INGEST_DURATION"]) - -class IntervalTimer(Timer): - def run(self): - while not self.finished.is_set(): - self.finished.wait(self.interval) - self.function(*self.args, **self.kwargs) - self.finished.set() class KafkaToDB(object): def __init__(self): super(KafkaToDB,self).__init__() self._db=DataBase() self._cache=[] - self._lock=Lock() - self._timer=IntervalTimer(ingest_duration, self._ingest) - self._timer.start() + self._cond=Condition() + Thread(target=self._ingest).start() def _ingest(self): - if not len(self._cache): return - self._lock.acquire() - bulk=self._cache - self._cache=[] - self._lock.release() - if not len(bulk): return - try: - self._db.save(bulk) - print("SaveToDB #"+str(len(bulk)), flush=True) - except Exception as e: - print("Exception: "+str(e), flush=True) + while True: + self._cond.acquire() + self._cond.wait() + bulk=self._cache + self._cache=[] + self._cond.release() + + try: + self._db.save(bulk) + print("SaveToDB #"+str(len(bulk)), flush=True) + except Exception as e: + print("Exception: "+str(e), flush=True) def _send(self, data): - self._lock.acquire() + self._cond.acquire() self._cache.append(data) - self._lock.release() - if len(self._cache)>ingest_batch: - self._ingest() + self._cond.notify() + self._cond.release() def listen(self): while True: diff --git a/deployment/docker-swarm/kafka2db.m4 b/deployment/docker-swarm/kafka2db.m4 index 49df79aa..ae17f888 100644 --- a/deployment/docker-swarm/kafka2db.m4 +++ b/deployment/docker-swarm/kafka2db.m4 @@ -2,8 +2,6 @@ kafka2db: image: ssai_kafka2db:latest environment: - INGEST_DURATION: "0.1" - INGEST_BATCH: "50" NO_PROXY: "*" no_proxy: "*" networks: diff --git a/deployment/kubernetes/kafka2db.yaml.m4 b/deployment/kubernetes/kafka2db.yaml.m4 index d7adb3b8..3ccd3efe 100644 --- a/deployment/kubernetes/kafka2db.yaml.m4 +++ b/deployment/kubernetes/kafka2db.yaml.m4 @@ -22,10 +22,6 @@ spec: image: ssai_kafka2db:latest imagePullPolicy: IfNotPresent env: - - name: INGEST_DURATION - value: "0.1" - - name: INGEST_BATCH - value: "50" - name: NO_PROXY value: "*" - name: no_proxy