In [1]:
from pyspark.sql import SparkSession
from operator import add

# New API
spark_session = SparkSession\
        .builder\
        .master("spark://192.168.2.119:7077") \
        .appName("GZ")\
        .config("spark.executor.cores",16)\
        .config("spark.dynamicAllocation.enabled", True)\
        .config("spark.dynamicAllocation.shuffleTracking.enabled", True)\
        .config("spark.shuffle.service.enabled", False)\
        .config("spark.dynamicAllocation.executorIdleTimeout","30s")\
        .config("spark.driver.port",9998)\
        .config("spark.blockManager.port",10005)\
        .getOrCreate()

# Old API (RDD)
spark_context = spark_session.sparkContext
spark_context.setLogLevel("ERROR")

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/02/20 19:11:25 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/02/20 19:11:28 WARN ExecutorAllocationManager: Dynamic allocation without a shuffle service is an experimental feature.


# Part A - Working with the RDD API

## Question A.1

A.1.1 Read the English transcripts with Spark, and count the number of lines.

In [None]:
lines_english = spark_context.textFile("hdfs://192.168.2.119:9000/europarl/europarl-v7.de-en.en")
print(lines_english.first())
lines_english1 = lines_english.map(lambda line: line.split('\n'))
line_english_counts = lines_english1.map(lambda w: len(w))
total_english_lines = line_english_counts.reduce(add)
print(f'total number of lines = {total_english_lines}')

[Stage 0:>                                                          (0 + 0) / 1]

A.1.2 Do the same with the other language (so that you have a separate lineage of RDDs for each).

In [5]:
lines_de = spark_context.textFile("hdfs://192.168.2.119:9000/europarl/europarl-v7.de-en.de")
print(lines_de.first())
lines_de1 = lines_de.map(lambda line: line.split('\n'))
line_de_counts = lines_de1.map(lambda w: len(w))
total_de_lines = line_de_counts.reduce(add)
print(f'total number of lines = {total_de_lines}')

Wiederaufnahme der Sitzungsperiode




total number of lines = 1920209


                                                                                

A.1.3 Verify that the line counts are the same for the two languages.
In this case, the count of the english transcripts is 1920209, which is equal to its original language's text.

A.1.4 Count the number of partitions.

In [6]:
print("number of partitions of the english:", lines_english.getNumPartitions())
print("number of partitions of the original:", lines_de.getNumPartitions())

number of partitions of the english: 3
number of partitions of the original: 3


## Question A.2

A.2.1 Pre-process the text from both RDDs by doing the following:

● Lowercase the text

● Tokenize the text (split on space)

Hint: define a function to run in your driver application to avoid writing this code twice.

In [7]:
from pyspark.sql.functions import lower, col
def preprocess(lines):
    lowercase_lines = lines.map(lambda line: line.lower())
    words = lowercase_lines\
    .flatMap(lambda line: line.split(' '))\
    .flatMap(lambda line: line.split('\n'))
    return lowercase_lines, words

A.2.2 Inspect 10 entries from each of your RDDs to verify your pre-processing.

In [8]:
# english
[english_lowercase_lines, _] = preprocess(lines_english)
print(english_lowercase_lines.take(10))
print("----------------------------------------------")
# original language
[de_lowercase_lines, _] = preprocess(lines_de)
print(de_lowercase_lines.take(10))

