Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MLeap Transformer schema is wrong #618

Closed
Ben-Epstein opened this issue Dec 20, 2019 · 11 comments
Closed

MLeap Transformer schema is wrong #618

Ben-Epstein opened this issue Dec 20, 2019 · 11 comments

Comments

@Ben-Epstein
Copy link

After creating a PySpark model and serializing it to a bundle, I try to read in the MLeap transformer and make a prediction but the prediction is wrong.

Upon investigation, I've found that the inputSchema of the model has been modified, so the features are in the wrong order. If you simply print out the PipelineModel, it shows the features in the correct order, but calling inputSchema gives an incorrect order.
@abaveja313
** Code to reproduce:

Model

from pyspark import SparkConf, SparkContext, SQLContext
from pyspark.sql import Row
from pyspark.ml.feature import VectorAssembler,StringIndexer
from pyspark.ml.classification import LogisticRegression,RandomForestClassifier
from pyspark.ml import Pipeline

data = load_breast_cancer()
X, y = data['data'], data['target']
cols = [str(i) for i in data['feature_names']] + ['label']
sample = Row(*cols)
dataframe = []
for X_sample, y_sample in zip(X, y):
    X_data = [float(i) for i in X_sample]
    label = float(y_sample)
    sample_data = X_data + [label]
    dataframe.append(sample(*sample_data))
df = sqlContext.createDataFrame(dataframe)
features = df.columns
features.remove('label')
assembler = VectorAssembler(inputCols=features, outputCol='features')
model = RandomForestClassifier()
pipeline = Pipeline(stages=[assembler, model])
train, test = df.randomSplit([0.7, 0.3])
fittedPipeline = pipeline.fit(train)
predictions = fittedPipeline.transform(test)
print(predictions.select('prediction').limit(10).collect())

serialization and movement to HDFS:

%%bash 
hdfs dfs -copyFromLocal -f /tmp/mleap-rftest.zip /tmp/mleap-rftest.zip

Reading in the model

import java.net.URI

import ml.bundle.hdfs.HadoopBundleFileSystem
import ml.combust.mleap.runtime.MleapContext
import ml.combust.mleap.runtime.frame.Transformer
import ml.combust.mleap.runtime.MleapSupport._
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem

object HDFSRetriever {
  val config = new Configuration()
  // Create the hadoop file system
  val fs: FileSystem = FileSystem.get(config)
  // Create the hadoop bundle file system
  val bundleFs = new HadoopBundleFileSystem(fs)
  // Create an implicit custom mleap context for saving/loading
  implicit val customMleapContext: MleapContext = MleapContext.defaultContext.copy(
    registry = MleapContext.defaultContext.bundleRegistry.registerFileSystem(bundleFs)
  )

