In [4]:
import findspark
findspark.init()

# create spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("my app").master("local").getOrCreate()

# get context from the session
sc = spark.sparkContext

# Read stopword list

In [3]:
stopwords = ['i', 'me', 'my', 'myself', 'we', 'our', 'ours', 'ourselves', 'you', "you're", "you've", "you'll", "you'd", 'your', 'yours', 'yourself', 'yourselves', 'he', 'him', 'his', 'himself', 'she', "she's", 'her', 'hers', 'herself', 'it', "it's", 'its', 'itself', 'they', 'them', 'their', 'theirs', 'themselves', 'what', 'which', 'who', 'whom', 'this', 'that', "that'll", 'these', 'those', 'am', 'is', 'are', 'was', 'were', 'be', 'been', 'being', 'have', 'has', 'had', 'having', 'do', 'does', 'did', 'doing', 'a', 'an', 'the', 'and', 'but', 'if', 'or', 'because', 'as', 'until', 'while', 'of', 'at', 'by', 'for', 'with', 'about', 'against', 'between', 'into', 'through', 'during', 'before', 'after', 'above', 'below', 'to', 'from', 'up', 'down', 'in', 'out', 'on', 'off', 'over', 'under', 'again', 'further', 'then', 'once', 'here', 'there', 'when', 'where', 'why', 'how', 'all', 'any', 'both', 'each', 'few', 'more', 'most', 'other', 'some', 'such', 'no', 'nor', 'not', 'only', 'own', 'same', 'so', 'than', 'too', 'very', 's', 't', 'can', 'will', 'just', 'don', "don't", 'should', "should've", 'now', 'd', 'll', 'm', 'o', 're', 've', 'y', 'ain', 'aren', "aren't", 'couldn', "couldn't", 'didn', "didn't", 'doesn', "doesn't", 'hadn', "hadn't", 'hasn', "hasn't", 'haven', "haven't", 'isn', "isn't", 'ma', 'mightn', "mightn't", 'mustn', "mustn't", 'needn', "needn't", 'shan', "shan't", 'shouldn', "shouldn't", 'wasn', "wasn't", 'weren', "weren't", 'won', "won't", 'wouldn', "wouldn't"]

# Without Pipeline

In [5]:
df = spark.createDataFrame([
    ('This movie was so poorly written and directed I fell asleep 30 minutes through the movie.', 0),
    ('The most interesting thing about Miryang (Secret Sunshine) is the actors.', 1),
    ('William Hurt may not be an American matinee idol anymore, but he still has pretty good taste in B-movie.', 1)
], ['text', 'sentiment'])

### Tokenizer

In [136]:
from pyspark.ml.feature import Tokenizer
tokenizer = Tokenizer(inputCol="text", outputCol="words")
df = tokenizer.transform(df)
df.select("words").show(5, truncate=100)

