## Azioni

- Aggregate data
- They actually load the result into the interpreter
    - collect() retrieves an RDD
    - reduce() reduces an RDD to a single value
    - count() Return the number of elements in the dataset.
    - first() Return the first element of the dataset (similar to take(1)).
    - take(n) Return an array with the first n elements of the dataset.
    - countByKey() Returns a dict of (K, Int) pairs with the count of each key.

## Trasformazioni
- Create a new RDD from an existing one.
- They are computed lazily when requested.
- They are recalculated whenever an action is run against them (can .cache() them to avoid it)
    - map(f) applies a function to every element of the RDD
    - flatMap(f) same as map but flattens result if element was a list
    - filter(f) filters RDD for given predicate
    - union(rdd2) concats two RDD
    - intersection(rdd2) returns common elements with other RDDs
    - reduceByKey() considers an entry in the RDD to be a (key, value) tuple and reduces by same key


In [None]:
sc

In [None]:
rdd1 = sc.parallelize(range(10))

In [None]:
rdd1

In [None]:
print rdd1

In [None]:
rdd1.collect()

In [None]:
rdd2 = sc.parallelize(range(20))

In [None]:
rdd2.collect()

In [None]:
rdd3 = rdd1.union(rdd2)

In [None]:
rdd3.collect()

In [None]:
rdd3.distinct().collect()

In [None]:
rdd1.intersection(rdd2).collect()

In [None]:
rdd4 = rdd1.intersection(rdd2).sortBy(lambda x: x)

In [None]:
rdd4.take(9)

In [None]:
rdd1 = sc.parallelize(range(10))

In [None]:
def sum_1(x):
    return x*1
a = rdd1.map(sum_1)

In [None]:
a.collect()

In [None]:
sentence = ['ciao come','va? io bene']

In [None]:
rdd_sentence = sc.parallelize(sentence)

In [None]:
rdd_sentence.first()

In [None]:
rdd_sentence_flat = rdd_sentence.flatMap(lambda x: x.split())

In [None]:
rdd_sentence_flat.collect()

In [126]:
# data http://www.gutenberg.org/files/236/236-0.txt
lines = sc.textFile('./data/book.txt')
lines.take(1)

[u'The Project Gutenberg EBook of The Jungle Book, by Rudyard Kipling']

In [None]:
lines.count()

In [None]:
words = lines.map(lambda x: x.split())
words.first()

In [None]:
words = lines.flatMap(lambda x: x.split())
words.first()

In [None]:
words_mapped = words.map(lambda x:(x,1))

In [None]:
words_frequency = words_mapped.reduceByKey(lambda v1,v2: v1+v2)

In [None]:
most_used_words = words_frequency.filter(lambda pair: pair[1]>20)

In [None]:
most_used_words.take(10)

In [None]:
most_used_sorted = most_used_words.sortBy(lambda pair: pair[1], False)

In [None]:
most_used_sorted.take(20)

In [None]:
words_frequency.filter(lambda pair: pair[0].startswith('time')).take(10)

In [None]:
import re
notwre = re.compile('[^\w\s]')

freqs = lines.map(lambda line: line.replace('--', ' '))\
            .flatMap(lambda x: x.split()) \
            .map(lambda w: notwre.sub('', w)) \
            .map(lambda w: w.lower()) \
            .filter(lambda w: len(w) > 2) \
            .map(lambda w: (w, 1)) \
            .reduceByKey(lambda v1,v2: v1+v2)

freqs.filter(lambda pair: pair[0].startswith('time')).take(10)

## Babies born in State of NY since 2007

(https://www.healthdata.gov/dataset/baby-names-beginning-2007)


In [None]:
babies_rdd = sc.textFile("./data/Baby_Names__Beginning_2007.csv")\
    .map(lambda line: line.split(","))\
    .filter(lambda line: len(line)>1)\
    .map(lambda line: (line[0],line[1],line[2],line[3],int(line[4])))

In [None]:
babies_rdd.take(10)

In [None]:
names_to_counties = babies_rdd.map(lambda d: (d[1], d[2]))

In [None]:
names_to_counties.take(10)

In [None]:
#voglio portare le tuple (nome, citta) in una notazione (nome, [citta1,citta2])
grouped_names_to_counties = names_to_counties.groupByKey()

In [None]:
grouped_names_to_counties.filter(lambda d: d[0]=="MICHAEL").collect()

In [None]:
# se volessimo sapere il numero di nomi per county
grouped_names_to_counties.map(lambda x: (x[0], len(x[1]))).take(10)

In [None]:
# oppure possiamo usare la reduceByKey per ottenere la frequenza del nome globale
names_frequencies = babies_rdd.map(lambda d: (d[1], d[4])).reduceByKey(lambda x,y: x+y)
names_frequencies.take(10)

In [None]:
names_frequencies.sortByKey().take(10)

In [None]:
names_frequencies.sortBy(lambda x:x[1],False).take(10)

In [None]:
babies_rdd.map(lambda d: (d[0], d[4])) \
          .reduceByKey(lambda v1, v2: v1+v2) \
          .sortBy(lambda x: x[1], False) \
          .collect()