In [None]:
%%html
<style>
.h1_cell, .just_text {
    box-sizing: border-box;
    padding-top:5px;
    padding-bottom:5px;
    font-family: "Times New Roman", Georgia, Serif;
    font-size: 125%;
    line-height: 22px; /* 5px +12px + 5px */
    text-indent: 25px;
    background-color: #fbfbea;
    padding: 10px;
}
.code_block {
    box-sizing: border-box;
    padding-top:5px;
    padding-bottom:5px;
    font-size: 75%;
    line-height: 22px; /* 5px +12px + 5px */
    #text-indent: 25px;
    #background-color: #fbfbea;
    padding: 5px;
}

hr { 
    display: block;
    margin-top: 0.5em;
    margin-bottom: 0.5em;
    margin-left: auto;
    margin-right: auto;
    border-style: inset;
    border-width: 2px;
}
</style>

<h2>
<center>
Building a Bag a Words with Spark
</center>
</h2>

<div class=h1_cell>
<p>
This notebook assumes you have already ran and understand the code in setup_spark.ipynb. Lets connect to our already installed spark cluster.
</div>

In [1]:
import os
import sys
import subprocess

In [2]:
os.environ['SPARK_HOME'] = os.environ['HOME'] + '/spark'
os.environ['PATH'] += ':' + os.environ['SPARK_HOME'] + '/bin'
sys.path.append(os.environ['SPARK_HOME'] + '/python')
sys.path.append(os.environ['SPARK_HOME'] + '/python/lib/py4j-0.10.6-src.zip')

In [3]:
# run start-all.sh
subprocess.call(os.environ['SPARK_HOME'] + "/sbin/start-all.sh", env=os.environ)

0

<div class=h1_cell>
<p>
Lets first configure the spark environment start the spark jvm application. We can then set some of spark's cluster settings like <i>spark.executor.memory</i>, which controls how much RAM an spark worker process gets. Finally, we can connect to the spark app and get our spark session object.
</div>

In [4]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
conf = SparkConf().set('spark.executor.cores', 2).set('spark.executor.memory', '8g')
spark = SparkSession(SparkContext(master='spark://instance-6:7077', conf=conf))
spark

<h3>
<center>
Why Spark?
</center>
</h3>

<div class=h1_cell>
<p>
Building a bag of words requires us to parse every sentence in each essay for every row, identifying unique words and adding them with the correct class label. We'll call this function F. We can speed F up by parallelizing it, i.e. using multithreading. While this may satisfy a need for processing power, it is common to also require large amounts of memory and disk space to apply F or another function to a large dataset, creating a large bag of words.
<p>
Spark is exellent for this reason. Spark virtualizes both the compute and memory resources of a group of computers (a cluster) and allows users to use the cluster throught a single 'spark' session object.
<p>
This notebook is a walkthrough of using Spark to apply F (bag-of-words) to a dataset.
<ol>
    <li>We'll the load dataset into a pandas dataframe.</li>
    <li>We'll distribute this dataset across our spark cluster by passing the dataframe to spark.</li>
    <li>We'll register a F with spark, allowing spark to call F on each of its worker nodes</li>
    <li>We'll use spark to apply F to the distributed dataset.</li>
</ol>
<p>
The result will be a single table of unique words (the joining of the distributed tables) and their class label counts, summarizing the vocabulary used between two different sources.
</div>

In [5]:
import pandas as pd
url = 'https://www.dropbox.com/s/2hdbltrl8bh6kbu/train.csv?raw=1'
donate_table = pd.read_csv(url, encoding='utf-8')

In [6]:
len(donate_table.index)

182080

In [6]:
donate_table = donate_table[['project_essay_1', 'project_essay_2', 'project_title', 'project_is_approved']]
donate_table.head(5)

Unnamed: 0,project_essay_1,project_essay_2,project_title,project_is_approved
0,Most of my kindergarten students come from low...,I currently have a differentiated sight word c...,Super Sight Word Centers,1
1,Our elementary school is a culturally rich sch...,We strive to provide our diverse population of...,Keep Calm and Dance On,0
2,Hello;\r\nMy name is Mrs. Brotherton. I teach ...,We are looking to add some 3Doodler to our cla...,Lets 3Doodle to Learn,1
3,My students are the greatest students but are ...,"The student's project which is totally \""kid-i...","\""Kid Inspired\"" Equipment to Increase Activit...",0
4,My students are athletes and students who are ...,For some reason in our kitchen the water comes...,We need clean water for our culinary arts class!,1


In [7]:
from pyspark.sql.types import ByteType, StringType, StructType, StructField

schema = StructType([StructField('project_essay_1', StringType()),
                     StructField('project_essay_2', StringType()),
                     StructField('project_title', StringType()),
                     StructField('project_is_approved', ByteType())])

dtdf = spark.createDataFrame(donate_table.head(100), schema=schema, verifySchema=False)
dtdf.show(5)

