### 1. Py. SP: Import Library 

In [1]:
# Import Library
# Python
import random
import os
import numpy as np
import pandas as pd # for data manipulation
import matplotlib.pyplot as plt # for graph

# SPARK
import pyspark
import findspark # to find location where spark installed
findspark.init()
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession

### 2. Py.SP: Make Session

In [2]:
# SPARK: Make Session
sc = SparkContext()
spark = SparkSession(sc)

In [3]:
# Set Target Animal
target_animal = 'monkey'

### 3. Py.SP: Make LBP Features From Images

In [5]:
# SPARK: read image files in directory and make it to dataframe
img_dir = "D:\\Data\\AnimalsOnTheWeb\\" + target_animal
imgs = spark.read.format("image").load(img_dir)
imgs.printSchema()

root
 |-- image: struct (nullable = true)
 |    |-- origin: string (nullable = true)
 |    |-- height: integer (nullable = true)
 |    |-- width: integer (nullable = true)
 |    |-- nChannels: integer (nullable = true)
 |    |-- mode: integer (nullable = true)
 |    |-- data: binary (nullable = true)



In [6]:
# Python: Make Features from Images and save it to CSV file

In [7]:
#!python lbp.py

In [40]:
# SPARK: Read Feature CSV file and make DataFrame
import pyspark.sql.types as typ
res_lbp = spark.read.csv('Res_LBP.csv',header=True)
labels =[
    ('ind',typ.IntegerType()), # index
    ('Animal',typ.StringType()), # Class of animals
    ('File',typ.StringType()), # filename
    ('ID',typ.StringType()), # picture ID
    ('LBP0',typ.FloatType()), # LBP features
    ('LBP1',typ.FloatType()),
    ('LBP2',typ.FloatType()),
    ('LBP3',typ.FloatType()),
    ('LBP4',typ.FloatType()),
    ('LBP5',typ.FloatType()),
    ('LBP6',typ.FloatType()),
    ('LBP7',typ.FloatType()),
    ('LBP8',typ.FloatType()),
    ('LBP9',typ.FloatType()),
]
# Define Schema
schema = typ.StructType([
    typ.StructField(e[0],e[1],False) for e in labels
])

# CSV read
res_lbp = spark.read.csv('Res_LBP.csv',header=True,schema=schema)
# Select Target Animal
target_lbp = res_lbp.where(res_lbp.Animal.isin(target_animal))
target_lbp.printSchema()

root
 |-- ind: integer (nullable = true)
 |-- Animal: string (nullable = true)
 |-- File: string (nullable = true)
 |-- ID: string (nullable = true)
 |-- LBP0: float (nullable = true)
 |-- LBP1: float (nullable = true)
 |-- LBP2: float (nullable = true)
 |-- LBP3: float (nullable = true)
 |-- LBP4: float (nullable = true)
 |-- LBP5: float (nullable = true)
 |-- LBP6: float (nullable = true)
 |-- LBP7: float (nullable = true)
 |-- LBP8: float (nullable = true)
 |-- LBP9: float (nullable = true)



In [41]:
target_lbp.head() #show 1st row

Row(ind=12500, Animal='monkey', File='pic1000_8472370_B_store.jpg', ID='pic1000', LBP0=0.0281333327293396, LBP1=0.03457777947187424, LBP2=0.011955556459724903, LBP3=0.047066666185855865, LBP4=0.09573332965373993, LBP5=0.169155552983284, LBP6=0.04497777670621872, LBP7=0.07631111145019531, LBP8=0.3548888862133026, LBP9=0.1371999979019165)

In [42]:
target_lbp.show() # show 20 row

