Data Mining and Visualisation 2019-2020 <br>
Practical 4 - Big Data Mining <br>


This practical requires you to use apache spark and spark ml in a standalone mode to run a machine learning pipeline using linear regression. A dataset has been provided for this that enables you to test and experiment yourself with a good chunk of the Apache Ecosystem.
Some parts of the code are provided, some others need to be completed by yourselves. In short you are going to use linear regression to predict game sales <br>

Use this notebook with Google CoLab.

Although not needed, you might enable the GPU capability; select Edit, Notebook setting and from the drop down menu select gpu. The notebook will then reconnect and assign a GPU that you can use for free.

If you run the command included below in the main code, you can see the GPU allocation you have been provided.

https://github.com/maxpumperla/elephas/blob/master/examples/Spark_ML_Pipeline.ipynb

In [1]:
import os       #importing os to set environment variable
def install_java():
  !apt-get install -y openjdk-8-jdk-headless -qq > /dev/null      #install openjdk
  os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"     #set environment variable
  !java -version       #check java version
install_java()

openjdk version "11.0.9.1" 2020-11-04
OpenJDK Runtime Environment (build 11.0.9.1+1-Ubuntu-0ubuntu1.18.04)
OpenJDK 64-Bit Server VM (build 11.0.9.1+1-Ubuntu-0ubuntu1.18.04, mixed mode, sharing)


In [2]:
!wget -q https://www-us.apache.org/dist/spark/spark-2.4.7/spark-2.4.7-bin-hadoop2.7.tgz

In [3]:
!tar xf spark-2.4.7-bin-hadoop2.7.tgz

In [4]:
!pip install -q findspark
!pip install -q elephas
!pip install --ignore-installed PyYAML

