In [1]:
# Imports
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.ml import Pipeline
from pyspark.ml.feature import HashingTF, RegexTokenizer, StopWordsRemover, IDF
from pysparkling import *
from pysparkling.ml import ColumnPruner, H2ODeepLearning
import os


In [2]:
# Check Spark is ready
spark

In [3]:
# Initialize H2O Context
hc = H2OContext.getOrCreate(spark)


Connecting to H2O server at http://172.16.225.190:54321... successful.


0,1
H2O cluster uptime:,06 secs
H2O cluster timezone:,Europe/Bratislava
H2O data parsing timezone:,UTC
H2O cluster version:,3.18.0.11
H2O cluster version age:,5 days
H2O cluster name:,sparkling-water-kuba_local-1527615165506
H2O cluster total nodes:,1
H2O cluster free memory:,846 Mb
H2O cluster total cores:,8
H2O cluster allowed cores:,8



Sparkling Water Context:
 * H2O name: sparkling-water-kuba_local-1527615165506
 * cluster size: 1
 * list of used nodes:
  (executorId, host, port)
  ------------------------
  (driver,172.16.225.190,54321)
  ------------------------

  Open H2O Flow in browser: http://172.16.225.190:54321 (CMD + click in Mac OSX)

    


In [4]:
# This is just helper function returning path to data-files
def _locate(file_name):
        return "sparkling-water/examples/smalldata/" + file_name

In [5]:
## This method loads the data, perform some basic filtering and create Spark's dataframe
def load():
    row_rdd = spark.sparkContext.textFile(_locate("smsData.txt")).map(lambda x: x.split("\t", 1)).filter(lambda r: r[0].strip())
    return spark.createDataFrame(row_rdd, ["label", "text"])


In [6]:
##
## Define the pipeline stages
##


In [7]:
## Tokenize the messages
tokenizer = RegexTokenizer(inputCol="text",
                           outputCol="words",
                           minTokenLength=3,
                           gaps=False,
                           pattern="[a-zA-Z]+")


In [8]:
## Remove ignored words
stopWordsRemover = StopWordsRemover(inputCol=tokenizer.getOutputCol(),
                                    outputCol="filtered",
                                    stopWords=["the", "a", "", "in", "on", "at", "as", "not", "for"],
                                    caseSensitive=False)


In [9]:
## Hash the words
hashingTF = HashingTF(inputCol=stopWordsRemover.getOutputCol(),
                      outputCol="wordToIndex",
                      numFeatures=1 << 10)


In [10]:
## Create inverse document frequencies model
idf = IDF(inputCol=hashingTF.getOutputCol(),
          outputCol="tf_idf",
          minDocFreq=4)


In [11]:
## Create H2ODeepLearning model
dl = H2ODeepLearning(epochs=10,
                     l1=0.001,
                     l2=0.0,
                     hidden=[200, 200],
                     featuresCols=[idf.getOutputCol()],
                     predictionCol="label")


In [12]:
## Remove all helper columns
colPruner = ColumnPruner(columns=[
        idf.getOutputCol(),
        hashingTF.getOutputCol(),
        stopWordsRemover.getOutputCol(),
        tokenizer.getOutputCol()])


In [13]:
##  Create the pipeline by defining all the stages
pipeline = Pipeline(stages=[tokenizer, stopWordsRemover, hashingTF, idf, dl, colPruner])



In [14]:
## Train the pipeline model
data = load()
model = pipeline.fit(data)

In [15]:
##
## Make predictions on unlabeled data
## Spam detector
##
def isSpam(smsText, model, h2oContext, hamThreshold = 0.5):
    smsTextDF = spark.createDataFrame([(smsText,)], ["text"]) # create one element tuple
    prediction = model.transform(smsTextDF)
    return prediction.first()["prediction_output"]["p0"] > hamThreshold


In [16]:
print(isSpam("Hello, party tonight after our Bratislava meetup?", model, hc))

print(isSpam("We tried to contact you re your reply to our offer of a Video Handset? 750 anytime any networks mins? UNLIMITED TEXT?", model, hc))

True
False
