reference: https://github.com/maxpumperla/elephas/blob/master/examples/Spark_ML_Pipeline.ipynb

## INSTALL PYSPARK

In [1]:
!apt-get update -qq > /dev/null
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-2.4.7/spark-2.4.7-bin-hadoop2.7.tgz
!tar xf spark-2.4.7-bin-hadoop2.7.tgz
!pip install -q findspark

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.7-bin-hadoop2.7"

import findspark
findspark.init()

from pyspark import SparkContext, SparkConf

spark_conf = SparkConf()\
  .setAppName("YourTest")\
  .setMaster("local[*]")

sc = SparkContext.getOrCreate(spark_conf)

In [None]:
from __future__ import print_function
print(sc)

<SparkContext master=local[*] appName=YourTest>


In [None]:
# ! pip install elephas

In [4]:
import keras
from keras.preprocessing.sequence import pad_sequences
from keras.models import Sequential, load_model
from keras.layers import Dense, Dropout, Embedding, LSTM, SpatialDropout1D
from keras.callbacks import ModelCheckpoint
import os
from sklearn.metrics import roc_auc_score
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
import re
from keras.preprocessing.text import Tokenizer
from sklearn.model_selection import train_test_split
from keras.callbacks import ModelCheckpoint 
from keras import layers, regularizers

Using TensorFlow backend.


In [5]:
from pyspark.sql import SQLContext
from pyspark.mllib.linalg import Vectors
import numpy as np
import random

sql_context = SQLContext(sc)

##BUILD TOKENIZER

In [6]:
datapath = "./"
csv_file = "data_cleaned_shuffled.csv"
data = pd.read_csv(datapath + csv_file)
data["text_clean"] = data['text_cleaned_string'].apply(lambda x: str(x).split())
max_fatures = 10000 
tokenizer = Tokenizer(num_words=max_fatures)
tokenizer.fit_on_texts(data['text_clean'].values)

In [12]:
X = tokenizer.texts_to_sequences(data['text_clean'].values)
X = pad_sequences(X, maxlen = 32) 

Y = pd.get_dummies(data['category']).values 
X_train_total, X_test, Y_train_total, Y_test = train_test_split(X, Y, test_size = 0.05, random_state = 42)

In [15]:
# create train and test dataset

df_train = pd.DataFrame(X_train_total)
df_train["label"] = np.argmax(Y_train_total, axis = 1)
df_train.to_csv("df_train.csv", index = False)


df_test = pd.DataFrame(X_test)
df_test["label"] = np.argmax(Y_test, axis = 1)
df_test.to_csv("df_test.csv", index = False)

len(df_train), len(df_test)

(57109, 3006)

## CREATE RDD

In [16]:
datapath = "./"
csv_file = "df_train.csv"
data = sc.textFile(datapath + csv_file)
header = data.first()

def str_to_int(x):
    return [int(i) for i in x]

data_rdd = data.filter(lambda x:x!= header).map(lambda line: line.split(',')).\
    map(lambda x: str_to_int(x)).\
    map(lambda x: (Vectors.dense(np.asarray(x[:-1]).astype(np.float32)), x[-1]))


train_df = sql_context.createDataFrame(data_rdd, ['features', 'category'])

In [17]:
datapath = "./"
csv_file = "df_test.csv"
data = sc.textFile(datapath + csv_file)
header = data.first()

def str_to_int(x):
    return [int(i) for i in x]

data_rdd = data.filter(lambda x:x!= header).map(lambda line: line.split(',')).\
    map(lambda x: str_to_int(x)).\
    map(lambda x: (Vectors.dense(np.asarray(x[:-1]).astype(np.float32)), x[-1]))


test_df = sql_context.createDataFrame(data_rdd, ['features', 'category'])

In [18]:
test_df.show(5, truncate = False)

+------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+
|features                                                                                                                                                    |category|
+------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+
|[0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,9.0,32.0]                          |2       |
|[0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,219.0,3.0,1607.0,1131.0,208.0,1514.0,793.0,168.0,529.0,63.0]       |1       |
|[0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,910.0,470.0,367.0,679.0,23.0,1583.0,2.0,25.0]              |2 

In [19]:
# make sure 'features' is vector type
test_df.printSchema()

root
 |-- features: vector (nullable = true)
 |-- category: long (nullable = true)



In [None]:
# ========== convert the label to index
# don't need this part this time

from pyspark.ml.feature import StringIndexer

string_indexer = StringIndexer(inputCol="category", outputCol="index_category")
fitted_indexer = string_indexer.fit(train_df)
indexed_df = fitted_indexer.transform(train_df)

# =========== scaling =============
# don't need this part this time

from pyspark.ml.feature import StandardScaler

