# If HDFS is not running

In [None]:
%%bash
start-dfs.sh

# Goals of this practice
* 1) Learn to write Spark applications 
* 2) Understand how Spark executes our applications on multiple machines 

# Wikipedia Pageview statistics

https://dumps.wikimedia.org/other/pagecounts-raw/

* Hourly Wikipedia statistics are stored in each file
* In this practice, we'll use 3 files (= 3 hours of data) that is around 1GB in total

In [None]:
%%bash
# List the 3 files
ls -alh /home/ubuntu/spark_inputs

# Each 'file' has many 'line's of data
* Each 'line' has 4 columns
* project | title | number of pageviews | size of the content

In [None]:
%%bash
# Print the last 10 'line's of a 'file'
tail -n 10 '/home/ubuntu/spark_inputs/pagecounts-20160101-000000'

# Upload the files to HDFS

In [None]:
%%bash
# Upload the files from the local disk to HDFS
hdfs dfs -put "/home/ubuntu/spark_inputs/" hdfs://master:9000/wiki

# View the files being uploaded at MASTER_PUBLIC_IP:50070 
* (Utilities tab -> 'Browse the file system')

In [21]:
%%bash
# List the uploaded files in the HDFS directory
hdfs dfs -ls "hdfs://master:9000/wiki"

Found 3 items
-rw-r--r--   3 ubuntu supergroup  309500921 2018-04-09 06:23 hdfs://master:9000/wiki/pagecounts-20160101-000000
-rw-r--r--   3 ubuntu supergroup  369835721 2018-04-09 06:24 hdfs://master:9000/wiki/pagecounts-20160101-010000
-rw-r--r--   3 ubuntu supergroup  333752520 2018-04-09 06:24 hdfs://master:9000/wiki/pagecounts-20160101-020000


# Set up Spark standalone cluster 
* (instead of YARN for this practice, to easily see the Spark Web UI)
* Check the cluster UI => MASTER_PUBLIC_IP:8080

In [22]:
%%bash
/home/ubuntu/spark_scripts/start_cluster.sh

org.apache.spark.deploy.master.Master running as process 2335.  Stop it first.
[1] 01:43:35 [FAILURE] worker2 Exited with error code 1
org.apache.spark.deploy.worker.Worker running as process 1550.  Stop it first.
[2] 01:43:35 [FAILURE] worker1 Exited with error code 1
org.apache.spark.deploy.worker.Worker running as process 1547.  Stop it first.


# Deploy 2 Spark Executors to the standalone cluster
* (i.e., initialize Spark Session)

In [23]:
import findspark
import os

# Required to import pyspark
findspark.init('/home/ubuntu/spark')

import pyspark

# Set executor configurations
sparkconf = pyspark.SparkConf().set('spark.executor.memory', '20g')
# Deploy Spark executors!!
ss = pyspark.sql.SparkSession.builder.appName("DS2").master("spark://master:7077").config(conf=sparkconf).getOrCreate()

# Congratulations! 2 Spark executors are now up and running
* Check the Spark Web UI => MASTER_PUBLIC_IP:4040
* (Check 'Executors')

# Launch a Spark MapReduce job
* We'll count the sum of pageviews per proejct

In [24]:
import time

# Use SparkContext for mapreduce (not SparkSession)
sc = ss.sparkContext

# Read the data from HDFS
lines = sc.textFile("hdfs://master:9000/wiki/")

# Split each 'line' into columns
columns = lines.map(lambda line: tuple(line.split(" ")))

# Create (project, count) tuples
# Be mindful of 'long()'!
project_count_tuples = columns.map(lambda column: (column[0], long(column[2])))

# For each project, compute the sum of counts
project_sum_tuples = project_count_tuples.reduceByKey(lambda left, right: max(left, right)) 

# Write the output to HDFS
# The Spark job starts here! (LAZY EXECUTION)
project_sum_tuples.saveAsTextFile("hdfs://master:9000/out/" + str(int(round(time.time()))))

# Congratulations! You've just launched Spark 'Tasks' to the 2 executors
* Check the Spark Web UI => MASTER_PUBLIC_IP:4040
* (Job DAG visualizer / Stages / Task metrics )

# Check the output of the MapReduce job

In [None]:
%%bash
hdfs dfs -ls /out/
#hdfs dfs -ls /out/...
#hdfs dfs -tail /out/.../...

# Lessons learned: 
* 1) Spark makes big data easy
  * (We ran 6 lines of 'Python code' to analyze big data)
  * (Spark took care of the hard problems of distributed execution)
