# ML Tuning

https://spark.apache.org/docs/2.2.0/ml-tuning.html
    ML tuning:
        find good model
        find good parameters of model

https://mapr.com/blog/predicting-loan-credit-risk-using-apache-spark-machine-learning-random-forests/
    another tuning example

        

In [3]:
import findspark
findspark.init()

import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()


In [4]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

# Prepare training documents, which are labeled.
training = spark.createDataFrame([
    (0, "a b c d e spark", 1.0),
    (1, "b d", 0.0),
    (2, "spark f g h", 1.0),
    (3, "hadoop mapreduce", 0.0),
    (4, "b spark who", 1.0),
    (5, "g d a y", 0.0),
    (6, "spark fly", 1.0),
    (7, "was mapreduce", 0.0),
    (8, "e spark program", 1.0),
    (9, "a e c l", 0.0),
    (10, "spark compile", 1.0),
    (11, "hadoop software", 0.0)
], ["id", "text", "label"])

# Configure an ML pipeline, which consists of tree stages: tokenizer, hashingTF, and lr.
tokenizer = Tokenizer(inputCol="text", outputCol="words")  # ["was", "mapreduce"]
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])

# We now treat the Pipeline as an Estimator, wrapping it in a CrossValidator instance.
# This will allow us to jointly choose parameters for all Pipeline stages.
# A CrossValidator requires an Estimator, a set of Estimator ParamMaps, and an Evaluator.
# We use a ParamGridBuilder to construct a grid of parameters to search over.
# With 3 values for hashingTF.numFeatures and 2 values for lr.regParam,
# this grid will have 3 x 2 = 6 parameter settings for CrossValidator to choose from.
paramGrid = ParamGridBuilder() \
    .addGrid(hashingTF.numFeatures, [10, 100, 1000]) \
    .addGrid(lr.regParam, [0.1, 0.01]) \
    .build()

crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(),
                          numFolds=2)  # use 3+ folds in practice

# Run cross-validation, and choose the best set of parameters.
cvModel = crossval.fit(training)

# Prepare test documents, which are unlabeled.
test = spark.createDataFrame([
    (4, "spark i j k"),
    (5, "l m n"),
    (6, "mapreduce spark"),
    (7, "apache hadoop")
], ["id", "text"])

# Make predictions on test documents. cvModel uses the best model found (lrModel).
prediction = cvModel.transform(test)
selected = prediction.select("id", "text", "probability", "prediction")
for row in selected.collect():
    print(row)
    
# spark.stop()


Row(id=4, text='spark i j k', probability=DenseVector([0.1136, 0.8864]), prediction=1.0)
Row(id=5, text='l m n', probability=DenseVector([0.9964, 0.0036]), prediction=0.0)
Row(id=6, text='mapreduce spark', probability=DenseVector([0.3073, 0.6927]), prediction=1.0)
Row(id=7, text='apache hadoop', probability=DenseVector([0.9587, 0.0413]), prediction=0.0)


In [33]:
# https://spark.apache.org/docs/2.2.0/ml-features.html#tokenizer
"""
Tokenization is the process of taking text (such as a sentence) and
breaking it into individual terms (usually words). 
A simple Tokenizer class provides this functionality. 
The example below shows how to split sentences into sequences of words.

RegexTokenizer allows more advanced tokenization based on regular expression (regex) matching. 
By default, the parameter “pattern” (regex, default: "\\s+") is used as delimiters to split the input text. 
Alternatively, users can set parameter “gaps” to false indicating the regex “pattern” denotes “tokens” rather 
than splitting gaps, and find all matching occurrences as the tokenization result.
"""

from pyspark.ml.feature import Tokenizer, RegexTokenizer
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType

sentenceDataFrame = spark.createDataFrame([
    (0, "Hi I heard about Spark"),
    (1, "I wish Java could use case classes"),
    (2, "Logistic,regression,models,are,neat")], ["id", "sentence"])

# split by ' ', but ',' is not touched. return 'Tokenizer' object
tokenizer = Tokenizer(inputCol="sentence", outputCol="words")  
display(tokenizer)

# split by pattern '\\W' word
regexTokenizer = RegexTokenizer(inputCol="sentence", outputCol="words", 
                                pattern="\\W")  # alternatively, pattern="\\w+", gaps(False)

