- Question

  - 영화리뷰 긍/부정 예측 Estimator pipeline을 테스트 (trainRatio=0.8)
  - Word2Vec의 파라미터 vectorSize를 5, 10, 20, 40으로 바꾸며 정확도를 측정하여 출력
  - 매뉴얼 및 웹검색을 통해 문제해결!

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *

spark = SparkSession.builder.appName('my app').master('local').getOrCreate()
sc = spark.sparkContext

22/10/27 16:41:51 WARN Utils: Your hostname, bagdoyeong-ui-MacBookAir.local resolves to a loopback address: 127.0.0.1; using 192.168.0.13 instead (on interface en0)
22/10/27 16:41:51 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/10/27 16:41:52 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/10/27 16:41:53 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [2]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import Tokenizer, HashingTF, Word2Vec, VectorAssembler
from pyspark.ml.classification import LinearSVC
from pyspark.sql.types import ArrayType, StringType
from pyspark.sql.functions import udf
from pyspark import keyword_only
from pyspark.ml import Transformer
from pyspark.ml.param.shared import HasInputCol, HasOutputCol, Param

In [20]:
stopwords = ['i', 'the', 'she', 'he', 'have', 'not']

df = spark.createDataFrame([
    ('This movie was so poorly written and directed I fell asleep 30minutes through the movie.', 0),
    ('The most interesting thing about Miryang (Secret Sunshine) is the actores.', 1),
    ('William Hurt may not be an American matinee idol anymorem but he still has pretty good talent.', 1)
], ['text', 'sentiment'])

In [3]:
tokenizer = Tokenizer(inputCol='text', outputCol='words')
df = tokenizer.transform(df)
df.select('words').show(5, truncate=100)


[Stage 0:>                                                          (0 + 1) / 1]

+----------------------------------------------------------------------------------------------------+
|                                                                                               words|
+----------------------------------------------------------------------------------------------------+
|[this, movie, was, so, poorly, written, and, directed, i, fell, asleep, 30minutes, through, the, ...|
|              [the, most, interesting, thing, about, miryang, (secret, sunshine), is, the, actores.]|
|[william, hurt, may, not, be, an, american, matinee, idol, anymorem, but, he, still, has, pretty,...|
+----------------------------------------------------------------------------------------------------+



                                                                                

In [28]:
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol='features')
df = hashingTF.transform(df)
df.select('features').show(5, truncate=100)


+----------------------------------------------------------------------------------------------------+
|                                                                                            features|
+----------------------------------------------------------------------------------------------------+
|(262144,[19036,42181,52800,61318,90636,95889,99211,108541,112584,117491,121809,188235,210223,2199...|
|(262144,[18700,61470,70065,95889,106841,124227,189082,195413,234706,259618],[1.0,1.0,1.0,2.0,1.0,...|
|(262144,[5429,23071,25718,31536,33917,67846,68474,91192,113432,121981,132786,138836,143202,145207...|
+----------------------------------------------------------------------------------------------------+



In [14]:
# udf 이용하여 tranform
def f(s):
    return [ ''.join(e for e in token if e.isalnum()) for token in s if token not in stopwords ]

func = udf(f, ArrayType(StringType()))
df = df.withColumn('clean_words', func(df['words']))
df.select('clean_words').show(5, truncate=100)

+----------------------------------------------------------------------------------------------------+
|                                                                                         clean_words|
+----------------------------------------------------------------------------------------------------+
|[this, movie, was, so, poorly, written, and, directed, fell, asleep, 30minutes, through, the, movie]|
|                 [the, most, interesting, thing, about, miryang, secret, sunshine, is, the, actores]|
|[william, hurt, may, be, an, american, matinee, idol, anymorem, but, still, has, pretty, good, ta...|
+----------------------------------------------------------------------------------------------------+



                                                                                

In [11]:
class RemoveStopWordsAndSpecialCharacters(Transformer, HasInputCol, HasOutputCol):
    @keyword_only
    def __init__(self, inputCol=None, outputCol=None, stopwords=None):
        super(RemoveStopWordsAndSpecialCharacters, self).__init__()
        self.stopwords = Param(self, 'stopwords', '')
        self._setDefault(stopwords=set())
        kwargs = self._input_kwargs
        self._set(**kwargs)
        
    def setStopwords(self, value):
        self._paramMap[self.stopwords] = value
        return self
    
    def getStopwords(self):
        return self.getOrDefault(self.stopwords)
    
    def _transform(self, dataset):
        stopwords = self.getStopwords()
        
        def f(s):
            return [ ''.join(e for e in token if e.isalnum()) for token in s if token not in stopwords]
        
        t = ArrayType(StringType())
        out_col = self.getOutputCol()
        in_col = dataset[self.getInputCol()]
        return dataset.withColumn(out_col, udf(f, t)(in_col))

In [26]:
tokenizer = Tokenizer(inputCol='text', outputCol='words')
cleaning = RemoveStopWordsAndSpecialCharacters(inputCol='words', outputCol='clean_words', stopwords=stopwords)
hashingTF = HashingTF(inputCol='clean_words', outputCol='tf')
w2v = Word2Vec(vectorSize=2, inputCol='clean_words', outputCol='w2v', minCount=1, maxIter=10)
asm = VectorAssembler(inputCols = [hashingTF.getOutputCol(), w2v.getOutputCol()], outputCol='features')
svm = LinearSVC(labelCol='sentiment')

mypipeline = Pipeline(stages=[tokenizer, cleaning, hashingTF, w2v, asm, svm])
df = mypipeline.fit(df).transform(df)
df.select('prediction').show()

22/10/27 17:04:46 WARN DAGScheduler: Broadcasting large task binary with size 13.2 MiB


                                                                                

22/10/27 17:04:47 WARN DAGScheduler: Broadcasting large task binary with size 13.2 MiB


                                                                                

22/10/27 17:04:48 WARN DAGScheduler: Broadcasting large task binary with size 13.2 MiB
22/10/27 17:04:49 WARN DAGScheduler: Broadcasting large task binary with size 13.2 MiB
22/10/27 17:04:49 WARN DAGScheduler: Broadcasting large task binary with size 13.2 MiB
22/10/27 17:04:50 WARN DAGScheduler: Broadcasting large task binary with size 13.2 MiB
22/10/27 17:04:51 WARN DAGScheduler: Broadcasting large task binary with size 13.2 MiB
22/10/27 17:04:51 WARN DAGScheduler: Broadcasting large task binary with size 13.2 MiB
22/10/27 17:04:52 WARN DAGScheduler: Broadcasting large task binary with size 13.2 MiB
22/10/27 17:04:53 WARN DAGScheduler: Broadcasting large task binary with size 13.2 MiB


                                                                                

22/10/27 17:04:54 WARN DAGScheduler: Broadcasting large task binary with size 2.1 MiB
+----------+
|prediction|
+----------+
|       0.0|
|       1.0|
|       1.0|
+----------+

