In [1]:
from pyspark import SparkContext
sc = SparkContext.getOrCreate()
print(sc)
print("Ready to go!")

<SparkContext master=local[*] appName=pyspark-shell>
Ready to go!


In [2]:
words = ["We", "love", "Coding"]
words_rdd = sc.parallelize(words)
words_rdd.count()

3

In [5]:
words_rdd.collect()

['We', 'love', 'Coding']

In [6]:
#To analyse large datasets using Spark you will load them into Resilient Distributed Datasets (RDDs). 
#There are a number of ways in which you can create RDDs. Use the parallelize() function to create one from a Python collection,
#and use the textFile() function to create an RDD from the file data/war-and-peace.txt. 

data = sc.textFile("data/war-and-peace.txt")

In [7]:
data.take(3)

['                                      1869',
 '                                 WAR AND PEACE',
 '                                 by Leo Tolstoy']

In [8]:
data.collect()

['                                      1869',
 '                                 WAR AND PEACE',
 '                                 by Leo Tolstoy',
 'BK1',
 '                                 BOOK ONE: 1805',
 'BK1|CH1',
 '  CHAPTER I',
 '',
 '  "Well, Prince, so Genoa and Lucca are now just family estates of the',
 "Buonapartes. But I warn you, if you don't tell me that this means war,",
 'if you still try to defend the infamies and horrors perpetrated by',
 'that Antichrist- I really believe he is Antichrist- I will have',
 'nothing more to do with you and you are no longer my friend, no longer',
 "my 'faithful slave,' as you call yourself! But how do you do? I see",
 'I have frightened you- sit down and tell me all the news."',
 '  It was in July, 1805, and the speaker was the well-known Anna',
 'Pavlovna Scherer, maid of honor and favorite of the Empress Marya',
 'Fedorovna. With these words she greeted Prince Vasili Kuragin, a man',
 'of high rank and importance, who was the firs

In [10]:
## first line
data.first()

'                                      1869'

In [11]:
## number of lines
data.count()

54223

In [12]:
for line in data.take(15):
    print(line)

                                      1869
                                 WAR AND PEACE
                                 by Leo Tolstoy
BK1
                                 BOOK ONE: 1805
BK1|CH1
  CHAPTER I

  "Well, Prince, so Genoa and Lucca are now just family estates of the
Buonapartes. But I warn you, if you don't tell me that this means war,
if you still try to defend the infamies and horrors perpetrated by
that Antichrist- I really believe he is Antichrist- I will have
nothing more to do with you and you are no longer my friend, no longer
my 'faithful slave,' as you call yourself! But how do you do? I see
I have frightened you- sit down and tell me all the news."


In [13]:
### transformations onto RDDs. The following helper function will be useful to select the words from a line.

# A helper function to compute the list of words in a line of text
import re
def get_words(line):
    return re.compile('\w+').findall(line)

print(get_words("This, is a test!"))

['This', 'is', 'a', 'test']


In [14]:
## Use filter() to count the number of lines which mention war and the number of lines which mention peace.
warcount = data.filter(lambda line: "war" in get_words(line)).count()
warcount

265

In [16]:
for line in data.filter(lambda line: "war" in get_words(line)).take(5):
    print(line)

Buonapartes. But I warn you, if you don't tell me that this means war,
things, but Austria never has wished, and does not wish, for war.
to get himself killed. Tell me what this wretched war is for?" she
  "You are off to the war, Prince?" said Anna Pavlovna.
tell you. There is a war now against Napoleon. If it were a war for


In [17]:
## how often is peace mentioned?
peacecount = data.filter(lambda line: "peace" in get_words(line)).count()
peacecount

104

In [18]:
# Capitalize each line in the RDD
for line in data.map(lambda line: line.upper()).take(5):
    print(line)

                                      1869
                                 WAR AND PEACE
                                 BY LEO TOLSTOY
BK1
                                 BOOK ONE: 1805


In [19]:
### Use flatMap() to create an RDD of the words in War and Peace and count the number of words.

# Split each line into words using get_words()
words = data.flatMap(lambda line: get_words(line))
words.take(10)

['1869', 'WAR', 'AND', 'PEACE', 'by', 'Leo', 'Tolstoy', 'BK1', 'BOOK', 'ONE']

In [20]:
words.count()

573322

In [21]:
words.distinct().count()

19206

In [23]:
# Split each line into words using get_words() 
words = data.map(lambda line: get_words(line)) ### used map and not flatmap!
words.take(5)

[['1869'],
 ['WAR', 'AND', 'PEACE'],
 ['by', 'Leo', 'Tolstoy'],
 ['BK1'],
 ['BOOK', 'ONE', '1805']]

In [24]:
words.count()

54223

In [25]:
words.distinct().collect() ### doesnt work with map!!!

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 20.0 failed 1 times, most recent failure: Lost task 0.0 in stage 20.0 (TID 37, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "C:\Anaconda3\envs\ads6ldn\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\worker.py", line 229, in main
  File "C:\Anaconda3\envs\ads6ldn\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\worker.py", line 224, in process
  File "C:\Anaconda3\envs\ads6ldn\lib\site-packages\pyspark\rdd.py", line 2438, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "C:\Anaconda3\envs\ads6ldn\lib\site-packages\pyspark\rdd.py", line 2438, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "C:\Anaconda3\envs\ads6ldn\lib\site-packages\pyspark\rdd.py", line 362, in func
    return f(iterator)
  File "C:\Anaconda3\envs\ads6ldn\lib\site-packages\pyspark\rdd.py", line 1857, in combineLocally
    merger.mergeValues(iterator)
  File "C:\Anaconda3\envs\ads6ldn\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\shuffle.py", line 238, in mergeValues
    d[k] = comb(d[k], v) if k in d else creator(v)
TypeError: unhashable type: 'list'

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1126)
	at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1132)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1599)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1587)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1586)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1586)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1820)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1769)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1758)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2027)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2048)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2067)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2092)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:939)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:938)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:153)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "C:\Anaconda3\envs\ads6ldn\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\worker.py", line 229, in main
  File "C:\Anaconda3\envs\ads6ldn\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\worker.py", line 224, in process
  File "C:\Anaconda3\envs\ads6ldn\lib\site-packages\pyspark\rdd.py", line 2438, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "C:\Anaconda3\envs\ads6ldn\lib\site-packages\pyspark\rdd.py", line 2438, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "C:\Anaconda3\envs\ads6ldn\lib\site-packages\pyspark\rdd.py", line 362, in func
    return f(iterator)
  File "C:\Anaconda3\envs\ads6ldn\lib\site-packages\pyspark\rdd.py", line 1857, in combineLocally
    merger.mergeValues(iterator)
  File "C:\Anaconda3\envs\ads6ldn\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\shuffle.py", line 238, in mergeValues
    d[k] = comb(d[k], v) if k in d else creator(v)
