## Apache Spark 101 @ DevFest 2020

This notebook contains the demo script I've live coded on the talk Apache Spark 101 for the DevFest 2020 season.

Minor variations might occur from the live talk, but all the content is here.

It first shows that we can access both `spark` and `sc` (Spark Session and Spark Context respectively) from within the Dataproc's Jupyter notebook natively.

Then it loads the `t8.shakespeare.txt` file from Google Cloud Storage and perform the classic word count algorithm with the RDD API. Later we convert it to DataFrame and register it for Spark SQL. 

The final step in the demo is to load the `stopwords` dataset from nltk and perform two types of joins: the traditional hash join and a broadcast join, and show the different execution plans.

In [None]:
spark

In [None]:
sc

In [None]:
text = sc.textFile("gs://danicat/devfest2020/t8.shakespeare.txt")

In [None]:
text.take(10)

In [None]:
words = text.flatMap(lambda line: line.lower().split())

In [None]:
words.take(10)

In [None]:
ones = words.map(lambda w: (w,1))

In [None]:
ones.take(10)

In [None]:
counts = ones.reduceByKey(lambda x, y: x + y)

In [None]:
counts.take(10)

In [None]:
print(counts.toDebugString().decode('utf-8'))

In [None]:
df = counts.toDF(["word", "count"])

In [None]:
df.show()

In [None]:
df.printSchema()

In [None]:
sorted = df.orderBy(df["count"].desc())

In [None]:
sorted.show()

In [None]:
df.createOrReplaceTempView("wordcount")

In [None]:
sorted2 = spark.sql("select * from wordcount order by 2 desc")

In [None]:
sorted2.show()

In [None]:
import nltk

In [None]:
nltk.download('stopwords')

In [None]:
from nltk.corpus import stopwords

In [None]:
sw = stopwords.words('english')

In [None]:
type(sw)

In [None]:
sw[0:10]

In [None]:
swdf = spark.createDataFrame(sw, ["word"]) # will not work!

In [None]:
# This is a hack because createDataFrame needs a tuple
swdf = spark.createDataFrame(map(lambda x: (x,), sw), ["word"])

In [None]:
swdf.show()

In [None]:
swdf.createOrReplaceTempView("stopwords")

In [None]:
nostop = spark.sql("select * from wordcount w where not exists (select 1 from stopwords s where s.word = w.word) order by 2 desc")

In [None]:
nostop.show()

In [None]:
nostop.explain()

In [None]:
from pyspark.sql.functions import broadcast # you can also use a sql hint instead

In [None]:
bcsw = broadcast(swdf)

In [None]:
bcsw.createOrReplaceTempView("bc_sw")

In [None]:
bc_nostop = spark.sql("select * from wordcount w where not exists (select 1 from bc_sw b where b.word = w.word) order by 2 desc")

In [None]:
bc_nostop.show()

In [None]:
bc_nostop.explain()

In [None]:
bc_nostop.explain(True)