# Basic Configuration

## Required installations
1. Java
    - download from http://www.oracle.com/technetwork/java/javase/downloads/index.html
    - install using the downloaded executable file
    - install Java to a different path name instead of *Program Files* due to issues relating to the additional whitespace
2. Anaconda
    - download from https://www.anaconda.com/download/
    - install using the downloaded executable file
3. Apache Spark 
    - download from https://spark.apache.org/downloads.html
    - unzip to a new folder e.g. *C:\spark*
4. Apache Hadoop 
    - only *winutils.exe* is required
    - download from https://github.com/steveloughran/winutils
    - create a new folder e.g. *C:\hadoop\bin* where this file should be placed in
    - Run command prompt as Adminstrator and execute
    ```bash
    winutils.exe chmod 777 \tmp\hive
    ```
5. Findspark
    - a utility to locate and initialize pyspark
    - install using
    ```bash
    conda install -c conda-forge findspark
    ```

## Environment variables
- HADOOP_HOME = *path\to\hadoop*
- SPARK_HOME = *path\to\spark*
- JAVA_HOME = *path\to\JavaJDK*

# Import base packages

In [1]:
import sys, os, shutil
import findspark
# use findspark to locate and initialize pyspark before importing pyspark
findspark.init()
import pyspark

# Check environment

In [2]:
print("Python Version:", sys.version)
print("Spark Version:", pyspark.__version__)

Python Version: 3.6.3 |Anaconda, Inc.| (default, Oct 15 2017, 03:27:45) [MSC v.1900 64 bit (AMD64)]
Spark Version: 2.1.1+hadoop2.7


# Example 1: Calculate value of Pi

Adapted from https://github.com/apache/spark/blob/master/examples/src/main/python/pi.py

In [None]:
from random import random
from operator import add
from pyspark.sql import SparkSession

def f(_):
    x = random() * 2 - 1
    y = random() * 2 - 1
    return 1 if x ** 2 + y ** 2 <= 1 else 0
    
spark = SparkSession \
        .builder \
        .appName("PythonPi") \
        .getOrCreate()

partitions = 10
num_samples = 1000000

count = spark.sparkContext.parallelize(range(1, num_samples + 1), partitions).map(f).reduce(add)

print("Pi is roughly %f" % (4.0 * count / num_samples))

spark.stop()

# Example 2: Perform Binary Classification using Decision Tree

Adapted from https://github.com/apache/spark/blob/master/examples/src/main/python/ml/decision_tree_classification_example.py

In [11]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

from pyspark.sql import SparkSession

spark = SparkSession\
        .builder\
        .appName("DecisionTreeClassificationExample")\
        .getOrCreate()

# locate data file
basepath = os.path.dirname(os.getcwd())
data_dir = os.path.dirname(os.getcwd()) + '/data/sample_libsvm_data.txt'
print(data_dir)
print(basepath)
filepath = os.path.abspath(os.path.join(basepath, "data/sample_libsvm_data.txt"))
print(filepath)
data = spark.read.format("libsvm").load("file:///D:/local/0 git repos/spark/data/sample_libsvm_data.txt")

spark.stop()

D:\local\0 git repos\spark/data/sample_libsvm_data.txt
D:\local\0 git repos\spark
D:\local\0 git repos\spark\data\sample_libsvm_data.txt


Py4JJavaError: An error occurred while calling o544.load.
: org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: file:/D:/local/0%20git%20repos/spark/data/sample_libsvm_data.txt
	at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:287)
	at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:229)
	at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:315)
	at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:202)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1988)
	at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1026)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
	at org.apache.spark.rdd.RDD.reduce(RDD.scala:1008)
	at org.apache.spark.mllib.util.MLUtils$.computeNumFeatures(MLUtils.scala:92)
	at org.apache.spark.ml.source.libsvm.LibSVMFileFormat$$anonfun$2.apply$mcI$sp(LibSVMRelation.scala:112)
	at org.apache.spark.ml.source.libsvm.LibSVMFileFormat$$anonfun$2.apply(LibSVMRelation.scala:99)
	at org.apache.spark.ml.source.libsvm.LibSVMFileFormat$$anonfun$2.apply(LibSVMRelation.scala:99)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.ml.source.libsvm.LibSVMFileFormat.inferSchema(LibSVMRelation.scala:99)
	at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$7.apply(DataSource.scala:184)
	at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$7.apply(DataSource.scala:184)
	at scala.Option.orElse(Option.scala:289)
	at org.apache.spark.sql.execution.datasources.DataSource.org$apache$spark$sql$execution$datasources$DataSource$$getOrInferFileFormatSchema(DataSource.scala:183)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:387)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:152)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:135)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:564)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.base/java.lang.Thread.run(Thread.java:844)


In [None]:
from pyspark import SparkContext

from pyspark.mllib.tree import DecisionTree, DecisionTreeModel
from pyspark.mllib.util import MLUtils

sc = SparkContext(appName="PythonDecisionTreeClassificationExample")

# Load and parse the data file into an RDD of LabeledPoint.
data = MLUtils.loadLibSVMFile(sc, 'data/sample_libsvm_data.txt')

# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])
    
# Train a DecisionTree model.
# Empty categoricalFeaturesInfo indicates all features are continuous.
model = DecisionTree.trainClassifier(trainingData, numClasses=2, categoricalFeaturesInfo={},
                                    impurity='gini', maxDepth=5, maxBins=32)

# Evaluate model on test instances and compute test error
predictions = model.predict(testData.map(lambda x: x.features))
labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)
testErr = labelsAndPredictions.filter(
        lambda lp: lp[0] != lp[1]).count() / float(testData.count())

print('Test Error = ' + str(testErr))
print('Learned classification tree model:')
print(model.toDebugString())

# Save model (delete previous model if exists)
if os.path.exists("model/myDecisionTreeClassificationModel"):
    shutil.rmtree("model/myDecisionTreeClassificationModel")
model.save(sc, "model/myDecisionTreeClassificationModel")

# load model
sameModel = DecisionTreeModel.load(sc, "model/myDecisionTreeClassificationModel")
    
sc.stop()

# Example 3: Perform Binary Classification using Logistic Regression