In [14]:
import os
from river import datasets
from river import ensemble
from confluent_kafka import Producer,Consumer
import certifi
import time
import json

In [2]:
user= os.environ['kafka_username']
password= os.environ['kafka_password']
bsts= os.environ['kafka_bootstrap_servers']

In [3]:
feature_topic = 'test_2'

In [4]:
conf = {'bootstrap.servers': bsts,
            'sasl.mechanism': 'PLAIN',
            'security.protocol': 'SASL_SSL',
            'ssl.ca.location': certifi.where(),
            'sasl.username': user,
            'sasl.password': password,
            'batch.num.messages': 2048,
            #'queue.buffering.max.messages': 100,
            'linger.ms': 100,
            'client.id': 'producer-icde-2023'}
producer = Producer(conf)    

In [None]:
max_size=4096
dataset = datasets.MaliciousURL()
data = dataset.take(max_size)


In [6]:
end=0
cnt = 0
st = time.time()
abs_st = time.time()
for f, y in data:
    cnt = cnt + 1    
    d = {}
    d['f']=f
    d['y']=str(y).lower()
    d['st']=time.time()  
            
    v= json.dumps(d).encode('utf-8')
    try:
        producer.produce(feature_topic, value=v, key=str(cnt))
    except:
      print(f'Queue full, flushing {cnt}')
      producer.flush()
      producer.produce(topic, value=v, key=str(cnt))
    if cnt%1024==0:           
        end = time.time()
        print(f'flushing count - {cnt}, time taken in seconds- {end-st} ')        
        producer.flush()
        time.sleep(1)
        st = time.time()        
        
producer.flush()
end = time.time()
print(f'final flushing count - {cnt}, time taken in seconds- {end-abs_st} ')        

Downloading http://www.sysnet.ucsd.edu/projects/url/url_svmlight.tar.gz (233.66 MB)
Uncompressing into /home/ubuntu/river_data/MaliciousURL
flushing count - 128, time taken in seconds- 8.735549211502075 
flushing count - 256, time taken in seconds- 0.009711265563964844 
flushing count - 384, time taken in seconds- 0.009943246841430664 
flushing count - 512, time taken in seconds- 0.009957313537597656 
flushing count - 640, time taken in seconds- 0.009808540344238281 
flushing count - 768, time taken in seconds- 0.010274887084960938 
flushing count - 896, time taken in seconds- 0.010121345520019531 
flushing count - 1024, time taken in seconds- 0.010085821151733398 
flushing count - 1152, time taken in seconds- 0.009898900985717773 
flushing count - 1280, time taken in seconds- 0.01019430160522461 
flushing count - 1408, time taken in seconds- 0.01014566421508789 
flushing count - 1536, time taken in seconds- 0.009747743606567383 
flushing count - 1664, time taken in seconds- 0.00986742

In [18]:
def consume_messages(group_id,model):
    features_consumer_conf = {'bootstrap.servers': bsts,
                          'sasl.username': user,
                          'sasl.password': password,
                          'sasl.mechanism': 'PLAIN',
                          'security.protocol': 'SASL_SSL',
                          'ssl.ca.location': certifi.where(),
                          'group.id': group_id,
                          'enable.auto.commit': True,
                          'auto.commit.interval.ms':1000,         
                          'auto.offset.reset': 'latest'}
    features_consumer = Consumer(features_consumer_conf)  
    
    print(f'\nNow subscribing to features topic:{feature_topic}')
        
    features_consumer.subscribe([feature_topic])
    cnt = 0
    msg = None
    error_cnt = 0
    end_learn_ts = 0
    st_learn_ts = 0

    st_processing_time = 0
    
    learning_durations=[]
    prediction_durations=[]
    processing_durations = []
    score_and_truth = []
    mem_usage = []
    while(True):           
        msg = features_consumer.poll(timeout=0.1)    
        if msg is None: continue
        if msg.error():
            error_cnt = error_cnt + 1
            if msg.error().code() == KafkaError._PARTITION_EOF:                    
                    if(error_cnt%1000==0):
                        print('error')
                        print(msg)
                    sys.stderr.write('%% %s [%d] reached end at offset %d\n' %
                                             (msg.topic(), msg.partition(), msg.offset()))
        else:       
            try:         
                msg_arrival_time = time.time()
                message = json.loads(msg.value().decode("utf-8"))            
                cnt = cnt + 1
                
                f = message['f']
                y = (message['y']=='true')              
                if(cnt==1):
                    st_processing_time = time.time()
                
                st_prediction_time = time.time()            
                score = model_artifact.predict_one(f)
                score_and_truth.append({'y':y,'score':score})
                end_prediction_time = time.time()  
                prediction_durations.append(end_prediction_time-st_prediction_time)
                
                st_learn_ts = time.time()
                model_artifact = model_artifact.learn_one(f,y)      
                end_learn_ts = time.time()
                learning_durations.append(end_learn_ts-st_learn_ts)            
                
                msg_departure_time = time.time()
                processing_durations.append(msg_departure_time-msg_arrival_time)
                if(cnt%10==0):
                    mem_usage.append(model_artifact._raw_memory_usage)
            except Exception as  e:      
                print(json.loads(msg.value().decode("utf-8")))
                print(e, file=sys.stdout)
                ignored = ignored + 1
                print(f'ignored ={ignored} total = {cnt}')

    print('CLOSING')
    features_consumer.commit()
    features_consumer.close() 
    total_time = time.time() - st_processing_time
    return 

In [19]:
import statistics
from river import metrics

def print_time_results(durations, type_of_duration):
    mean = statistics.mean(durations)
    median = statistics.median(durations)
    max_dur = max(durations)
    min_dur = min(durations)  
    print(f'Type of durations : {type_of_duration} ' )
    print(f'\tAVG : {mean}')
    print(f'\MEDIAN : {median}')
    print(f'\MAX : {max_dur}')
    print(f'\MIN : {min_dur}')
    
def print_results(score_and_truth,processing_durations, prediction_durations, learning_durations,mem_usage,total_time):
    auc = metrics.ROCAUC()
    f1 = metrics.F1()
    recall = metrics.MicroRecall()
    for m in score:
        y = m['y']
        score = m['score']
        auc = auc.update(y,score)
        f1 = f1.update(y, score)
        recall = recall.update(y, score)
    
  
    total_records = len(durations)
    avg_memory_usage = statistics.mean(mem_usage)
    print(f'Messages consumed:{total_records},Total Cumulative Time: {total_time}')    
    print(f'AUC{auc}')
    print(f'F1 {f1}')
    print(f'RECALL {recall}')
    print(f'AVERAGE MEMORY USAGE {avg_memory_usage}')
    print_time_results(processing_durations,f'PROCESSING DURATIONS FOR {processing_durations}')
    print_time_results(prediction_durations,f'PREDICTION DURATIONS FOR {prediction_durations}')
    print_time_results(prediction_durations,f'LEARNING DURATIONS FOR {learning_durations}')



In [None]:
model = ensemble.AdaptiveRandomForestClassifier(seed=8, leaf_prediction="mc")
group_id = 'ARFC_8'
score_and_truth,processing_durations, prediction_durations, learning_durations,mem_usage,total_time = consume_messages(group_id,model)
print_results(score_and_truth,processing_durations, prediction_durations, learning_durations,mem_usage,total_time)


Now subscribing to features topic:test_2
