In [1]:
import csv
from tqdm import tqdm
import pandas as pd
# from torch.utils.data import TensorDataset, DataLoader, RandomSampler, SequentialSampler
# import torch
# from transformers import AutoModel, AutoTokenizer
from vncorenlp import VnCoreNLP


In [2]:
from pyspark import SparkContext, SparkConf
SparkContext.setSystemProperty('spark.executor.memory', '2g')
conf = SparkConf().setAppName("Process Comment").setMaster("spark://25.15.27.228:7077")
sc = SparkContext.getOrCreate(conf=conf)

In [3]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Process Comment').getOrCreate()

In [4]:
class ProcssComment:
    def __init__(self,sc,spark,type = 'Comment'):
        self.sc = sc
        self.type = type
        self.spark = spark
        self.URI           = self.sc._gateway.jvm.java.net.URI
        self.Path          = self.sc._gateway.jvm.org.apache.hadoop.fs.Path
        self.FileSystem    = self.sc._gateway.jvm.org.apache.hadoop.fs.FileSystem
        self.Configuration = self.sc._gateway.jvm.org.apache.hadoop.conf.Configuration
        # self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        # self.phobert = AutoModel.from_pretrained("vinai/phobert-base").to(self.device)
        # self.tokenizer_phobert = AutoTokenizer.from_pretrained("vinai/phobert-base", use_fast=False)
        self.rdrsegmenter = VnCoreNLP("vncorenlp/VnCoreNLP-1.1.1.jar", annotators="wseg", max_heap_size='-Xmx500m')
        self.getList()
        self.rates = ['1','2','3','4','5']
        self.dictRate = {}

    def getList(self):
        fs = self.FileSystem.get(self.URI("hdfs://cris:9000"), self.Configuration())
        status = fs.listStatus(self.Path(f'/shopee/{self.type}'))
        pathProducts = []
        for fileStatus in status:
            pathProducts.append(str(fileStatus.getPath()))
        self.path = pathProducts[1:]
    
    def processComment(self):
        for i in tqdm(range(len(self.path))):
            url = self.path[i]

            parDF2 = self.spark.read.parquet(url)
            cols = ['comment','rating_star']
            data = parDF2[cols].toPandas()
            for rate in self.rates:
                index = data[data['rating_star']==rate].index
                if index.shape[0] !=0:
                    if rate not in self.dictRate.keys():
                        self.dictRate[rate] = data.loc[index,:]
                    elif self.dictRate[rate].shape[0] < 50000:
                        self.dictRate[rate] = pd.concat([self.dictRate[rate],data.loc[index,:]],axis = 0)
        for rate in self.rates:
            self.dictRate[rate] = self.dictRate[rate].reset_index(drop=True)

            data = self.getFeature(self.dictRate[rate],'comment')
            try:
                self.uploadHdfs(data,rate)
            except Exception as e:
                print(f'Error when upload to HDFS {e}')

    
    def getFeature(self,data,col):
        for x in tqdm(range(data.shape[0])):
            sentences = self.rdrsegmenter.tokenize(data[col].iloc[x])
            tmp = []
            for sentence in sentences:
                arr = " ".join(sentence)
                tmp.append(arr)
            data.loc[x,col] = ' </s> <s> '.join(tmp)

        return data
    


    def uploadHdfs(self,data,rate):
        data = spark.createDataFrame(data.astype(str))
        data.coalesce(1).write.mode('append').parquet(f'hdfs://cris:9000/ProcessShopee/{self.type}/rate_{rate}')
        print('Done upload HDFS')
    
    

In [5]:
comment = ProcssComment(sc,spark)

In [6]:
comment.processComment()

100%|██████████| 44/44 [01:38<00:00,  2.24s/it]
100%|██████████| 21707/21707 [01:07<00:00, 322.64it/s]


Done upload HDFS


100%|██████████| 11046/11046 [00:26<00:00, 423.58it/s]


Done upload HDFS


100%|██████████| 27177/27177 [01:04<00:00, 424.51it/s]


Done upload HDFS


100%|██████████| 50046/50046 [01:58<00:00, 423.57it/s]


Done upload HDFS


100%|██████████| 74363/74363 [03:03<00:00, 405.35it/s]


Done upload HDFS


In [7]:
rates = ['1','2','3','4','5']
data = comment.dictRate['1']
for rate in rates[1:]:
    data = pd.concat([data,comment.dictRate[rate]],axis = 0)
comment.uploadHdfs(data,'sum')

Done upload HDFS


In [9]:
rates = ['1','2','3','4','5']
data = comment.dictRate['1']
for rate in rates[1:]:
    data = pd.concat([data,comment.dictRate[rate]],axis = 0)
data = data.reset_index(drop=True)
comment.uploadHdfs(data,'sum')

Done upload HDFS


In [19]:
data = spark.read.parquet('hdfs://cris:9000/ProcessShopee/Comment/rate_sum/part-00000-2bb5d99d-0093-4c2e-bc46-0b8487d2c0db-c000.snappy.parquet')