+-----+------+--------------------+-------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+----------+
|  ind|Animal|                File|     ID|       LBP0|       LBP1|       LBP2|       LBP3|       LBP4|       LBP5|       LBP6|       LBP7|       LBP8|      LBP9|
+-----+------+--------------------+-------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+----------+
|12500|monkey|pic1000_8472370_B...|pic1000|0.028133333| 0.03457778|0.011955556|0.047066666| 0.09573333| 0.16915555|0.044977777| 0.07631111|  0.3548889|    0.1372|
|12501|monkey|pic1001_8472385_B...|pic1001|0.013377778|     0.0216|0.017111111|0.053644445|0.116177775| 0.14786667|0.025066666|0.033822224| 0.49857777|0.07275555|
|12502|monkey|pic1002_8472390_B...|pic1002|     0.0272|0.042933334|0.015333333|     0.0416| 0.08764444|0.120355554|0.032444444|0.060222223| 0.44226667|      0.13|
|12503|monkey|pic1003_

### 4. Py.SP: Make Ground Truth

In [14]:
# Python: Read .mat file
import scipy.io as sio # Library for .mat files
import re # Library for Regular Expression
file_path = 'D:\\Data\\AnimalsOnTheWeb\\' + target_animal + '\\'
file = 'animaldata_'+ target_animal + '.mat'
# Read from .mat files
data_read = sio.loadmat(os.path.join(file_path,file))

# truth table (1 or 0)
truth_tbl = list(data_read['gt'][0]) 

# get picture ID and save it to 'name' column
truth_nameread = list(data_read['imgnames'][0])
truth_name = [t[0] for t in truth_nameread]
truth_lists = pd.DataFrame({'name': truth_name,'truth': truth_tbl})
truth_lists['name'] = truth_lists['name'].astype('str')
re_picid = re.compile('pic\d+')
truth_lists['ID'] = [re_picid.findall(r)[0] for r in truth_lists['name']]
truth_lists.head()

Unnamed: 0,name,truth,ID
0,monkey/pic1_monkey_white2.gif,0,pic1
1,monkey/pic2_logotrans200.gif,0,pic2
2,monkey/pic3_DesignSurvey.gif,0,pic3
3,monkey/pic4_Popup.gif,0,pic4
4,monkey/pic5_Results.gif,0,pic5


In [17]:
# SPARK: convert pandas DF to Spark DF
df_truth = spark.createDataFrame(truth_lists)
df_truth.printSchema()

root
 |-- name: string (nullable = true)
 |-- truth: long (nullable = true)
 |-- ID: string (nullable = true)



In [18]:
# Cast Truth column to integer
df_truth = df_truth.withColumn('truth_int',df_truth['truth'].cast(typ.IntegerType()))
df_truth.printSchema()

root
 |-- name: string (nullable = true)
 |-- truth: long (nullable = true)
 |-- ID: string (nullable = true)
 |-- truth_int: integer (nullable = true)



In [19]:
#show 5 row
df_truth.show(5)

+--------------------+-----+----+---------+
|                name|truth|  ID|truth_int|
+--------------------+-----+----+---------+
|monkey/pic1_monke...|    0|pic1|        0|
|monkey/pic2_logot...|    0|pic2|        0|
|monkey/pic3_Desig...|    0|pic3|        0|
|monkey/pic4_Popup...|    0|pic4|        0|
|monkey/pic5_Resul...|    0|pic5|        0|
+--------------------+-----+----+---------+
only showing top 5 rows



### 5. SP: join  features and Grd Truth dataframe

In [21]:
df_ml = df_truth.join(target_lbp,on='ID')
df_ml.head(5)

