In [1]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, SQLContext

from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql.functions import udf, col

from pyspark.ml.regression import LinearRegression
from pyspark.mllib.evaluation import RegressionMetrics

from pyspark.ml.tuning import ParamGridBuilder, CrossValidator, CrossValidatorModel
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.evaluation import RegressionEvaluator

In [2]:
#!/usr/bin/python
# -*- coding: utf-8 -*-
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql import SparkSession
from pyspark.sql import functions as func

spark = SparkSession.builder\
        .master("spark://172.18.0.1:7077")\
        .appName("Model")\
        .config('spark-master', '7077')\
        .config("spark.cleaner.referenceTracking.cleanCheckpoints", "true")\
        .getOrCreate()

In [None]:
from functools import reduce
from pyspark.sql import DataFrame

parquetFile = spark.read.parquet("hdfs://namenode:9000/hadoop/dfs/data/review/") 
parquetFile.createOrReplaceTempView("tiki_review_data")

In [None]:
parquetFile = spark.read.parquet("hdfs://namenode:9000/hadoop/dfs/data/item/") 
parquetFile.createOrReplaceTempView("tiki_item_data")

In [None]:
from pyspark.ml.feature import Word2Vec, Tokenizer, VectorIndexer, VectorAssembler, StringIndexer
from pyspark.ml import Pipeline
from pyspark.sql import functions as F

# Task dự đoán rating từ comment

In [None]:
review_df = spark.sql("""
SELECT * FROM tiki_review_data
""")

In [None]:
review_df = review_df.limit(10000)

In [None]:
review_df.show(5)

In [None]:
tokenizer = Tokenizer(inputCol = "review", outputCol = "review_tokenized")
word2vec_model = Word2Vec(vectorSize=5, seed=42, inputCol="review_tokenized", outputCol="embedding")

In [None]:
pipeline = Pipeline(stages = [tokenizer, word2vec_model])

In [None]:
pipeline_model = pipeline.fit(review_df)

In [None]:
features_labels = pipeline_model.transform(review_df)

In [None]:
from pyspark.ml.regression import LinearRegression, RandomForestRegressor

In [None]:
lr = LinearRegression(featuresCol = 'embedding', labelCol = 'rating')
lr_model = lr.fit(features_labels)

In [None]:
lr_model.transform(features_labels)

# Task dự đoán giá trị sản phẩm

In [None]:
item_df = spark.sql("""
SELECT * FROM tiki_item_data
""")

# fillna
item_df = item_df.fillna("no information", subset = ["item", "seller", "category", "brand"])
item_df = item_df.fillna(0, subset = ["rating_value", "review_count"])
item_df = item_df.dropna(subset = ["price"]).drop("rating_value")

item_df = item_df.withColumn("review_count", F.col("review_count").cast("float"))

In [None]:
tokenizer = Tokenizer(inputCol = "item", outputCol = "item_description_tokenized")
word2vec_model = Word2Vec(vectorSize=5, seed=42, inputCol="item_description_tokenized", outputCol="item_description_emb")

pipeline = Pipeline(stages = [tokenizer, word2vec_model])
pipeline_model = pipeline.fit(item_df)

item_df = pipeline_model.transform(item_df)

In [None]:
string_indexers = [StringIndexer(inputCol = inputCol, outputCol = inputCol + "_indexed") for inputCol in ["brand", "seller", "category"]]
pipeline = Pipeline(stages=string_indexers)
item_df = pipeline.fit(item_df).transform(item_df)

In [None]:
assembler = VectorAssembler(
    inputCols=["brand_indexed", "review_count", "item_description_emb", "seller_indexed", "category_indexed"],
    outputCol="features")

item_df = assembler.transform(item_df)
item_df

In [None]:
featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=40).fit(item_df)

# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = item_df.randomSplit([0.7, 0.3])

# Train a RandomForest model.
rf = RandomForestRegressor(featuresCol="indexedFeatures", labelCol = "price", maxBins = 1000)

# Chain indexer and forest in a Pipeline
pipeline = Pipeline(stages=[featureIndexer, rf])

# Train model.  This also runs the indexer.
model = pipeline.fit(trainingData)

In [None]:
model.transform(testData).select("price", "prediction")