Skip to content

Commit

Permalink
Added evaluation metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
Ling-Ling Zhang authored and Ling-Ling Zhang committed Jun 6, 2015
1 parent c59095c commit 6502909
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 33 deletions.
1 change: 1 addition & 0 deletions data/data.prop
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
1.useSplitWords=true
1.splitWordsIgnoreRegexp=\\s+
1.splitWordsTokenizerRegexp=[\\p{L}][\\p{L}0-9]*|(?:\\$ ?)?[0-9]+(?:\\.[0-9]{2})?%?|\\s+|[\\x80-\\uFFFD]|.
10 changes: 0 additions & 10 deletions engine.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,6 @@
"appName": "CoreNLP"
}
},
"serving": {
"params": {
"filepath": "data/sample_disabled_items.txt"
}
},
"preparator": {
"params": {
"filepath":"data/sample_not_train_data.txt"
}
},
"algorithms": [
{
"name": "als",
Expand Down
53 changes: 50 additions & 3 deletions src/main/scala/DataSource.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,8 @@ package org.template.classification

import io.prediction.controller.PDataSource
import io.prediction.controller.EmptyEvaluationInfo
import io.prediction.controller.EmptyActualResult
import io.prediction.controller.Params
import io.prediction.data.storage.Event
import io.prediction.data.storage.Storage
import io.prediction.data.store.PEventStore

import org.apache.spark.SparkContext
Expand All @@ -18,7 +16,7 @@ case class DataSourceParams(val appName: String) extends Params

class DataSource(val dsp: DataSourceParams)
extends PDataSource[TrainingData,
EmptyEvaluationInfo, Query, EmptyActualResult] {
EmptyEvaluationInfo, Query, ActualResult] {

@transient lazy val logger = Logger[this.type]

Expand Down Expand Up @@ -61,6 +59,55 @@ class DataSource(val dsp: DataSourceParams)

new TrainingData(labeledPoints)
}
override
def readEval(sc: SparkContext)
: Seq[(TrainingData, EmptyEvaluationInfo, RDD[(Query, ActualResult)])] = {
val eventsRDD: RDD[Event] = PEventStore.find(
appName = dsp.appName,
entityType = Some("question"),
eventNames = Some(List("twitter"))
)(sc).cache()

val labeledPoints: RDD[TextClass] = eventsRDD
.filter {event => event.event == "twitter"}
.map { event =>

try {
TextClass(
text_type = event.entityId,
text = event.properties.get[String]("text"),
gender = event.properties.getOpt[String]("gender"),
dizzy = event.properties.getOpt[String]("dizziness"),
convul = event.properties.getOpt[String]("convulsions"),
heart = event.properties.getOpt[String]("heart_palpitation"),
breath = event.properties.getOpt[String]("shortness_of_breath"),
head = event.properties.getOpt[String]("headaches"),
effect = event.properties.getOpt[String]("effect_decreased"),
allergy = event.properties.getOpt[String]("allergies_worse"),
bad = event.properties.getOpt[String]("bad_interaction"),
nausea = event.properties.getOpt[String]("nausea"),
insomnia = event.properties.getOpt[String]("insomnia")
)
} catch {
case e: Exception =>
logger.error(s"Cannot convert ${event} to TextClass." +
s" Exception: ${e}.")
throw e
}
}.cache()

(0 until 1).map { idx =>
val random = idx
(
new TrainingData(labeledPoints),
new EmptyEvaluationInfo(),
labeledPoints.map {
p => (new Query(p.text, p.gender, p.dizzy, p.convul, p.heart, p.breath, p.head, p.effect, p.allergy, p.bad, p.nausea, p.insomnia),
new ActualResult(p.text_type))
}
)
}
}
}

case class TextClass(
Expand Down
34 changes: 19 additions & 15 deletions src/main/scala/Engine.scala
Original file line number Diff line number Diff line change
@@ -1,28 +1,32 @@
package org.template.classification

import io.prediction.controller.IEngineFactory
import io.prediction.controller.EngineFactory
import io.prediction.controller.Engine

case class Query(
class Query(
val text: String,
val gender: Option[Set[String]],
val dizziness: Option[Set[String]],
val convulsions: Option[Set[String]],
val heart_palpitation: Option[Set[String]],
val shortness_of_breath: Option[Set[String]],
val headaches: Option[Set[String]],
val effect_decreased: Option[Set[String]],
val allergies_worse: Option[Set[String]],
val bad_interaction: Option[Set[String]],
val nausea: Option[Set[String]],
val insomnia: Option[Set[String]]
val gender: Option[String],
val dizziness: Option[String],
val convulsions: Option[String],
val heart_palpitation: Option[String],
val shortness_of_breath: Option[String],
val headaches: Option[String],
val effect_decreased: Option[String],
val allergies_worse: Option[String],
val bad_interaction: Option[String],
val nausea: Option[String],
val insomnia: Option[String]
) extends Serializable

case class PredictedResult(
class PredictedResult(
val queryResults: String
) extends Serializable

object ClassificationEngine extends IEngineFactory {
class ActualResult(
val queryResults: String
) extends Serializable

object ClassificationEngine extends EngineFactory {
def apply() = {
new Engine(
classOf[DataSource],
Expand Down
7 changes: 4 additions & 3 deletions src/main/scala/Model.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ import org.apache.spark.SparkContext
import edu.stanford.nlp.classify.Classifier
import edu.stanford.nlp.classify.ColumnDataClassifier

class Model(
val cl: Classifier[String, String])
class Model(val cl: Classifier[String, String])
extends Serializable
{
def save(id: String, params: AlgorithmParams,
sc: SparkContext): Boolean = {
Expand All @@ -28,7 +28,8 @@ class Model(
}

object Model
extends IPersistentModelLoader[AlgorithmParams, Model] {
extends IPersistentModelLoader[AlgorithmParams, Model]
with Serializable {
def apply(id: String, params: AlgorithmParams,
sc: Option[SparkContext]) = {
new Model(
Expand Down
17 changes: 15 additions & 2 deletions src/main/scala/NLPAlgorithm.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,12 @@ case class AlgorithmParams(
val lambda: Double) extends Params

class NLPAlgorithm(val ap: AlgorithmParams)
extends PAlgorithm[PreparedData, Model, Query, PredictedResult] {
extends PAlgorithm[PreparedData, Model, Query, PredictedResult]
with Serializable {

@transient lazy val logger = Logger[this.type]
val cdc = new ColumnDataClassifier("data/data.prop");
var cl: Classifier[String, String] = null

def train(sc: SparkContext, data: PreparedData): Model = {
// MLLib ALS cannot handle empty training data.
Expand Down Expand Up @@ -63,6 +65,7 @@ class NLPAlgorithm(val ap: AlgorithmParams)
*/

val classifier = cdc.makeClassifier(cdc.readTrainingExamples("corenlpData"))
cl = classifier
new Model(cl = classifier)
}

Expand Down Expand Up @@ -101,6 +104,16 @@ class NLPAlgorithm(val ap: AlgorithmParams)

val d = cdc.makeDatumFromLine(line)

new PredictedResult(query.text + " ==> " + cl.classOf(d))
new PredictedResult(cl.classOf(d))
}

override
def batchPredict(model: Model, queries: RDD[(Long, Query)]): RDD[(Long, PredictedResult)] = {

val results = queries.mapValues(query => new PredictedResult(model.cl.classOf(new ColumnDataClassifier("data/data.prop").makeDatumFromLine("\t" + query.text + "\t"))))
results
}

//val mapFun = (query: Query) => new PredictedResult(cl.classOf(cdc.makeDatumFromLine("\t" + query.text + "\t")))

}

0 comments on commit 6502909

Please sign in to comment.