* 2) We can handle bigger data by simply adding more EC2 instances
  * (Spark will use the additional resources to execute more tasks in parallel)

==================================================

* Q: I don't even want to write Python code, can I just use SQL on Spark?
* A: Yes :)

# Launch a SparkSQL job

In [28]:
# Create a Spark DataFrame (equivalent of a 'SQL table' in Spark)
df = ss.createDataFrame(columns, ['project', 'title', 'count', 'size'])

# Create a table view called "WikipediaTable"
df.createOrReplaceTempView("WikipediaTable")

# Run a SQL query over the table
# This query orders projects by the number of titles the projects have
selected = ss.sql("SELECT project, COUNT(title) as num_of_title FROM WikipediaTable GROUP BY project ORDER BY num_of_title DESC")

# Print the results in this console (top 20 results will be shown)
selected.show()

+---------+------------+
|  project|num_of_title|
+---------+------------+
|       en|     5843326|
|commons.m|      982905|
|       zh|      763713|
|       de|      696433|
|       fr|      692983|
|       ru|      642028|
|       ja|      610854|
|       es|      607773|
|   www.wd|      443484|
|       it|      356246|
|       pl|      310897|
|       pt|      286648|
|     en.d|      219174|
|       nl|      181850|
|       tr|      178015|
|       sv|      138963|
|       ko|      127152|
|       ar|      122273|
|       vi|      108635|
|       fa|      100613|
+---------+------------+
only showing top 20 rows



# Surprise! You've just launched Spark 'Tasks' to the 2 executors (again!)
* How did our SQL query become Spark 'Tasks'?
* Check the Spark Web UI again! => MASTER_PUBLIC_IP:4040
* Check the SQL tab (SparkSQL Logical operators: Scan, HashAggregate, Project, Exchange, Filter, TakeOrdered, ...)

# Lessons learned: 
* 1) We can use SQL queries (higher-level abstraction) to use Spark
  * (Spark automatically translated our query to parallel Spark 'Tasks' to promptly return the result we requested)
* 2) Similarly, Spark can automatically translate graph processing, streaming, and machine learning workloads to Spark 'Tasks'
  * (some of these topics will be covered in future lectures) 

# (QUIZ 1) 각 project 당 'count' column에 있는 숫자들의 총합이 20 이상인 (project, sum_of_count)을 구하시오
### - 결과값 2개의 column => (project, sum_of_count)
# (QUIZ 2) 다음의 table을 'WikipediaTable'과 Join하여, grade가 'A'에 해당하는 project 속하는 title들을 구하시오
### - 결과값 1개의 column => (title)

In [26]:
cols = ['project', 'grade']
vals = [
     ('en', 'C'),
     ('he', 'A'),
     ('zh', 'B'),    
     ('no', 'A')
]

title_grade = ss.createDataFrame(vals, cols)
title_grade.show()
title_grade.createOrReplaceTempView("TitleGradeTable")

+-------+-----+
|project|grade|
+-------+-----+
|     en|    C|
|     he|    A|
|     zh|    B|
|     no|    A|
+-------+-----+



In [27]:
selected = ss.sql("SELECT title FROM WikipediaTable NATURAL JOIN TitleGradeTable WHERE grade='A'")
selected.show()

+--------------------+
|               title|
+--------------------+
|                    |
|                   $|
|%22/fr/Discussion...|
|%22/fr/Sp%C3%A9ci...|
|                %25s|
|  %27Phags-pa-skrift|
|%28567%29_Eleutheria|
|             %3DOslo|
|%60Abdu%27l-Bah%C...|
|%60Abdu%27l-Bah%C...|
|%C2%AB%C3%86ger%C...|
|%C2%ABAdmiral_Hip...|
|%C2%ABAdmiral_Sch...|
|%C2%ABAdmiral_Sch...|
| %C2%ABAltmark%C2%BB|
|%C2%ABAnglo_Norse...|
|%C2%ABBarbarossa%...|
|%C2%ABCutty_Sark%...|
|%C2%ABDeutschland...|
|%C2%ABEidsvold_18...|
+--------------------+
only showing top 20 rows



In [None]:
selected = ss.sql("SELECT project, SUM(count) as sum_of_count FROM WikipediaTable GROUP BY project HAVING sum_of_count >20")
selected.show()

In [None]:
selected = ss.sql("SELECT title FROM WikipediaTable NATURAL JOIN TitleGradeTable WHERE grade='A'")
selected.show()

# Stop Spark executors and the Spark cluster using the following commands

In [None]:
ss.stop()

In [None]:
%%bash
/home/ubuntu/spark_scripts/stop_cluster.sh