[Row(ID='pic1', name='monkey/pic1_monkey_white2.gif', truth=0, truth_int=0, ind=13282, Animal='monkey', File='pic1_monkey_white2.gif', LBP0=0.007415606174618006, LBP1=0.018317654728889465, LBP2=0.011344769969582558, LBP3=0.024017708376049995, LBP4=0.08184836804866791, LBP5=0.1346430480480194, LBP6=0.025013834238052368, LBP7=0.044714998453855515, LBP8=0.5885445475578308, LBP9=0.0641394555568695),
 Row(ID='pic2', name='monkey/pic2_logotrans200.gif', truth=0, truth_int=0, ind=13393, Animal='monkey', File='pic2_logotrans200.gif', LBP0=0.0019064750522375107, LBP1=0.012661870568990707, LBP2=0.00949640292674303, LBP3=0.02928057685494423, LBP4=0.062266185879707336, LBP5=0.11478417366743088, LBP6=0.026978416368365288, LBP7=0.046726617962121964, LBP8=0.6552157998085022, LBP9=0.040683452039957047),
 Row(ID='pic3', name='monkey/pic3_DesignSurvey.gif', truth=0, truth_int=0, ind=13504, Animal='monkey', File='pic3_DesignSurvey.gif', LBP0=0.012463700957596302, LBP1=0.01831400953233242, LBP2=0.00411217

In [22]:
# Select columns from dataframe
df_ml1 = df_ml.select([c for c in df_ml.columns if c in ['truth_int','LBP0','LBP1','LBP2','LBP3','LBP4','LBP5','LBP6','LBP7','LBP8','LBP9']])
df_ml1.show(5)

+---------+-----------+-----------+-----------+-----------+-----------+----------+-----------+-----------+----------+-----------+
|truth_int|       LBP0|       LBP1|       LBP2|       LBP3|       LBP4|      LBP5|       LBP6|       LBP7|      LBP8|       LBP9|
+---------+-----------+-----------+-----------+-----------+-----------+----------+-----------+-----------+----------+-----------+
|        0|0.007415606|0.018317655| 0.01134477|0.024017708| 0.08184837|0.13464305|0.025013834|   0.044715|0.58854455|0.064139456|
|        0|0.001906475|0.012661871|0.009496403|0.029280577|0.062266186|0.11478417|0.026978416|0.046726618| 0.6552158|0.040683452|
|        0|0.012463701| 0.01831401|0.004112173|0.007864002|0.007567247|0.16688217|0.011573436|0.026941095| 0.5995506| 0.14473154|
|        0|0.013060036|0.026299283|  0.0015681|0.010013441|0.014672939|0.15642922| 0.02157258|0.044758067|0.56801075| 0.14361559|
|        0|0.012581527|0.017340913|0.001498325|0.021108761|0.011171338|0.23818967|0.021505

### 6. SP: Machine Learning

##### 6.1. Feature Creator

In [24]:
# make Feature column
import pyspark.ml.feature as ft
labels_feat =[
    ('LBP0',typ.FloatType()),
    ('LBP1',typ.FloatType()),
    ('LBP2',typ.FloatType()),
    ('LBP3',typ.FloatType()),
    ('LBP4',typ.FloatType()),
    ('LBP5',typ.FloatType()),
    ('LBP6',typ.FloatType()),
    ('LBP7',typ.FloatType()),
    ('LBP8',typ.FloatType()),
    ('LBP9',typ.FloatType()),
]
featuresCreator = ft.VectorAssembler(
    inputCols=[col[0] for col in labels_feat[0:]],outputCol='features'
)

##### 6.2. Make Classification Model

In [25]:
# make model
import pyspark.ml.classification as cl
logistic = cl.LogisticRegression(maxIter=10,regParam=0.01,labelCol='truth_int')

##### 6.3. Pipeline

In [26]:
# make pipeline
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[featuresCreator,logistic])

In [27]:
# Separate training and test data
lbp_train, lbp_test = df_ml1.randomSplit([0.7,0.3],seed=100)
# Train model
model = pipeline.fit(lbp_train)
# Test
test_model = model.transform(lbp_test) # get results on test dataset
test_model.take(1)

[Row(truth_int=0, LBP0=0.0, LBP1=0.0, LBP2=0.0, LBP3=0.0, LBP4=0.0, LBP5=0.0, LBP6=0.0, LBP7=0.0, LBP8=0.0, LBP9=0.0, features=SparseVector(10, {}), rawPrediction=DenseVector([2.4983, -2.4983]), probability=DenseVector([0.924, 0.076]), prediction=0.0)]

### 7. SP: Evaluation

In [28]:
# Evaluation
import pyspark.ml.evaluation as ev
evaluator = ev.BinaryClassificationEvaluator(rawPredictionCol='probability',labelCol='truth_int')
print('Area under ROC curve: ' + str(evaluator.evaluate(test_model, {evaluator.metricName:'areaUnderROC'})))

Area under ROC curve: 0.8494523973784068


### 8. SP: K-means Clustering

In [29]:
# K-Means clustering

In [30]:
import pyspark.ml.clustering as clus
kmeans = clus.KMeans(k=10, featuresCol='features')

In [31]:
# make pipeline
pipeline = Pipeline(stages=[featuresCreator,kmeans])

In [38]:
# Get All class data
df_km = res_lbp.select([c for c in df_ml.columns if c in ['LBP0','LBP1','LBP2','LBP3','LBP4','LBP5','LBP6','LBP7','LBP8','LBP9']])
df_km.show(5)

+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+----------+-----------+
|       LBP0|       LBP1|       LBP2|       LBP3|       LBP4|       LBP5|       LBP6|       LBP7|      LBP8|       LBP9|
+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+----------+-----------+
| 0.05400621| 0.09041149|0.024541926| 0.03751553|0.050667703|0.060962733|0.033190995| 0.09750777|0.19192547|  0.3592702|
|0.110507585|0.095817834|0.031439807|0.032135498| 0.03360715|0.071495466|0.038075615| 0.08607818|0.22409226|  0.2767506|
| 0.04896794|0.049928635| 0.02091568|0.035518225| 0.04894049|0.103672594|0.030769654|  0.0590964|0.45443565| 0.14775471|
|   0.012288|    0.02336|   0.011776|   0.029568|    0.04608|   0.121856|   0.031424|   0.062464|  0.469376|   0.191808|
|0.004452055|0.018949771|0.007191781|0.040810503| 0.07351598| 0.18710046|0.022488585|0.028253425|0.54423517|0.073002286|
+-----------+-----------+-------

In [33]:
# Separate training and test data
km_train, km_test = df_km.randomSplit([0.7,0.3],seed=100)

In [34]:
model_km = pipeline.fit(km_train)
test_km = model_km.transform(km_test)

In [35]:
# Show Results
test_km.groupBy('prediction').agg({'*':'count','LBP0':'avg','LBP1':'avg'}).collect()

[Row(prediction=1, avg(LBP1)=0.0, avg(LBP0)=0.0, count(1)=92),
 Row(prediction=6, avg(LBP1)=0.08458137746277936, avg(LBP0)=0.0707300516842136, count(1)=826),
 Row(prediction=3, avg(LBP1)=0.04393933544139808, avg(LBP0)=0.03294809934934775, count(1)=483),
 Row(prediction=5, avg(LBP1)=0.00845794432736707, avg(LBP0)=0.005356709053097502, count(1)=316),
 Row(prediction=9, avg(LBP1)=0.06777245327148859, avg(LBP0)=0.05411297094145017, count(1)=205),
 Row(prediction=4, avg(LBP1)=0.09728791872609635, avg(LBP0)=0.09108109149634162, count(1)=882),
 Row(prediction=8, avg(LBP1)=0.05811804454805779, avg(LBP0)=0.04432795576887719, count(1)=580),
 Row(prediction=7, avg(LBP1)=0.06424826767611626, avg(LBP0)=0.048250739314236644, count(1)=389),
 Row(prediction=2, avg(LBP1)=0.10460108523554307, avg(LBP0)=0.10597452507736473, count(1)=329),
 Row(prediction=0, avg(LBP1)=0.02263585915924736, avg(LBP0)=0.015545153876867784, count(1)=431)]