['resumption of the session', 'i declare resumed the session of the european parliament adjourned on friday 17 december 1999, and i would like once again to wish you a happy new year in the hope that you enjoyed a pleasant festive period.', "although, as you will have seen, the dreaded 'millennium bug' failed to materialise, still the people in a number of countries suffered a series of natural disasters that truly were dreadful.", 'you have requested a debate on this subject in the course of the next few days, during this part-session.', "in the meantime, i should like to observe a minute' s silence, as a number of members have requested, on behalf of all the victims concerned, particularly those of the terrible storms, in the various countries of the european union.", "please rise, then, for this minute' s silence.", "(the house rose and observed a minute' s silence)", 'madam president, on a point of order.', 'you will be aware from the press and television that there have been a num

A.2.3 Verify that the line counts still match after the pre-processing.

In [9]:
# english
lines_english1 = english_lowercase_lines.map(lambda line: line.split('\n'))
line_english_counts = lines_english1.map(lambda w: len(w))
total_english_lines = line_english_counts.reduce(add)
print(f'total number of lines = {total_english_lines}')



total number of lines = 1920209


                                                                                

In [10]:
# original 
lines_de1 = de_lowercase_lines.map(lambda line: line.split('\n'))
line_de_counts = lines_de1.map(lambda w: len(w))
total_de_lines = line_de_counts.reduce(add)
print(f'total number of lines = {total_de_lines}')



total number of lines = 1920209


                                                                                

A.2.3 Verify that the line counts still match after the pre-processing.

After verification, the line counts are exactly the same as it is before preprocessing.

Total number of lines = 1920209

## Question1 A.3

A.3.1 Use Spark to compute the 10 most frequently according words in the English language corpus. Repeat for the other language.

In [11]:
# english
[_, english_words] = preprocess(lines_english)
english_word_key = english_words.map(lambda w: w.strip()).map(lambda w: (w, 1))
english_word_counts = english_word_key.reduceByKey(add)
print(english_word_counts.takeOrdered(10, key=lambda x: -x[1]))



[('the', 3663193), ('of', 1736975), ('to', 1611788), ('and', 1345073), ('in', 1134026), ('that', 835874), ('a', 810540), ('is', 792564), ('for', 557349), ('we', 551244)]


                                                                                

In [12]:
# original 
[_, de_words] = preprocess(lines_de)
de_word_key = de_words.map(lambda w: w.strip()).map(lambda w: (w, 1))
de_word_counts = de_word_key.reduceByKey(add)
print(de_word_counts.takeOrdered(10, key=lambda x: -x[1]))



[('die', 1980477), ('der', 1710353), ('und', 1337721), ('in', 781362), ('zu', 618872), ('den', 577654), ('wir', 489036), ('für', 478326), ('ich', 469025), ('das', 466127)]


                                                                                

In [13]:
english_words.take(3)
english_word_key.take(3)

[('resumption', 1), ('of', 1), ('the', 1)]

A.3.2 Verify that your results are reasonable.

The pipeline to get the 10 most frequently according words:

1. get the splited words using the 'preprocess' function, e.g ['resumption', 'of', 'the']
2. map step: remove the extra blank space and make a key-value-pair, e.g [('resumption', 1), ('of', 1), ('the', 1)]
3. reduce step: combine the pairs with the same key, add up the corresponding value, e.g ('of', 1), ('of', 1) --> ('of', 2).
4. output the ordered result

## Question A.4

A.4.1 Use this parallel corpus to mine some translations in the form of word pairs, for the two languages. Do this by pairing words found on short lines with the same number of words respectively. We (incorrectly) assume the words stay in the same order when translated.

Follow this approach. Work with the pair of RDDs you created in question A.2.
Hint: make a new pair of RDDs for each step, sv_1, en_1, sv_2, en_2, ...

1. Key the lines by their line number (hint: ZipWithIndex()).
2. Swap the key and value - so that the line number is the key.
3. Join the two RDDs together according to the line number key, so you have pairs of matching lines.

In [24]:
# english
[english_lowercase_lines, _] = preprocess(lines_english)
english_lines_index = english_lowercase_lines.zipWithIndex()
english_index_lines = english_lines_index.map(lambda x: (x[1], x[0]))
print(english_index_lines.take(10))

# original
[de_lowercase_lines, _] = preprocess(lines_de)
de_lines_index = de_lowercase_lines.zipWithIndex()
de_index_lines = de_lines_index.map(lambda x: (x[1], x[0]))
print(de_index_lines.take(10))

                                                                                

[(0, 'resumption of the session'), (1, 'i declare resumed the session of the european parliament adjourned on friday 17 december 1999, and i would like once again to wish you a happy new year in the hope that you enjoyed a pleasant festive period.'), (2, "although, as you will have seen, the dreaded 'millennium bug' failed to materialise, still the people in a number of countries suffered a series of natural disasters that truly were dreadful."), (3, 'you have requested a debate on this subject in the course of the next few days, during this part-session.'), (4, "in the meantime, i should like to observe a minute' s silence, as a number of members have requested, on behalf of all the victims concerned, particularly those of the terrible storms, in the various countries of the european union."), (5, "please rise, then, for this minute' s silence."), (6, "(the house rose and observed a minute' s silence)"), (7, 'madam president, on a point of order.'), (8, 'you will be aware from the pre



[(0, 'wiederaufnahme der sitzungsperiode'), (1, 'ich erkläre die am freitag, dem 17. dezember unterbrochene sitzungsperiode des europäischen parlaments für wiederaufgenommen, wünsche ihnen nochmals alles gute zum jahreswechsel und hoffe, daß sie schöne ferien hatten.'), (2, 'wie sie feststellen konnten, ist der gefürchtete "millenium-bug " nicht eingetreten. doch sind bürger einiger unserer mitgliedstaaten opfer von schrecklichen naturkatastrophen geworden.'), (3, 'im parlament besteht der wunsch nach einer aussprache im verlauf dieser sitzungsperiode in den nächsten tagen.'), (4, 'heute möchte ich sie bitten - das ist auch der wunsch einiger kolleginnen und kollegen -, allen opfern der stürme, insbesondere in den verschiedenen ländern der europäischen union, in einer schweigeminute zu gedenken.'), (5, 'ich bitte sie, sich zu einer schweigeminute zu erheben.'), (6, '(das parlament erhebt sich zu einer schweigeminute.)'), (7, 'frau präsidentin, zur geschäftsordnung.'), (8, 'wie sie sich

                                                                                

In [31]:
# join two rdds
# de_en_join = de_index_lines.join(english_index_lines).map(lambda x: (x[1][0], x[1][1]))
# de_en_exclude_empty = de_en_join.filter(lambda x: x[0]!=0 & x[1]!=0)
# de_en_exclude_empty.first()
# (rdd1 union rdd2).reduceByKey(_ ++ _)
de_en_join = de_index_lines.union(english_index_lines).reduceByKey(add)
de_en_join.take(1)

22/02/19 18:10:45 ERROR TaskSetManager: Task 5 in stage 38.0 failed 4 times; aborting job


Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 38.0 failed 4 times, most recent failure: Lost task 5.3 in stage 38.0 (TID 158) (192.168.2.119 executor 1): com.esotericsoftware.kryo.KryoException: java.io.IOException: No space left on device
	at com.esotericsoftware.kryo.io.Output.flush(Output.java:188)
	at com.esotericsoftware.kryo.io.Output.require(Output.java:164)
	at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:251)
	at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:237)
	at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:49)
	at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:38)
	at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651)
	at org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:269)
	at org.apache.spark.serializer.SerializationStream.writeValue(Serializer.scala:134)
	at org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:283)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:171)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: No space left on device
	at java.io.FileOutputStream.writeBytes(Native Method)
	at java.io.FileOutputStream.write(FileOutputStream.java:326)
	at org.apache.spark.storage.TimeTrackingOutputStream.write(TimeTrackingOutputStream.java:59)
	at org.apache.spark.io.MutableCheckedOutputStream.write(MutableCheckedOutputStream.scala:43)
	at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
	at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
	at net.jpountz.lz4.LZ4BlockOutputStream.flush(LZ4BlockOutputStream.java:243)
	at com.esotericsoftware.kryo.io.Output.flush(Output.java:186)
	... 20 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2403)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2352)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2351)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2351)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1109)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1109)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1109)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2591)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2533)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2522)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:898)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2214)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2235)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2254)
	at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:166)
	at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
	at sun.reflect.GeneratedMethodAccessor56.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:748)
