In [1]:
#Input: 2-Columns from crawler
#Output: 7-Columns preprocessed data to elasticsearch and gaam5 and cassandra

import pandas as pd
import datetime
import re
import json
import shutil
from pyspark.sql.types import StructType, StructField, StringType

#if this code is running for first time, it needs to train the model using old data.
FIRST_TIME=True #ino bezarin True bemoone. bazi computera nemitoone modele spark ro load kone. buge sparke. model to train konin tori nist.
pathModel = r"C:\Users\AhmaDGoly\Desktop\Arshad\TERM 2\Big Data\Final\Codes\ScoreModel.pysparkmodel\\"   #ye addrese pooshe bedin ke khali bashe.
pathOldData6Column = r"C:\Users\AhmaDGoly\Desktop\Arshad\TERM 2\Big Data\Final\Codes\6ColumnOldData.csv" #inja save mishe. ye esme alaki bedin mohem nis.
pathOldData = r'C:\Users\AhmaDGoly\Desktop\Arshad\TERM 2\Big Data\Final\Codes\OldData.csv'               #in mohem hast. addresse OldData.csv ke ferestadamo bedin.
topicToReceive = 'crawler'
topicToSend1 = "elasticsearch"
topicToSend2 = "gaam5"
topicToSend3 = "cassandra"
kafkaPort = 19092

def current_time():
    current_datetime = datetime.datetime.now()
    return current_datetime.strftime("[%Y-%m-%d_%H:%M:%S]")

In [2]:
def persian_num_to_eng_num(number):
    return number.replace('۰', '0').replace('۱', '1')\
                                   .replace('۲', '2')\
                                   .replace('۳', '3')\
                                   .replace('۴', '4')\
                                   .replace('۵', '5')\
                                   .replace('۶', '6')\
                                   .replace('۷', '7')\
                                   .replace('۸', '8')\
                                   .replace('۹', '9')


def extract_hashtags(message):
    if pd.isna(message):
        return []
    hashtags = re.findall(r'\#\w+', message)
    modified_hashtags = []
    for hashtag in hashtags:
        if hashtag.startswith('#ب') and hashtag.endswith('ن'):
            modified_hashtags.append('#BTC')
        elif hashtag.startswith('#اتر'):
            modified_hashtags.append('#ETH')
        elif hashtag.startswith('#بیت'):
            modified_hashtags.append('#BTC')
        else:
            modified_hashtags.append(hashtag)
    return modified_hashtags


def assign_score(message):
    if 'فوری' in message and 'فاندای منفی' in message:
        return 10
    elif 'فاندای منفی' in message:
        return 30
    elif 'فوری' in message and 'فاندای مثبت' in message:
        return 90
    elif 'فاندای مثبت' in message:
        return 70
    if '#لیکوئید' in message and '۲۴' in message:
        if 'Long' in message:
            percent_index = message.find('٪')
            if percent_index != -1:
                start_index = message.rfind(' ', 0, percent_index) + 1
                percentage = message[start_index:percent_index]
                return 100 - int(round(float(persian_num_to_eng_num(percentage))))
        elif 'Short' in message:
            percent_index = message.find('٪')
            if percent_index != -1:
                start_index = message.rfind(' ', 0, percent_index) + 1
                percentage = message[start_index:percent_index]
                return int(round(float(persian_num_to_eng_num(percentage))))
    return 50

def assign_category(message):
    if message.startswith('📊'):
        return 'Chart'
    elif message.startswith('✍') or message.startswith('📉') or message.startswith('🔰'):
        return 'News'
    elif message.startswith('🚨 فوری'):
        return 'Immediate News'
    elif message.startswith('🚨'):
        return 'Special News'
    else:
        return 'News'
def clean_message_text(message):
    message = message.replace('\n', '')
    message = message.replace('@NEWS_FUNDA', '')
    message = message.replace('NEWS_FUNDA', '')
    message = message.strip()
    return message

In [3]:
def TwoColumn_to_SixColumn(df):
    df_copy = df.copy()
    df_copy['hashtags'] = df_copy['message'].apply(extract_hashtags)
    df_copy = df_copy.dropna(subset=['message'])
    df_copy['score'] = df_copy['message'].apply(assign_score)
    df_copy['category'] = df_copy['message'].apply(assign_category)
    df_copy['clean_message'] = df_copy['message'].apply(clean_message_text)
    return df_copy

In [4]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import Tokenizer, HashingTF
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.regression import LinearRegressionModel
spark = SparkSession.builder.appName("MessageScorePrediction").getOrCreate()

