In [1]:
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession

In [2]:
import numpy as np
import transformers
import pandas as pd
# import matplotlib.pyplot as plt
import torch
# import seaborn as sns
from pyspark.sql.types import ArrayType, IntegerType

from torch.utils.data import Dataset
import torch.nn.utils.rnn as rnn_utils
# import matplotlib.pyplot as plt
import time
from pyspark.sql.functions import *
# import seaborn as sns
import statsmodels.api as sm
import pyspark
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, SQLContext

from pyspark.ml.feature import StringIndexer, VectorIndexer, StringIndexerModel, IndexToString
from pyspark.sql import SparkSession
#conf = SparkConf().setAppName('Elephas_App').setMaster('local[8]')
#sc = SparkContext(conf=conf)


  from .autonotebook import tqdm as notebook_tqdm
Pyarrow will become a required dependency of pandas in the next major release of pandas (pandas 3.0),
(to allow more performant data types, such as the Arrow string type, and better interoperability with other libraries)
but was not found to be installed on your system.
If this would cause problems for you,
please provide us feedback at https://github.com/pandas-dev/pandas/issues/54466
        
  import pandas as pd


In [3]:
scala_version = '2.12'  # your scala version
spark_version = '3.5.0' # your spark version
packages = [
    f'org.apache.spark:spark-sql-kafka-0-10_{scala_version}:{spark_version}',
    'org.apache.kafka:kafka-clients:3.6.0' #your kafka version
]
spark = SparkSession.builder.master("local").appName("kafka-example").config("spark.jars.packages", ",".join(packages)).getOrCreate()
spark

In [4]:
sqlContext = spark.sparkContext
sqlContext

In [5]:
from pyspark.sql.types import StructType, StructField, StringType, LongType, DoubleType, FloatType

# Define the schema for your DataFrame
schema = StructType([
    StructField("post_message", StringType(), True),
])

In [6]:

# Specify the Kafka parameters
kafka_params = {
    "kafka.bootstrap.servers": "localhost:9092",  # Change this to your Kafka broker
    "subscribe": "FakeNewDT",               # Change this to your Kafka topic
    "startingOffsets": "earliest"
}

In [7]:
from transformers import AutoTokenizer
tokenizer = AutoTokenizer.from_pretrained("vinai/phobert-base-v2")

In [8]:
# Pytorch's nn module has lots of useful feature
import torch.nn as nn

class LSTMNet(nn.Module):

    def __init__(self,vocab_size,embedding_dim,hidden_dim,output_dim,n_layers,bidirectional,dropout):

        super(LSTMNet,self).__init__()

        # Embedding layer converts integer sequences to vector sequences
        self.embedding = nn.Embedding(vocab_size,embedding_dim)

        # LSTM layer process the vector sequences
        self.lstm = nn.LSTM(embedding_dim,
                            hidden_dim,
                            num_layers = n_layers,
                            bidirectional = bidirectional,
                            dropout = dropout,
                            batch_first = True
                           )

        # Dense layer to predict
        self.fc = nn.Linear(hidden_dim * 2,output_dim)
        # Prediction activation function
        self.sigmoid = nn.Sigmoid()


    def forward(self,text,text_lengths):
        embedded = self.embedding(text).to(device)

        # Thanks to packing, LSTM don't see padding tokens
        # and this makes our model better
        packed_embedded = nn.utils.rnn.pack_padded_sequence(embedded, text_lengths,batch_first=True)

        packed_output,(hidden_state,cell_state) = self.lstm(packed_embedded)

        # Concatenating the final forward and backward hidden states
        hidden = torch.cat((hidden_state[-2,:,:], hidden_state[-1,:,:]), dim = 1)

        dense_outputs=self.fc(hidden)

        #Final activation function
        outputs=self.sigmoid(dense_outputs)

        return outputs


In [9]:
tokenizer = AutoTokenizer.from_pretrained("vinai/phobert-base-v2")
vocab_size = tokenizer.vocab_size
embedding_dim = 300
hidden_dim = 64
output_dim = 1
n_layers = 2
bidirectional = True
dropout = 0.2


In [10]:
load_model = LSTMNet(vocab_size, embedding_dim, hidden_dim, output_dim, n_layers, bidirectional, dropout)

In [11]:
device = torch.device("cpu")


In [12]:
load_model = load_model.to(device)


In [13]:
load_model.load_state_dict(torch.load('../model/model_0.9178659543395042.pth',map_location=torch.device('cpu')))


<All keys matched successfully>

In [14]:
load_model.eval()


LSTMNet(
  (embedding): Embedding(64000, 300)
  (lstm): LSTM(300, 64, num_layers=2, batch_first=True, dropout=0.2, bidirectional=True)
  (fc): Linear(in_features=128, out_features=1, bias=True)
  (sigmoid): Sigmoid()
)

