Skip to content

Commit

Permalink
Added materials for Nov 15 London meetup
Browse files Browse the repository at this point in the history
  • Loading branch information
woobe committed Dec 18, 2017
1 parent f9d82f5 commit 47406b8
Show file tree
Hide file tree
Showing 3 changed files with 409 additions and 0 deletions.
381 changes: 381 additions & 0 deletions 2017_11_15_Big_Data_London/HamOrSpamMojo.ipynb
@@ -0,0 +1,381 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"# Imports\n",
"from pyspark.sql import SparkSession\n",
"from pyspark.sql.types import *\n",
"from pyspark.ml import Pipeline\n",
"from pyspark.ml.feature import HashingTF, RegexTokenizer, StopWordsRemover, IDF\n",
"from pysparkling import *\n",
"from pysparkling.ml import ColumnPruner, H2ODeepLearning\n",
"import os\n"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/html": [
"\n",
" <div>\n",
" <p><b>SparkSession - hive</b></p>\n",
" \n",
" <div>\n",
" <p><b>SparkContext</b></p>\n",
"\n",
" <p><a href=\"http://10.129.54.69:4040\">Spark UI</a></p>\n",
"\n",
" <dl>\n",
" <dt>Version</dt>\n",
" <dd><code>v2.2.0</code></dd>\n",
" <dt>Master</dt>\n",
" <dd><code>local</code></dd>\n",
" <dt>AppName</dt>\n",
" <dd><code>PySparkShell</code></dd>\n",
" </dl>\n",
" </div>\n",
" \n",
" </div>\n",
" "
],
"text/plain": [
"<pyspark.sql.session.SparkSession at 0x10e2a0910>"
]
},
"execution_count": 2,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"# Check Spark is ready\n",
"spark"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Connecting to H2O server at http://10.129.54.69:54321... successful.\n"
]
},
{
"data": {
"text/html": [
"<div style=\"overflow:auto\"><table style=\"width:50%\"><tr><td>H2O cluster uptime:</td>\n",
"<td>06 secs</td></tr>\n",
"<tr><td>H2O cluster version:</td>\n",
"<td>3.14.0.7</td></tr>\n",
"<tr><td>H2O cluster version age:</td>\n",
"<td>26 days </td></tr>\n",
"<tr><td>H2O cluster name:</td>\n",
"<td>sparkling-water-kuba_local-1510775811046</td></tr>\n",
"<tr><td>H2O cluster total nodes:</td>\n",
"<td>1</td></tr>\n",
"<tr><td>H2O cluster free memory:</td>\n",
"<td>768 Mb</td></tr>\n",
"<tr><td>H2O cluster total cores:</td>\n",
"<td>8</td></tr>\n",
"<tr><td>H2O cluster allowed cores:</td>\n",
"<td>8</td></tr>\n",
"<tr><td>H2O cluster status:</td>\n",
"<td>accepting new members, healthy</td></tr>\n",
"<tr><td>H2O connection url:</td>\n",
"<td>http://10.129.54.69:54321</td></tr>\n",
"<tr><td>H2O connection proxy:</td>\n",
"<td>None</td></tr>\n",
"<tr><td>H2O internal security:</td>\n",
"<td>False</td></tr>\n",
"<tr><td>H2O API Extensions:</td>\n",
"<td>XGBoost, Algos, AutoML, Core V3, Core V4</td></tr>\n",
"<tr><td>Python version:</td>\n",
"<td>2.7.13 final</td></tr></table></div>"
],
"text/plain": [
"-------------------------- ----------------------------------------\n",
"H2O cluster uptime: 06 secs\n",
"H2O cluster version: 3.14.0.7\n",
"H2O cluster version age: 26 days\n",
"H2O cluster name: sparkling-water-kuba_local-1510775811046\n",
"H2O cluster total nodes: 1\n",
"H2O cluster free memory: 768 Mb\n",
"H2O cluster total cores: 8\n",
"H2O cluster allowed cores: 8\n",
"H2O cluster status: accepting new members, healthy\n",
"H2O connection url: http://10.129.54.69:54321\n",
"H2O connection proxy:\n",
"H2O internal security: False\n",
"H2O API Extensions: XGBoost, Algos, AutoML, Core V3, Core V4\n",
"Python version: 2.7.13 final\n",
"-------------------------- ----------------------------------------"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"\n",
"Sparkling Water Context:\n",
" * H2O name: sparkling-water-kuba_local-1510775811046\n",
" * cluster size: 1\n",
" * list of used nodes:\n",
" (executorId, host, port)\n",
" ------------------------\n",
" (driver,10.129.54.69,54321)\n",
" ------------------------\n",
"\n",
" Open H2O Flow in browser: http://10.129.54.69:54321 (CMD + click in Mac OSX)\n",
"\n",
" \n"
]
}
],
"source": [
"# Initialize H2O Context\n",
"hc = H2OContext.getOrCreate(spark)\n"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"# This is just helper function returning path to data-files\n",
"def _locate(file_name):\n",
" return \"../../../examples/smalldata/\" + file_name"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"## This method loads the data, perform some basic filtering and create Spark's dataframe\n",
"def load():\n",
" row_rdd = spark.sparkContext.textFile(_locate(\"smsData.txt\")).map(lambda x: x.split(\"\\t\", 1)).filter(lambda r: r[0].strip())\n",
" return spark.createDataFrame(row_rdd, [\"label\", \"text\"])\n"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"##\n",
"## Define the pipeline stages\n",
"##\n"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"## Tokenize the messages\n",
"tokenizer = RegexTokenizer(inputCol=\"text\",\n",
" outputCol=\"words\",\n",
" minTokenLength=3,\n",
" gaps=False,\n",
" pattern=\"[a-zA-Z]+\")\n"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"## Remove ignored words\n",
"stopWordsRemover = StopWordsRemover(inputCol=tokenizer.getOutputCol(),\n",
" outputCol=\"filtered\",\n",
" stopWords=[\"the\", \"a\", \"\", \"in\", \"on\", \"at\", \"as\", \"not\", \"for\"],\n",
" caseSensitive=False)\n"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"\n",
"## Hash the words\n",
"hashingTF = HashingTF(inputCol=stopWordsRemover.getOutputCol(),\n",
" outputCol=\"wordToIndex\",\n",
" numFeatures=1 << 10)\n"
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"## Create inverse document frequencies model\n",
"idf = IDF(inputCol=hashingTF.getOutputCol(),\n",
" outputCol=\"tf_idf\",\n",
" minDocFreq=4)\n"
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"## Create H2ODeepLearning model\n",
"dl = H2ODeepLearning(epochs=10,\n",
" l1=0.001,\n",
" l2=0.0,\n",
" hidden=[200, 200],\n",
" featuresCols=[idf.getOutputCol()],\n",
" predictionCol=\"label\")\n"
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"## Remove all helper columns\n",
"colPruner = ColumnPruner(columns=[\n",
" idf.getOutputCol(),\n",
" hashingTF.getOutputCol(),\n",
" stopWordsRemover.getOutputCol(),\n",
" tokenizer.getOutputCol()])\n"
]
},
{
"cell_type": "code",
"execution_count": 13,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"## Create the pipeline by defining all the stages\n",
"pipeline = Pipeline(stages=[tokenizer, stopWordsRemover, hashingTF, idf, dl, colPruner])\n",
"\n"
]
},
{
"cell_type": "code",
"execution_count": 14,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"## Train the pipeline model\n",
"data = load()\n",
"model = pipeline.fit(data)"
]
},
{
"cell_type": "code",
"execution_count": 15,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"##\n",
"## Make predictions on unlabeled data\n",
"## Spam detector\n",
"##\n",
"def isSpam(smsText, model, h2oContext, hamThreshold = 0.5):\n",
" smsTextDF = spark.createDataFrame([(smsText,)], [\"text\"]) # create one element tuple\n",
" prediction = model.transform(smsTextDF)\n",
" return prediction.first()[\"spam\"] > hamThreshold\n"
]
},
{
"cell_type": "code",
"execution_count": 16,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"False\n",
"True\n"
]
}
],
"source": [
"print(isSpam(\"Michal, h2oworld party tonight in MV?\", model, hc))\n",
"\n",
"print(isSpam(\"We tried to contact you re your reply to our offer of a Video Handset? 750 anytime any networks mins? UNLIMITED TEXT?\", model, hc))"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 2",
"language": "python",
"name": "python2"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 2
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython2",
"version": "2.7.13"
}
},
"nbformat": 4,
"nbformat_minor": 0
}
Binary file not shown.

0 comments on commit 47406b8

Please sign in to comment.