countTokens = udf(lambda words: len(words), IntegerType())

# tokenizer object transform together with a DataFrame, return a inner join DataFrame
tokenized = tokenizer.transform(sentenceDataFrame)  
tokenized.show()

tokenized.select("sentence", "words")\
    .withColumn("tokens", countTokens(col("words"))).show(truncate=False)  # choose 2 cols and add new one with update

regexTokenized = regexTokenizer.transform(sentenceDataFrame)
regexTokenized.select("sentence", "words") \
    .withColumn("tokens", countTokens(col("words"))).show(truncate=False)

Tokenizer_43bc8deb11b72f601119

+---+--------------------+--------------------+
| id|            sentence|               words|
+---+--------------------+--------------------+
|  0|Hi I heard about ...|[hi, i, heard, ab...|
|  1|I wish Java could...|[i, wish, java, c...|
|  2|Logistic,regressi...|[logistic,regress...|
+---+--------------------+--------------------+

+-----------------------------------+------------------------------------------+------+
|sentence                           |words                                     |tokens|
+-----------------------------------+------------------------------------------+------+
|Hi I heard about Spark             |[hi, i, heard, about, spark]              |5     |
|I wish Java could use case classes |[i, wish, java, could, use, case, classes]|7     |
|Logistic,regression,models,are,neat|[logistic,regression,models,are,neat]     |1     |
+-----------------------------------+------------------------------------------+------+

+-----------------------------------+---------

In [68]:
# https://spark.apache.org/docs/2.2.0/api/python/pyspark.ml.html#pyspark.ml.feature.HashingTF
"""
Maps a sequence of terms to their term frequencies using the hashing trick.   such as string to a hashed number/vector

Currently we use Austin Appleby’s MurmurHash 3 algorithm (MurmurHash3_x86_32) to calculate 
the hash code value for the term object. Since a simple modulo is used to transform the hash 
function to a column index, it is advisable to use a power of two as the numFeatures parameter;
otherwise the features will not be mapped evenly to the columns.
"""

from pyspark.ml.feature import HashingTF

# ([],) must have extra ',' to work
df = spark.createDataFrame([(["a", "b", "c", "d"],), (["a", "b", "c"],), (["b", "c", "d"],)], ["words"])
df.show()

hashingTF = HashingTF(numFeatures=8, inputCol="words", outputCol="features123")
hashingTF.transform(df).show()

print(hashingTF.transform(df).take(3))
print(hashingTF.transform(df).head())
hashingTF.transform(df).head().features123


+------------+
|       words|
+------------+
|[a, b, c, d]|
|   [a, b, c]|
|   [b, c, d]|
+------------+

+------------+--------------------+
|       words|         features123|
+------------+--------------------+
|[a, b, c, d]|(8,[1,2,6],[1.0,2...|
|   [a, b, c]| (8,[1,2],[1.0,2.0])|
|   [b, c, d]|(8,[1,2,6],[1.0,1...|
+------------+--------------------+

[Row(words=['a', 'b', 'c', 'd'], features123=SparseVector(8, {1: 1.0, 2: 2.0, 6: 1.0})), Row(words=['a', 'b', 'c'], features123=SparseVector(8, {1: 1.0, 2: 2.0})), Row(words=['b', 'c', 'd'], features123=SparseVector(8, {1: 1.0, 2: 1.0, 6: 1.0}))]
Row(words=['a', 'b', 'c', 'd'], features123=SparseVector(8, {1: 1.0, 2: 2.0, 6: 1.0}))


SparseVector(8, {1: 1.0, 2: 2.0, 6: 1.0})

https://www.cs.umd.edu/Outreach/hsContest99/questions/node3.html

# Sparse Vectors
A vector is a one-dimensional array of elements. The natural C++ implementation of a vector is as a one-dimensional array. However, in many applications, the elements of a vector have mostly zero values. Such a vector is said to be sparse. It is inefficient to use a one-dimensional array to store a sparse vector. It is also inefficient to add elements whose values are zero in forming sums of sparse vectors. Consequently, we should choose a different representation.

One possibility is to represent the elements of a sparse vector as a linked list of nodes, each of which contains an integer index, a numerical value, and a pointer to the next node. Generally, the entries of the list will correspond to the non-zero elements of the vector in order, with each entry containing the index and value for that entry. (This restriction may be violated if a zero value is explicitly assigned to an element).

Your goal is to write a program to add pairs of sparse vectors, creating new sparse vectors. The results of addition should not include any elements whose values are zero. You should then print the resulting vectors with elements in ascending order of index (from smallest index to largest).

Input Format
The input will be several pairs of sparse vectors, with each vector on a separate line. Each sparse vector will consist of a number of index-value pairs, where the first number in each pair is an integer representing the index (location), and the second number is a floating-point number representing the actual value. You may assume all index locations are non-negative. Elements will be entered in ascending order of index. The list of vectors is terminated by a line containing only -1.

Output Format
The output will be sparse vectors representing the sum of each pair of input vectors, each on a separate line. Vector elements should appear as pairs of indices and values, separated by a comma and a blank and enclosed in square braces. Vectors should appear as lists of elements separated by commas. The vector elements must be printed in ascending order of index. Vectors with no elements should appear as the string "empty vector".

Example
Input:

3 1.0 2500 6.3 5000 10.0 60000 5.7 
1 7.5 3 5.7 2500 -6.3 

10 0.0 
15000 6.7 

100 -1.0
100 1.0

-1

Output:

[1, 7.5], [3, 6.7], [5000, 10], [60000, 5.7]

[15000, 6.7]

empty vector

In [60]:
# http://spark.apache.org/docs/2.2.0/api/python/pyspark.ml.html#pyspark.ml.linalg.SparseVector
# https://en.wikipedia.org/wiki/Sparse_matrix
"""
A simple sparse vector class for passing data to MLlib. Users may alternatively pass SciPy’s {scipy.sparse} data types.

In numerical analysis and computer science, a sparse matrix or sparse array is a matrix in which most of the elements
are zero. By contrast, if most of the elements are nonzero, then the matrix is considered dense. The number of 
zero-valued elements divided by the total number of elements (e.g., m × n for an m × n matrix) is called the sparsity 
of the matrix (which is equal to 1 minus the density of the matrix).
"""
from pyspark.ml.linalg import SparseVector
import array

a = SparseVector(4, [1, 3], [3.0, 4.0])
print(a.dot(a))  # 25.0 = 0*0 + 3*3 + 4*4

# https://docs.python.org/3/library/array.html
# https://stackoverflow.com/questions/176011/python-list-vs-array-when-to-use
arr = array.array('d', [1., 2., 3., 4.])  # 'd' type double
print(a.dot(arr))  # 22.0 = 0*0 + 3*2 + 4*4

b = SparseVector(4, [2], [1.0])
a.dot(b)  # 0.0 = 0*0 + 3*0 + 4*0



25.0
22.0


0.0

In [27]:
# http://spark.apache.org/docs/2.2.0/api/python/pyspark.sql.html#pyspark.sql.functions.udf
# Creates a Column expression representing a user defined function (UDF).

from pyspark.sql.types import IntegerType
# two ways to define UDF
# also two ways to be used later, with/without .alias()
slen = udf(lambda s: len(s), IntegerType())
# @udf(returnType=IntegerType())
# def slen(s):
#     return len(s)
@udf
def to_upper(s):
    if s is not None:
        return s.upper()

@udf(returnType=IntegerType())
def add_one(x):
    if x is not None:
        return x + 1

df = spark.createDataFrame([(1, "John Doe", 21)], ("id", "name", "age"))
df.show()
df.printSchema()

df_new = df.select(slen("name").alias("slen(name)"), to_upper("name"), add_one("age"))
# df_new = df.select(slen("name"), to_upper("name"), add_one("age"))
df_new.printSchema()
df_new.show()

# header are changed, 
# then column values are computed and updated.


+---+--------+---+
| id|    name|age|
+---+--------+---+
|  1|John Doe| 21|
+---+--------+---+

root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)

root
 |-- slen(name): integer (nullable = true)
 |-- to_upper(name): string (nullable = true)
 |-- add_one(age): integer (nullable = true)

+----------+--------------+------------+
|slen(name)|to_upper(name)|add_one(age)|
+----------+--------------+------------+
|         8|      JOHN DOE|          22|
+----------+--------------+------------+

