Permalink
Browse files

Add UDFTransformer and fix minor PipelineStage bugs

  • Loading branch information...
eedeleon authored and elibarzilay committed Nov 20, 2017
1 parent 73663a7 commit c793ff89282fd48c51a4e774149a39006cc12754
@@ -45,14 +45,14 @@
"import mmlspark\n",
"from pyspark.sql.types import IntegerType, StringType, StructType, StructField\n",
"\n",
"dataFile = \"BookReviewsFromAmazon10K.tsv\"\n",
"dataFilePath = \"BookReviewsFromAmazon10K.tsv\"\n",
"textSchema = StructType([StructField(\"rating\", IntegerType(), False),\n",
" StructField(\"text\", StringType(), False)])\n",
"import os, urllib\n",
"if not os.path.isfile(dataFile):\n",
" urllib.request.urlretrieve(\"https://mmlspark.azureedge.net/datasets/\"+dataFile, dataFile)\n",
"raw_data = spark.createDataFrame(pd.read_csv(dataFile, sep=\"\\t\", header=None), textSchema)\n",
"raw_data.show(5)"
"if not os.path.isfile(dataFilePath):\n",
" urllib.request.urlretrieve(\"https://mmlspark.azureedge.net/datasets/\" + dataFilePath, dataFilePath)\n",
"rawData = spark.createDataFrame(pd.read_csv(dataFilePath, sep=\"\\t\", header=None), textSchema)\n",
"rawData.show(5)"
]
},
{
@@ -76,14 +76,14 @@
"source": [
"from pyspark.sql.functions import udf\n",
"from pyspark.sql.types import LongType, FloatType, DoubleType\n",
"def word_count(s):\n",
"def wordCount(s):\n",
" return len(s.split())\n",
"def word_length(s):\n",
"def wordLength(s):\n",
" import numpy as np\n",
" ss = [len(w) for w in s.split()]\n",
" return round(float(np.mean(ss)), 2)\n",
"word_length_udf = udf(word_length, DoubleType())\n",
"word_count_udf = udf(word_count, IntegerType())"
"wordLengthUDF = udf(wordLength, DoubleType())\n",
"wordCountUDF = udf(wordCount, IntegerType())"
]
},
{
@@ -92,11 +92,23 @@
"metadata": {},
"outputs": [],
"source": [
"data = raw_data \\\n",
" .select(\"rating\", \"text\",\n",
" word_count_udf(\"text\").alias(\"wordCount\"),\n",
" word_length_udf(\"text\").alias(\"wordLength\")) \\\n",
" .withColumn(\"label\", raw_data[\"rating\"] > 3).drop(\"rating\")"
"from mmlspark import UDFTransformer\n",
"wordLength = \"wordLength\"\n",
"wordCount = \"wordCount\"\n",
"wordLengthTransformer = UDFTransformer(inputCol=\"text\", outputCol=wordLength, udf=wordLengthUDF)\n",
"wordCountTransformer = UDFTransformer(inputCol=\"text\", outputCol=wordCount, udf=wordCountUDF)\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from pyspark.ml import Pipeline\n",
"data = Pipeline(stages=[wordLengthTransformer, wordCountTransformer]) \\\n",
" .fit(rawData).transform(rawData) \\\n",
" .withColumn(\"label\", rawData[\"rating\"] > 3).drop(\"rating\")"
]
},
{
@@ -152,9 +164,9 @@
"featurizedData = hashingScheme.transform(tokenizedData)\n",
"\n",
"# Merge text and numeric features in one feature column\n",
"feature_columns_array = [\"TextFeatures\", \"wordCount\", \"wordLength\"]\n",
"featureColumnsArray = [\"TextFeatures\", \"wordCount\", \"wordLength\"]\n",
"assembler = VectorAssembler(\n",
" inputCols = feature_columns_array,\n",
" inputCols = featureColumnsArray,\n",
" outputCol=\"features\")\n",
"assembledData = assembler.transform(featurizedData)\n",
"\n",
@@ -190,16 +202,16 @@
"for learner in logisticRegressions:\n",
" model = learner.fit(train)\n",
" models.append(model)\n",
" scored_data = model.transform(test)\n",
" metrics.append(evaluator.evaluate(scored_data))\n",
"best_metric = max(metrics)\n",
"best_model = models[metrics.index(best_metric)]\n",
" scoredData = model.transform(test)\n",
" metrics.append(evaluator.evaluate(scoredData))\n",
"bestMetric = max(metrics)\n",
"bestModel = models[metrics.index(bestMetric)]\n",
"\n",
"# Save model\n",
"best_model.write().overwrite().save(\"SparkMLExperiment.mmls\")\n",
"bestModel.write().overwrite().save(\"SparkMLExperiment.mmls\")\n",
"# Get AUC on the validation dataset\n",
"scored_val = best_model.transform(validation)\n",
"print(evaluator.evaluate(scored_val))"
"scoredVal = bestModel.transform(validation)\n",
"print(evaluator.evaluate(scoredVal))"
]
},
{
@@ -268,8 +268,8 @@ class PySparkTransformerWrapperTest(entryPoint: Transformer,
case "ComputePerInstanceStatistics" => computeStatisticsString(entryPointName)
case "IndexToValue" => indexToValueString(entryPointName)
case "ValueIndexerModel" => valueIndexerModelString(entryPointName)
case "_CNTKModel" | "FastVectorAssembler" | "MultiNGram" | "ImageFeaturizer" | "_ImageFeaturizer"
| "_ImageTransformer" | "UnrollImage" | "HashTransform" | "Timer"
case "_CNTKModel" | "_UDFTransformer" | "FastVectorAssembler" | "MultiNGram" | "ImageFeaturizer"
| "_ImageFeaturizer" | "_ImageTransformer" | "UnrollImage" | "HashTransform" | "Timer"
| "StopWordsRemoverTransform" | "ImageSetAugmenter"
=> ""
case _ =>
@@ -0,0 +1,17 @@
// Copyright (C) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License. See LICENSE in project root for information.
package org.apache.spark.ml.param
import org.apache.spark.sql.expressions.UserDefinedFunction
/** Param for UserDefinedFunction. Needed as spark has explicit params for many different
* types but not UserDefinedFunction.
*/
class UDFParam(parent: Params, name: String, doc: String, isValid: UserDefinedFunction => Boolean)
extends ComplexParam[UserDefinedFunction](parent, name, doc, isValid) {
def this(parent: Params, name: String, doc: String) =
this(parent, name, doc, ParamValidators.alwaysTrue)
}
@@ -0,0 +1,17 @@
// Copyright (C) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License. See LICENSE in project root for information.
package org.apache.spark.ml.param
import org.apache.spark.sql.execution.python.UserDefinedPythonFunction
/** Param for UserDefinedPythonFunction. Needed as spark has explicit params for many different
* types but not UserDefinedPythonFunction.
*/
class UDPyFParam(parent: Params, name: String, doc: String, isValid: UserDefinedPythonFunction => Boolean)
extends ComplexParam[UserDefinedPythonFunction](parent, name, doc, isValid) {
def this(parent: Params, name: String, doc: String) =
this(parent, name, doc, ParamValidators.alwaysTrue)
}
@@ -142,17 +142,19 @@ abstract class TestBase extends FunSuite with BeforeAndAfterEachTestData with Be
def makeBasicDF(): DataFrame = {
val df = Seq(
(0, "guitars", "drums"),
(1, "piano", "trumpet"),
(2, "bass", "cymbals")).toDF("numbers", "words", "more")
(0, 0.toDouble, "guitars", "drums", 1.toLong, true),
(1, 1.toDouble, "piano", "trumpet", 2.toLong, false),
(2, 2.toDouble, "bass", "cymbals", 3.toLong, true))
.toDF("numbers", "doubles", "words", "more", "longs", "booleans")
df
}
def makeBasicNullableDF(): DataFrame = {
val df = Seq(
(0, 2.5, "guitars", "drums"),
(1, Double.NaN, "piano", "trumpet"),
(2, 8.9, "bass", null)).toDF("indices", "numbers","words", "more")
(0, 2.5, "guitars", Some("drums"), Some(2.toLong), None),
(1, Double.NaN, "piano", Some("trumpet"), Some(1.toLong), Some(true)),
(2, 8.9, "bass", None, None, Some(false)))
.toDF("numbers", "doubles", "words", "more", "longs", "booleans")
df
}
@@ -163,7 +163,8 @@ class FuzzingTest extends TestBase {
}
test("Verify all pipeline stage values match their param names") {
val exemptions: Set[String] = Set()
val exemptions: Set[String] = Set[String](
"com.microsoft.ml.spark.UDFTransformer") // needs to hide setters from model
pipelineStages.foreach { pipelineStage =>
if (!exemptions(pipelineStage.getClass.getName)) {
val paramFields =
@@ -0,0 +1,137 @@
# Copyright (C) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See LICENSE in the project root for information.
import sys
if sys.version >= '3':
basestring = str
from pyspark.ml.param.shared import *
from mmlspark._UDFTransformer import _UDFTransformer
from pyspark import keyword_only
from pyspark.ml.util import JavaMLReadable, JavaMLWritable
from pyspark.ml.wrapper import JavaTransformer, JavaEstimator, JavaModel
from pyspark.ml.common import inherit_doc
from pyspark.sql.functions import UserDefinedFunction
from pyspark.ml.common import inherit_doc
from mmlspark.Utils import *
@inherit_doc
class UDFTransformer(ComplexParamsMixin, JavaMLReadable, JavaMLWritable, JavaTransformer):
"""
Args:
inputCol (str): The name of the input column (default: )
outputCol (str): The name of the output column
udf (object): User Defined Python Function to be applied to the DF input col
udfScala (object): User Defined Function to be applied to the DF input col
"""
@keyword_only
def __init__(self, inputCol=None, inputCols=None, outputCol=None, udf=None):
super(UDFTransformer, self).__init__()
self._java_obj = self._new_java_obj("com.microsoft.ml.spark.UDFTransformer")
self.inputCol = Param(self, "inputCol", "inputCol: The name of the input column (default: )")
self.inputCols = Param(self, "inputCols", "inputCols: The names of the input columns (default: )")
self.outputCol = Param(self, "outputCol", "outputCol: The name of the output column")
self.udf = Param(self, "udf", "udf: User Defined Python Function to be applied to the DF input col")
if udf != None:
self.setUDF(udf)
self._udf = udf
if inputCol != None:
self.setInputCol(inputCol)
if inputCols != None:
self.setInputCols(inputCols)
if outputCol != None:
self.setOutputCol(outputCol)
def setInputCol(self, value):
"""
Args:
inputCol (str): The name of the input column (default: )
"""
self._set(inputCol=value)
return self
def getInputCol(self):
"""
Returns:
str: The name of the input column (default: )
"""
return self.getOrDefault(self.inputCol)
def setInputCols(self, value):
"""
Args:
inputCols (list): The names of the input columns (default: )
"""
self._set(inputCols=value)
return self
def getInputCols(self):
"""
Returns:
str: The name of the input column (default: )
"""
return self.getOrDefault(self.inputCols)
def setOutputCol(self, value):
"""
Args:
outputCol (str): The name of the output column
"""
self._set(outputCol=value)
return self
def getOutputCol(self):
"""
Returns:
str: The name of the output column
"""
return self.getOrDefault(self.outputCol)
def setUDF(self, udf):
name = getattr(udf,"_name", getattr(udf, "__name__", None))
name = name if name else udf.__class__.__name__
userDefinedFunction = UserDefinedFunction(udf.func, returnType = udf.returnType, name = name)
self._java_obj = self._java_obj.setUDF(userDefinedFunction._judf)
self._udf = udf
return self
def getUDF(self):
return self._udf
@classmethod
def read(cls):
""" Returns an MLReader instance for this class. """
return JavaMMLReader(cls)
@staticmethod
def getJavaPackage():
""" Returns package name String. """
return "com.microsoft.ml.spark.UDFTransformer"
@staticmethod
def _from_java(java_stage):
module_name=_UDFTransformer.__module__
module_name=module_name.rsplit(".", 1)[0] + ".UDFTransformer"
return from_java(java_stage, module_name)
Oops, something went wrong.

0 comments on commit c793ff8

Please sign in to comment.