def pysparkScoreModel(df,spark):
    print(f"{current_time()}: Training ScoreModel based on old dataset.")
    data = [(row.clean_message, row.score) for row in df.itertuples(index=False)]
    spark_df = spark.createDataFrame(data, ["clean_message", "score"])
    tokenizer = Tokenizer(inputCol="clean_message", outputCol="words")
    tokenized_df = tokenizer.transform(spark_df)
    hashingTF = HashingTF(inputCol="words", outputCol="features")
    featured_df = hashingTF.transform(tokenized_df)
    (training_data, testing_data) = featured_df.randomSplit([0.9, 0.1], seed=42)
    lr = LinearRegression(labelCol="score", featuresCol="features")
    model = lr.fit(training_data)
    #Test:
    predictions = model.transform(testing_data)
    evaluator = RegressionEvaluator(labelCol="score", predictionCol="prediction", metricName="rmse")
    rmse = evaluator.evaluate(predictions)
    print(f"{current_time()}: Training ScoreModel completed. Root Mean Squared Error (RMSE): {rmse}")
    #return trained model
    return model

In [5]:
def predict(pandas_df, spark, scoreModel):
    #add some prints mb?
    data = [(row.clean_message,) for row in pandas_df.itertuples(index=False)]
    schema = StructType([StructField("clean_message", StringType())])
    spark_df = spark.createDataFrame(data, schema)
    tokenizer = Tokenizer(inputCol="clean_message", outputCol="words")
    tokenized_df = tokenizer.transform(spark_df)
    hashingTF = HashingTF(inputCol="words", outputCol="features")
    featured_df = hashingTF.transform(tokenized_df)
    predictions = scoreModel.transform(featured_df)
    predictions = predictions.drop("features")
    predictions = predictions.drop("words")
    predictions = predictions.drop("clean_message")
    predictions = predictions.withColumnRenamed("prediction", "ML_score")
    pandas_predictions = predictions.toPandas()
    return pandas_df.merge(pandas_predictions, left_index=True, right_index=True)

In [6]:
if FIRST_TIME:
    print(f"{current_time()}: FIRST_TIME is on. loading old data.")
    df = pd.read_csv(pathOldData, encoding='utf-8-sig')
    df = TwoColumn_to_SixColumn(df)
    print(f"{current_time()}: Preprocessed old data. made 'hashtag','score','category' and 'clean_message' columns from message column.")
    scoreModel = pysparkScoreModel(df,spark)
    shutil.rmtree(pathModel, ignore_errors=True)
    scoreModel.save(pathModel)
    df.to_csv(pathOldData6Column, index=False, encoding='utf-8-sig')
    print(f"{current_time()}: Saved Score Model and 6Column old_data.")
else:
    print(f"{current_time()}: FIRST_TIME is off. loading ScoreModel.")
    scoreModel = LinearRegressionModel.load(pathModel)
    df = pd.read_csv(pathOldData6Column, encoding='utf-8-sig')

[2023-07-15_16:34:50]: FIRST_TIME is on. loading old data.
[2023-07-15_16:34:51]: Preprocessed old data. made 'hashtag','score','category' and 'clean_message' columns from message column.
[2023-07-15_16:34:51]: Training ScoreModel based on old dataset.
[2023-07-15_16:35:37]: Training ScoreModel completed. Root Mean Squared Error (RMSE): 3.865693387791679
[2023-07-15_16:35:39]: Saved Score Model and 6Column old_data.


In [7]:
#Gerenating Preprocessed Stream_data for group.
#streamdf = pd.read_csv(r'C:\Users\AhmaDGoly\Desktop\Arshad\TERM 2\Big Data\Final\Codes\StreamData.csv', encoding='utf-8-sig')
#streamdf = TwoColumn_to_SixColumn(streamdf)
#streamdf = predict(streamdf, spark, scoreModel)
#streamdf.to_csv(r'C:\Users\AhmaDGoly\Desktop\Arshad\TERM 2\Big Data\Final\Codes\StreamData_preprocessed.csv', index=False, encoding='utf-8-sig')

In [7]:
from kafka import KafkaConsumer,KafkaProducer
consumer = KafkaConsumer(topicToReceive, bootstrap_servers=[f'127.0.0.1:{kafkaPort}'])
producer = KafkaProducer(bootstrap_servers=[f'127.0.0.1:{kafkaPort}'])

In [8]:
import threading
import queue
import time
receivedMsgs_queue = queue.Queue()
ThreadOutput_queue = queue.Queue()
x = 1
y = 1
def TwoColumn_To_SevenColumn_Thread():
    global receivedMsgs_queue
    global ThreadOutput_queue
    global producer
    global x
    while True:
        time.sleep(0.1)
        if not receivedMsgs_queue.empty():
            message = receivedMsgs_queue.get()
            json_row = message.value.decode('utf-8')
            row = json.loads(json_row)
            cachedf = pd.DataFrame([row])
            #2 to 6
            cachedf = TwoColumn_to_SixColumn(cachedf)
            #6 to 7 (ML_score from ScoreModel)
            cachedf = predict(cachedf, spark, scoreModel)
            #print(f"{current_time()}: Processing complete. sending preprocessed news to topic={topicToSend1},{topicToSend2},{topicToSend3}")
            ThreadOutput_queue.put(f"{current_time()}: Processing {x} complete. sending preprocessed news to topic={topicToSend1},{topicToSend2},{topicToSend3}")
            for index, row in cachedf.iterrows():
                json_row = json.dumps(row.to_dict())
                producer.send(topicToSend1, value=json_row.encode('utf-8'))
                producer.send(topicToSend2, value=json_row.encode('utf-8'))
                producer.send(topicToSend3, value=json_row.encode('utf-8'))
            x = x + 1
            del cachedf,row,json_row

