In [2]:
import pyspark as ps
from pyspark.ml import Pipeline
from pyspark.ml.pipeline import Transformer
from spacy.en import English

from pyspark.ml.feature import Tokenizer, CountVectorizer, IDF

## Read data.json into Spark SQL context

In [3]:
data_file = 'data/data.json'
df = spark.read.json(data_file)

In [4]:
print(df.printSchema())
print(df.count())
df.show(3)

root
 |-- author: string (nullable = true)
 |-- excerpt: string (nullable = true)
 |-- title: string (nullable = true)

None
9050
+----------+--------------------+-----------------+
|    author|             excerpt|            title|
+----------+--------------------+-----------------+
|JaneAusten|Chapter 1 || It i...|PrideAndPrejudice|
|JaneAusten|“What is his name...|PrideAndPrejudice|
|JaneAusten|“In such cases, a...|PrideAndPrejudice|
+----------+--------------------+-----------------+
only showing top 3 rows



## Create pipeline

In [29]:
# David's stemmer

from pyspark import keyword_only
from pyspark.ml.util import Identifiable
from pyspark.ml.pipeline import Transformer
from pyspark.ml.param.shared import HasInputCol, HasOutputCol, Param
from nltk.stem import SnowballStemmer
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType

# Custom stemming transformer class for pyspark
class Stemming_Transformer(Transformer, HasInputCol, HasOutputCol):
    @keyword_only
    def __init__(self, inputCol=None, outputCol=None):
        super(Stemming_Transformer, self).__init__()
        kwargs = self.__init__._input_kwargs
        self.setParams(**kwargs)

    @keyword_only
    def setParams(self, inputCol=None, outputCol=None, language='english', ):
        kwargs = self.setParams._input_kwargs
        return self._set(**kwargs)

    def _transform(self, dataset):
        opinion_stemm = SnowballStemmer('english')
        udfStemmer = udf(lambda tokens: [opinion_stemm.stem(word) for word in tokens], ArrayType(StringType()))

        inCol = self.getInputCol()
        outCol = self.getOutputCol()

        return dataset.withColumn(outCol, udfStemmer(inCol))

In [27]:
from spacy.en import English

from pyspark import keyword_only  ## < 2.0 -> pyspark.ml.util.keyword_only
from pyspark.ml import Transformer
from pyspark.ml.param.shared import HasInputCol, HasOutputCol, Param
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType

class SpacyTokenizer(Transformer, HasInputCol, HasOutputCol):

    @keyword_only
    def __init__(self, inputCol=None, outputCol=None, stopwords=None):
        super(SpacyTokenizer, self).__init__()
        self.stopwords = Param(self, "stopwords", "")
        self._setDefault(stopwords=set())
        kwargs = self.__init__._input_kwargs
        self.setParams(**kwargs)

    @keyword_only
    def setParams(self, inputCol=None, outputCol=None, stopwords=None):
        kwargs = self.setParams._input_kwargs
        return self._set(**kwargs)

    def setStopwords(self, value):
        self._paramMap[self.stopwords] = value
        return self

    def getStopwords(self):
        return self.getOrDefault(self.stopwords)

    def _transform(self, dataset):
        stopwords = self.getStopwords()

        def f(s):
            parser = English()
            tokens = [str(tok) for tok in parser(s)]
            return [t for t in tokens if t.lower() not in stopwords]

        t = ArrayType(StringType())
        out_col = self.getOutputCol()
        in_col = dataset[self.getInputCol()]
        return dataset.withColumn(out_col, udf(f, t)(in_col))

In [30]:
# Set up transformers and build pipeline
tokenizer = SpacyTokenizer(inputCol='excerpt', outputCol='words')
stemmer =  Stemming_Transformer(inputCol='words', outputCol='stems')
countvec = CountVectorizer(inputCol='stems', outputCol='termfreq')
idf = IDF(inputCol='termfreq', outputCol='tfidf')
pipeline = Pipeline(stages=[tokenizer, stemmer, countvec, idf])

In [31]:
data = pipeline.fit(df).transform(df)
data.show(3)

+----------+--------------------+-----------------+--------------------+--------------------+--------------------+--------------------+
|    author|             excerpt|            title|               words|               stems|            termfreq|               tfidf|
+----------+--------------------+-----------------+--------------------+--------------------+--------------------+--------------------+
|JaneAusten|Chapter 1 || It i...|PrideAndPrejudice|[Chapter, 1, ||, ...|[chapter, 1, ||, ...|(27332,[0,1,2,3,4...|(27332,[0,1,2,3,4...|
|JaneAusten|“What is his name...|PrideAndPrejudice|[“, What, is, his...|[“, what, is, his...|(27332,[0,1,2,3,4...|(27332,[0,1,2,3,4...|
|JaneAusten|“In such cases, a...|PrideAndPrejudice|[“, In, such, cas...|[“, in, such, cas...|(27332,[0,1,2,3,4...|(27332,[0,1,2,3,4...|
+----------+--------------------+-----------------+--------------------+--------------------+--------------------+--------------------+
only showing top 3 rows