  /**
   * Load a given model from HDFS using
   * the configuration specified in the
   * MLeapContext
   *
   * @param path hdfs path to load from
   */
  def loadBundleFromHDFS(path: String): Transformer = {
    new URI(path).loadMleapBundle().get.root
  }
}
val model = HDFSRetriever.loadBundleFromHDFS("hdfs:///tmp/mleap-rftest.zip");
print(model)
out>> Pipeline(PipelineModel_515b263296a1,NodeShape(Map(),Map()),
PipelineModel(List(VectorAssembler(VectorAssembler_b62fd25850d3,NodeShape(Map(
input0 -> Socket(input0,mean radius), 
input1 -> Socket(input1,mean texture), 
input2 -> Socket(input2,mean perimeter), 
input3 -> Socket(input3,mean area),
input4 -> Socket(input4,mean smoothness), 
input5 -> Socket(input5,mean compactness), 
input6 -> Socket(input6,mean concavity), 
input7 -> Socket(input7,mean concave points), 
input8 -> Socket(input8,mean symmetry),
input9 -> Socket(input9,mean fractal dimension), 
input10 -> Socket(input10,radius error), 
input11 -> Socket(input11,texture error), 
input12 -> Socket(input12,perimeter error), 
input13 -> Socket(input13,area error), 
input14 -> Socket(input14,smoothness error),
input15 -> Socket(input15,compactness error), 
input16 -> Socket(input16,concavity error), 
input17 -> Socket(input17,concave points error), 
input18 -> Socket(input18,symmetry error), 
input19 -> Socket(input19,fractal dimension error), 
input20 -> Socket(input20,worst radius), 
input21 -> Socket(input21,worst texture), 
input22 -> Socket(input22,worst perimeter), 
input23 -> Socket(input23,worst area), 
input24 -> Socket(input24,worst smoothness), 
input25 -> Socket(input25,worst compactness),
input26 -> Socket(input26,worst concavity), 
input27 -> Socket(input27,worst concave points), 
input28 -> Socket(input28,worst symmetry), 
input29 -> Socket(input29,worst fractal dimension)),Map(output -> Socket(output,features))),VectorAssemblerModel(List(ScalarShape(true)

printing the inputSchema:

model.inputSchema.fields.zipWithIndex.foreach { case (field, idx) =>
  println(s"$idx $field")
}
out>> 0 StructField(mean texture,ScalarType(double,true))
1 StructField(concavity error,ScalarType(double,true))
2 StructField(mean compactness,ScalarType(double,true))
3 StructField(mean radius,ScalarType(double,true))
4 StructField(texture error,ScalarType(double,true))
5 StructField(mean smoothness,ScalarType(double,true))
6 StructField(concave points error,ScalarType(double,true))
7 StructField(worst concavity,ScalarType(double,true))
8 StructField(mean concavity,ScalarType(double,true))
9 StructField(compactness error,ScalarType(double,true))
10 StructField(mean area,ScalarType(double,true))
11 StructField(worst fractal dimension,ScalarType(double,true))
12 StructField(worst concave points,ScalarType(double,true))
13 StructField(worst perimeter,ScalarType(double,true))
14 StructField(area error,ScalarType(double,true))
15 StructField(worst compactness,ScalarType(double,true))
16 StructField(worst texture,ScalarType(double,true))
17 StructField(mean concave points,ScalarType(double,true))
18 StructField(mean symmetry,ScalarType(double,true))
19 StructField(worst area,ScalarType(double,true))
20 StructField(symmetry error,ScalarType(double,true))
21 StructField(fractal dimension error,ScalarType(double,true))
22 StructField(worst radius,ScalarType(double,true))
23 StructField(worst smoothness,ScalarType(double,true))
24 StructField(mean fractal dimension,ScalarType(double,true))
25 StructField(radius error,ScalarType(double,true))
26 StructField(smoothness error,ScalarType(double,true))
27 StructField(mean perimeter,ScalarType(double,true))
28 StructField(worst symmetry,ScalarType(double,true))
29 StructField(perimeter error,ScalarType(double,true))

As you can see, the inputSchema is wrong, causing all predictions to be wrong. I've reproduced the same with LogisticRegression models as well. I'm stuck here because without being able to generate the schema I have to specify it each time which creates non-reproducible code.

Is there something I'm doing wrong here or missing? Help would be greatly appreciated!

@ancasarb
Copy link
Member

@Ben-Epstein is it possible to send me some sample data so that I can try this out?

@ancasarb
Copy link
Member

Or if you could please send the serialized zip, that would be good as well.

@ancasarb
Copy link
Member

ancasarb commented Jan 20, 2020

I've actually followed the mleap demo for pyspark https://github.com/ancasarb/mleap-demo/blob/master/notebooks/PySpark%20-%20AirBnb.ipynb, serialized the pipeline and re-loaded it in Scala as below

      val pipeline = (for (bf <- managed(BundleFile(file))) yield {
        bf.loadMleapBundle().get.root
      }).tried.get.asInstanceOf[Pipeline]

      println(pipeline)

      pipeline.inputSchema.fields.zipWithIndex.foreach { case (field, idx) =>
        println(s"$idx $field")
      }

      val frame = FrameReader().read(new File(getClass.getClassLoader.getResource("frame.airbnb.json").getFile)).get
      val result = pipeline.transform(frame)

      println(result.get)

Using the frame.airbnb.json file for example

  "schema": {
    "fields": [{
      "name": "state",
      "type": "string"
    }, {
      "name": "bathrooms",
      "type": "double"
    }, {
      "name": "square_feet",
      "type": "double"
    }, {
      "name": "bedrooms",
      "type": "double"
    }, {
      "name": "security_deposit",
      "type": "double"
    }, {
      "name": "cleaning_fee",
      "type": "double"
    }, {
      "name": "extra_people",
      "type": "double"
    }, {
      "name": "number_of_reviews",
      "type": "double"
    }, {
      "name": "review_scores_rating",
      "type": "double"
    }, {
      "name": "room_type",
      "type": "string"
    }, {
      "name": "host_is_superhost",
      "type": "string"
    }, {
      "name": "cancellation_policy",
      "type": "string"
    }, {
      "name": "instant_bookable",
      "type": "string"
    }]
  },
  "rows": [["NY", 2.0, 1250.0, 3.0, 50.0, 30.0, 2.0, 56.0, 90.0, "Entire home/apt", "1.0", "strict", "1.0"]]
}

and everything worked fine, am I missing anything?

How are you doing the prediction?

@Ben-Epstein
Copy link
Author

Ben-Epstein commented Mar 16, 2020

@Ben-Epstein is it possible to send me some sample data so that I can try this out?

@ancasarb the dataset is in the code linked above. It's the load_breast_cancer() dataset from sklearn.

Let me know if you need any help!

@Ben-Epstein
Copy link
Author

Is there any update on this?

@ancasarb
Copy link
Member

@Ben-Epstein Looking at the code for the pipeline schema here, it does seem that we use a hashmap to collect the schema for the pipeline, so it could be that the order is different. However, this shouldn't be an issue at scoring. In the serialized vector assembler, I can see

{
  "name": "VectorAssembler_48d5abe041b949877a36",
  "shape": {
    "inputs": [{
      "port": "input0",
      "name": "mean radius"
    }, {
      "port": "input1",
      "name": "mean texture"
    }, {
      "port": "input2",
      "name": "mean perimeter"
    }, {
      "port": "input3",
      "name": "mean area"
    }, {
      "port": "input4",
      "name": "mean smoothness"
    }, {
      "port": "input5",
      "name": "mean compactness"
    }, {
      "port": "input6",
      "name": "mean concavity"
    }, {
      "port": "input7",
      "name": "mean concave points"
    }, {
      "port": "input8",
      "name": "mean symmetry"
    }, {
      "port": "input9",
      "name": "mean fractal dimension"
    }, {
      "port": "input10",
      "name": "radius error"
    }, {
      "port": "input11",
      "name": "texture error"
    }, {
      "port": "input12",
      "name": "perimeter error"
    }, {
      "port": "input13",
      "name": "area error"
    }, {
      "port": "input14",
      "name": "smoothness error"
    }, {
      "port": "input15",
      "name": "compactness error"
    }, {
      "port": "input16",
      "name": "concavity error"
    }, {
      "port": "input17",
      "name": "concave points error"
    }, {
      "port": "input18",
      "name": "symmetry error"
    }, {
      "port": "input19",
      "name": "fractal dimension error"
    }, {
      "port": "input20",
      "name": "worst radius"
    }, {
      "port": "input21",
      "name": "worst texture"
    }, {
      "port": "input22",
      "name": "worst perimeter"
    }, {
      "port": "input23",
      "name": "worst area"
    }, {
      "port": "input24",
      "name": "worst smoothness"
    }, {
      "port": "input25",
      "name": "worst compactness"
    }, {
      "port": "input26",
      "name": "worst concavity"
    }, {
      "port": "input27",
      "name": "worst concave points"
    }, {
      "port": "input28",
      "name": "worst symmetry"
    }, {
      "port": "input29",
      "name": "worst fractal dimension"
    }],
    "outputs": [{
      "port": "output",
      "name": "features"
    }]
  }
}

so the order is correct.

If then create a leap frame

 "schema": {
    "fields": [{
      "name": "mean texture",
      "type": "double"
    }, {
      "name": "concavity error",
      "type": "double"
    }, 
   .....
]
  },
  "rows": [[23.95, 0.05051, ....]]
}

then the model scores fine, even if the order that we provide in the json is different than the one from the schema, or the ordering of the columns in the training data. As long as the "fields" and "rows" in the json respect the same ordering (any ordering) the scoring will be correct.

Hope this helps, let me know if you have further questions.

@ancasarb
Copy link
Member

Closing this in preparation of 0.16.0 release, please reopen if you still have questions, hopefully the clarification above makes sense.

@Ben-Epstein
Copy link
Author

@ancasarb thank you for checking the code, however I'm still getting incorrect predictions with the code above. Did you run it and confirm that predictions on the MLeap transformer match the original spark model? All of the code necessary is above.

@ancasarb ancasarb reopened this May 17, 2020
@ancasarb
Copy link
Member

@Ben-Epstein Yes, I ran it without issues. Could you please share a leap frame that you use for the scoring please?

@bhrigs
Copy link

bhrigs commented Aug 24, 2020

Worked fine for me with the latest version ml.combust.mleap:mleap-spark_2.11:0.16.0

@ancasarb
Copy link
Member

closing this, please re-open if you're still struggling with it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants