# COM6012 Scalable Machine Learning 2019 - Haiping Lu
# Lab 1 - Introduction to (Py)Spark and (Sheffield)HPC

## Objectives

* Task 1: To finish in the lab session. **Critical**
* Task 2: To finish in the lab session. **Critical**
* Task 3: To finish in the lab session. **Essential**
* Task 4: To finish in the lab session. **Essential**
* Task 5: To explore by yourself before the next session. **Optional but recommended**

**Suggested reading**: 
* Chapters 2 to 4 of [PySpark tutorial](https://runawayhorse001.github.io/LearningApacheSpark/pyspark.pdf) (several sections in Chapter 3 can be safely skipped)
* [Spark Quick Start](https://spark.apache.org/docs/2.3.2/quick-start.html)

All slides and notebooks are at: https://github.com/haipinglu/ScalableML/

**Note**: We will use pyspark **2.3.2** rather than the latest **2.4.0** due to JVM problem on Windows. See this [discussion](https://stackoverflow.com/questions/53161939/pyspark-error-does-not-exist-in-the-jvm-error-when-initializing-sparkcontext)

## 1. Install Spark

### 1.1. HPC - ShARC  (help: ` hpc@sheffield.ac.uk`)

#### Connect to ShARC (`host: sharc.sheffield.ac.uk`) via SSH: [MobaXterm on Windows](https://www.sheffield.ac.uk/cics/research/hpc/using/access/windowspc),  [Linux/Unix](https://www.sheffield.ac.uk/cics/research/hpc/using/access/linux), [Apple/Mac](https://www.sheffield.ac.uk/cics/research/hpc/using/access/apple). See [Connecting to a cluster using SSH](http://docs.hpc.shef.ac.uk/en/latest/hpc/connecting.html), and also [Intro_to_HPC by Mike](https://github.com/mikecroucher/Intro_to_HPC) (note that we are not using Scala though)

Start an interactive session on a node: `qrshx`


#### Load java and conda
`module load apps/java/jdk1.8.0_102/binary`

`module load apps/python/conda`

#### Create a virtual environment called myspark
`conda create -n myspark python=3.6`

#### Activate the environment
`source activate myspark`

#### Install spark and pyspark 2.3.2 from conda-forge
`conda install -c conda-forge pyspark=2.3.2`

#### Run spark
`pyspark`

#### [Transfer files to HPC](https://www.sheffield.ac.uk/cics/research/hpc/using/access)

#### Not familiar with terminal?: [A tutorial from Mike Croucher](https://github.com/mikecroucher/Intro_to_HPC/blob/gh-pages/terminal_tutorial.md)

### 2. Windows/Linux/Mac: on your own / lab machine.  

You should have `conda` installed from COM6509, the MLAI module, `pyspark 2.3.2` can be installed via  (see above)

`conda install -c conda-forge pyspark=2.3.2`
* Windows: 1) With video - [Install Spark on Windows (PySpark)](https://medium.com/@GalarnykMichael/install-spark-on-windows-pyspark-4498a5d8d66c) 2) [How to install Spark on Windows in 5 steps](https://medium.com/@dvainrub/how-to-install-apache-spark-2-x-in-your-pc-e2047246ffc3) **Note:** The following may be needed. Go to your System Environment Variables and add PYTHONPATH to it with the following value: `%SPARK_HOME%\python;%SPARK_HOME%\python\lib\py4j-<version>-src.zip:%PYTHONPATH%`, just check what py4j version you have in your `spark/python/lib` folder ([source](https://stackoverflow.com/questions/53161939/pyspark-error-does-not-exist-in-the-jvm-error-when-initializing-sparkcontext?noredirect=1&lq=1)).

* Linux: With video - [Install PySpark on Ubuntu](https://medium.com/@GalarnykMichael/install-spark-on-ubuntu-pyspark-231c45677de0)

* Mac: [Install Spark/PySpark on Mac](https://medium.com/@yajieli/installing-spark-pyspark-on-mac-and-fix-of-some-common-errors-355a9050f735)

## 2. Run Spark

### Connect and activate

`qrshx`

`module load apps/java/jdk1.8.0_102/binary`

`module load apps/python/conda`

`source activate myspark`


### Interactive

#### If running spark in a shell, `spark` (SparkSession) and `sc` (SparkContext) is automatically created.
`pyspark` or `pyspark --master local[2]` with two cores

#### If running notebook of spark on your local machine

In [1]:
#import findspark
#findspark.init()
import pyspark
sdfsdffdf

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master("local[2]") \
    .appName("COM6012 Spark Intro") \
    .getOrCreate()

If `import pyspark` reports error, you may try `pip install findspark`, `import findspark`, 
`findspark.init()`, and then `import pyspark` should work.

Check your SparkSession object

In [3]:
spark

Create and check sc (SparkContext)

In [4]:
sc = spark.sparkContext
sc

In [5]:
nums = sc.parallelize([1,2,3,4])
nums.map(lambda x: x*x).collect()

[1, 4, 9, 16]

## 3. Log Mining with Spark - Example


This example deals with **Semi-Structured** data

In [6]:
logFile=spark.read.text("Data/NASA_Aug95_100.txt")
logFile

DataFrame[value: string]

In [7]:
logFile.count()

100

In [8]:
logFile.first()

Row(value='in24.inetnebr.com - - [01/Aug/1995:00:00:01 -0400] "GET /shuttle/missions/sts-68/news/sts-68-mcc-05.txt HTTP/1.0" 200 1839')

**Question**: How many accesses are from Japan?

In [9]:
hostsJapan = logFile.filter(logFile.value.contains(".jp"))

Check whether you are getting what you want.

In [10]:
hostsJapan.show(5,False)

+--------------------------------------------------------------------------------------------------------------+
|value                                                                                                         |
+--------------------------------------------------------------------------------------------------------------+
|kgtyk4.kj.yamagata-u.ac.jp - - [01/Aug/1995:00:00:17 -0400] "GET / HTTP/1.0" 200 7280                         |
|kgtyk4.kj.yamagata-u.ac.jp - - [01/Aug/1995:00:00:18 -0400] "GET /images/ksclogo-medium.gif HTTP/1.0" 200 5866|
|kgtyk4.kj.yamagata-u.ac.jp - - [01/Aug/1995:00:00:21 -0400] "GET /images/NASA-logosmall.gif HTTP/1.0" 304 0   |
|kgtyk4.kj.yamagata-u.ac.jp - - [01/Aug/1995:00:00:21 -0400] "GET /images/MOSAIC-logosmall.gif HTTP/1.0" 304 0 |
|kgtyk4.kj.yamagata-u.ac.jp - - [01/Aug/1995:00:00:22 -0400] "GET /images/USA-logosmall.gif HTTP/1.0" 304 0    |
+-----------------------------------------------------------------------------------------------

In [11]:
hostsJapan.count()

11

### Self-contained Application

Create a file `LogMining100.py`

~~~~
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master("local[2]") \
    .appName("COM6012 Spark Intro") \
    .getOrCreate()

sc = spark.sparkContext
sc.setLogLevel("WARN")

logFile=spark.read.text("Data/NASA_Aug95_100.txt")
hostsJapan = logFile.filter(logFile.value.contains(".jp")).count()

print("\n\nHello Spark: There are %i hosts from Japan.\n\n" % (hostsJapan))

spark.stop()
~~~~


Then run it with `spark-submit Code/LogMining100.py`  



## 4. Big Data Log Mining with Spark 

**Data**: Download the August data in gzip (NASA_access_log_Aug95.gz) from [NASA HTTP server access log](http://ita.ee.lbl.gov/html/contrib/NASA-HTTP.html) and put into your `Data` folder. `NASA_Aug95_100.txt` above is the first 100 lines of the August data.

**Question**: How many accesses are from Japan and UK respectively?

Create a file `LogMiningBig.py`

~~~~
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master("local[2]") \
    .appName("COM6012 Spark Intro") \
    .getOrCreate()

sc = spark.sparkContext
sc.setLogLevel("WARN")

logFile=spark.read.text("../Data/NASA_access_log_Aug95.gz").cache()

hostsJapan = logFile.filter(logFile.value.contains(".jp")).count()
hostsUK = logFile.filter(logFile.value.contains(".uk")).count()

print("\n\nHello Spark: There are %i hosts from UK.\n" % (hostsUK))
print("Hello Spark: There are %i hosts from Japan.\n\n" % (hostsJapan))

spark.stop()
~~~~
**Spark can read gzip file directly. You do not need to unzip it to a big file.**

**Note the use of cache() above**

### Run a program in batch mode

[How to submi batch jobs to ShARC](https://www.sheffield.ac.uk/cics/research/hpc/sharc/batch) **The more resources you request, the longer you need to queue**

Interactive mode will be good for learning, exploring and debugging, with smaller data. For big data, it will be more convenient to use batch processing. You submit the job to the node to join a queue. Once allocated, your job will run, with output properly recorded. This is done via a shell script.

Create a file `Lab1_SubmitBatch.sh`

~~~~
#!/bin/bash
#$ -l h_rt=2:00:00  #time needed
#$ -pe smp 2 #number of cores
#$ -l rmem=4G #number of memery
#$ -o COM6012_Lab1
#$ -j y # normal and error outputs into a single file
#$ -M youremail@shef.ac.uk #Notify you by email, remove this line if you don't like
#$ -m ea #Email you when it finished or aborted
#$ -cwd # Run job from current directory

module load apps/java/jdk1.8.0_102/binary

module load apps/python/conda

source activate myspark

spark-submit ../Code/LogMiningBig.py
~~~~

Get necessary files on your ShARC. Under appopriate directory submit yur job via the `qsub` comand

`qsub Lab1_SubmitBatch.sh`

Check the status of your quening/running job(s) `qstat` (jobs not shown are finished already).

Check your output file.


## 5. Exercises

### More mining questions (completing three or more questions is considered as completion of this exercise):

#### Easier questions (recommended)
* How many requests in total?
* How many requests on a particular day (e.g., 15th August)?
* How many 404 (page not found) errors in total?
* How many 404 (page not found) errors on a particular day (e.g., 15th August)?
* How many requests from a particular host (e.g.,uplherc.up.com)?
* Any other question that you are interested in.

#### More challenging questions that will become easier to answer in Session 2 (optional for Session 1)
* How many **unique** hosts on a particular day (e.g., 15th August)?
* How many **unique** hosts in total (i.e., in August 1995)?
* Which host is the most frequent visitor?
* How many different types of return codes?
* How many requests per day on average?
* How many requests per post on average?
* Any other question that you are interested in.

### The effects of caching (recommended)
* **Compare** the time taken to complete your jobs **with and without** `cache()`.

# Acknowledgements

Many thanks to Twin, Will, and Mike for their kind help and all those kind contributors of open resources.

The log mining problem is adapted from [UC Berkeley cs105x L3](https://www.edx.org/course/introduction-apache-spark-uc-berkeleyx-cs105x).