In [29]:
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType, FloatType, IntegerType
from pyspark.ml import Pipeline
from pyspark.ml.pipeline import Transformer
from pyspark.ml.feature import Tokenizer
from pyspark.ml.feature import StopWordsRemover
from pyspark.ml.feature import CountVectorizer
from pyspark.ml.feature import IDF

from spacy.en import English

from src.spacy_transformer import SpacyTokenize_Transformer
from src.NLP_pipeline import get_pipeline

# from pyspark.ml.feature import Word2Vec
# from pyspark.ml.feature import NGram
# from spacy.symbols import ORTH, LEMMA, POS, SYM, TAG
# import pandas as pd
# from pyspark.ml.feature import RegexTokenizer

%autoreload 2

In [2]:
# when starting jupyter with the sparkjupyter script, pyspark is already imported

print("sql session setup by script:\t", spark)
print("spark context setup by script:\t", sc)
print("pyspark imported by script:\t", str(pyspark)[:56], "...")

sql session setup by script:	 <pyspark.sql.session.SparkSession object at 0x10a112fd0>
spark context setup by script:	 <pyspark.context.SparkContext object at 0x1022dc278>
pyspark imported by script:	 <module 'pyspark' from '/usr/local/Cellar/apache-spark/2 ...


In [3]:
data_file = 'data/data.json'
raw_df = spark.read.json(data_file)

raw_df.printSchema()
print("row count: ", raw_df.count())
raw_df.show(3)


# create copy of raw_df incase I mess things up :P
df = raw_df


root
 |-- author: string (nullable = true)
 |-- excerpt: string (nullable = true)
 |-- title: string (nullable = true)

row count:  9050
+--------------+--------------------+---------------+
|        author|             excerpt|          title|
+--------------+--------------------+---------------+
|CharlesDickens|A CHRISTMAS CAROL...|AChristmasCarol|
|CharlesDickens|Mind! I don't mea...|AChristmasCarol|
|CharlesDickens|Scrooge never pai...|AChristmasCarol|
+--------------+--------------------+---------------+
only showing top 3 rows



In [4]:
# a tiny sample dataframe for testing
tiny_df = df.sample(False, 1/1000).limit(5)
print(type(tiny_df))
print(tiny_df.count())
tiny_df.show()

<class 'pyspark.sql.dataframe.DataFrame'>
5
+--------------+--------------------+----------------+
|        author|             excerpt|           title|
+--------------+--------------------+----------------+
|CharlesDickens|He shook his head...|ATaleOfTwoCities|
|CharlesDickens|‘I am very poor,’...|DavidCopperfield|
|CharlesDickens|‘It’s work enough...|DavidCopperfield|
|    JaneAusten|There was no occa...|            Emma|
|    JaneAusten|“Oh! shame, shame...|   MansfieldPark|
+--------------+--------------------+----------------+



## Spacy: a brief aside

Spacy is a production oriented Natural Language Processing package with (among other things) very nice tokenization options. I use spaCy here because it tokenizes punctuation and contractions better than spark's tokenizer.

Here we will wrap the tokenization in a Spark UDF. Later we will include it in our customized transformer.

In [5]:
%%time
# timing to ensure spaCy is set up properly (should take ~100ms)

parser = English()


CPU times: user 105 ms, sys: 4.05 ms, total: 109 ms
Wall time: 115 ms


In [6]:
# Grab a couple excerpts for testing

excerpt = df.take(100)[80]['excerpt']
excerpt2 = df.take(100)[99]['excerpt']

In [7]:
%%time
parsedData = parser(excerpt)

# sentences = [sent.string.strip() for sent in parsedData.sents]
# for s in sentences:
#     print(s, '\n')

tokens = [tok.lower_ for tok in parsedData]
# print(type(token_lower[1]))
print(tokens[:8])

['but', 'they', 'did', "n't", 'devote', 'the', 'whole', 'evening']
CPU times: user 13.6 ms, sys: 1.94 ms, total: 15.5 ms
Wall time: 19.9 ms


## UDF demonstration
A quick way to create a User Defined Function (UDF) in spark:

Get (or create a function) in python and use a lambda function to insert it in to "udf(  )".

Don't forget to define your Spark DataType!

```
Other excerpt metadata to include via UDF:
num_chars, num_words, num_sent, num_para
(use these to calc word_len, word_per_sent, word_per_para, sent_per_para . . . etc.
per excerpt, book and author)
```