+--------------------+--------------------+--------------------+-------------------+
|     project_essay_1|     project_essay_2|       project_title|project_is_approved|
+--------------------+--------------------+--------------------+-------------------+
|Most of my kinder...|I currently have ...|Super Sight Word ...|                  1|
|Our elementary sc...|We strive to prov...|Keep Calm and Dan...|                  0|
|Hello;\r\nMy name...|We are looking to...|Lets 3Doodle to L...|                  1|
|My students are t...|The student's pro...|\"Kid Inspired\" ...|                  0|
|My students are a...|For some reason i...|We need clean wat...|                  1|
+--------------------+--------------------+--------------------+-------------------+
only showing top 5 rows



In [8]:
from pyspark.sql.functions import concat_ws

dtdf = dtdf.withColumn('text', concat_ws(' ', dtdf.project_title, dtdf.project_essay_1, dtdf.project_essay_2))
dtdf = dtdf.drop(*['project_essay_1', 'project_essay_2', 'project_title'])
dtdf.show(5)

+-------------------+--------------------+
|project_is_approved|                text|
+-------------------+--------------------+
|                  1|Super Sight Word ...|
|                  0|Keep Calm and Dan...|
|                  1|Lets 3Doodle to L...|
|                  0|\"Kid Inspired\" ...|
|                  1|We need clean wat...|
+-------------------+--------------------+
only showing top 5 rows



In [9]:
from nltk.tokenize import WordPunctTokenizer
from nltk.corpus import stopwords
from nltk import download

from string import punctuation

tokenizer = WordPunctTokenizer()
download('stopwords')
swords = stopwords.words('english')

b_tokenizer = spark.sparkContext.broadcast(WordPunctTokenizer())
b_swords = spark.sparkContext.broadcast(swords)

def sentence_wrangler(text):
    word_list = b_tokenizer.value.tokenize(text.lower())
    result = []
    for word in word_list:
        if word in b_swords.value:
            continue
        check = False
        for char in word:
            if char in punctuation:
                check = True
                break
        if not check: result.append(word)
    return result

def build_bag(words):
    d = {}
    for word in words:
        if word in d:
            d[word] += 1
        else:
            d[word] = 1
    return d

[nltk_data] Downloading package stopwords to
[nltk_data]     /home/jakeu123/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


In [None]:
# If nltk is not install on worker nodes
!ssh instance-7 'pip install nltk'
!ssh instance-8 'pip install nltk'

In [10]:
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType

sentence_wrangler_udf = udf(sentence_wrangler, ArrayType(StringType()))
dtdf = dtdf.withColumn('words', sentence_wrangler_udf('text'))
dtdf.show(5)