print(f"{current_time()}: {x}: preProcessingThread is running.")
preProcessingThread = threading.Thread(target=TwoColumn_To_SevenColumn_Thread)
preProcessingThread.start()

[2023-07-13_18:56:20]: 1: preProcessingThread is running.


In [9]:
for message in consumer:
    print(f"{current_time()}: {y}: Received a news from Crawler. Processing...")
    y = y + 1
    receivedMsgs_queue.put(message)
    while not ThreadOutput_queue.empty():
        print(ThreadOutput_queue.get())

[2023-07-13_18:56:38]: 1: Received a news from Crawler. Processing...
[2023-07-13_18:57:08]: 2: Received a news from Crawler. Processing...
[2023-07-13_18:56:45]: Processing 1 complete. sending preprocessed news to topic=elasticsearch,gaam5,cassandra
[2023-07-13_18:57:38]: 3: Received a news from Crawler. Processing...
[2023-07-13_18:57:14]: Processing 2 complete. sending preprocessed news to topic=elasticsearch,gaam5,cassandra
[2023-07-13_18:58:08]: 4: Received a news from Crawler. Processing...
[2023-07-13_18:57:43]: Processing 3 complete. sending preprocessed news to topic=elasticsearch,gaam5,cassandra
[2023-07-13_18:58:47]: 5: Received a news from Crawler. Processing...
[2023-07-13_18:58:13]: Processing 4 complete. sending preprocessed news to topic=elasticsearch,gaam5,cassandra
[2023-07-13_18:58:47]: 6: Received a news from Crawler. Processing...
[2023-07-13_18:58:47]: 7: Received a news from Crawler. Processing...
[2023-07-13_18:58:47]: 8: Received a news from Crawler. Processing

KeyboardInterrupt: 

In [41]:
#Old output without using thread and queue

#x = 1
# let this cell run forever.
#for message in consumer:
#    print(f"{current_time()}: {x}: Received a news from Crawler. Processing...")
#    json_row = message.value.decode('utf-8')
#    row = json.loads(json_row)
#    cachedf = pd.DataFrame([row])
#    #2 to 6
#    cachedf = TwoColumn_to_SixColumn(cachedf)
#    #6 to 7 (ML_score from ScoreModel)
#    cachedf = predict(cachedf, spark, scoreModel)
#    print(f"{current_time()}: Processing complete. sending preprocessed news to topic={topicToSend1},{topicToSend2},{topicToSend3}")
#    for index, row in cachedf.iterrows():
#        json_row = json.dumps(row.to_dict())
#        producer.send(topicToSend1, value=json_row.encode('utf-8'))
#        producer.send(topicToSend2, value=json_row.encode('utf-8'))
#        producer.send(topicToSend3, value=json_row.encode('utf-8'))
#    x = x + 1
#    del cachedf,row,json_row

[2023-07-11_20:46:39]: 1: Received a news from Crawler. Processing...




[2023-07-11_20:46:44]: Processing complete. sending preprocessed news to topic=elasticsearch,gaam5,cassandra




[2023-07-11_20:46:46]: 2: Received a news from Crawler. Processing...
[2023-07-11_20:46:51]: Processing complete. sending preprocessed news to topic=elasticsearch,gaam5,cassandra
[2023-07-11_20:46:51]: 3: Received a news from Crawler. Processing...
[2023-07-11_20:46:56]: Processing complete. sending preprocessed news to topic=elasticsearch,gaam5,cassandra
[2023-07-11_20:46:56]: 4: Received a news from Crawler. Processing...
[2023-07-11_20:47:01]: Processing complete. sending preprocessed news to topic=elasticsearch,gaam5,cassandra
[2023-07-11_20:47:01]: 5: Received a news from Crawler. Processing...
[2023-07-11_20:47:06]: Processing complete. sending preprocessed news to topic=elasticsearch,gaam5,cassandra
[2023-07-11_20:47:06]: 6: Received a news from Crawler. Processing...
[2023-07-11_20:47:10]: Processing complete. sending preprocessed news to topic=elasticsearch,gaam5,cassandra
[2023-07-11_20:47:10]: 7: Received a news from Crawler. Processing...
[2023-07-11_20:47:15]: Processing c

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "c:\Users\AhmaDGoly\AppData\Local\Programs\Python\Python39\lib\site-packages\py4j\java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "c:\Users\AhmaDGoly\AppData\Local\Programs\Python\Python39\lib\site-packages\py4j\clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "c:\Users\AhmaDGoly\AppData\Local\Programs\Python\Python39\lib\socket.py", line 704, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 