In [15]:
def stream_predict(text, tokenizer, model, device):
    tokens = tokenizer.tokenize(text)
    ids = tokenizer.convert_tokens_to_ids(tokens)
    ids = torch.tensor(ids, dtype=torch.long).unsqueeze(0)

    ids = ids.to(device)

    # Get the length of the input sequence
    length = torch.tensor(ids.shape[1], dtype=torch.long).unsqueeze(0)

    # Evaluate the model on the input text
    with torch.no_grad():
        model.eval()
        predictions = model(ids, length)


    binary_predictions = torch.round(predictions).cpu().numpy()

    return binary_predictions[0][0]

In [16]:
kafka_stream_df = (
    spark.read.format("kafka")
    .option("kafka.bootstrap.servers", kafka_params["kafka.bootstrap.servers"])
    .option("subscribe", kafka_params["subscribe"])
    .option("startingOffsets", kafka_params["startingOffsets"])
    .load()
    .selectExpr("CAST(value AS STRING)")
    .select(from_json("value", schema).alias("data"))
    .select("data.*")
)
# Apply the UDF to create a new column 'predictions'
data = kafka_stream_df.toPandas()['post_message']
data = data.head(10)

In [17]:
len(data)

10

In [18]:
# T·∫°o m·ªôt DataFrame r·ªóng
df = pd.DataFrame(columns=['post_message', 'predictions'])

In [19]:
# from time import sleep
# from IPython.display import display, clear_output
# for i in range(len(data)):
#     result = stream_predict(data[i], tokenizer, load_model, device)
#     df = df.append({'post_message': data[i], 'predictions': result}, ignore_index=True)
#     print(df)
#     sleep(3)
#     clear_output(wait=True)
# # In ra DataFrame

import pandas as pd
from time import sleep
from IPython.display import display, clear_output

# T·∫°o DataFrame tr·ªëng
df = pd.DataFrame(columns=['post_message', 'predictions'])
sent = []
pred = []
for i in range(len(data)):
    result = stream_predict(data[i], tokenizer, load_model, device)
    # df = df.append({'post_message': data[i], 'predictions': result}, ignore_index=True)
    sent.append(data[i])
    pred.append(result)
    print(sent)
    print(pred)
    sleep(3)
    clear_output(wait=True)


['V·ª• Tr∆∞·ªùng qu·ªëc_t·∫ø Singapore thu 80% h·ªçc_ph√≠ khi d·∫°y online : Kh√¥ng ƒë∆∞·ª£c ph√©p khi ch∆∞a tho·∫£_thu·∫≠n ƒë∆∞·ª£c v·ªõi ph·ª•_huynh', 'D·ª± b√°o th·ªùi ti·∫øt h√¥m nay 5/5: N·∫Øng n√≥ng gia tƒÉng ·ªü B·∫Øc B·ªô v√† Trung B·ªô, c√≥ n∆°i tr√™n 38 ƒë·ªô C', 'M·∫•y ch√∫ Ba T√†u b√°n b√°nh bao v√† m·ª≥ v·∫±n th·∫Øn ch∆°i B√† ƒë·∫ßm gi√† qu·∫£ n√†y v·ª´a m·∫•t ti·ªÅn oan:\r\n\r\nhttps://zingnews.vn/anh-bo-20-trieu-usd-mua-xet-nghiem-tu-tq-nhung-khong-dung-duoc-post1074158.html\r\n\r\n L·∫°i th√™m n·ªói lo d√¢n m·∫•t m·∫°ng..\r\n\r\nhttps://vnexpress.net/anh-thua-dau-vi-danh-cuoc-vao-kit-xet-nghiem-trung-quoc-4085956.html', 'Ô∏èüéâ KHAI_TR∆Ø∆†NG TUY·∫æN C√ÅP V∆Ø·ª¢T BI·ªÇN C√ì TR·ª§ C√ÅP TREO CAO NH·∫§T TH·∫æ_GI·ªöI ·ªû H·∫£i_Ph√≤ng , Ph√≥ Th·ªß_t∆∞·ªõng Th∆∞·ªùng_tr·ª±c Tr∆∞∆°ng_Ho√†_B√¨nh ƒë√£ t·ªõi d·ª± : üè© L·ªÖ ƒë·ªông_th·ªï kh√°ch_s·∫°n 5 sao v√† khai_tr∆∞∆°ng b√£i bi·ªÉn nh√¢n_t·∫°o Dragon t·∫°i T·ªï_h·ª£p du_l·ªãch Qu·ªëc_t·∫ø ƒê·ªìi_R·ªìng ·ªü qu·∫≠n ƒê·ªì_S∆°n 