Caused by: com.esotericsoftware.kryo.KryoException: java.io.IOException: No space left on device
	at com.esotericsoftware.kryo.io.Output.flush(Output.java:188)
	at com.esotericsoftware.kryo.io.Output.require(Output.java:164)
	at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:251)
	at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:237)
	at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:49)
	at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:38)
	at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651)
	at org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:269)
	at org.apache.spark.serializer.SerializationStream.writeValue(Serializer.scala:134)
	at org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:283)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:171)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
Caused by: java.io.IOException: No space left on device
	at java.io.FileOutputStream.writeBytes(Native Method)
	at java.io.FileOutputStream.write(FileOutputStream.java:326)
	at org.apache.spark.storage.TimeTrackingOutputStream.write(TimeTrackingOutputStream.java:59)
	at org.apache.spark.io.MutableCheckedOutputStream.write(MutableCheckedOutputStream.scala:43)
	at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
	at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
	at net.jpountz.lz4.LZ4BlockOutputStream.flush(LZ4BlockOutputStream.java:243)
	at com.esotericsoftware.kryo.io.Output.flush(Output.java:186)
	... 20 more


In [None]:
en

In [None]:
## Example #1 - Filter by Top_level Domain and compute most common words ##

# Try .ac.uk, .ru, .se, .com
p = re.compile('WARC-Target-URI: \S+\.ac.uk', re.IGNORECASE)


# Note: .partition(..) returns a 3-tuple: the string before the separator (index 0), 
# the separotor (index 1), and the part of the string afterwards (index 2) -- which is the part we want.
all_words = rdd\
    .filter(lambda doc: bool(p.search(doc[1])))\
    .map(lambda web_text: web_text[1].partition('\r\n\r\n')[2])\
    .flatMap(lambda t: t.split(' '))\
    .flatMap(lambda w: w.split('\n'))\



all_words_and_count = all_words.map(lambda w: w.strip())\
    .map(lambda w: (w,1))


word_counts = all_words_and_count.reduceByKey(add)

print(word_counts.takeOrdered(60, key=lambda x: -x[1]))
