# Spark Cache

In [None]:
sc

In [None]:
%%writefile simple.txt
a 1 first
b 2 second
c 3 third

In [None]:
%%bash
hdfs dfs -rm -r -skipTrash simple.txt
hdfs dfs -put simple.txt .

In [None]:
simple_rdd = sc.textFile("hdfs:///user/aadral/simple.txt")

In [None]:
favourite_letters_rdd = simple_rdd.map(lambda x: x.split())
favourite_letters_rdd.first()

In [None]:
# simple_rdd.cache()
# simple_rdd.unpersist()
# simple_rdd.is_cached

# Spark Implicit Cache

In [None]:
wiki_rdd = sc.textFile("hdfs:///data/wiki/en_articles_part")

In [None]:
words_rdd = (
    wiki_rdd
    .map(lambda x: x.split('\t', 1))
    .map(lambda pair: pair[1].lower())
    .flatMap(lambda content: content.split())
)

In [None]:
word_count_rdd = (
    words_rdd
    .map(lambda x: (x, 1))
    .reduceByKey(lambda x, y: x + y)
)

In [None]:
word_count_rdd.cache()

In [None]:
%%time
print(word_count_rdd.is_cached)
print(word_count_rdd.first())
print()

* https://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-persistence

In [None]:
word_count_rdd.unpersist()

# Spark Broadcast

In [None]:
stop_words_rdd = sc.textFile("hdfs:///data/stop_words")
stop_words_broadcast = sc.broadcast(stop_words_rdd.collect())

In [None]:
word_count_rdd.filter(lambda x: (x[0] not in stop_words_broadcast.value)).takeOrdered(10, key=lambda x: -x[1])

# Spark Joins and PairRDD

* https://spark.apache.org/docs/latest/api/scala/org/apache/spark/rdd/PairRDDFunctions.html

In [None]:
tripple_dataset = word_count_rdd.map(lambda x: (x[0][0], x[0], x[1]))
tripple_dataset.first()

In [None]:
favourite_letters_rdd.first()

In [None]:
favourite_letters_rdd.join(tripple_dataset).take(5)

In [None]:
tripple_dataset = word_count_rdd.map(lambda x: (x[0][0], (x[0], x[1])))
tripple_dataset.first()

In [None]:
favourite_letters_rdd.join(tripple_dataset).take(5)

# Last but not least

In [None]:
sc.top()