### Text Processing for Author Recognition using Spark

#### Import statements

In [74]:
import pyspark as ps    # import the spark suite
import warnings         # display warning if spark context already exists
import os
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType, FloatType
import string
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize

#### Initialized Spark Context

In [3]:
try:
    sc = ps.SparkContext('local[4]') # create spark context to work locally on all available cpus
    print('created SparkContext')
except ValueError:
    warnings.warn('SparkContext already exists')    # issue a warning if context already exists

created SparkContext


### Read data.json into Spark SQL context

In [4]:
spark = ps.SQLContext(sc)
print('created SQLContext')

created SQLContext


In [5]:
data_file = 'data/data.json'
df = spark.read.json(data_file)

CONSIDER:

for fun use RDD and map reduce to remove the double bars I put into the excerpts!

In [6]:
print df.printSchema()
print df.count()
df.show(3)

root
 |-- author: string (nullable = true)
 |-- excerpt: string (nullable = true)
 |-- title: string (nullable = true)

None
9316
+--------------+--------------------+---------------+
|        author|             excerpt|          title|
+--------------+--------------------+---------------+
|CharlesDickens|﻿The Project Gute...|AChristmasCarol|
|CharlesDickens|Dickens gave his ...|AChristmasCarol|
|CharlesDickens|Dickens's greates...|AChristmasCarol|
+--------------+--------------------+---------------+
only showing top 3 rows



### Here we import some sql funtions and give our dataframe a SQL table name

In [7]:
from pyspark.sql.functions import length
from pyspark.sql.functions import count

df.createOrReplaceTempView("excerpts")

In [8]:
sqlDF = spark.sql("SELECT count(*) FROM excerpts WHERE author = 'MarkTwain'")
sqlDF.show()

+--------+
|count(1)|
+--------+
|    2349|
+--------+



### Lambda functions are created to explore the the character count, word count, and average word length and then add them to the dataframe.

In [181]:
def char_count(text):
    return len(text)

def word_count(text):
    return len(text.split())

def avg_word_length(text):
    return sum([len(t) for t in text.split()]) / float(len(text.split()))

def sentence_count(text):
    return len(text.split('.'))

def sentence_length(text):
    return sum([len(t.split()) for t in text.split('.')]) / float(len(text.split('.'))) 

def tokenize_excerpt(text):
    stops = set(stopwords.words('english'))  # get a set of english stop words
    unpunctuated_text = text.translate(None, string.punctuation)  # remove punctuation
    tokens = word_tokenize(unpunctuated_text)  # tokenize
    cleaned_tokens = [t.lower() for t in tokens if t.lower() not in stops]  # remove stopwords, lowercase everything
    return cleaned_tokens

def avg_word_length2(tokens):
    return sum([len(t) for t in tokens.split(' ')]) / float(len(tokens.split(' ')))

In [182]:
charcount_udf = udf(lambda x : char_count(x))
wordcount_udf = udf(lambda x: word_count(x))
avgwordlen_udf = udf(lambda x: avg_word_length(x))
sentencecount_udf = udf(lambda x: sentence_count(x))
sentencelength_udf = udf(lambda x: sentence_length(x))
tokenize_udf = udf(lambda x: tokenize_excerpt(x))
wordlen2_udf = udf(lambda x: avg_word_length2(x))

df2 = df.withColumn("character_count", charcount_udf(df.excerpt).cast(FloatType())) \
        .withColumn("word_count", wordcount_udf(df.excerpt).cast(FloatType())) \
        .withColumn("avg_wordlen", avgwordlen_udf(df.excerpt).cast(FloatType())) \
        .withColumn("sent_count", sentencecount_udf(df.excerpt).cast(FloatType())) \
        .withColumn("sent_length", sentencelength_udf(df.excerpt).cast(FloatType())) \
        .withColumn("tokenized", tokenize_udf(df.excerpt))
    
df3 = df2.withColumn("wordlen_tokens", wordlen2_udf(df.excerpt).cast(FloatType()))
df3.printSchema()

root
 |-- author: string (nullable = true)
 |-- excerpt: string (nullable = true)
 |-- title: string (nullable = true)
 |-- character_count: float (nullable = true)
 |-- word_count: float (nullable = true)
 |-- avg_wordlen: float (nullable = true)
 |-- sent_count: float (nullable = true)
 |-- sent_length: float (nullable = true)
 |-- tokenized: string (nullable = true)
 |-- wordlen_tokens: float (nullable = true)



In [191]:
df3.createOrReplaceTempView("excerpts")
sqlDF = spark.sql('''SELECT author
                    , ROUND(AVG(avg_wordlen),3) AS AvgWordLength
                    , ROUND(AVG(word_count),1) AS AvgWordsPerParagraph
                    , ROUND(AVG(sent_length),1) AS AvgWordsPerSentence
                    , ROUND(AVG(wordlen_tokens),3) AS AvgTokenizedWordLength
                    FROM excerpts GROUP BY author''')
sqlDF.show()

+--------------+-------------+--------------------+-------------------+----------------------+
|        author|AvgWordLength|AvgWordsPerParagraph|AvgWordsPerSentence|AvgTokenizedWordLength|
+--------------+-------------+--------------------+-------------------+----------------------+
|     MarkTwain|        4.474|               282.9|               24.9|                 4.478|
|      JohnMuir|        4.701|               318.5|               26.5|                 4.698|
|    JaneAusten|        4.569|               271.4|               21.8|                 4.567|
|CharlesDickens|        4.439|               246.2|               21.1|                 4.438|
+--------------+-------------+--------------------+-------------------+----------------------+