TypeError: unhashable type: 'list'

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1126)
	at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1132)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


In [27]:
for line in data.flatMap(lambda line: get_words(line)).take(5):
    print(line)

1869
WAR
AND
PEACE
by


In [28]:
# Count the number of distinct words
words = data.flatMap(lambda line: get_words(line))
words.distinct().count() 

19206

In [29]:
#Use the function union() to create an RDD of lines with either war or peace mentioned. Count how many lines.
warLines = data.filter(lambda line: "war" in get_words(line))
peaceLines = data.filter(lambda line: "peace" in get_words(line))
warOrPeaceLines = warLines.union(peaceLines)
warOrPeaceLines.count()
warOrPeaceLines.take(5)

["Buonapartes. But I warn you, if you don't tell me that this means war,",
 'things, but Austria never has wished, and does not wish, for war.',
 'to get himself killed. Tell me what this wretched war is for?" she',
 '  "You are off to the war, Prince?" said Anna Pavlovna.',
 'tell you. There is a war now against Napoleon. If it were a war for']

In [32]:
warOrPeaceLines.collect()

["Buonapartes. But I warn you, if you don't tell me that this means war,",
 'things, but Austria never has wished, and does not wish, for war.',
 'to get himself killed. Tell me what this wretched war is for?" she',
 '  "You are off to the war, Prince?" said Anna Pavlovna.',
 'tell you. There is a war now against Napoleon. If it were a war for',
 '  "Well, why are you going to the war?" asked Pierre.',
 'wants to go to the war," replied Pierre, addressing the princess',
 'war and have no pity for me. Why is it?"',
 'am now going to the war, the greatest war there ever was, and I know',
 '  "But they say that war has been declared," replied the visitor.',
 'war that had been announced in a manifesto, and about the',
 'sir, for shame! It would be better if you went to the war."',
 'animated. The colonel told them that the declaration of war had',
 'Nicholas, who when he heard that the war was being discussed had',
 '  "It\'s all about the war," the count shouted down the table. "You',
 '

In [33]:
warAndPeaceLines = warLines.intersection(peaceLines)
warAndPeaceLines.count()

7

In [34]:
warAndPeaceLines.collect()

['  "To enter Russia without declaring war! I will not make peace as',
 '"but they are not even that! They are neither fit for war nor peace!',
 'perpetual peace and the abolition of war, and secondly, by the fact',
 'war and the peace that had been concluded. "Yes, I have been much',
 "'Boyars,' I will say to them, 'I do not desire war, I desire the peace",
 'peace nor war, neither an advance nor a defensive camp at the Drissa',
 'blamed," he said, "both for that war and the peace... but everything']

In [38]:
a = data.filter(lambda line: "war" in get_words(line) and "peace" in get_words(line))#.take(5)
a.count()

7

In [39]:
a.take(1)

['  "To enter Russia without declaring war! I will not make peace as']

In [40]:
#You have already seen three actions: collect() which returns all elements in the RDD, take(n), 
#which return the first n elements of the RDD, and count() which returns the number of elements in the RDD.
#The action reduce() takes as input a function which collapses two elements into one. Use it to find the 
#longest word in War and Peace.

data.flatMap(lambda line: get_words(line)).reduce(lambda acc, line: line if len(line) > len(acc) else acc)

'characteristically'

In [41]:
#find the set of words in War and Peace which only appear titlecased.

words = data.flatMap(lambda line: get_words(line)).distinct()
titled_words = words.filter(lambda word: word.istitle()) # maybe there is a "But", "It" => titlecased because begins a sentence
print(titled_words.count())

untitled_words = words.filter(lambda word: not word.istitle()) ### all lowercase ones and UPPERCASE ones
print(untitled_words.count())

lower_titles = titled_words.map(lambda word: word.lower()) ## changed to "but", "it" ...
nouns = lower_titles.subtract(untitled_words) ### removed the "but", "it" ...!
nouns.take(50)


3068
16138


['leo',
 'lucca',
 'july',
 'buonaparte',
 'funke',
 'mary',
 'bolkonskaya',
 'kutuzov',
 'enghien',
 'andrew',
 'boris',
 'michael',
 'conde',
 'jacobin',
 'jaffa',
 'englishman',
 'rostovs',
 'rostova',
 'vasilevich',
 'mimi',
 'ilynichna',
 'orlov',
 'straits',
 'sparrow',
 'dmitrievna',
 'madeira',
 'suvorovs',
 'russians',
 'egyptian',
 'mikhelson',
 'pomerania',
 'stralsund',
 'achilles',
 'frederick',
 'ferdinand',
 'poland',
 'prussians',
 'theresa',
 'denisov',
 'bondarenko',
 'rook',
 'bogdanich',
 'bilibin',
 'franz',
 'auersperg',
 'formio',
 'amelie',
 'bohemian',
 'buxhowden',
 'hark']

In [46]:
############# Key/Value pairs in Spark

words = data.flatMap(lambda line: get_words(line))
words.take(5)

['1869', 'WAR', 'AND', 'PEACE', 'by']

In [47]:
word_pairs = words.map(lambda word: (word, 1))
word_pairs.take(5)

[('1869', 1), ('WAR', 1), ('AND', 1), ('PEACE', 1), ('by', 1)]

In [48]:
word_counts = word_pairs.reduceByKey(lambda c1, c2: c1 + c2)
word_counts.take(10)

[('PEACE', 1),
 ('Leo', 1),
 ('BOOK', 15),
 ('ONE', 2),
 ('1805', 24),
 ('Prince', 1578),
 ('Genoa', 3),
 ('Lucca', 2),
 ('are', 1222),
 ('now', 1158)]

In [49]:
word_pairs = words.map(lambda word: (word, 1))
word_pairs.groupByKey().map(lambda pair: (pair[0], len(pair[1]))).take(10)

[('PEACE', 1),
 ('Leo', 1),
 ('BOOK', 15),
 ('ONE', 2),
 ('1805', 24),
 ('Prince', 1578),
 ('Genoa', 3),
 ('Lucca', 2),
 ('are', 1222),
 ('now', 1158)]

In [74]:
## Bazic groupByKey example in python
x = sc.parallelize([
    ("USA", 1), ("USA", 2), ("India", 1),
    ("UK", 1), ("India", 4), ("India", 9),
    ("USA", 8), ("USA", 3), ("India", 4),
    ("UK", 6), ("UK", 9), ("UK", 5)], 3)
 

In [75]:
## groupByKey with default partitions
y = x.groupByKey()
 
## Check partitions
print('Output: ',y.getNumPartitions()) 
## Output: 3
 

Output:  3


In [76]:
## Print Output
for t in y.collect():
#    print(t[0], [v for v in t[1]])
    print(t[0], len(t[1]))
    
## USA [1, 2, 8, 3]
## India [1, 4, 9, 4]
## UK [1, 6, 9, 5]

USA 4
UK 4
India 4