scaler = StandardScaler(inputCol="features", outputCol="scaled_features", withStd=True, withMean=True)
fitted_scaler = scaler.fit(indexed_df)
scaled_df = fitted_scaler.transform(indexed_df)

## Build Model and ElephasEstimator

In [20]:
# simple lstm

embed_dim = 128
lstm_out = 196
import tensorflow as tf
from tensorflow import keras
from keras.callbacks import ModelCheckpoint 
from keras import layers, regularizers
model = tf.keras.Sequential()
model.add(tf.keras.layers.Embedding(max_fatures, embed_dim,input_length = 32))
model.add(tf.keras.layers.Dropout(0.5))
model.add(tf.keras.layers.LSTM(lstm_out, dropout=0.5, recurrent_dropout=0.2))
model.add(tf.keras.layers.Dense(3,activation='softmax'))
model.compile(loss = 'categorical_crossentropy', optimizer='adam',metrics = ['accuracy'])
save_best_model = tf.keras.callbacks.ModelCheckpoint('best_model_total.h5', monitor='val_accuracy', mode='max', save_best_only=True, verbose=1)
print(model.summary()) 

Model: "sequential"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
embedding (Embedding)        (None, 32, 128)           1280000   
_________________________________________________________________
dropout (Dropout)            (None, 32, 128)           0         
_________________________________________________________________
lstm (LSTM)                  (None, 196)               254800    
_________________________________________________________________
dense (Dense)                (None, 3)                 591       
Total params: 1,535,391
Trainable params: 1,535,391
Non-trainable params: 0
_________________________________________________________________
None


In [21]:
from elephas.ml_model import ElephasEstimator
from tensorflow.keras import optimizers


adam = optimizers.Adam(lr=0.01)
opt_conf = optimizers.serialize(adam)

# Initialize SparkML Estimator and set all relevant properties
estimator = ElephasEstimator()
estimator.setFeaturesCol("features")             # These two come directly from pyspark,
estimator.setLabelCol("category")        
estimator.set_keras_model_config(model.to_yaml())       # Provide serialized Keras model
estimator.set_categorical_labels(True)
estimator.set_nb_classes(3) 
estimator.set_num_workers(4)  
estimator.set_epochs(20) 
estimator.set_batch_size(512)
estimator.set_verbosity(1) 
estimator.set_validation_split(0.15)
estimator.set_optimizer_config(opt_conf)
estimator.set_mode("synchronous")
estimator.set_loss("categorical_crossentropy")
estimator.set_metrics(['acc'])

ElephasEstimator_fb9f7bad3c2d

In [22]:
from pyspark.ml import Pipeline

pipeline = Pipeline(stages=[estimator]) 

In [23]:
fitted_pipeline = pipeline.fit(train_df) # Fit model to data 
# it will take fairly long time to train

>>> Fit model
>>> Synchronous training complete.


In [25]:
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.ml.feature import VectorAssembler

# prediction = fitted_pipeline.transform(train_df) # Evaluate on train data.
prediction = fitted_pipeline.transform(test_df) # <-- The same code evaluates test data.
pnl = prediction.select("category", "prediction")
pnl.show(10, truncate=False) 

+--------+-----------------------------------------------------------------+
|category|prediction                                                       |
+--------+-----------------------------------------------------------------+
|2       |[0.04426686838269234, 0.16747823357582092, 0.7882549166679382]   |
|1       |[0.9730930328369141, 0.019218582659959793, 0.007688305340707302] |
|2       |[0.09128804504871368, 0.6165599226951599, 0.2921519875526428]    |
|0       |[0.999300479888916, 2.9267198988236487E-4, 4.0680859819985926E-4]|
|0       |[0.984204113483429, 0.004681208170950413, 0.011114677414298058]  |
|0       |[0.3636516034603119, 0.49921563267707825, 0.13713277876377106]   |
|1       |[0.6880679726600647, 0.2330729365348816, 0.07885913550853729]    |
|0       |[0.9989369511604309, 3.6318288766779006E-4, 6.998337339609861E-4]|
|0       |[0.7662630677223206, 0.13533107936382294, 0.09840583801269531]   |
|1       |[0.04812038689851761, 0.8488356471061707, 0.10304394364356995]   |

In [26]:
prediction_and_label = pnl.rdd.map((lambda row: (float(row.category), float(np.argmax(row.prediction)))))
metrics = MulticlassMetrics(prediction_and_label)
print(metrics.precision()) 

0.6689953426480373


In [28]:
# can save the pipeline

fitted_model = fitted_pipeline.stages[0]
print(fitted_model)

fitted_model.save("fitted_pipeline")

ElephasTransformer_6fc2eefdd213
