# (Py)Spark Exercizes
@stravanni



----
#Exercises
###Try to solve the following exercizes employing the API presentet above

##A. Wordcount
1. read the file "example.txt", containing "THE DIVINE COMEDY"
2. Select the 10 most frequent words, exluding che stopwords

In [3]:
import nltk
nltk.download("stopwords")
FILE_PATH = "file:///notebooks/cineca/data/"

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


In [4]:
try:
    import nltk.corpus as corpus
    stopwords = set(corpus.stopwords.words())
    stopwords = stopwords.union(["dante","etc._:","dante's","_the","(_inf._"])
except ImportError: 
    stopwords = []

In [5]:
# scrivere la soluzione qua
# Most common words in "THE DIVINE COMEDY"
rdd = sc.textFile(FILE_PATH + "DivineComedy.txt")
res = rdd .flatMap(lambda line: line.split()) \
        .map(lambda word: word.strip().lower()) \
        .filter(lambda word: word not in stopwords) \
        .map(lambda word: (word, 1)) \
        .reduceByKey(lambda a, b: a + b) \
        .sortBy(lambda (word, count):count, ascending=False) \
        .take(10)
#    .map(lambda (key, cnt): (cnt, key)) \
#    .top(10)
res

[(u'one', 413),
 (u'thou', 284),
 (u'may', 238),
 (u'great', 199),
 (u'made', 183),
 (u'would', 171),
 (u'upon', 166),
 (u'like', 166),
 (u'time', 159),
 (u'thy', 148)]

##B. Estimating Pi
This code estimates π by "throwing darts" at a circle.

1. We pick random points in the unit square ((0, 0) to (1,1)) and see how many fall in the unit circle.
2. The fraction should be π / 4, so we use this to get our estimate.

In [6]:
# SOLUTION
from random import random
NUM_SAMPLES = 100

def sample(p):
    x, y = random(), random()
    return 1 if x*x + y*y < 1 else 0

#print random()
#sample(10)
#a = [1,2,3]
#map(lambda a: 1 if random()**2 + random()**2 < 1 else 0,a)
#lambda a: 1 if random()**2 + random()**2 < 1 else 0

sample_nums = sc.parallelize(xrange(0, NUM_SAMPLES))
#sample = sample_nums.map(lambda a: 1 if 1<2 else 0)
sample = sample_nums.map(sample)
count = sample.reduce(lambda a, b: a + b)

print "Pi is roughly %f" % (4.0 * count / NUM_SAMPLES)

Pi is roughly 2.720000


##C. TMax

In [9]:
import re
import sys

In [10]:
#function to extract the data from the line
#based on position and filter out the invalid records
def extractData(line):
    val = line.strip()
    (year, temp, q) = (str(val[15:19]), str(val[87:92]), str(val[92:93]))
    if (temp != "+9999" and re.match("[01459]", q)):
        return [(year, temp)]
    else:
        return []

In [13]:
#Create an RDD from the input data in HDFS
weatherData = sc.textFile(FILE_PATH + "1902.txt")

In [14]:
#Transform the data to extract/filter and then find the max temperature
temperature_per_year = weatherData.flatMap(extractData)
max_temperature_per_year = temperature_per_year.reduceByKey(lambda a,b : a if int(a) > int(b) else b)

In [15]:
#temperature_per_year.sortByKey().takeSample(False,10)
years = temperature_per_year.map(lambda (y,t): y)
years.distinct().collect()

['1902']

In [16]:
#Save the RDD back into HDFS
max_temperature_per_year.saveAsTextFile("hdfs:///output")
#max_temperature_per_year.saveAsTextFile("hdfs:///output")

####Currently, pyspark doesn't support overwrite or append.

- The function `saveAsTextFile` is
a wrapper around `saveAsHadoopFile` and it's not possible overwrite existing files.

#### in scala
It is however trivial to do this using HDFS directly from Scala:
```
val hadoopConf = new org.apache.hadoop.conf.Configuration()

val hdfs = org.apache.hadoop.fs.FileSystem.get(new java.net.URI("hdfs://localhost:9000"), hadoopConf)
```
#### in shell
- If you need to merge hdfs file, remember to use:
[hadoop getMerge](https://hadoop.apache.org/docs/r2.4.1/hadoop-project-dist/hadoop-common/FileSystemShell.html#getmerge)
- If you simply want to delete it:
```
hdfs dfs -rm -R "hdfs:///output"
```

In [17]:
weatherData_ = sc.textFile("hdfs:///output")

In [18]:
weatherData_.collect()

[u"('1902', '+0244')"]

In [19]:
%%bash
hdfs dfs -ls /
#hdfs dfs -ls /output

Found 3 items
drwxr-xr-x   - root supergroup          0 2015-04-08 20:56 /output
drwxr-xr-x   - root supergroup          0 2015-04-08 14:50 /spark
drwxr-xr-x   - root supergroup          0 2014-12-12 12:05 /user


In [20]:
%%bash
hdfs dfs -rm -R "hdfs:///output"

# http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/FileSystemShell.html#rm

Deleted hdfs:///output


15/04/08 20:56:51 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes.