+-------------------+--------------------+--------------------+
|project_is_approved|                text|               words|
+-------------------+--------------------+--------------------+
|                  1|Super Sight Word ...|[super, sight, wo...|
|                  0|Keep Calm and Dan...|[keep, calm, danc...|
|                  1|Lets 3Doodle to L...|[lets, 3doodle, l...|
|                  0|\"Kid Inspired\" ...|[kid, inspired, e...|
|                  1|We need clean wat...|[need, clean, wat...|
+-------------------+--------------------+--------------------+
only showing top 5 rows



In [11]:
from pyspark.sql.functions import udf
from pyspark.sql.types import MapType, IntegerType


build_bag_udf = udf(build_bag, MapType(StringType(), IntegerType()))
dtdf = dtdf.withColumn('bag', build_bag_udf('words'))
dtdf.show(5)

+-------------------+--------------------+--------------------+--------------------+
|project_is_approved|                text|               words|                 bag|
+-------------------+--------------------+--------------------+--------------------+
|                  1|Super Sight Word ...|[super, sight, wo...|[practice -> 3, d...|
|                  0|Keep Calm and Dan...|[keep, calm, danc...|[temporary -> 1, ...|
|                  1|Lets 3Doodle to L...|[lets, 3doodle, l...|[88 -> 1, big -> ...|
|                  0|\"Kid Inspired\" ...|[kid, inspired, e...|[00 -> 1, educati...|
|                  1|We need clean wat...|[need, clean, wat...|[reason -> 1, fee...|
+-------------------+--------------------+--------------------+--------------------+
only showing top 5 rows



In [12]:
from pyspark.sql.functions import explode

bagdf = dtdf.select('project_is_approved', explode(dtdf.bag).alias('word', 'count'))
bagdf.cache()
bagdf.show(5)

+-------------------+--------------+-----+
|project_is_approved|          word|count|
+-------------------+--------------+-----+
|                  1|      practice|    3|
|                  1|differentiated|    1|
|                  1|          year|    3|
|                  1|    considered|    1|
|                  1|     obstacles|    1|
+-------------------+--------------+-----+
only showing top 5 rows



In [13]:
bagdf = bagdf.groupBy('word', 'project_is_approved').agg({'count': 'sum'})
bagdf.show(5)

+-----------+-------------------+----------+
|       word|project_is_approved|sum(count)|
+-----------+-------------------+----------+
|       last|                  1|         2|
|   normally|                  0|         1|
|      ahead|                  1|         4|
|progressing|                  1|         1|
|    qualify|                  1|         5|
+-----------+-------------------+----------+
only showing top 5 rows



In [None]:
!pip install pyarrow==0.9.*
!ssh instance-7 'pip install pyarrow==0.9.*'
!ssh instance-8 'pip install pyarrow==0.9.*'

In [None]:
!ssh instance-7 'pip install pandas'
!ssh instance-8 'pip install pandas'

In [15]:
from pyspark.sql.functions import pandas_udf, PandasUDFType

schema = StructType([StructField('word', StringType()),
                     StructField('counts', ArrayType(IntegerType()))])

@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def combine_sums(pdf):
    counts = [0, 0]
    if len(pdf.index) == 1:
        idx = pdf.loc[0].project_is_approved
        counts[idx] = pdf.loc[0].count
    else:
        if pdf.loc[0].project_is_approved == 0:
            counts[0] = pdf.loc[0].count
            counts[1] = pdf.loc[1].count
        else:
            counts[0] = pdf.loc[1].count
            counts[1] = pdf.loc[0].count
    return pd.DataFrame([pdf.loc[0].word, counts], columns=['word', 'counts'])

bagdf.groupBy('word').apply(combine_sums).show(5)

Py4JJavaError: An error occurred while calling o172.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 12.0 failed 4 times, most recent failure: Lost task 0.3 in stage 12.0 (TID 431, 10.142.0.2, executor 2): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/jakeu123/spark/python/lib/pyspark.zip/pyspark/worker.py", line 229, in main
    process()
  File "/home/jakeu123/spark/python/lib/pyspark.zip/pyspark/worker.py", line 224, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/home/jakeu123/spark/python/lib/pyspark.zip/pyspark/worker.py", line 149, in <lambda>
    func = lambda _, it: map(mapper, it)
  File "<string>", line 1, in <lambda>
  File "/home/jakeu123/spark/python/lib/pyspark.zip/pyspark/worker.py", line 97, in wrapped
    result = f(pd.concat(series, axis=1))
  File "<ipython-input-15-f0125b557cd4>", line 19, in combine_sums
  File "/home/jakeu123/.local/lib/python2.7/site-packages/pandas/core/frame.py", line 403, in __init__
    copy=copy)
  File "/home/jakeu123/.local/lib/python2.7/site-packages/pandas/core/frame.py", line 536, in _init_ndarray
    return create_block_manager_from_blocks([values], [columns, index])
  File "/home/jakeu123/.local/lib/python2.7/site-packages/pandas/core/internals.py", line 4866, in create_block_manager_from_blocks
    construction_error(tot_items, blocks[0].shape[1:], axes, e)
  File "/home/jakeu123/.local/lib/python2.7/site-packages/pandas/core/internals.py", line 4843, in construction_error
    passed, implied))
ValueError: Shape of passed values is (1, 2), indices imply (2, 2)

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
	at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:164)
	at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:114)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1599)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1587)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1586)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1586)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1820)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1769)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1758)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2027)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2048)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2067)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:363)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3272)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2484)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2484)
	at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3253)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3252)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2484)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2698)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:254)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/jakeu123/spark/python/lib/pyspark.zip/pyspark/worker.py", line 229, in main
    process()
  File "/home/jakeu123/spark/python/lib/pyspark.zip/pyspark/worker.py", line 224, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/home/jakeu123/spark/python/lib/pyspark.zip/pyspark/worker.py", line 149, in <lambda>
    func = lambda _, it: map(mapper, it)
  File "<string>", line 1, in <lambda>
  File "/home/jakeu123/spark/python/lib/pyspark.zip/pyspark/worker.py", line 97, in wrapped
    result = f(pd.concat(series, axis=1))
  File "<ipython-input-15-f0125b557cd4>", line 19, in combine_sums
  File "/home/jakeu123/.local/lib/python2.7/site-packages/pandas/core/frame.py", line 403, in __init__
    copy=copy)
  File "/home/jakeu123/.local/lib/python2.7/site-packages/pandas/core/frame.py", line 536, in _init_ndarray
    return create_block_manager_from_blocks([values], [columns, index])
  File "/home/jakeu123/.local/lib/python2.7/site-packages/pandas/core/internals.py", line 4866, in create_block_manager_from_blocks
    construction_error(tot_items, blocks[0].shape[1:], axes, e)
  File "/home/jakeu123/.local/lib/python2.7/site-packages/pandas/core/internals.py", line 4843, in construction_error
    passed, implied))
ValueError: Shape of passed values is (1, 2), indices imply (2, 2)

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
	at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:164)
	at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:114)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