[K     |████████████████████████████████| 421.9MB 22kB/s 
[K     |████████████████████████████████| 337kB 45.0MB/s 
[K     |████████████████████████████████| 204.2MB 86kB/s 
[K     |████████████████████████████████| 3.9MB 53.7MB/s 
[K     |████████████████████████████████| 450kB 54.6MB/s 
[K     |████████████████████████████████| 51kB 8.1MB/s 
[K     |████████████████████████████████| 51kB 9.4MB/s 
[K     |████████████████████████████████| 20.1MB 1.3MB/s 
[K     |████████████████████████████████| 204kB 57.0MB/s 
[?25h  Building wheel for elephas (setup.py) ... [?25l[?25hdone
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Building wheel for gast (setup.py) ... [?25l[?25hdone
[31mERROR: tensorflow-probability 0.12.1 has requirement gast>=0.3.2, but you'll have gast 0.2.2 which is incompatible.[0m
[31mERROR: datascience 0.10.6 has requirement folium==0.2.1, but you'll have folium 0.8.3 which is incompatible.[0m
[31mERROR: albumentations 0.1.12 has requir

In [5]:
#To find the GPU information allocated to play with run the below snippet of code
!ln -sf /opt/bin/nvidia-smi /usr/bin/nvidia-smi
!pip install gputil
!pip install psutil
!pip install humanize
import psutil
import humanize
import os
import GPUtil as GPU
GPUs = GPU.getGPUs()
# XXX: only one GPU on Colab and isn’t guaranteed
gpu = GPUs[0]
def printm():
 process = psutil.Process(os.getpid())
 print("Gen RAM Free: " + humanize.naturalsize( psutil.virtual_memory().available ), " | Proc size: " + humanize.naturalsize( process.memory_info().rss))
 print("GPU RAM Free: {0:.0f}MB | Used: {1:.0f}MB | Util {2:3.0f}% | Total {3:.0f}MB".format(gpu.memoryFree, gpu.memoryUsed, gpu.memoryUtil*100, gpu.memoryTotal))
printm() 

Collecting gputil
  Downloading https://files.pythonhosted.org/packages/ed/0e/5c61eedde9f6c87713e89d794f01e378cfd9565847d4576fa627d758c554/GPUtil-1.4.0.tar.gz
Building wheels for collected packages: gputil
  Building wheel for gputil (setup.py) ... [?25l[?25hdone
  Created wheel for gputil: filename=GPUtil-1.4.0-cp36-none-any.whl size=7411 sha256=e32cb4ad0e2cd9c49afce65978c0a428d5024ea70b5986f6225c5fe8668d26a3
  Stored in directory: /root/.cache/pip/wheels/3d/77/07/80562de4bb0786e5ea186911a2c831fdd0018bda69beab71fd
Successfully built gputil
Installing collected packages: gputil
Successfully installed gputil-1.4.0
Gen RAM Free: 12.8 GB  | Proc size: 143.3 MB
GPU RAM Free: 15079MB | Used: 0MB | Util   0% | Total 15079MB


In [6]:
import os
os.environ["SPARK_HOME"] = "/content/spark-2.4.7-bin-hadoop2.7"
import findspark
findspark.init()

In [7]:
from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName('Elephas_App').setMaster('local[8]')
sc = SparkContext(conf=conf)

In [None]:
#from pyspark.sql import SparkSession
#sc = SparkSession.builder \
 #   .master("local[*]") \
  #  .appName("Learning_Spark") \
   # .getOrCreate()

In [8]:
from __future__ import print_function
print(sc)

<SparkContext master=local[8] appName=Elephas_App>


In [10]:
from google.colab import files
files.upload()

Saving test.csv to test.csv
Saving train.csv to train.csv


In [11]:
data_path = "./"

Up until that point what you do is simply importing and installing everything that's needed to have a spark cluster created. The files.upload() command will enable you to upload the dataset provided, i.e. vgsales.csv, so that you can then start the data mining bit.

In [12]:
from pyspark.sql import SQLContext
from pyspark.ml.linalg import Vectors
import numpy as np
import random

sqlcontext = SQLContext(sc)

def shuffle_csv(csv_file):
    lines = open(csv_file).readlines()
    random.shuffle(lines)
    open(csv_file, 'w').writelines(lines)

def load_data_frame(csv_file, shuffle=True, train=True):
    if shuffle:
        shuffle_csv(csv_file)
    data = sc.textFile(data_path + csv_file) # This is an RDD, which will later be transformed to a data frame
    data = data.filter(lambda x:x.split(',')[0] != 'id').map(lambda line: line.split(','))
    if train:
        data = data.map(
            lambda line: (Vectors.dense(np.asarray(line[1:-1]).astype(np.float32)),
                          str(line[-1])) )
    else:
        # Test data gets dummy labels. We need the same structure as in Train data
        data = data.map( lambda line: (Vectors.dense(np.asarray(line[1:]).astype(np.float32)),"Class_1") ) 
    return sqlcontext.createDataFrame(data, ['features', 'category'])

In [13]:
train_df = load_data_frame("train.csv")
test_df = load_data_frame("test.csv", shuffle=False, train=False) # No need to shuffle test data

print("Train data frame:")
train_df.show(10)

print("Test data frame (note the dummy category):")
test_df.show(10)

Train data frame:
+--------------------+--------+
|            features|category|
+--------------------+--------+
|[2.0,0.0,0.0,0.0,...| Class_8|
|[0.0,0.0,0.0,0.0,...| Class_2|
|[0.0,0.0,0.0,0.0,...| Class_5|
|[0.0,0.0,0.0,0.0,...| Class_7|
|[0.0,0.0,1.0,0.0,...| Class_5|
|[0.0,0.0,0.0,0.0,...| Class_2|
|[0.0,0.0,0.0,0.0,...| Class_2|
|[3.0,0.0,0.0,0.0,...| Class_2|
|[0.0,0.0,0.0,0.0,...| Class_2|
|[0.0,0.0,0.0,0.0,...| Class_3|
+--------------------+--------+
only showing top 10 rows

Test data frame (note the dummy category):
+--------------------+--------+
|            features|category|
+--------------------+--------+
|[0.0,0.0,0.0,0.0,...| Class_1|
|[2.0,2.0,14.0,16....| Class_1|
|[0.0,1.0,12.0,1.0...| Class_1|
|[0.0,0.0,0.0,1.0,...| Class_1|
|[1.0,0.0,0.0,1.0,...| Class_1|
|[0.0,0.0,0.0,0.0,...| Class_1|
|[0.0,0.0,0.0,0.0,...| Class_1|
|[2.0,0.0,0.0,0.0,...| Class_1|
|[0.0,0.0,0.0,0.0,...| Class_1|
|[0.0,0.0,0.0,0.0,...| Class_1|
+--------------------+--------+
only showing top 

In [14]:
from pyspark.ml.feature import StringIndexer

string_indexer = StringIndexer(inputCol="category", outputCol="index_category")
fitted_indexer = string_indexer.fit(train_df)
indexed_df = fitted_indexer.transform(train_df)

In [15]:
from pyspark.ml.feature import StandardScaler

scaler = StandardScaler(inputCol="features", outputCol="scaled_features", withStd=True, withMean=True)
fitted_scaler = scaler.fit(indexed_df)
scaled_df = fitted_scaler.transform(indexed_df)

In [16]:
print("The result of indexing and scaling. Each transformation adds new columns to the data frame:")
scaled_df.show(10)

The result of indexing and scaling. Each transformation adds new columns to the data frame:
+--------------------+--------+--------------+--------------------+
|            features|category|index_category|     scaled_features|
+--------------------+--------+--------------+--------------------+
|[2.0,0.0,0.0,0.0,...| Class_8|           2.0|[1.05768602129565...|
|[0.0,0.0,0.0,0.0,...| Class_2|           0.0|[-0.2535060296260...|
|[0.0,0.0,0.0,0.0,...| Class_5|           6.0|[-0.2535060296260...|
|[0.0,0.0,0.0,0.0,...| Class_7|           5.0|[-0.2535060296260...|
|[0.0,0.0,1.0,0.0,...| Class_5|           6.0|[-0.2535060296260...|
|[0.0,0.0,0.0,0.0,...| Class_2|           0.0|[-0.2535060296260...|
|[0.0,0.0,0.0,0.0,...| Class_2|           0.0|[-0.2535060296260...|
|[3.0,0.0,0.0,0.0,...| Class_2|           0.0|[1.71328204675652...|
|[0.0,0.0,0.0,0.0,...| Class_2|           0.0|[-0.2535060296260...|
|[0.0,0.0,0.0,0.0,...| Class_3|           3.0|[-0.2535060296260...|
+--------------------+--

In [17]:
from tensorflow.python.keras.models import Sequential
from tensorflow.python.keras.layers.core import Dense, Dropout, Activation
from tensorflow.python.keras.utils import np_utils, generic_utils

nb_classes = train_df.select("category").distinct().count()
input_dim = len(train_df.select("features").first()[0])

model = Sequential()
model.add(Dense(512, input_shape=(input_dim,)))
model.add(Activation('relu'))
model.add(Dropout(0.5))
model.add(Dense(512))
model.add(Activation('relu'))
model.add(Dropout(0.5))
model.add(Dense(512))
model.add(Activation('relu'))
model.add(Dropout(0.5))
model.add(Dense(nb_classes))
model.add(Activation('softmax'))

model.compile(loss='categorical_crossentropy', optimizer='adam')

In [19]:
from elephas.ml_model import ElephasEstimator
from tensorflow.python.keras import optimizers


adam = optimizers.Adam(lr=0.003)
opt_conf = optimizers.serialize(adam)

# Initialize SparkML Estimator and set all relevant properties
estimator = ElephasEstimator()
estimator.setFeaturesCol("scaled_features")             # These two come directly from pyspark,
estimator.setLabelCol("index_category")                 # hence the camel case. Sorry :)
estimator.set_keras_model_config(model.to_yaml())       # Provide serialized Keras model
estimator.set_categorical_labels(True)
estimator.set_nb_classes(nb_classes)
estimator.set_num_workers(1)  # We just use one worker here. Feel free to adapt it.
estimator.set_epochs(20) 
estimator.set_batch_size(128)
estimator.set_verbosity(1)
estimator.set_validation_split(0.15)
estimator.set_optimizer_config(opt_conf)
estimator.set_mode("synchronous")
estimator.set_loss("categorical_crossentropy")
estimator.set_metrics(['acc'])

ElephasEstimator_152bc2649c63

In [20]:
from pyspark.ml import Pipeline

pipeline = Pipeline(stages=[string_indexer, scaler, estimator])

In [21]:
from pyspark.mllib.evaluation import MulticlassMetrics

fitted_pipeline = pipeline.fit(train_df) # Fit model to data

prediction = fitted_pipeline.transform(train_df) # Evaluate on train data.
#prediction = fitted_pipeline.transform(test_df) # <-- The same code evaluates test data.
pnl = prediction.select("index_category", "prediction")
pnl.show(100)

prediction_and_label = pnl.rdd.map(lambda row: (row.index_category, row.prediction))
metrics = MulticlassMetrics(prediction_and_label)
print(metrics.precision())

  config = yaml.load(yaml_string)


>>> Fit model
>>> Synchronous training complete.
+--------------+----------+
|index_category|prediction|
+--------------+----------+
|           8.0|       0.0|
|           8.0|       2.0|
|           8.0|       1.0|
|           8.0|       0.0|
|           8.0|       4.0|
|           8.0|       3.0|
|           8.0|       2.0|
|           8.0|       0.0|
|           8.0|       0.0|
|           8.0|       0.0|
|           8.0|       0.0|
|           8.0|       4.0|
|           8.0|       6.0|
|           8.0|       0.0|
|           8.0|       0.0|
|           8.0|       2.0|
|           8.0|       1.0|
|           8.0|       1.0|
|           8.0|       6.0|
|           8.0|       0.0|
|           8.0|       1.0|
|           8.0|       2.0|
|           8.0|       0.0|
|           8.0|       2.0|
|           8.0|       4.0|
|           8.0|       2.0|
|           8.0|       0.0|
|           8.0|       0.0|
|           8.0|       0.0|
|           8.0|       3.0|
|           8.0|       0.0|

In [None]:
#read the file using spark read command

# you may want to count the data and show them to understand what is all about

# print out the schema

# show the game name for each platform; choose a number of rows you would like to show

# you could run some descriptive statistics for the NA_sales and EU_sales

In [None]:
# now group the data by platform and order them by count; show the first 10 rows in descending order

# now use pyspark sql types to change the column type to double for "Year", "NA_Sales", "EU_Sales", "JP_Sales"

# e.g. data2 = data.withColumn("Year", data["Year"].cast(DoubleType()))

In [None]:
from pyspark.ml.feature import VectorAssembler
inputcols = ["Global_Sales", "NA_Sales", "EU_Sales"]
assembler = VectorAssembler(inputCols= inputcols,
                            outputCol = "predictors")
predictors = assembler.transform(data)
predictors.columns

In [None]:
model_data = predictors.select("predictors", "JP_Sales")
model_data.show(5,truncate=False)

In [None]:
# create train_data and test_data randomly at 80%/20%
# from pyspark.ml.regression import LinearRegression
# create a model using the predictors created above using as labels the JP_Sales
#fit and evaliate the model on test_data

In [None]:
lrModel.coefficients

In [None]:
pred.predictions.show(20)


In [None]:
# use mse, rmse, mae and r^2 as metrics.... you may find them at from pyspark.ml.evaluation import RegressionEvaluator

In [None]:
rmse = eval.evaluate(pred.predictions)
mse = eval.evaluate(pred.predictions, {eval.metricName: "mse"})
mae = eval.evaluate(pred.predictions, {eval.metricName: "mae"})
r2 = eval.evaluate(pred.predictions, {eval.metricName: "r2"})

In [None]:
rmse

In [None]:
mse

In [None]:
mae

In [None]:
r2