In [22]:
data = data.toPandas()


In [24]:
data['rating_star'] = data['rating_star'].astype(int) - 1

In [26]:
data = spark.createDataFrame(data)

In [27]:
data.show()

+--------------------+-----------+
|             comment|rating_star|
+--------------------+-----------+
|Thực_sự thất_vọng...|          0|
|Hàng nhận đc khá ...|          0|
|đặt 1 đùi và 1 ng...|          0|
|Chất_liệu quá tồi...|          0|
|Giao vải bị lỗi ....|          0|
|Mỏng te ko như mô...|          0|
|Shop cố_tình giao...|          0|
|Shop ngoài cái ba...|          0|
|Sản_phẩm kém chất...|          0|
|Mua áo áp_dụng mã...|          0|
| Vải xấu đau_đớn ạ ,|          0|
|        Vải quá mỏng|          0|
|Thất_vọng tôi đốt...|          0|
|Mua 2 tấm lưới vả...|          0|
|Quần quá xấu so v...|          0|
|      chất_lượng kém|          0|
|Không tốt như quả...|          0|
|Em đặt 1 bộ mà sh...|          0|
|Sản_phẩm màu ko g...|          0|
|Rõ_ràng không mua...|          0|
+--------------------+-----------+
only showing top 20 rows



In [28]:
from pyspark.ml.feature import Word2Vec

In [8]:
from pyspark.sql.functions import lower, col, split

In [29]:
dataset = data.select(lower(col('comment')).alias('comment'), 'rating_star')

In [30]:
dataset = dataset.select(split(dataset.comment, ' ').alias('comment'), 'rating_star')

In [52]:
word2Vec = Word2Vec(vectorSize=100, seed=42, inputCol="comment", outputCol="feature")
word2Vec.setMaxIter(5)
model = word2Vec.fit(dataset)

In [53]:
res = model.transform(dataset)
res.show()