+----------------------------------------------------------------------------------------------------+
|                                                                                               words|
+----------------------------------------------------------------------------------------------------+
|[this, movie, was, so, poorly, written, and, directed, i, fell, asleep, 30, minutes, through, the...|
|               [the, most, interesting, thing, about, miryang, (secret, sunshine), is, the, actors.]|
|[william, hurt, may, not, be, an, american, matinee, idol, anymore,, but, he, still, has, pretty,...|
+----------------------------------------------------------------------------------------------------+



### Removing stopwords and special chars

In [137]:
from pyspark.sql.types import ArrayType, StringType
from pyspark.sql.functions import udf

def f(s):
    return [ ''.join(e for e in token if e.isalnum()) for token in s if token not in stopwords ]

func = udf(f, ArrayType(StringType()))
df = df.withColumn('clean_words', func(df['words']))
df.select("clean_words").show(5, truncate=100)

+------------------------------------------------------------------------------------------+
|                                                                               clean_words|
+------------------------------------------------------------------------------------------+
|                      [movie, poorly, written, directed, fell, asleep, 30, minutes, movie]|
|                                   [interesting, thing, miryang, secret, sunshine, actors]|
|[william, hurt, may, american, matinee, idol, anymore, still, pretty, good, taste, bmovie]|
+------------------------------------------------------------------------------------------+



### HashingTF

In [138]:
from pyspark.ml.feature import HashingTF
hashingTF = HashingTF(inputCol="clean_words", outputCol="tf")
df = hashingTF.transform(df)
df.select("tf").show(5, truncate=100)

+------------------------------------------------------------------------------------------------------------------------+
|                                                                                                                      tf|
+------------------------------------------------------------------------------------------------------------------------+
|                      (262144,[52800,98627,103409,152575,155321,223227,249111,262048],[1.0,1.0,1.0,1.0,2.0,1.0,1.0,1.0])|
|                                            (262144,[46512,86875,107499,237079,240227,252717],[1.0,1.0,1.0,1.0,1.0,1.0])|
|(262144,[6258,24817,36200,68474,101702,113432,121981,138836,140586,164508,175449,181389],[1.0,1.0,1.0,1.0,1.0,1.0,1.0...|
+------------------------------------------------------------------------------------------------------------------------+



### Word2Vec

In [139]:
from pyspark.ml.feature import Word2Vec
w2v = Word2Vec(vectorSize=2, inputCol="clean_words", outputCol="w2v", minCount=1, maxIter=10)
model = w2v.fit(df)
model.getVectors().show(5, truncate=False)

# create an average word vector for each document (works well according to Zeyu & Shu)
df = model.transform(df)
df.select("w2v").show()

+-----------+---------------------------------------------+
|word       |vector                                       |
+-----------+---------------------------------------------+
|interesting|[-0.1858113706111908,-0.10948356240987778]   |
|secret     |[0.17072227597236633,0.1150684505701065]     |
|asleep     |[0.05812466889619827,-0.1537899225950241]    |
|hurt       |[-0.028090475127100945,-0.010738671757280827]|
|taste      |[-0.22129961848258972,-0.1893204003572464]   |
+-----------+---------------------------------------------+
only showing top 5 rows



Exception ignored in: <object repr() failed>
Traceback (most recent call last):
  File "/home/pyspark/pkg/spark-2.4.5-bin-hadoop2.7/python/pyspark/ml/wrapper.py", line 40, in __del__
    if SparkContext._active_spark_context and self._java_obj is not None:
AttributeError: 'VectorAssembler' object has no attribute '_java_obj'


### VectorAssembler

In [141]:
from pyspark.ml.feature import VectorAssembler
asm = VectorAssembler(inputCols=[hashingTF.getOutputCol(), w2v.getOutputCol()],
                      outputCol="features")
df = asm.transform(df)
df.select("features").show(truncate=False)

+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|features                                                                                                                                                                                            |
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|(262146,[52800,98627,103409,152575,155321,223227,249111,262048,262144,262145],[1.0,1.0,1.0,1.0,2.0,1.0,1.0,1.0,-0.028119811167319615,0.036776250435246356])                                         |
|(262146,[46512,86875,107499,237079,240227,252717,262144,262145],[1.0,1.0,1.0,1.0,1.0,1.0,-0.03396759647876024,-0.030281569343060255])                                                               |
|(262

### LinearSVC

In [142]:
from pyspark.ml.classification import LinearSVC
svm = LinearSVC(labelCol="sentiment")
df = svm.fit(df).transform(df)
df.select("prediction").show()

+----------+
|prediction|
+----------+
|       0.0|
|       1.0|
|       1.0|
+----------+



# With Pipeline

In [6]:
from pyspark import keyword_only
from pyspark.ml import Transformer
from pyspark.ml.param.shared import HasInputCol, HasOutputCol, Param
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType

class RemoveStopWordsAndSpecialCharacters(Transformer, HasInputCol, HasOutputCol):

    @keyword_only
    def __init__(self, inputCol=None, outputCol=None, stopwords=None):
        super(RemoveStopWordsAndSpecialCharacters, self).__init__()
        self.stopwords = Param(self, "stopwords", "")
        self._setDefault(stopwords=set())
        kwargs = self._input_kwargs
        self._set(**kwargs)

    def setStopwords(self, value):
        self._paramMap[self.stopwords] = value
        return self

    def getStopwords(self):
        return self.getOrDefault(self.stopwords)

    def _transform(self, dataset):
        stopwords = self.getStopwords()

        def f(s):
            return [ ''.join(e for e in token if e.isalnum()) for token in s if token not in stopwords ]

        t = ArrayType(StringType())
        out_col = self.getOutputCol()
        in_col = dataset[self.getInputCol()]
        return dataset.withColumn(out_col, udf(f, t)(in_col))

In [None]:
df = spark.createDataFrame([
    ('This movie was so poorly written and directed I fell asleep 30 minutes through the movie.', 0),
    ('The most interesting thing about Miryang (Secret Sunshine) is the actors.', 1),
    ('William Hurt may not be an American matinee idol anymore, but he still has pretty good taste in B-movie.', 1)
], ['text', 'sentiment'])

In [7]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import Tokenizer, HashingTF, Word2Vec, VectorAssembler
from pyspark.ml.classification import LinearSVC

tokenizer = Tokenizer(inputCol="text", outputCol="words")
cleaning = RemoveStopWordsAndSpecialCharacters(inputCol="words", outputCol="clean_words",
                                               stopwords=stopwords)
hashingTF = HashingTF(inputCol="clean_words", outputCol="tf")
w2v = Word2Vec(vectorSize=2, inputCol="clean_words", outputCol="w2v",
               minCount=1, maxIter=10)
asm = VectorAssembler(inputCols=[hashingTF.getOutputCol(), w2v.getOutputCol()],
                      outputCol="features")
svm = LinearSVC(labelCol="sentiment")

mypipeline = Pipeline(stages=[tokenizer, cleaning, hashingTF, w2v, asm, svm])
df = mypipeline.fit(df).transform(df)
df.select("prediction").show()

+----------+
|prediction|
+----------+
|       0.0|
|       1.0|
|       1.0|
+----------+