In [8]:
%%time

def tokenize(text):
    parser = English()
    return [tok.lower_ for tok in parser(text)]

tokenize_udf = udf(lambda x: tokenize(x), ArrayType(StringType()))

df_tokens = tiny_df.withColumn("tokens", tokenize_udf(df.excerpt))
df_tokens.show(3)

+--------------+--------------------+----------------+--------------------+
|        author|             excerpt|           title|              tokens|
+--------------+--------------------+----------------+--------------------+
|CharlesDickens|He shook his head...|ATaleOfTwoCities|[he, shook, his, ...|
|CharlesDickens|‘I am very poor,’...|DavidCopperfield|[‘, i, am, very, ...|
|CharlesDickens|‘It’s work enough...|DavidCopperfield|[‘, it, ’s, work,...|
+--------------+--------------------+----------------+--------------------+
only showing top 3 rows

CPU times: user 20.2 ms, sys: 4.77 ms, total: 25 ms
Wall time: 4.69 s


# Transformers
add explanation and example of transformers

## Native Transformers

Many of the transformers in Spark's ML lib are great. (Sadly tokenizer leaves punctuation attached to the preceding word.)

In [9]:
tokenizer = Tokenizer(inputCol="excerpt", outputCol="tokenized")
df_spark_tokens = tokenizer.transform(tiny_df)
df_spark_tokens.show()

+--------------+--------------------+----------------+--------------------+
|        author|             excerpt|           title|           tokenized|
+--------------+--------------------+----------------+--------------------+
|CharlesDickens|He shook his head...|ATaleOfTwoCities|[he, shook, his, ...|
|CharlesDickens|‘I am very poor,’...|DavidCopperfield|[‘i, am, very, po...|
|CharlesDickens|‘It’s work enough...|DavidCopperfield|[‘it’s, work, eno...|
|    JaneAusten|There was no occa...|            Emma|[there, was, no, ...|
|    JaneAusten|“Oh! shame, shame...|   MansfieldPark|[“oh!, shame,, sh...|
+--------------+--------------------+----------------+--------------------+



## Customized Transformers
Luckily, we can make our own transformers as well.

Here we build the spaCy tokenizer into a spark transformer

### Spacy Transformer:

In [10]:
%%time
tokenizer = SpacyTokenize_Transformer(inputCol='excerpt', outputCol='words')

CPU times: user 558 µs, sys: 161 µs, total: 719 µs
Wall time: 613 µs


In [11]:
%%time

df_tokens = tokenizer.transform(df)
df_tokens.show(3)


+--------------+--------------------+---------------+--------------------+
|        author|             excerpt|          title|               words|
+--------------+--------------------+---------------+--------------------+
|CharlesDickens|A CHRISTMAS CAROL...|AChristmasCarol|[A, CHRISTMAS, CA...|
|CharlesDickens|Mind! I don't mea...|AChristmasCarol|[Mind, !, I, do, ...|
|CharlesDickens|Scrooge never pai...|AChristmasCarol|[Scrooge, never, ...|
+--------------+--------------------+---------------+--------------------+
only showing top 3 rows

CPU times: user 23 ms, sys: 4.76 ms, total: 27.8 ms
Wall time: 15.7 s


# Pipeline

Pipelines allow for multiple transformers to be strung together efficiently.

By using ".getOutputCol( )" column names can be set in a single location.

Columns can then be added/dropped simply by adding or removing them from the "stages" list  in the Pipeline


```python

tokenizer = RegexTokenizer(inputCol="parsed_text", outputCol="raw_tokens"
            , pattern="\\W", minTokenLength=3)
remover = StopWordsRemover(inputCol=tokenizer.getOutputCol(), outputCol='tokens_stop')
stemmer = Stemming_Transformer(inputCol=remover.getOutputCol(), outputCol='tokens')
bigram = NGram(inputCol=stemmer.getOutputCol(), outputCol='bigrams'
         , n=2)
trigram = NGram(inputCol=stemmer.getOutputCol(), outputCol='trigrams'
          , n=3)
cv = CountVectorizer(inputCol=stemmer.getOutputCol(), outputCol='token_countvector'
     , minDF=10.0)
idf = IDF(inputCol=cv.getOutputCol(), outputCol='token_idf'
      , minDocFreq=10)
w2v_2d = Word2Vec(vectorSize=2, minCount=2, inputCol=stemmer.getOutputCol()
         , outputCol='word2vec_2d')
w2v_large = Word2Vec(vectorSize=250, minCount=2, inputCol=stemmer.getOutputCol()
            , outputCol='word2vec_large')

pipe = Pipeline(stages=[tokenizer, remover, stemmer, cv, idf, w2v_2d, w2v_large])
```

In [12]:
# Set up transformers
tokenizer = SpacyTokenize_Transformer(inputCol='excerpt', outputCol='words')
countvec = CountVectorizer(inputCol=tokenizer.getOutputCol(), outputCol='termfreq')
idf = IDF(inputCol=countvec.getOutputCol(), outputCol='tfidf')


In [40]:
%%time
# How to have only tfidf col added to df?

pipeline = Pipeline(stages=[tokenizer, countvec, idf])
data = pipeline.fit(tiny_df).transform(tiny_df)

data.show()

+--------------+--------------------+----------------+--------------------+--------------------+--------------------+
|        author|             excerpt|           title|               words|            termfreq|               tfidf|
+--------------+--------------------+----------------+--------------------+--------------------+--------------------+
|CharlesDickens|He shook his head...|ATaleOfTwoCities|[He, shook, his, ...|(517,[0,1,2,3,4,5...|(517,[0,1,2,3,4,5...|
|CharlesDickens|‘I am very poor,’...|DavidCopperfield|[‘, I, am, very, ...|(517,[0,1,2,3,4,5...|(517,[0,1,2,3,4,5...|
|CharlesDickens|‘It’s work enough...|DavidCopperfield|[‘, It, ’s, work,...|(517,[0,1,2,3,4,5...|(517,[0,1,2,3,4,5...|
|    JaneAusten|There was no occa...|            Emma|[There, was, no, ...|(517,[0,1,2,4,5,6...|(517,[0,1,2,4,5,6...|
|    JaneAusten|“Oh! shame, shame...|   MansfieldPark|[“, Oh, !, shame,...|(517,[0,1,2,3,4,5...|(517,[0,1,2,3,4,5...|
+--------------+--------------------+----------------+--

### Note:
%%time output for full df:
```
CPU times: user 136 ms, sys: 80.6 ms, total: 217 ms
Wall time: 22min 14s
```

# Building the full dataframe:

With our full pipeline defined in a script we can now save the resulting dataframe to a parquet file for easy access in future notebooks

In [34]:
%%time

# started 2:07 4/12
# save_loc = "data/dataframe.parquet"

pipeline = get_pipeline()
df = pipeline.fit(raw_df).transform(raw_df)
df.write.mode('overwrite').save(save_loc, format="parquet")

# Consider: Add in spark sql querries for some of the more interesting columns (just for kicks)

CPU times: user 578 ms, sys: 358 ms, total: 936 ms
Wall time: 2h 12min 45s


In [35]:
df.createOrReplaceTempView("data")
spark.sql('''
            SELECT *
            FROM data
            GROUP BY author
            LIMIT 5
            ''').show()

ERROR:root:An unexpected error occurred while tokenizing input
The following traceback may be corrupted or invalid
The error message is: ('EOF in multi-line string', (1, 12))



AnalysisException: "expression 'data.`excerpt`' is neither present in the group by, nor is it an aggregate function. Add to group by or wrap in first() (or first_value) if you don't care which value you get.;;\nGlobalLimit 5\n+- LocalLimit 5\n   +- Aggregate [author#0], [author#0, excerpt#1, title#2, words#573, termfreq#579, tfidf#586, w2v#594, w2v_2D#603]\n      +- SubqueryAlias data\n         +- Project [author#0, excerpt#1, title#2, words#573, termfreq#579, tfidf#586, w2v#594, UDF(words#573) AS w2v_2D#603]\n            +- Project [author#0, excerpt#1, title#2, words#573, termfreq#579, tfidf#586, UDF(words#573) AS w2v#594]\n               +- Project [author#0, excerpt#1, title#2, words#573, termfreq#579, UDF(termfreq#579) AS tfidf#586]\n                  +- Project [author#0, excerpt#1, title#2, words#573, UDF(words#573) AS termfreq#579]\n                     +- Project [author#0, excerpt#1, title#2, f(excerpt#1) AS words#573]\n                        +- Relation[author#0,excerpt#1,title#2] json\n"