+--------------------+-----------+--------------------+
|             comment|rating_star|             feature|
+--------------------+-----------+--------------------+
|[thực_sự, thất_vọ...|          0|[-0.0390716940927...|
|[hàng, nhận, đc, ...|          0|[-0.0165625282563...|
|[đặt, 1, đùi, và,...|          0|[0.05198483966971...|
|[chất_liệu, quá, ...|          0|[-0.0672429638776...|
|[giao, vải, bị, l...|          0|[-0.0196016904382...|
|[mỏng, te, ko, nh...|          0|[0.01527424864470...|
|[shop, cố_tình, g...|          0|[0.04810728304002...|
|[shop, ngoài, cái...|          0|[-0.0524565114526...|
|[sản_phẩm, kém, c...|          0|[-0.0587866885600...|
|[mua, áo, áp_dụng...|          0|[-0.0251391950296...|
|[vải, xấu, đau_đớ...|          0|[-0.0165521422401...|
|    [vải, quá, mỏng]|          0|[-0.2366287211577...|
|[thất_vọng, tôi, ...|          0|[-0.0203908173633...|
|[mua, 2, tấm, lướ...|          0|[0.00853809400608...|
|[quần, quá, xấu, ...|          0|[-0.0190395279

In [None]:
res.show()

In [54]:
data1 = res.select('feature', 'rating_star')

In [55]:
data1 = data1.withColumnRenamed('feature', 'features')
data1 = data1.withColumnRenamed('rating_star', 'label')

In [41]:
from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
splits = data1.randomSplit([0.6, 0.4], 1234)

In [45]:
splits[1].show()

+--------------------+-----+
|            features|label|
+--------------------+-----+
|[-0.9046872377395...|    0|
|[-0.8961412668228...|    1|
|[-0.7522539639224...|    0|
|[-0.6465824196736...|    0|
|[-0.6002849489450...|    0|
|[-0.5608168616890...|    0|
|[-0.5276502370834...|    0|
|[-0.5199735164642...|    0|
|[-0.5196515843272...|    0|
|[-0.5084541020914...|    0|
|[-0.4971521297203...|    0|
|[-0.4914908634223...|    0|
|[-0.4811887666583...|    0|
|[-0.4713759645819...|    0|
|[-0.4700897104210...|    0|
|[-0.4645945083828...|    0|
|[-0.4563015364110...|    1|
|[-0.4452980682253...|    0|
|[-0.4372306317090...|    0|
|[-0.4172227638463...|    0|
+--------------------+-----+
only showing top 20 rows



In [48]:
trainingData = splits[0]
testData = splits[1]

In [49]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [56]:
data = data1

In [58]:
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(data)

# Automatically identify categorical features, and index them.
# Set maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)

# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])

# Train a RandomForest model.
rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", numTrees=50)

# Convert indexed labels back to original labels.
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel",
                               labels=labelIndexer.labels)

# Chain indexers and forest in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, rf, labelConverter])

# Train model.  This also runs the indexers.
model = pipeline.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("predictedLabel", "label", "features").show(10)

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))

rfModel = model.stages[2]
print(rfModel)  # summary only

+--------------+-----+--------------------+
|predictedLabel|label|            features|
+--------------+-----+--------------------+
|             4|    1|[-0.8577913641929...|
|             4|    0|[-0.5640047788619...|
|             4|    0|[-0.3846752941608...|
|             3|    0|[-0.3684847354888...|
|             3|    1|[-0.3684847354888...|
|             4|    0|[-0.3640398979187...|
|             4|    0|[-0.3337194621562...|
|             4|    0|[-0.3337194621562...|
|             4|    0|[-0.3311986215412...|
|             4|    0|[-0.3116985559463...|
+--------------+-----+--------------------+
only showing top 10 rows

Test Error = 0.49441
RandomForestClassificationModel: uid=RandomForestClassifier_a9e1fb7a90e1, numTrees=50, numClasses=5, numFeatures=100


In [59]:
testData.toPandas().label.value_counts()

4    22326
3    14997
2     8197
0     6556
1     3295
Name: label, dtype: int64

In [60]:
trainingData.toPandas().label.value_counts()

4    52037
3    35049
2    18980
0    15151
1     7751
Name: label, dtype: int64

In [47]:
# from pyspark.ml.classification import MultilayerPerceptronClassifier
# from pyspark.ml.evaluation import MulticlassClassificationEvaluator
# splits = data1.randomSplit([0.6, 0.4], 1234)
train = splits[0]
test = splits[1]

# specify layers for the neural network:
# input layer of size 4 (features), two intermediate of size 5 and 4
# and output of size 3 (classes)
layers = [50, 60, 60, 5]
# create the trainer and set its parameters
trainer = MultilayerPerceptronClassifier(maxIter=500, layers=layers, blockSize=128, seed=1234)

# train the model
model = trainer.fit(train)

# compute accuracy on the test set
result = model.transform(test)
predictionAndLabels = result.select("prediction", "label")
evaluator = MulticlassClassificationEvaluator(metricName="accuracy")
print("Test set accuracy = " + str(evaluator.evaluate(predictionAndLabels)))

Test set accuracy = 0.5911463629584435


In [13]:
sent = ("a b " * 100 + "a c " * 10).split(" ")
doc = spark.createDataFrame([(sent,), (sent,)], ["sentence"])
word2Vec = Word2Vec(vectorSize=5, seed=42, inputCol="sentence", outputCol="model")
word2Vec.setMaxIter(10)

Word2Vec_5c0b34e806c0

In [30]:
doc.show()

+--------------------+
|            sentence|
+--------------------+
|[a, b, a, b, a, b...|
|[a, b, a, b, a, b...|
+--------------------+



In [None]:
sent

In [14]:
word2Vec.getMaxIter()

10

In [15]:
word2Vec.clear(word2Vec.maxIter)
model = word2Vec.fit(doc)
model.getMinCount()

5

In [16]:
model.setInputCol("sentence")
model.getVectors().show()



+----+--------------------+
|word|              vector|
+----+--------------------+
|   a|[0.09511695802211...|
|   b|[-1.2028766870498...|
|   c|[0.30153274536132...|
+----+--------------------+



In [32]:
res = model.transform(doc)

In [34]:
res = res.toPandas()

In [35]:
res

Unnamed: 0,sentence,model
0,"[a, b, a, b, a, b, a, b, a, b, a, b, a, b, a, ...","[-0.4833007957870605, 0.18547806319911286, -0...."
1,"[a, b, a, b, a, b, a, b, a, b, a, b, a, b, a, ...","[-0.4833007957870605, 0.18547806319911286, -0...."


In [36]:
res.iloc[0].model

DenseVector([-0.4833, 0.1855, -0.273, -0.0509, -0.4769])

In [17]:
t = model.getVectors()



In [23]:
df = t.select('vector').toPandas()

In [27]:
df.vector.iloc[0]

DenseVector([0.0951, 0.3911, -0.43, -0.1411, -0.0656])

In [37]:
data

Unnamed: 0,comment,rating_star
0,Thực_sự thất_vọng về sản_phẩm . </s> <s> Giao ...,1
1,Hàng nhận đc khá lâu chắc do dịch . </s> <s> H...,1
2,đặt 1 đùi và 1 ngố size M thì gửi 2 ngố 2 size...,1
3,"Chất_liệu quá tồi , trơn và bóng y_như cao_su ...",1
4,Giao vải bị lỗi . 1 đường dài 40cm . </s> <s> ...,1
...,...,...
184334,"Ốp xink , chắc_chắn 🥰🥰🥰🥰🥰🥰🥰🥰🥰🥰🥰🥰🥰🥰🥰🥰🥰🥰🥰🥰🥰🥰🥰🥰🥰🥰...",5
184335,Xinh lắm,5
184336,ốp đẹp quá_trời mọi ngừi nênmua nha vì là đồ t...,5
184337,"Đẹp lắm , cầm_chắc tay , khum phí tìnnn Nma gi...",5
