In [None]:
%%bash
pip install kafka-python

In [None]:
from kafka import KafkaConsumer, KafkaProducer
import json
import re
from datetime import datetime
import nltk
import pickle
from nltk.corpus import stopwords
import csv
from sklearn.feature_extraction.text import TfidfVectorizer
import numpy as np
from nltk.stem import WordNetLemmatizer

In [None]:
consumer = KafkaConsumer(
    bootstrap_servers='kafka-cluster-kafka-bootstrap.kafka-cluster-project.svc.cluster.local:9092',
    auto_offset_reset='latest',
    value_deserializer=lambda m: json.loads(m.decode('ascii','ignore'))
)


In [None]:
import json
from json import JSONEncoder
import numpy

class NumpyArrayEncoder(JSONEncoder):
    def default(self, obj):
        if isinstance(obj, numpy.ndarray):
            return obj.tolist()
        return JSONEncoder.default(self, obj)


In [None]:
producer = KafkaProducer(bootstrap_servers='kafka-cluster-kafka-bootstrap.kafka-cluster-project.svc.cluster.local:9092',value_serializer=lambda v: json.dumps(v, cls=NumpyArrayEncoder).encode('utf-8'))

In [None]:
consumer.subscribe('lad-jaeger-spans')

In [None]:

nltk.download('stopwords')
nltk.download('wordnet')
nltk.download('omw-1.4')
stemmer = WordNetLemmatizer()

# open the file in the read mode
f = open('final_log_data.csv', 'r')

# create the csv reader
csv_reader = csv.reader(f)
next(csv_reader, None)  # skip the headers

num_of_comments = 1000
i = 1
comments = []
durations = []
labels = []
for row in csv_reader:
    comment = row[2]
    duration = row[0]
    label = row[3]

    # Substituting multiple spaces with single space
    comment = re.sub(r'\s+', ' ', comment, flags=re.I)

    # Converting to Lowercase
    comment = comment.lower()
    
    # Lemmatization
    comment = comment.split()
    comment = [stemmer.lemmatize(word) for word in comment]
    comment = ' '.join(comment)
    
    comments.append(comment)
    durations.append(duration)
    labels.append(label)

    i = i + 1
    if i > num_of_comments:
        break

f.close()

tfidfconverter = TfidfVectorizer(max_features=1000, stop_words=stopwords.words('english'))
tfidfconverter = tfidfconverter.fit(comments)

In [None]:
counter = 1

for message in consumer:
    # message value and key are raw bytes -- decode if necessary!
    # e.g., for unicode: `message.value.decode('utf-8')`
    # print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition, message.offset, message.key, message.value))
    
    if message.value['process']['serviceName'] == 'frontend':
        
        http_method = re.findall(r"{'key': 'http\.method', 'vStr': '(.+?)'}",str(message.value))
        http_target = re.findall(r"{'key': 'http\.target', 'vStr': '(.+?)'}",str(message.value))
        http_status_code = re.findall(r"{'key': 'http\.status_code', 'vType': '.*?', 'vInt64': '(\d+)'}",str(message.value))
        http_url = re.findall(r"{'key': 'http\.url', 'vStr': '(.+?)'}",str(message.value))
        duration = re.findall(r"(.+?)s",str(message.value['duration']))
        
        if len(http_method) > 0:
            http_method = http_method[0]
        if len(http_target) > 0:
            http_target = http_target[0]
        if len(http_url) > 0:
            http_url = http_url[0]
        if len(http_status_code) > 0:
            http_status_code = http_status_code[0]
        if len(duration) > 0:
            duration = duration[0]
        
        if http_method == 'POST':
            if "comment" in http_url:
                comment = ""
                fields = message.value['logs'][0]['fields']
                description1 = re.findall(r"{'key': 'body', 'vStr': '(.+?)'}",str(fields))
                if len(description1) > 0:
                    temp = re.findall(r'"description":"(.+?)","_links":',str(description1[0]))
                    user_id = re.findall(r'"userId":(.+?),"movieId":',str(description1[0]))
                    movie_id = re.findall(r'"movieId":(.+?),"description":',str(description1[0]))
                    
                    if len(temp) > 0:
                        comment = temp[0]
                        user_id = user_id[0]
                        movie_id = movie_id[0]
                
                
                data = [duration,comment]


                # PREDICTION

                duration = data[0]
                comment_orig = data[1]


                # Substituting multiple spaces with single space
                comment = re.sub(r'\s+', ' ', comment_orig, flags=re.I)

                # Converting to Lowercase
                comment = comment.lower()

                # Lemmatization
                comment = comment.split()
                comment = [stemmer.lemmatize(word) for word in comment]
                comment = ' '.join(comment)

                comment_features = tfidfconverter.transform([comment]).toarray()

                A = np.array([duration], dtype=float)[:,None]
                data = np.concatenate((A, comment_features), axis=1)
                ##### data_pred = classifier.predict(data)

                date = datetime.now().strftime('%Y-%m-%d')
                time = datetime.now().strftime('%H:%M:%S')
                
                numpyData = {"array": data,"comment_date": date,'comment':comment,'spanId':message.value['spanId'],'userId':user_id,'movieId':movie_id}
                producer.send('data_to_predict_logistic_regression',numpyData)
                
                ##### print(counter, date, time, duration, comment_orig, data_pred[0])

                ##### log_prediction = [date,time,duration,comment_orig,data_pred[0]]
                # write a row to the csv file
                ##### writer.writerow(log_prediction)
                counter = counter + 1
