From 80d0cb616a696701b631b3596ab09b94049306d2 Mon Sep 17 00:00:00 2001 From: aleksei Date: Thu, 2 Nov 2017 00:08:32 +0300 Subject: [PATCH 1/8] Workd Embeddigns based on RocksDB --- build.sbt | 3 +- .../johnsnowlabs/ml/crf/DatasetEncoder.scala | 8 +- .../johnsnowlabs/ml/crf/DatasetReader.scala | 31 +++-- .../annotators/ner/crf/FeatureGenerator.scala | 63 ++++++---- ...CrfBasedNer.scala => NerCrfApproach.scala} | 75 ++++++++++-- ...fBasedNerModel.scala => NerCrfModel.scala} | 34 +++--- .../com/johnsnowlabs/nlp/datasets/CoNLL.scala | 103 ++++++++++++++++ .../AnnotatorWithWordEmbeddings.scala | 92 ++++++++++++++ .../embeddings/ModelWithWordEmbeddings.scala | 101 ++++++++++++++++ .../nlp/embeddings/WordEmbeddings.scala | 102 ++++++++++++++++ .../nlp/embeddings/WordEmbeddingsFormat.scala | 9 ++ .../com/johnsnowlabs/nlp/util/LruMap.scala | 38 ++++++ .../nlp/util/SparkNlpConfigKeys.scala | 10 ++ .../resources/ner-corpus/test_ner_dataset.txt | 8 ++ .../ml/crf/CoNLL2003CrfTest.scala | 112 +++++++++++++----- .../ml/crf/CoNLL2003PipelineTest.scala | 90 ++------------ .../johnsnowlabs/ml/crf/TestDatasets.scala | 4 +- .../johnsnowlabs/nlp/AnnotatorBuilder.scala | 11 +- .../com/johnsnowlabs/nlp/DataBuilder.scala | 5 +- .../com/johnsnowlabs/nlp/SparkAccessor.scala | 2 +- ...ineSpec.scala => NerCrfApproachSpec.scala} | 36 ++++-- 21 files changed, 742 insertions(+), 195 deletions(-) rename src/main/scala/com/johnsnowlabs/nlp/annotators/ner/crf/{CrfBasedNer.scala => NerCrfApproach.scala} (55%) rename src/main/scala/com/johnsnowlabs/nlp/annotators/ner/crf/{CrfBasedNerModel.scala => NerCrfModel.scala} (77%) create mode 100644 src/main/scala/com/johnsnowlabs/nlp/datasets/CoNLL.scala create mode 100644 src/main/scala/com/johnsnowlabs/nlp/embeddings/AnnotatorWithWordEmbeddings.scala create mode 100644 src/main/scala/com/johnsnowlabs/nlp/embeddings/ModelWithWordEmbeddings.scala create mode 100644 src/main/scala/com/johnsnowlabs/nlp/embeddings/WordEmbeddings.scala create mode 100644 src/main/scala/com/johnsnowlabs/nlp/embeddings/WordEmbeddingsFormat.scala create mode 100644 src/main/scala/com/johnsnowlabs/nlp/util/LruMap.scala create mode 100644 src/main/scala/com/johnsnowlabs/nlp/util/SparkNlpConfigKeys.scala create mode 100644 src/test/resources/ner-corpus/test_ner_dataset.txt rename src/test/scala/com/johnsnowlabs/nlp/annotators/ner/crf/{CrfBasedNerPipelineSpec.scala => NerCrfApproachSpec.scala} (63%) diff --git a/build.sbt b/build.sbt index c7ca7691084ad5..2bf9bbaa2b76d6 100644 --- a/build.sbt +++ b/build.sbt @@ -55,7 +55,8 @@ lazy val testDependencies = Seq( ) lazy val utilDependencies = Seq( - "com.typesafe" % "config" % "1.3.0" + "com.typesafe" % "config" % "1.3.0", + "org.rocksdb" % "rocksdbjni" % "5.8.0" ) lazy val root = (project in file(".")) diff --git a/src/main/scala/com/johnsnowlabs/ml/crf/DatasetEncoder.scala b/src/main/scala/com/johnsnowlabs/ml/crf/DatasetEncoder.scala index 4a460c6e205cda..3b3b367abfb9d1 100644 --- a/src/main/scala/com/johnsnowlabs/ml/crf/DatasetEncoder.scala +++ b/src/main/scala/com/johnsnowlabs/ml/crf/DatasetEncoder.scala @@ -71,7 +71,7 @@ class DatasetEncoder(val startLabel: String = "@#Start") { def getFeatures(prevLabel: String = startLabel, label: String, binaryAttrs: Seq[String], - numAttrs: Seq[(String, Float)]): (Int, SparseArray) = { + numAttrs: Seq[Float]): (Int, SparseArray) = { val labelId = getLabel(label) val binFeature = binaryAttrs.map{attr => @@ -80,8 +80,8 @@ class DatasetEncoder(val startLabel: String = "@#Start") { (attrId, 1f) } - val numFeatures = numAttrs.map{case(attr, value) => { - val attrId = getAttr(attr, true) + val numFeatures = numAttrs.zipWithIndex.map{case(value, idx) => { + val attrId = getAttr("num" + idx, true) addAttrFeature(labelId, attrId, value) (attrId, value) }} @@ -128,4 +128,4 @@ class DatasetEncoder(val startLabel: String = "@#Start") { result } } -} +} \ No newline at end of file diff --git a/src/main/scala/com/johnsnowlabs/ml/crf/DatasetReader.scala b/src/main/scala/com/johnsnowlabs/ml/crf/DatasetReader.scala index f29016e6be2019..09e3c5a4fefab4 100644 --- a/src/main/scala/com/johnsnowlabs/ml/crf/DatasetReader.scala +++ b/src/main/scala/com/johnsnowlabs/ml/crf/DatasetReader.scala @@ -1,14 +1,17 @@ package com.johnsnowlabs.ml.crf import java.io.FileInputStream + import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream + +import scala.collection.TraversableOnce import scala.collection.mutable.ArrayBuffer import scala.io.Source case class TextSentenceLabels(labels: Seq[String]) case class TextSentenceAttrs(words: Seq[WordAttrs]) -case class WordAttrs(attrs: Seq[(String, String)]) +case class WordAttrs(strAttrs: Seq[(String, String)], numAttrs: Array[Float] = Array.empty) object DatasetReader { @@ -23,7 +26,7 @@ object DatasetReader { } } - private def readWithLabels(file: String, skipLines: Int = 0): Iterator[(TextSentenceLabels, TextSentenceAttrs)] = { + private def readWithLabels(file: String, skipLines: Int = 0): TraversableOnce[(TextSentenceLabels, TextSentenceAttrs)] = { val lines = getSource(file) .getLines() .drop(skipLines) @@ -66,22 +69,22 @@ object DatasetReader { } } - def encodeDataset(source: Iterator[(TextSentenceLabels, TextSentenceAttrs)]): CrfDataset = { + def encodeDataset(source: TraversableOnce[(TextSentenceLabels, TextSentenceAttrs)]): CrfDataset = { val metadata = new DatasetEncoder() val instances = source.map{case (textLabels, textSentence) => var prevLabel = metadata.startLabel val (labels, features) = textLabels.labels.zip(textSentence.words) .map{case (label, word) => - val attrs = word.attrs.map(a => a._1 + "=" + a._2) - val (labelId, features) = metadata.getFeatures(prevLabel, label, attrs, Seq.empty) + val attrs = word.strAttrs.map(a => a._1 + "=" + a._2) + val (labelId, features) = metadata.getFeatures(prevLabel, label, attrs, word.numAttrs) prevLabel = label (labelId, features) }.unzip (InstanceLabels(labels), Instance(features)) - }.toList + }.toArray CrfDataset(instances, metadata.getMetadata) } @@ -93,12 +96,20 @@ object DatasetReader { def encodeSentence(sentence: TextSentenceAttrs, metadata: DatasetMetadata): Instance = { val items = sentence.words.map{word => - val attrIds = word.attrs.flatMap { case (name, value) => + val strAttrs = word.strAttrs.flatMap { case (name, value) => val key = name + "=" + value metadata.attr2Id.get(key) + }.map((_, 1f)) + + val numAttrs = word.numAttrs.zipWithIndex.flatMap {case(value, idx) => + val key = "num" + idx + val attr = metadata.attr2Id.get(key) + attr.map(attrName => (attrName, value)) } - val attrValues = attrIds.sortBy(id => id).distinct.map(id => (id, 1f)).toArray + val id2value = strAttrs ++ numAttrs + + val attrValues = id2value.sortBy(id => id._1).distinct.toArray new SparseArray(attrValues) } @@ -111,7 +122,7 @@ object DatasetReader { encodeDataset(textDataset) } - def readAndEncode(file: String, skipLines: Int, metadata: DatasetMetadata): Iterator[(InstanceLabels, Instance)] = { + def readAndEncode(file: String, skipLines: Int, metadata: DatasetMetadata): TraversableOnce[(InstanceLabels, Instance)] = { val textDataset = readWithLabels(file, skipLines) textDataset.map{case (sourceLabels, sourceInstance) => @@ -121,5 +132,3 @@ object DatasetReader { } } } - - diff --git a/src/main/scala/com/johnsnowlabs/nlp/annotators/ner/crf/FeatureGenerator.scala b/src/main/scala/com/johnsnowlabs/nlp/annotators/ner/crf/FeatureGenerator.scala index bcc21ed82a2848..df35a1072543c1 100644 --- a/src/main/scala/com/johnsnowlabs/nlp/annotators/ner/crf/FeatureGenerator.scala +++ b/src/main/scala/com/johnsnowlabs/nlp/annotators/ner/crf/FeatureGenerator.scala @@ -2,19 +2,33 @@ package com.johnsnowlabs.nlp.annotators.ner.crf import com.johnsnowlabs.ml.crf._ import com.johnsnowlabs.nlp.annotators.common.TaggedSentence +import com.johnsnowlabs.nlp.embeddings.WordEmbeddings + import scala.collection.mutable + /** * Generates features for CrfBasedNer */ -case class FeatureGenerator(dictFeatures: DictionaryFeatures) { +case class FeatureGenerator(dictFeatures: DictionaryFeatures, + embeddings: Option[WordEmbeddings] = None) { + + val emptyEmbedding = if (embeddings.isEmpty) Array.empty[Float] else Array.fill[Float](embeddings.get.nDims)(0f) + + def getEmbeddings(token: String): Array[Float] = { + if (embeddings.isEmpty) { + emptyEmbedding + } else { + embeddings.get.getEmbeddings(token) + } + } val shapeEncoding = Map( - '.' -> '.', ',' -> '.', - ':' -> ':', ';' -> ':', '?' -> ':', '!' -> ':', - '-' -> '-', '+' -> '-', '*' -> '-', '/' -> '-', '=' -> '-', '|' -> '-', '_' -> '-', '%' -> '-', - '(' -> '(', '{' -> '(', '[' -> '(', '<' -> '(', - ')' -> ')', '}' -> ')', ']' -> ')', '>' -> ')' + '.' -> '.', ',' -> '.', + ':' -> ':', ';' -> ':', '?' -> ':', '!' -> ':', + '-' -> '-', '+' -> '-', '*' -> '-', '/' -> '-', '=' -> '-', '|' -> '-', '_' -> '-', '%' -> '-', + '(' -> '(', '{' -> '(', '[' -> '(', '<' -> '(', + ')' -> ')', '}' -> ')', ']' -> ')', '>' -> ')' ) def getShape(token: String) = { @@ -149,14 +163,14 @@ case class FeatureGenerator(dictFeatures: DictionaryFeatures) { def getSuffix(token: String, size: Int, default: String = "") = { if (token.length >= size) - token.substring(token.length - size) + token.substring(token.length - size).toLowerCase else default } def getPrefix(token: String, size: Int, default: String = "") = { if (token.length >= size) - token.substring(0, size) + token.substring(0, size).toLowerCase else default } @@ -219,7 +233,7 @@ case class FeatureGenerator(dictFeatures: DictionaryFeatures) { val f = fillFeatures(word) f("pos") = tag f - } + } val words = wordFeatures.length @@ -229,12 +243,12 @@ case class FeatureGenerator(dictFeatures: DictionaryFeatures) { val pairAttrs = (-window until window) .filter(j => isInRange(i + j, words) && isInRange(i + j + 1, words)) .flatMap(j => - pairs.map{name => - val feature = getName(name, j, j + 1) - val value1 = wordFeatures(i + j).getOrElse(name, "") - val value2 = wordFeatures(i + j + 1).getOrElse(name, "") - (feature, value1 + "|" + value2) - } + pairs.map{name => + val feature = getName(name, j, j + 1) + val value1 = wordFeatures(i + j).getOrElse(name, "") + val value2 = wordFeatures(i + j + 1).getOrElse(name, "") + (feature, value1 + "|" + value2) + } ).toArray val unoAttrs = (-window to window) @@ -253,20 +267,23 @@ case class FeatureGenerator(dictFeatures: DictionaryFeatures) { else if (i == words - 1) Array(("_EOS_", "")) else Array.empty[(String, String)] - WordAttrs(pairAttrs ++ unoAttrs ++ dictAttrs ++ addition) + val binAttrs = pairAttrs ++ unoAttrs ++ dictAttrs ++ addition + + val numAttrs = getEmbeddings(taggedSentence.words(i)) + + WordAttrs(binAttrs, numAttrs) } TextSentenceAttrs(attrs) } - def generateDataset(sentences: Iterator[(TextSentenceLabels, TaggedSentence)], - dictFeatures: DictionaryFeatures): CrfDataset = { + def generateDataset(sentences: TraversableOnce[(TextSentenceLabels, TaggedSentence)]): CrfDataset = { val textDataset = sentences .filter(p => p._2.words.length > 0) .map{case (labels, sentence) => { - val textSentence = generate(sentence) - (labels, textSentence) - }} + val textSentence = generate(sentence) + (labels, textSentence) + }} DatasetReader.encodeDataset(textDataset) } @@ -276,6 +293,4 @@ case class FeatureGenerator(dictFeatures: DictionaryFeatures) { DatasetReader.encodeSentence(attrSentence, metadata) } -} - - +} \ No newline at end of file diff --git a/src/main/scala/com/johnsnowlabs/nlp/annotators/ner/crf/CrfBasedNer.scala b/src/main/scala/com/johnsnowlabs/nlp/annotators/ner/crf/NerCrfApproach.scala similarity index 55% rename from src/main/scala/com/johnsnowlabs/nlp/annotators/ner/crf/CrfBasedNer.scala rename to src/main/scala/com/johnsnowlabs/nlp/annotators/ner/crf/NerCrfApproach.scala index 05021d5ea86184..c7a5a7e9c44159 100644 --- a/src/main/scala/com/johnsnowlabs/nlp/annotators/ner/crf/CrfBasedNer.scala +++ b/src/main/scala/com/johnsnowlabs/nlp/annotators/ner/crf/NerCrfApproach.scala @@ -1,17 +1,25 @@ package com.johnsnowlabs.nlp.annotators.ner.crf -import com.johnsnowlabs.ml.crf.{CrfParams, LinearChainCrf, Verbose} -import com.johnsnowlabs.nlp.AnnotatorApproach +import com.johnsnowlabs.ml.crf.{CrfParams, LinearChainCrf, TextSentenceLabels, Verbose} +import com.johnsnowlabs.nlp.{AnnotatorApproach, AnnotatorType, DocumentAssembler} import com.johnsnowlabs.nlp.AnnotatorType.{DOCUMENT, NAMED_ENTITY, POS, TOKEN} +import com.johnsnowlabs.nlp.annotators.RegexTokenizer +import com.johnsnowlabs.nlp.annotators.common.Annotated.PosTaggedSentence import com.johnsnowlabs.nlp.annotators.common.NerTagged +import com.johnsnowlabs.nlp.annotators.pos.perceptron.PerceptronApproach +import com.johnsnowlabs.nlp.annotators.sbd.pragmatic.SentenceDetectorModel +import com.johnsnowlabs.nlp.datasets.CoNLL +import com.johnsnowlabs.nlp.embeddings.AnnotatorWithWordEmbeddings +import org.apache.spark.ml.Pipeline import org.apache.spark.ml.param.{DoubleParam, IntParam, Param, StringArrayParam} import org.apache.spark.ml.util.{DefaultParamsReadable, Identifiable} -import org.apache.spark.sql.Dataset +import org.apache.spark.sql.{DataFrame, Dataset} /* Algorithm for training Named Entity Recognition Model. */ -class CrfBasedNer(override val uid: String) extends AnnotatorApproach[CrfBasedNerModel]{ +class NerCrfApproach(override val uid: String) extends AnnotatorApproach[NerCrfModel] + with AnnotatorWithWordEmbeddings { def this() = this(Identifiable.randomUID("NER")) override val description = "CRF based Named Entity Recognition Tagger" @@ -33,6 +41,9 @@ class CrfBasedNer(override val uid: String) extends AnnotatorApproach[CrfBasedNe val verbose = new IntParam(this, "verbose", "Level of verbosity during training") val randomSeed = new IntParam(this, "randomSeed", "Random seed") + val datasetPath = new Param[String](this, "datasetPath", "Path to dataset. " + + "If path is empty will use dataset passed to train as usual Spark Pipeline stage") + def setLabelColumn(column: String) = set(labelColumn, column) def setEntities(tags: Array[String]) = set(entities, tags) @@ -49,6 +60,8 @@ class CrfBasedNer(override val uid: String) extends AnnotatorApproach[CrfBasedNe def setVerbose(verbose: Verbose.Level) = set(this.verbose, verbose.id) def setRandomSeed(seed: Int) = set(randomSeed, seed) + def setDatsetPath(path: String) = set(datasetPath, path) + setDefault( minEpochs -> 0, maxEpochs -> 1000, @@ -58,15 +71,55 @@ class CrfBasedNer(override val uid: String) extends AnnotatorApproach[CrfBasedNe verbose -> Verbose.Silent.id ) - override def train(dataset: Dataset[_]): CrfBasedNerModel = { - val rows = dataset.toDF() + private def getTrainDataframe(dataset: Dataset[_]): DataFrame = { + + if (!isDefined(datasetPath)) + return dataset.toDF() + + val documentAssembler = new DocumentAssembler() + .setInputCol("text") + .setOutputCol("document") + + val sentenceDetector = new SentenceDetectorModel() + .setCustomBoundChars(Array("\n\n")) + .setInputCols(Array("document")) + .setOutputCol("sentence") + + val tokenizer = new RegexTokenizer() + .setInputCols(Array("document")) + .setOutputCol("token") + + val posTagger = new PerceptronApproach() + .setCorpusPath("/anc-pos-corpus/") + .setNIterations(10) + .setInputCols("token", "document") + .setOutputCol("pos") + + val pipeline = new Pipeline().setStages( + Array( + documentAssembler, + sentenceDetector, + tokenizer, + posTagger) + ) + + val reader = CoNLL(3, AnnotatorType.NAMED_ENTITY) + val dataframe = reader.readDataset($(datasetPath), dataset.sparkSession).toDF + pipeline.fit(dataframe).transform(dataframe) + } + + + override def train(dataset: Dataset[_]): NerCrfModel = { + + val rows = getTrainDataframe(dataset) - val trainDataset = NerTagged.collectTrainingInstances(rows, getInputCols, $(labelColumn)) + val trainDataset: Array[(TextSentenceLabels, PosTaggedSentence)] = NerTagged.collectTrainingInstances(rows, getInputCols, $(labelColumn)) val dictPaths = get(dicts).getOrElse(Array.empty[String]) val dictFeatures = DictionaryFeatures.read(dictPaths.toSeq) - val crfDataset = FeatureGenerator(dictFeatures).generateDataset(trainDataset.toIterator, dictFeatures) + val crfDataset = FeatureGenerator(dictFeatures, embeddings) + .generateDataset(trainDataset) val params = CrfParams( minEpochs = getOrDefault(minEpochs), @@ -83,7 +136,7 @@ class CrfBasedNer(override val uid: String) extends AnnotatorApproach[CrfBasedNe val crf = new LinearChainCrf(params) val crfModel = crf.trainSGD(crfDataset) - var model = new CrfBasedNerModel() + var model = new NerCrfModel() .setModel(crfModel) .setDictionaryFeatures(dictFeatures) @@ -93,8 +146,8 @@ class CrfBasedNer(override val uid: String) extends AnnotatorApproach[CrfBasedNe if (isDefined(minW)) model = model.shrink($(minW).toFloat) - model + fillModelEmbeddings(model) } } -object CrfBasedNer extends DefaultParamsReadable[CrfBasedNer] \ No newline at end of file +object NerCrfApproach extends DefaultParamsReadable[NerCrfApproach] \ No newline at end of file diff --git a/src/main/scala/com/johnsnowlabs/nlp/annotators/ner/crf/CrfBasedNerModel.scala b/src/main/scala/com/johnsnowlabs/nlp/annotators/ner/crf/NerCrfModel.scala similarity index 77% rename from src/main/scala/com/johnsnowlabs/nlp/annotators/ner/crf/CrfBasedNerModel.scala rename to src/main/scala/com/johnsnowlabs/nlp/annotators/ner/crf/NerCrfModel.scala index 240d10331edf92..04ec15c0529eba 100644 --- a/src/main/scala/com/johnsnowlabs/nlp/annotators/ner/crf/CrfBasedNerModel.scala +++ b/src/main/scala/com/johnsnowlabs/nlp/annotators/ner/crf/NerCrfModel.scala @@ -4,6 +4,7 @@ import com.johnsnowlabs.ml.crf.{LinearChainCrfModel, SerializedLinearChainCrfMod import com.johnsnowlabs.nlp.AnnotatorType._ import com.johnsnowlabs.nlp.annotators.common.{IndexedTaggedWord, NerTagged, PosTagged, TaggedSentence} import com.johnsnowlabs.nlp.annotators.common.Annotated.{NerTaggedSentence, PosTaggedSentence} +import com.johnsnowlabs.nlp.embeddings.ModelWithWordEmbeddings import com.johnsnowlabs.nlp.{Annotation, AnnotatorModel} import org.apache.hadoop.fs.Path import org.apache.spark.ml.param.StringArrayParam @@ -14,8 +15,8 @@ import org.apache.spark.sql.{Encoders, Row} /* Named Entity Recognition model */ -class CrfBasedNerModel (override val uid: String) - extends AnnotatorModel[CrfBasedNerModel] { +class NerCrfModel(override val uid: String) + extends AnnotatorModel[NerCrfModel] with ModelWithWordEmbeddings{ def this() = this(Identifiable.randomUID("NER")) @@ -23,7 +24,7 @@ class CrfBasedNerModel (override val uid: String) var model: Option[LinearChainCrfModel] = None var dictionaryFeatures = DictionaryFeatures(Seq.empty) - def setModel(crf: LinearChainCrfModel): CrfBasedNerModel = { + def setModel(crf: LinearChainCrfModel): NerCrfModel = { model = Some(crf) this } @@ -33,10 +34,10 @@ class CrfBasedNerModel (override val uid: String) this } - def setEntities(toExtract: Array[String]): CrfBasedNerModel = set(entities, toExtract) + def setEntities(toExtract: Array[String]): NerCrfModel = set(entities, toExtract) /** - Predicts Named Entities in input sentences + Predicts Named Entities in input sentences * @param sentences POS tagged sentences. * @return sentences with recognized Named Entities */ @@ -45,8 +46,9 @@ class CrfBasedNerModel (override val uid: String) val crf = model.get + val fg = FeatureGenerator(dictionaryFeatures, embeddings) sentences.map{sentence => - val instance = FeatureGenerator(dictionaryFeatures).generate(sentence, crf.metadata) + val instance = fg.generate(sentence, crf.metadata) val labelIds = crf.predict(instance) val words = sentence.indexedTaggedWords .zip(labelIds.labels) @@ -71,7 +73,7 @@ class CrfBasedNerModel (override val uid: String) NerTagged.pack(taggedSentences) } - def shrink(minW: Float): CrfBasedNerModel = { + def shrink(minW: Float): NerCrfModel = { model = model.map(m => m.shrink(minW)) this } @@ -80,16 +82,16 @@ class CrfBasedNerModel (override val uid: String) override val annotatorType: AnnotatorType = NAMED_ENTITY - override def write: MLWriter = new CrfBasedNerModel.CrfBasedNerModelWriter(this, super.write) + override def write: MLWriter = new NerCrfModel.NerCrfModelWriter(this, super.write) } -object CrfBasedNerModel extends DefaultParamsReadable[CrfBasedNerModel] { +object NerCrfModel extends DefaultParamsReadable[NerCrfModel] { implicit val crfEncoder = Encoders.kryo[SerializedLinearChainCrfModel] - override def read: MLReader[CrfBasedNerModel] = new CrfBasedNerModelReader(super.read) + override def read: MLReader[NerCrfModel] = new NerCrfModelReader(super.read) - class CrfBasedNerModelReader(baseReader: MLReader[CrfBasedNerModel]) extends MLReader[CrfBasedNerModel] { - override def load(path: String): CrfBasedNerModel = { + class NerCrfModelReader(baseReader: MLReader[NerCrfModel]) extends MLReader[NerCrfModel] { + override def load(path: String): NerCrfModel = { val instance = baseReader.load(path) val dataPath = new Path(path, "data").toString @@ -116,10 +118,13 @@ object CrfBasedNerModel extends DefaultParamsReadable[CrfBasedNerModel] { instance .setModel(crfModel.deserialize) .setDictionaryFeatures(dictFeatures) + + instance.deserializeEmbeddings(path) + instance } } - class CrfBasedNerModelWriter(model: CrfBasedNerModel, baseWriter: MLWriter) extends MLWriter { + class NerCrfModelWriter(model: NerCrfModel, baseWriter: MLWriter) extends MLWriter { override protected def saveImpl(path: String): Unit = { require(model.model.isDefined, "Crf Model must be defined before serialization") @@ -136,7 +141,8 @@ object CrfBasedNerModel extends DefaultParamsReadable[CrfBasedNerModel] { val dictPath = new Path(path, "dict").toString val dictLines = model.dictionaryFeatures.dict.toSeq.map(p => p._1 + ":" + p._2) Seq(dictLines).toDS.write.mode("overwrite").parquet(dictPath) + + model.serializeEmbeddings(path) } } } - diff --git a/src/main/scala/com/johnsnowlabs/nlp/datasets/CoNLL.scala b/src/main/scala/com/johnsnowlabs/nlp/datasets/CoNLL.scala new file mode 100644 index 00000000000000..58b7df1f69e770 --- /dev/null +++ b/src/main/scala/com/johnsnowlabs/nlp/datasets/CoNLL.scala @@ -0,0 +1,103 @@ +package com.johnsnowlabs.nlp.datasets + + +import com.johnsnowlabs.nlp.{Annotation, AnnotatorType} +import com.johnsnowlabs.nlp.annotators.common.{IndexedTaggedWord, NerTagged, PosTagged, TaggedSentence} +import org.apache.spark.sql.{Dataset, SparkSession} + +import scala.collection.mutable.ArrayBuffer +import scala.io.Source + +case class CoNLL(targetColumn: Int = 3, annotatorType: String) { + require(annotatorType == AnnotatorType.NAMED_ENTITY || annotatorType == AnnotatorType.POS) + + /* + Reads Dataset in CoNLL format and pack it into docs + */ + def readDocs(file: String): Seq[(String, Seq[TaggedSentence])] = { + val lines = Source.fromFile(file).getLines().toSeq + + readLines(lines) + } + + def readLines(lines: Seq[String]): Seq[(String, Seq[TaggedSentence])] = { + val doc = new StringBuilder() + val tokens = new ArrayBuffer[IndexedTaggedWord]() + val labels = new ArrayBuffer[TaggedSentence]() + + def addSentence(): Unit = { + if (tokens.nonEmpty) { + labels.append(TaggedSentence(tokens.toArray)) + tokens.clear() + } + } + + val docs = lines + .flatMap{line => + val items = line.split(" ") + if (items.nonEmpty && items(0) == "-DOCSTART-") { + addSentence() + + val result = (doc.toString, labels.toList) + doc.clear() + labels.clear() + + if (result._1.nonEmpty) + Some(result._1, result._2) + else + None + } else if (items.length <= 1) { + if (doc.nonEmpty && doc.last != '\n') { + doc.append("\n\n") + addSentence() + } + None + } else + { + if (doc.nonEmpty) + doc.append(" ") + + val begin = doc.length + doc.append(items(0)) + val end = doc.length - 1 + val tag = items(targetColumn) + tokens.append(IndexedTaggedWord(items(0), tag, begin, end)) + None + } + } + + addSentence() + + val last = if (doc.nonEmpty) Seq((doc.toString, labels.toList)) else Seq.empty + + docs ++ last + } + + def pack(sentences: Seq[TaggedSentence]): Seq[Annotation] = { + if (annotatorType == AnnotatorType.NAMED_ENTITY) + NerTagged.pack(sentences) + else + PosTagged.pack(sentences) + } + + def readDataset(file: String, + spark: SparkSession, + textColumn: String = "text", + labelColumn: String = "label"): Dataset[_] = { + + import spark.implicits._ + + readDocs(file).map(p => (p._1, pack(p._2))).toDF(textColumn, labelColumn) + } + + def readDatasetFromLines(lines: Seq[String], + spark: SparkSession, + textColumn: String = "text", + labelColumn: String = "label"): Dataset[_] = { + + import spark.implicits._ + + val seq = readLines(lines).map(p => (p._1, pack(p._2))) + seq.toDF(textColumn, labelColumn) + } +} \ No newline at end of file diff --git a/src/main/scala/com/johnsnowlabs/nlp/embeddings/AnnotatorWithWordEmbeddings.scala b/src/main/scala/com/johnsnowlabs/nlp/embeddings/AnnotatorWithWordEmbeddings.scala new file mode 100644 index 00000000000000..9be8f6011a90e5 --- /dev/null +++ b/src/main/scala/com/johnsnowlabs/nlp/embeddings/AnnotatorWithWordEmbeddings.scala @@ -0,0 +1,92 @@ +package com.johnsnowlabs.nlp.embeddings + +import java.nio.file.Files +import java.util.UUID + +import com.johnsnowlabs.nlp.util.SparkNlpConfigKeys +import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.spark.ml.Estimator +import org.apache.spark.ml.param.{IntParam, Param} +import org.apache.spark.sql.SparkSession + + +trait AnnotatorWithWordEmbeddings extends AutoCloseable { this: Estimator[_] => + val sourceEmbeddingsPath = new Param[String](this, "sourceEmbeddingsPath", "Word embeddings file") + val embeddingsFormat = new IntParam(this, "embeddingsFormat", "Word vectors file format") + val embeddingsNDims = new IntParam(this, "embeddingsNDims", "Number of dimensions for word vectors") + + val embeddingsFolder = new Param[String](this, "embeddingsFolder", + "Folder to store Embeddings Index") + + private val defaultFolder = spark.sparkContext.getConf + .getOption(SparkNlpConfigKeys.embeddingsFolder).getOrElse("embeddings/") + + setDefault(this.embeddingsFolder -> defaultFolder) + + + def setEmbeddingsSource(path: String, nDims: Int, format: WordEmbeddingsFormat.Format) = { + set(this.sourceEmbeddingsPath, path) + set(this.embeddingsFormat, format.id) + set(this.embeddingsNDims, nDims) + } + + def setEmbeddingsFolder(path: String) = set(this.embeddingsFolder, path) + + def fillModelEmbeddings[T <: ModelWithWordEmbeddings](model: T): T = { + if (!isDefined(sourceEmbeddingsPath)) { + return model + } + + val file = "/" + new Path(localPath).getName + val path = Path.mergePaths(new Path($(embeddingsFolder)), new Path(file)) + hdfs.copyFromLocalFile(new Path(localPath), path) + + model.setDims($(embeddingsNDims)) + + model.setIndexPath(path.toUri.toString) + + model + } + + lazy val embeddings: Option[WordEmbeddings] = { + get(sourceEmbeddingsPath).map(_ => WordEmbeddings(localPath, $(embeddingsNDims))) + } + + private lazy val localPath: String = { + val path = Files.createTempDirectory(UUID.randomUUID().toString.takeRight(12) + "_idx") + .toAbsolutePath.toString + + if ($(embeddingsFormat) == WordEmbeddingsFormat.SparkNlp.id) { + hdfs.copyToLocalFile(new Path($(sourceEmbeddingsPath)), new Path(path)) + } else { + indexEmbeddings(path) + } + + path + } + + private lazy val spark: SparkSession = { + SparkSession + .builder() + .getOrCreate() + } + + private lazy val hdfs: FileSystem = { + FileSystem.get(spark.sparkContext.hadoopConfiguration) + } + + private def indexEmbeddings(localFile: String): Unit = { + if ($(embeddingsFormat) == WordEmbeddingsFormat.GloVe.id) { + val lines = spark.sparkContext.textFile($(sourceEmbeddingsPath)).toLocalIterator + WordEmbeddingsIndexer.indexGlove(lines, localFile) + } + else { + require(false, s"Unsupported word embeddings format ${$(embeddingsFormat)}") + } + } + + override def close(): Unit = { + if (embeddings.nonEmpty) + embeddings.get.close() + } +} diff --git a/src/main/scala/com/johnsnowlabs/nlp/embeddings/ModelWithWordEmbeddings.scala b/src/main/scala/com/johnsnowlabs/nlp/embeddings/ModelWithWordEmbeddings.scala new file mode 100644 index 00000000000000..fbbd5d3cea9dab --- /dev/null +++ b/src/main/scala/com/johnsnowlabs/nlp/embeddings/ModelWithWordEmbeddings.scala @@ -0,0 +1,101 @@ +package com.johnsnowlabs.nlp.embeddings + +import java.io.File +import java.nio.file.Files +import java.util.UUID + +import com.johnsnowlabs.nlp.util.SparkNlpConfigKeys +import org.apache.hadoop.fs.{FileSystem, FileUtil, Path} +import org.apache.spark.ml.Model +import org.apache.spark.ml.param.{IntParam, Param} +import org.apache.spark.sql.SparkSession + + +/** + * Trait for models that want to use Word Embeddings + * + * Corresponding Approach have to implement AnnotatorWithWordEmbeddings + */ +trait ModelWithWordEmbeddings extends AutoCloseable { + this: Model[_] => + + val nDims = new IntParam(this, "nDims", "Number of embedding dimensions") + val indexPath = new Param[String](this, "indexPath", "File that stores Index") + + def setDims(nDims: Int) = set(this.nDims, nDims) + def setIndexPath(path: String) = set(this.indexPath, path) + + private lazy val spark = { + SparkSession.builder().getOrCreate() + } + + private lazy val hdfs = { + FileSystem.get(spark.sparkContext.hadoopConfiguration) + } + + private lazy val embeddingsFile: String = { + val localFile = if (!new File($(indexPath)).exists()) { + val localPath = Files.createTempDirectory(UUID.randomUUID().toString.takeRight(12) + "embedddings_idx") + .toAbsolutePath.toString + + hdfs.copyToLocalFile(new Path($(indexPath)), new Path(localPath)) + localPath + } else { + $(indexPath) + } + + val crcFiles = new File(localFile).listFiles().filter(f => f.getName.endsWith(".crc")) + for (file <- crcFiles) { + file.delete() + } + + localFile + } + + lazy val embeddings: Option[WordEmbeddings] = { + get(indexPath).map { path => + WordEmbeddings(embeddingsFile, $(nDims)) + } + } + + override def close(): Unit = { + if (embeddings.nonEmpty) + embeddings.get.close() + } + + def deserializeEmbeddings(path: String): Unit = { + if (isDefined(indexPath)) { + val embeddingsFolder = spark.conf.getOption(SparkNlpConfigKeys.embeddingsFolder) + if (embeddingsFolder.isDefined) { + val dst = new Path(embeddingsFolder.get) + val file = getEmbeddingsSerializedPath(path).getName + + val indexFile = new Path(dst.toString, file) + setIndexPath(indexFile.toString) + } + + try { + // ToDo make files comparision + if (!hdfs.exists(new Path($(indexPath)))) + FileUtil.copy(hdfs, getEmbeddingsSerializedPath(path), hdfs, new Path($(indexPath)), false, spark.sparkContext.hadoopConfiguration) + } + catch { + case e: Exception => + throw new Exception(s"Set spark option ${SparkNlpConfigKeys.embeddingsFolder} to store embeddings", e) + } + } + } + + def serializeEmbeddings(path: String): Unit = { + if (isDefined(indexPath)) { + val dst = getEmbeddingsSerializedPath(path) + if (hdfs.exists(dst)) { + hdfs.delete(dst, true) + } + + hdfs.copyFromLocalFile(new Path(embeddingsFile), dst) + } + } + + def getEmbeddingsSerializedPath(path: String) = Path.mergePaths(new Path(path), new Path("/embeddings")) +} diff --git a/src/main/scala/com/johnsnowlabs/nlp/embeddings/WordEmbeddings.scala b/src/main/scala/com/johnsnowlabs/nlp/embeddings/WordEmbeddings.scala new file mode 100644 index 00000000000000..174738e5dd90d7 --- /dev/null +++ b/src/main/scala/com/johnsnowlabs/nlp/embeddings/WordEmbeddings.scala @@ -0,0 +1,102 @@ +package com.johnsnowlabs.nlp.embeddings + +import java.io.{Closeable, File} +import java.nio.ByteBuffer + +import com.johnsnowlabs.nlp.util.LruMap +import org.fusesource.leveldbjni.JniDBFactory.bytes +import org.rocksdb._ + +import scala.io.Source + + +object WordEmbeddingsIndexer { + + private[embeddings] def toBytes(embeddings: Array[Float]): Array[Byte] = { + val buffer = ByteBuffer.allocate(embeddings.length * 4) + for (value <- embeddings) { + buffer.putFloat(value) + } + buffer.array() + } + + private[embeddings] def fromBytes(source: Array[Byte]): Array[Float] = { + val wrapper = ByteBuffer.wrap(source) + val result = Array.fill[Float](source.length / 4)(0f) + + for (i <- 0 until result.length) { + result(i) = wrapper.getFloat(i * 4) + } + result + } + + def indexGlove(source: Iterator[String], dbFile: String): Unit = { + val options = new Options() + options.setCreateIfMissing(true) + options.setWriteBufferSize(20 * 1 << 20) + + RocksDB.loadLibrary() + val writeOptions = new WriteOptions() + + val db = RocksDB.open(options, dbFile) + var batch = new WriteBatch() + try { + var batchSize = 0 + for (line <- source) { + val items = line.split(" ") + val word = items(0) + val embeddings = items.drop(1).map(i => i.toFloat) + batch.put(bytes(word), toBytes(embeddings)) + + batchSize += 1 + if (batchSize % 1000 == 0) { + db.write(writeOptions, batch) + batch.close() + batch = new WriteBatch() + batchSize == 0 + } + } + + db.write(writeOptions, batch) + batch.close() + } finally { + db.close() + } + } + + def indexGlove(source: String, dbFile: String): Unit = { + val lines = Source.fromFile(source).getLines() + indexGlove(lines, dbFile) + } +} + +case class WordEmbeddings(dbFile: String, + nDims: Int, + cacheSizeMB: Int = 100, + lruCacheSize: Int = 100000) extends Closeable{ + val options = new Options() + options.setRowCache(new LRUCache(cacheSizeMB * 1 << 20)) + RocksDB.loadLibrary() + + val db = RocksDB.openReadOnly(options, dbFile) + + val zeroArray = Array.fill[Float](nDims)(0f) + + val lru = new LruMap[String, Array[Float]](lruCacheSize) + + private def getEmbeddingsFromDb(word: String): Array[Float] = { + val result = db.get(bytes(word.toLowerCase.trim)) + if (result == null) + zeroArray + else + WordEmbeddingsIndexer.fromBytes(result) + } + + def getEmbeddings(word: String): Array[Float] = { + lru.getOrElseUpdate(word, getEmbeddingsFromDb(word)) + } + + override def close(): Unit = { + db.close() + } +} diff --git a/src/main/scala/com/johnsnowlabs/nlp/embeddings/WordEmbeddingsFormat.scala b/src/main/scala/com/johnsnowlabs/nlp/embeddings/WordEmbeddingsFormat.scala new file mode 100644 index 00000000000000..c6f8885c005d3c --- /dev/null +++ b/src/main/scala/com/johnsnowlabs/nlp/embeddings/WordEmbeddingsFormat.scala @@ -0,0 +1,9 @@ +package com.johnsnowlabs.nlp.embeddings + +object WordEmbeddingsFormat extends Enumeration { + type Format = Value + + val SparkNlp = Value(1) + + val GloVe = Value(2) +} diff --git a/src/main/scala/com/johnsnowlabs/nlp/util/LruMap.scala b/src/main/scala/com/johnsnowlabs/nlp/util/LruMap.scala new file mode 100644 index 00000000000000..8e530b4b440c55 --- /dev/null +++ b/src/main/scala/com/johnsnowlabs/nlp/util/LruMap.scala @@ -0,0 +1,38 @@ +package com.johnsnowlabs.nlp.util + +import scala.collection.mutable + + +class LruMap[TKey, TValue](maxCacheSize: Int) { + val cache = mutable.Map[TKey, TValue]() + val lru = mutable.PriorityQueue[KeyPriority]()(KeyPriorityOrdering) + + var counter = 0 + + private def deleteOne(): Unit = { + val oldest = lru.dequeue().key + cache.remove(oldest) + } + + def getOrElseUpdate(key: TKey, valueCreator: => TValue): TValue = { + val oldValue = cache.get(key) + if (oldValue.isDefined) { + oldValue.get + } else { + if (cache.size >= maxCacheSize) + deleteOne() + + val value = valueCreator + cache(key) = value + counter += 1 + lru.enqueue(KeyPriority(key, counter)) + value + } + } + + case class KeyPriority(key: TKey, priority: Int) + + object KeyPriorityOrdering extends Ordering[KeyPriority] { + override def compare(x: KeyPriority, y: KeyPriority): Int = x.priority.compareTo(y.priority) + } +} diff --git a/src/main/scala/com/johnsnowlabs/nlp/util/SparkNlpConfigKeys.scala b/src/main/scala/com/johnsnowlabs/nlp/util/SparkNlpConfigKeys.scala new file mode 100644 index 00000000000000..aaeb53fc2655a1 --- /dev/null +++ b/src/main/scala/com/johnsnowlabs/nlp/util/SparkNlpConfigKeys.scala @@ -0,0 +1,10 @@ +package com.johnsnowlabs.nlp.util + +/** + * Additional configure options that used by spark.nlp + */ +object SparkNlpConfigKeys { + + /** Folder to store word embeddings */ + val embeddingsFolder = "sparknlp.embeddings.folder" +} diff --git a/src/test/resources/ner-corpus/test_ner_dataset.txt b/src/test/resources/ner-corpus/test_ner_dataset.txt new file mode 100644 index 00000000000000..2f77cc28f6013b --- /dev/null +++ b/src/test/resources/ner-corpus/test_ner_dataset.txt @@ -0,0 +1,8 @@ +-DOCSTART- POS O NER +John NNP I-NP PER +Smith NNP I-NP PER +works VBZ I-VP O +at IN I-PP O +Airbus NNP I-NP ORG +Germany NNP B-NP LOC +. . O O \ No newline at end of file diff --git a/src/test/scala/com/johnsnowlabs/ml/crf/CoNLL2003CrfTest.scala b/src/test/scala/com/johnsnowlabs/ml/crf/CoNLL2003CrfTest.scala index 2084416b224a5b..fd13d5b424a61f 100644 --- a/src/test/scala/com/johnsnowlabs/ml/crf/CoNLL2003CrfTest.scala +++ b/src/test/scala/com/johnsnowlabs/ml/crf/CoNLL2003CrfTest.scala @@ -1,88 +1,140 @@ package com.johnsnowlabs.ml.crf -import org.apache.spark.ml.regression.LinearRegression +import java.io.File + +import com.johnsnowlabs.nlp.annotators.common.TaggedSentence +import com.johnsnowlabs.nlp.annotators.ner.crf.{DictionaryFeatures, FeatureGenerator} +import com.johnsnowlabs.nlp.AnnotatorType +import com.johnsnowlabs.nlp.datasets.CoNLL +import com.johnsnowlabs.nlp.embeddings.{WordEmbeddings, WordEmbeddingsIndexer} + +import scala.collection.mutable + /* Before running: 1. Download CoNLLL2003 datasets 2. Set trainFile, testFileA, testFileB to corresponding paths + 3. (Optional) If you wish to use word embeddings then download GLove Word embeddings and unzip it Then script could be run */ object CoNLL2003CrfTest extends App { val folder = "./" - val trainFile = folder + "eng.train.crfsuite" - val testFileA = folder + "eng.testa.crfsuite" - val testFileB = folder + "eng.testb.crfsuite" + val trainFile = folder + "eng.train" + val testFileA = folder + "eng.testa" + val testFileB = folder + "eng.testb" + + val embeddingsDims = 100 + val embeddingsFile = folder + s"glove.6B.${embeddingsDims}d.txt" + val wordEmbeddingsDb = folder + s"embeddings.${embeddingsDims}d.db" + + var wordEmbeddings: Option[WordEmbeddings] = None + + val time = System.nanoTime() + if (new File(embeddingsFile).exists() && !new File(wordEmbeddingsDb).exists()) { + WordEmbeddingsIndexer.indexGlove(embeddingsFile, wordEmbeddingsDb) + } + + if (new File(wordEmbeddingsDb).exists()) { + wordEmbeddings = Some(WordEmbeddings(wordEmbeddingsDb, embeddingsDims)) + } + + val nerReader = CoNLL(3, AnnotatorType.NAMED_ENTITY) + val posReader = CoNLL(1, AnnotatorType.POS) + val fg = FeatureGenerator( + DictionaryFeatures.read(Seq("src/main/resources/ner-corpus/dict.txt")), + wordEmbeddings + ) + def readDataset(file: String): Seq[(TextSentenceLabels, TaggedSentence)] = { + val labels = nerReader.readDocs(file).flatMap(_._2) + .map(sentence => TextSentenceLabels(sentence.tags)) - def trainModel(file: String, linesToSkip: Int): LinearChainCrfModel = { + val posTaggedSentences = posReader.readDocs(file).flatMap(_._2) + labels.zip(posTaggedSentences) + } + + def trainModel(file: String): LinearChainCrfModel = { System.out.println("Dataset Reading") val time = System.nanoTime() - val dataset = DatasetReader.readAndEncode(trainFile, linesToSkip) + val lines = readDataset(file) + val dataset = fg.generateDataset(lines) System.out.println(s"Done, ${(System.nanoTime() - time)/1e9}\n") System.out.println("Start fitting") val params = CrfParams( - minEpochs = 100, + maxEpochs = 10, l2 = 1f, verbose = Verbose.Epochs, randomSeed = Some(0), - c0 = 2250000 + c0 = 1250000 ) val crf = new LinearChainCrf(params) crf.trainSGD(dataset) } - def testDataset(file: String, linesToSkip: Int, model: LinearChainCrfModel, metadata: DatasetMetadata): Unit = { + def testDataset(file: String, model: LinearChainCrfModel): Unit = { // prec = predicted * correct / predicted // rec = predicted * correct / correct val started = System.nanoTime() - val labels = metadata.label2Id.size - val predictedCorrect = Array.fill(labels)(0) - val predicted = Array.fill(labels)(0) - val correct = Array.fill(labels)(0) + val predictedCorrect = mutable.Map[String, Int]() + val predicted = mutable.Map[String, Int]() + val correct = mutable.Map[String, Int]() + + val testInstances = readDataset(file) + + for ((labels, sentence) <- testInstances) { + val instance = fg.generate(sentence, model.metadata) - val testInstances = DatasetReader.readAndEncode(file, linesToSkip, metadata) - for ((labels, instance) <- testInstances) { val predictedLabels = model.predict(instance) - for ((lCorrect, lPredicted) <- labels.labels.zip(predictedLabels.labels) - if lCorrect >= 0) { + .labels + .map(l => model.metadata.labels(l)) - correct(lCorrect) += 1 - predicted(lPredicted) += 1 + for ((lCorrect, lPredicted) <- labels.labels.zip(predictedLabels)) { + correct(lCorrect) = correct.getOrElseUpdate(lCorrect, 0) + 1 + predicted(lPredicted) = predicted.getOrElse(lPredicted, 0) + 1 if (lCorrect == lPredicted) - predictedCorrect(lPredicted) += 1 + predictedCorrect(lPredicted) = predictedCorrect.getOrElseUpdate(lPredicted, 0) + 1 } } System.out.println(s"time: ${(System.nanoTime() - started)/1e9}") System.out.println("label\tprec\trec\tf1") - for (i <- 1 until labels) { - val label = metadata.label2Id.filter(p => p._2 == i).keys.head - val rec = predictedCorrect(i).toFloat / correct(i) - val prec = predictedCorrect(i).toFloat / predicted(i) + val totalCorrect = correct.filterKeys(label => label != "O").values.sum + val totalPredicted = correct.filterKeys(label => label != "O").values.sum + val totalPredictedCorrect = predictedCorrect.filterKeys(label => label != "O").values.sum + + val rec = totalPredictedCorrect.toFloat / totalCorrect + val prec = totalPredictedCorrect.toFloat / totalPredicted + val f1 = 2 * prec * rec / (prec + rec) + + System.out.println(s"Total\t$prec\t$rec\t$f1") + + val labels = (predicted.keys ++ correct.keys).toList.distinct + + for (label <- labels) { + val rec = predictedCorrect.getOrElse(label, 0).toFloat / correct.getOrElse(label, 0) + val prec = predictedCorrect.getOrElse(label, 0).toFloat / predicted.getOrElse(label, 0) val f1 = 2 * prec * rec / (prec + rec) System.out.println(s"$label\t$prec\t$rec\t$f1") } } - val model = trainModel(trainFile, 2) + val model = trainModel(trainFile) System.out.println("\n\nQuality on train data") - testDataset(trainFile, 2, model, model.metadata) + testDataset(trainFile, model) System.out.println("\n\nQuality on test A data") - testDataset(testFileA, 2, model, model.metadata) + testDataset(testFileA, model) System.out.println("\n\nQuality on test B data") - testDataset(testFileB, 2, model, model.metadata) - + testDataset(testFileB, model) } - diff --git a/src/test/scala/com/johnsnowlabs/ml/crf/CoNLL2003PipelineTest.scala b/src/test/scala/com/johnsnowlabs/ml/crf/CoNLL2003PipelineTest.scala index 59715c77608e42..e79f3885a1ff20 100644 --- a/src/test/scala/com/johnsnowlabs/ml/crf/CoNLL2003PipelineTest.scala +++ b/src/test/scala/com/johnsnowlabs/ml/crf/CoNLL2003PipelineTest.scala @@ -4,82 +4,17 @@ import com.johnsnowlabs.nlp._ import com.johnsnowlabs.nlp.annotators.RegexTokenizer import com.johnsnowlabs.nlp.annotators.common.Annotated.{NerTaggedSentence, PosTaggedSentence} import com.johnsnowlabs.nlp.annotators.common.{NerTagged, PosTagged, TaggedSentence} -import com.johnsnowlabs.nlp.annotators.ner.crf.{CrfBasedNer} +import com.johnsnowlabs.nlp.annotators.ner.crf.NerCrfApproach import com.johnsnowlabs.nlp.annotators.pos.perceptron.PerceptronApproach import com.johnsnowlabs.nlp.annotators.sbd.pragmatic.SentenceDetectorModel +import com.johnsnowlabs.nlp.datasets.CoNLL +import com.johnsnowlabs.nlp.embeddings.WordEmbeddingsFormat import org.apache.spark.ml.{Pipeline, PipelineModel, PipelineStage} -import org.apache.spark.sql.{DataFrame, Dataset, SparkSession} +import org.apache.spark.sql.DataFrame import scala.collection.mutable -import scala.collection.mutable.ArrayBuffer -import scala.io.Source -class CoNLL(val targetColumn: Int = 3, val spark: SparkSession = SparkAccessor.spark) { - import spark.implicits._ - - /* - Reads Dataset in CoNLL format and pack it into docs - */ - def readDocs(file: String): Seq[(String, Seq[Annotation])] = { - val lines = Source.fromFile(file).getLines().toSeq - - readLines(lines) - } - - def readLines(lines: Seq[String]): Seq[(String, Seq[Annotation])] = { - val doc = new StringBuilder() - val labels = new ArrayBuffer[Annotation]() - - val docs = lines - .flatMap{line => - val items = line.split(" ") - if (items.nonEmpty && items(0) == "-DOCSTART-") { - val result = (doc.toString, labels.toList) - doc.clear() - labels.clear() - - if (result._1.nonEmpty) - Some(result) - else - None - } else if (items.length <= 1) { - if (doc.nonEmpty && doc.last != '\n') - doc.append("\n\n") - None - } else - { - if (doc.nonEmpty) - doc.append(" ") - - val begin = doc.length - doc.append(items(0)) - val end = doc.length - 1 - val ner = items(targetColumn) - labels.append(new Annotation(AnnotatorType.NAMED_ENTITY, begin, end, ner, Map("tag" -> ner))) - None - } - } - - val last = if (doc.nonEmpty) Seq((doc.toString, labels.toList)) else Seq.empty - - docs ++ last - } - - def readDataset(file: String, - textColumn: String = "text", - labelColumn: String = "label"): Dataset[_] = { - readDocs(file).toDF(textColumn, labelColumn) - } - - def readDatasetFromLines(lines: Seq[String], - textColumn: String = "text", - labelColumn: String = "label"): Dataset[_] = { - val seq = readLines(lines) - seq.toDF(textColumn, labelColumn) - } -} - object CoNLL2003PipelineTest extends App { val folder = "./" @@ -87,8 +22,8 @@ object CoNLL2003PipelineTest extends App { val testFileA = folder + "eng.testa" val testFileB = folder + "eng.testb" - val nerReader = new CoNLL() - val posReader = new CoNLL(targetColumn = 1) + val nerReader = CoNLL(annotatorType = AnnotatorType.NAMED_ENTITY) + val posReader = CoNLL(targetColumn = 1, annotatorType = AnnotatorType.POS) def getPosStages(): Array[_ <: PipelineStage] = { val documentAssembler = new DocumentAssembler() @@ -118,13 +53,14 @@ object CoNLL2003PipelineTest extends App { def getNerStages(): Array[_ <: PipelineStage] = { - val nerTagger = new CrfBasedNer() + val nerTagger = new NerCrfApproach() .setInputCols("sentence", "token", "pos") .setLabelColumn("label") .setC0(1250000) .setRandomSeed(100) - .setDicts(Seq("src/main/resources/ner-corpus/dict.txt")) + .setMaxEpochs(14) .setOutputCol("ner") + .setEmbeddingsSource("glove.6B.100d.txt", 100, WordEmbeddingsFormat.GloVe) getPosStages() :+ nerTagger } @@ -132,7 +68,7 @@ object CoNLL2003PipelineTest extends App { def trainPosModel(file: String): PipelineModel = { System.out.println("Dataset Reading") val time = System.nanoTime() - val dataset = posReader.readDataset(file) + val dataset = posReader.readDataset(file, SparkAccessor.spark) System.out.println(s"Done, ${(System.nanoTime() - time)/1e9}\n") System.out.println("Start fitting") @@ -148,7 +84,7 @@ object CoNLL2003PipelineTest extends App { def trainNerModel(file: String): PipelineModel = { System.out.println("Dataset Reading") val time = System.nanoTime() - val dataset = nerReader.readDataset(file) + val dataset = nerReader.readDataset(file, SparkAccessor.spark) System.out.println(s"Done, ${(System.nanoTime() - time)/1e9}\n") System.out.println("Start fitting") @@ -199,7 +135,7 @@ object CoNLL2003PipelineTest extends App { val predicted = mutable.Map[String, Int]() val correct = mutable.Map[String, Int]() - val dataset = reader.readDataset(file) + val dataset = reader.readDataset(file, SparkAccessor.spark) val transformed = model.transform(dataset) val sentences = collect(transformed) @@ -276,4 +212,4 @@ object CoNLL2003PipelineTest extends App { val nerModel = measureNer() nerModel.write.overwrite().save("ner_model") -} +} \ No newline at end of file diff --git a/src/test/scala/com/johnsnowlabs/ml/crf/TestDatasets.scala b/src/test/scala/com/johnsnowlabs/ml/crf/TestDatasets.scala index d066005e099066..9737e7f97c137e 100644 --- a/src/test/scala/com/johnsnowlabs/ml/crf/TestDatasets.scala +++ b/src/test/scala/com/johnsnowlabs/ml/crf/TestDatasets.scala @@ -19,10 +19,10 @@ object TestDatasets { def small = { val metadata = new DatasetEncoder() val (label1, word1) = metadata.getFeatures(metadata.startLabel, "label1", - Seq("one"), Seq("num1" -> 1f, "num2" -> 2f)) + Seq("one"), Seq(1f, 2f)) val (label2, word2) = metadata.getFeatures("label1", "label2", - Seq("two"), Seq("num1" -> 2f, "num2" -> 3f)) + Seq("two"), Seq(2f, 3f)) val instance = new Instance(Seq(word1, word2)) val labels = new InstanceLabels(Seq(1, 2)) diff --git a/src/test/scala/com/johnsnowlabs/nlp/AnnotatorBuilder.scala b/src/test/scala/com/johnsnowlabs/nlp/AnnotatorBuilder.scala index 38d1410f78a424..d162dfc836cdca 100644 --- a/src/test/scala/com/johnsnowlabs/nlp/AnnotatorBuilder.scala +++ b/src/test/scala/com/johnsnowlabs/nlp/AnnotatorBuilder.scala @@ -1,7 +1,7 @@ package com.johnsnowlabs.nlp import com.johnsnowlabs.nlp.annotators._ -import com.johnsnowlabs.nlp.annotators.ner.crf.{CrfBasedNer, CrfBasedNerModel} +import com.johnsnowlabs.nlp.annotators.ner.crf.{NerCrfApproach, NerCrfModel} import com.johnsnowlabs.nlp.annotators.ner.regex.NERRegexApproach import com.johnsnowlabs.nlp.annotators.parser.dep.DependencyParser import com.johnsnowlabs.nlp.annotators.pos.perceptron.PerceptronApproach @@ -143,20 +143,21 @@ object AnnotatorBuilder extends FlatSpec { this: Suite => .transform(df) } - def withCrfBasedNerTagger(dataset: Dataset[Row]): Dataset[Row] = { + def withNerCrfTagger(dataset: Dataset[Row]): Dataset[Row] = { val df = withFullPOSTagger(withTokenizer(dataset)) - getCrfBasedNerModel(dataset).transform(df) + getNerCrfModel(dataset).transform(df) } - def getCrfBasedNerModel(dataset: Dataset[Row]): CrfBasedNerModel = { + def getNerCrfModel(dataset: Dataset[Row]): NerCrfModel = { val df = withFullPOSTagger(withTokenizer(dataset)) - new CrfBasedNer() + new NerCrfApproach() .setInputCols("sentence", "token", "pos") .setLabelColumn("label") .setMinEpochs(1) .setMaxEpochs(3) + .setDatsetPath("src/test/resources/ner-corpus/test_ner_dataset.txt") .setC0(34) .setL2(3.0) .setOutputCol("ner") diff --git a/src/test/scala/com/johnsnowlabs/nlp/DataBuilder.scala b/src/test/scala/com/johnsnowlabs/nlp/DataBuilder.scala index 5bafbc1d29dd0a..d6600d82620781 100644 --- a/src/test/scala/com/johnsnowlabs/nlp/DataBuilder.scala +++ b/src/test/scala/com/johnsnowlabs/nlp/DataBuilder.scala @@ -1,6 +1,6 @@ package com.johnsnowlabs.nlp -import com.johnsnowlabs.ml.crf.CoNLL +import com.johnsnowlabs.nlp.datasets.CoNLL import org.apache.spark.sql.{Dataset, Row} import org.scalatest._ @@ -23,7 +23,8 @@ object DataBuilder extends FlatSpec with BeforeAndAfterAll { this: Suite => def buildNerDataset(datasetContent: String): Dataset[Row] = { val lines = datasetContent.split("\n") - val data = new CoNLL(1, SparkAccessor.spark).readDatasetFromLines(lines).toDF + val data = CoNLL(1, AnnotatorType.NAMED_ENTITY) + .readDatasetFromLines(lines, SparkAccessor.spark).toDF AnnotatorBuilder.withDocumentAssembler(data) } } diff --git a/src/test/scala/com/johnsnowlabs/nlp/SparkAccessor.scala b/src/test/scala/com/johnsnowlabs/nlp/SparkAccessor.scala index 9e400a2b3f347b..faa0ebc8d81c5d 100644 --- a/src/test/scala/com/johnsnowlabs/nlp/SparkAccessor.scala +++ b/src/test/scala/com/johnsnowlabs/nlp/SparkAccessor.scala @@ -8,7 +8,7 @@ object SparkAccessor { .builder() .appName("test") .master("local[*]") - .config("spark.driver.memory","4G") + .config("spark.driver.memory","8G") .config("spark.kryoserializer.buffer.max","200M") .getOrCreate() } \ No newline at end of file diff --git a/src/test/scala/com/johnsnowlabs/nlp/annotators/ner/crf/CrfBasedNerPipelineSpec.scala b/src/test/scala/com/johnsnowlabs/nlp/annotators/ner/crf/NerCrfApproachSpec.scala similarity index 63% rename from src/test/scala/com/johnsnowlabs/nlp/annotators/ner/crf/CrfBasedNerPipelineSpec.scala rename to src/test/scala/com/johnsnowlabs/nlp/annotators/ner/crf/NerCrfApproachSpec.scala index 0f1e3c2418e02c..8cfe7e79d359ba 100644 --- a/src/test/scala/com/johnsnowlabs/nlp/annotators/ner/crf/CrfBasedNerPipelineSpec.scala +++ b/src/test/scala/com/johnsnowlabs/nlp/annotators/ner/crf/NerCrfApproachSpec.scala @@ -4,31 +4,31 @@ import com.johnsnowlabs.nlp._ import org.scalatest.FlatSpec -class CrfBasedNerPipelineSpec extends FlatSpec { +class NerCrfApproachSpec extends FlatSpec { val nerSentence = DataBuilder.buildNerDataset(ContentProvider.nerCorpus) - val nerModel = AnnotatorBuilder.getCrfBasedNerModel(nerSentence) + val nerModel = AnnotatorBuilder.getNerCrfModel(nerSentence) // Dataset ready for NER tagger val nerInputDataset = AnnotatorBuilder.withFullPOSTagger(AnnotatorBuilder.withTokenizer(nerSentence)) - "CrfBasedNerModel" should "be serializable and deserializable correctly" in { + "NerCrfApproach" should "be serializable and deserializable correctly" in { nerModel.write.overwrite.save("./test_crf_pipeline") - val loadedNer = CrfBasedNerModel.read.load("./test_crf_pipeline") + val loadedNer = NerCrfModel.read.load("./test_crf_pipeline") assert(nerModel.model.get.serialize == loadedNer.model.get.serialize) assert(nerModel.dictionaryFeatures == loadedNer.dictionaryFeatures) } - "CrfBasedNer" should "have correct set of labels" in { + "NerCrfApproach" should "have correct set of labels" in { assert(nerModel.model.isDefined) val metadata = nerModel.model.get.metadata assert(metadata.labels.toSeq == Seq("@#Start", "PER", "O", "ORG", "LOC")) } - "CrfBasedNer" should "correctly store annotations" in { + "NerCrfApproach" should "correctly store annotations" in { val tagged = nerModel.transform(nerInputDataset) val annotations = Annotation.collect(tagged, "ner").flatten.toSeq val labels = Annotation.collect(tagged, "label").flatten.toSeq @@ -38,22 +38,32 @@ class CrfBasedNerPipelineSpec extends FlatSpec { assert(annotation.begin == label.begin) assert(annotation.end == label.end) assert(annotation.annotatorType == AnnotatorType.NAMED_ENTITY) - assert(annotation.metadata("tag") == label.metadata("tag")) + assert(annotation.result == label.result) assert(annotation.metadata.contains("word")) } } - "CrfBasedNer" should "correctly tag sentences" in { + + "NerCrfApproach" should "correctly tag sentences" in { val tagged = nerModel.transform(nerInputDataset) val annotations = Annotation.collect(tagged, "ner").flatten - val tags = annotations.map(a => a.metadata("tag")).toSeq + val tags = annotations.map(a => a.result).toSeq + assert(tags == Seq("PER", "PER", "O", "O", "ORG", "LOC", "O")) + } + + + "NerCrfModel" should "correctly train using dataset from file" in { + val tagged = AnnotatorBuilder.withNerCrfTagger(nerInputDataset) + val annotations = Annotation.collect(tagged, "ner").flatten + + val tags = annotations.map(a => a.result).toSeq assert(tags == Seq("PER", "PER", "O", "O", "ORG", "LOC", "O")) } - "CrfBasedNerModel" should "correctly handle entities param" in { - val restrictedModel = new CrfBasedNerModel() + "NerCrfModel" should "correctly handle entities param" in { + val restrictedModel = new NerCrfModel() .setEntities(Array("PER", "LOC")) .setModel(nerModel.model.get) .setOutputCol(nerModel.getOutputCol) @@ -61,9 +71,9 @@ class CrfBasedNerPipelineSpec extends FlatSpec { val tagged = restrictedModel.transform(nerInputDataset) val annotations = Annotation.collect(tagged, "ner").flatten - val tags = annotations.map(a => a.metadata("tag")).toSeq + val tags = annotations.map(a => a.result).toSeq assert(tags == Seq("PER", "PER", "LOC")) } -} +} \ No newline at end of file From 4b078212fe0e62594a15f321eb65e2c29e9bec83 Mon Sep 17 00:00:00 2001 From: aleksei Date: Tue, 14 Nov 2017 16:57:12 +0300 Subject: [PATCH 2/8] Java convenient API to work with CoNLL dataset --- build.sbt | 4 +- .../ner/crf/DictionaryFeatures.scala | 13 +++- .../com/johnsnowlabs/nlp/datasets/CoNLL.scala | 7 ++- .../nlp/datasets/CoNLL2003NerReader.scala | 62 +++++++++++++++++++ .../nlp/embeddings/WordEmbeddings.scala | 7 +-- src/test/resources/log4j.properties | 3 +- .../ml/crf/CoNLL2003CrfTest.scala | 57 ++++------------- 7 files changed, 101 insertions(+), 52 deletions(-) create mode 100644 src/main/scala/com/johnsnowlabs/nlp/datasets/CoNLL2003NerReader.scala diff --git a/build.sbt b/build.sbt index 2bf9bbaa2b76d6..22536a26c9bcbc 100644 --- a/build.sbt +++ b/build.sbt @@ -56,7 +56,9 @@ lazy val testDependencies = Seq( lazy val utilDependencies = Seq( "com.typesafe" % "config" % "1.3.0", - "org.rocksdb" % "rocksdbjni" % "5.8.0" + "org.rocksdb" % "rocksdbjni" % "5.8.0", + "org.slf4j" % "slf4j-api" % "1.7.25", + "org.apache.commons" % "commons-compress" % "1.15" ) lazy val root = (project in file(".")) diff --git a/src/main/scala/com/johnsnowlabs/nlp/annotators/ner/crf/DictionaryFeatures.scala b/src/main/scala/com/johnsnowlabs/nlp/annotators/ner/crf/DictionaryFeatures.scala index 21d5a770a9df0f..c386e6ab17dcce 100644 --- a/src/main/scala/com/johnsnowlabs/nlp/annotators/ner/crf/DictionaryFeatures.scala +++ b/src/main/scala/com/johnsnowlabs/nlp/annotators/ner/crf/DictionaryFeatures.scala @@ -1,5 +1,10 @@ package com.johnsnowlabs.nlp.annotators.ner.crf +import java.io.File + +import com.johnsnowlabs.nlp.util.io.ResourceHelper +import com.johnsnowlabs.nlp.util.io.ResourceHelper.SourceStream + import scala.io.Source case class DictionaryFeatures(dict: Map[String, String]) @@ -31,7 +36,13 @@ object DictionaryFeatures { } private def read(path: String): Iterator[(String, String)] = { - Source.fromFile(path).getLines().map{ + val source = if (new File(path).exists) { + Source.fromFile(path) + } else { + SourceStream(path).content + } + + source.getLines().map{ line => val items = line.split(":") require(items.size == 2) diff --git a/src/main/scala/com/johnsnowlabs/nlp/datasets/CoNLL.scala b/src/main/scala/com/johnsnowlabs/nlp/datasets/CoNLL.scala index 58b7df1f69e770..32a50a899ad325 100644 --- a/src/main/scala/com/johnsnowlabs/nlp/datasets/CoNLL.scala +++ b/src/main/scala/com/johnsnowlabs/nlp/datasets/CoNLL.scala @@ -1,8 +1,13 @@ package com.johnsnowlabs.nlp.datasets +import java.io.File + +import com.johnsnowlabs.ml.crf.{CrfDataset, DatasetMetadata, InstanceLabels, TextSentenceLabels} import com.johnsnowlabs.nlp.{Annotation, AnnotatorType} import com.johnsnowlabs.nlp.annotators.common.{IndexedTaggedWord, NerTagged, PosTagged, TaggedSentence} +import com.johnsnowlabs.nlp.annotators.ner.crf.{DictionaryFeatures, FeatureGenerator} +import com.johnsnowlabs.nlp.embeddings.{WordEmbeddings, WordEmbeddingsIndexer} import org.apache.spark.sql.{Dataset, SparkSession} import scala.collection.mutable.ArrayBuffer @@ -100,4 +105,4 @@ case class CoNLL(targetColumn: Int = 3, annotatorType: String) { val seq = readLines(lines).map(p => (p._1, pack(p._2))) seq.toDF(textColumn, labelColumn) } -} \ No newline at end of file +} diff --git a/src/main/scala/com/johnsnowlabs/nlp/datasets/CoNLL2003NerReader.scala b/src/main/scala/com/johnsnowlabs/nlp/datasets/CoNLL2003NerReader.scala new file mode 100644 index 00000000000000..5a5ffd63532e1d --- /dev/null +++ b/src/main/scala/com/johnsnowlabs/nlp/datasets/CoNLL2003NerReader.scala @@ -0,0 +1,62 @@ +package com.johnsnowlabs.nlp.datasets + +import java.io.File + +import com.johnsnowlabs.ml.crf.{CrfDataset, DatasetMetadata, InstanceLabels, TextSentenceLabels} +import com.johnsnowlabs.nlp.AnnotatorType +import com.johnsnowlabs.nlp.annotators.common.TaggedSentence +import com.johnsnowlabs.nlp.annotators.ner.crf.{DictionaryFeatures, FeatureGenerator} +import com.johnsnowlabs.nlp.embeddings.{WordEmbeddings, WordEmbeddingsIndexer} + +/** + * Helper class for to work with CoNLL 2003 dataset for NER task + * Class is made for easy use from Java + */ +class CoNLL2003NerReader(wordEmbeddingsFile: String, wordEmbeddingsNDims: Int, dictionaryFile: String) { + private val nerReader = CoNLL(3, AnnotatorType.NAMED_ENTITY) + private val posReader = CoNLL(1, AnnotatorType.POS) + + private var wordEmbeddings: Option[WordEmbeddings] = None + + if (wordEmbeddingsFile != null) { + require(new File(wordEmbeddingsFile).exists()) + + val fileDb = wordEmbeddingsFile + ".db" + if (!new File(fileDb).exists()) { + WordEmbeddingsIndexer.indexGlove(wordEmbeddingsFile, fileDb) + } + + if (new File(fileDb).exists()) { + wordEmbeddings = Some(WordEmbeddings(fileDb, wordEmbeddingsNDims)) + } + } + + private val dicts = if (dictionaryFile == null) Seq.empty[String] else Seq(dictionaryFile) + + private val fg = FeatureGenerator( + DictionaryFeatures.read(dicts), + wordEmbeddings + ) + + private def readDataset(file: String): Seq[(TextSentenceLabels, TaggedSentence)] = { + val labels = nerReader.readDocs(file).flatMap(_._2) + .map(sentence => TextSentenceLabels(sentence.tags)) + + val posTaggedSentences = posReader.readDocs(file).flatMap(_._2) + labels.zip(posTaggedSentences) + } + + def readNerDataset(file: String, metadata: Option[DatasetMetadata] = None): CrfDataset = { + val lines = readDataset(file) + if (metadata.isEmpty) + fg.generateDataset(lines) + else { + val labeledInstances = lines.map { line => + val instance = fg.generate(line._2, metadata.get) + val labels = InstanceLabels(line._1.labels.map(l => metadata.get.label2Id.getOrElse(l, -1))) + (labels, instance) + } + CrfDataset(labeledInstances, metadata.get) + } + } +} \ No newline at end of file diff --git a/src/main/scala/com/johnsnowlabs/nlp/embeddings/WordEmbeddings.scala b/src/main/scala/com/johnsnowlabs/nlp/embeddings/WordEmbeddings.scala index 174738e5dd90d7..7fbbb0772ead89 100644 --- a/src/main/scala/com/johnsnowlabs/nlp/embeddings/WordEmbeddings.scala +++ b/src/main/scala/com/johnsnowlabs/nlp/embeddings/WordEmbeddings.scala @@ -1,10 +1,9 @@ package com.johnsnowlabs.nlp.embeddings -import java.io.{Closeable, File} +import java.io.Closeable import java.nio.ByteBuffer import com.johnsnowlabs.nlp.util.LruMap -import org.fusesource.leveldbjni.JniDBFactory.bytes import org.rocksdb._ import scala.io.Source @@ -46,7 +45,7 @@ object WordEmbeddingsIndexer { val items = line.split(" ") val word = items(0) val embeddings = items.drop(1).map(i => i.toFloat) - batch.put(bytes(word), toBytes(embeddings)) + batch.put(word.getBytes, toBytes(embeddings)) batchSize += 1 if (batchSize % 1000 == 0) { @@ -85,7 +84,7 @@ case class WordEmbeddings(dbFile: String, val lru = new LruMap[String, Array[Float]](lruCacheSize) private def getEmbeddingsFromDb(word: String): Array[Float] = { - val result = db.get(bytes(word.toLowerCase.trim)) + val result = db.get(word.toLowerCase.trim.getBytes()) if (result == null) zeroArray else diff --git a/src/test/resources/log4j.properties b/src/test/resources/log4j.properties index b979ea5aa6742b..82a6454d35b980 100644 --- a/src/test/resources/log4j.properties +++ b/src/test/resources/log4j.properties @@ -6,4 +6,5 @@ log4j.appender.STDOUT.layout.ConversionPattern=[%5p] %m%n log4j.logger.RuleFactory=ERROR log4j.logger.PerceptronTraining=ERROR log4j.logger.PragmaticScorer=ERROR -log4j.logger.NorvigApproach=ERROR \ No newline at end of file +log4j.logger.NorvigApproach=ERROR +log4j.logger.CRF=INFO \ No newline at end of file diff --git a/src/test/scala/com/johnsnowlabs/ml/crf/CoNLL2003CrfTest.scala b/src/test/scala/com/johnsnowlabs/ml/crf/CoNLL2003CrfTest.scala index fd13d5b424a61f..760aed41f95353 100644 --- a/src/test/scala/com/johnsnowlabs/ml/crf/CoNLL2003CrfTest.scala +++ b/src/test/scala/com/johnsnowlabs/ml/crf/CoNLL2003CrfTest.scala @@ -1,12 +1,6 @@ package com.johnsnowlabs.ml.crf -import java.io.File - -import com.johnsnowlabs.nlp.annotators.common.TaggedSentence -import com.johnsnowlabs.nlp.annotators.ner.crf.{DictionaryFeatures, FeatureGenerator} -import com.johnsnowlabs.nlp.AnnotatorType -import com.johnsnowlabs.nlp.datasets.CoNLL -import com.johnsnowlabs.nlp.embeddings.{WordEmbeddings, WordEmbeddingsIndexer} +import com.johnsnowlabs.nlp.datasets.{CoNLL, CoNLL2003NerReader} import scala.collection.mutable @@ -28,49 +22,23 @@ object CoNLL2003CrfTest extends App { val embeddingsDims = 100 val embeddingsFile = folder + s"glove.6B.${embeddingsDims}d.txt" - val wordEmbeddingsDb = folder + s"embeddings.${embeddingsDims}d.db" - - var wordEmbeddings: Option[WordEmbeddings] = None - val time = System.nanoTime() - if (new File(embeddingsFile).exists() && !new File(wordEmbeddingsDb).exists()) { - WordEmbeddingsIndexer.indexGlove(embeddingsFile, wordEmbeddingsDb) - } - - if (new File(wordEmbeddingsDb).exists()) { - wordEmbeddings = Some(WordEmbeddings(wordEmbeddingsDb, embeddingsDims)) - } - - val nerReader = CoNLL(3, AnnotatorType.NAMED_ENTITY) - val posReader = CoNLL(1, AnnotatorType.POS) - val fg = FeatureGenerator( - DictionaryFeatures.read(Seq("src/main/resources/ner-corpus/dict.txt")), - wordEmbeddings - ) - - def readDataset(file: String): Seq[(TextSentenceLabels, TaggedSentence)] = { - val labels = nerReader.readDocs(file).flatMap(_._2) - .map(sentence => TextSentenceLabels(sentence.tags)) - - val posTaggedSentences = posReader.readDocs(file).flatMap(_._2) - labels.zip(posTaggedSentences) - } + val reader = new CoNLL2003NerReader(embeddingsFile, embeddingsDims, "/ner-corpus/dict.txt") def trainModel(file: String): LinearChainCrfModel = { System.out.println("Dataset Reading") val time = System.nanoTime() - val lines = readDataset(file) - val dataset = fg.generateDataset(lines) + val dataset = reader.readNerDataset(file) System.out.println(s"Done, ${(System.nanoTime() - time)/1e9}\n") System.out.println("Start fitting") val params = CrfParams( - maxEpochs = 10, + maxEpochs = 25, l2 = 1f, verbose = Verbose.Epochs, randomSeed = Some(0), - c0 = 1250000 + c0 = 2250000 ) val crf = new LinearChainCrf(params) crf.trainSGD(dataset) @@ -82,19 +50,20 @@ object CoNLL2003CrfTest extends App { val started = System.nanoTime() val predictedCorrect = mutable.Map[String, Int]() - val predicted = mutable.Map[String, Int]() + val predicted = mutable. Map[String, Int]() val correct = mutable.Map[String, Int]() - val testInstances = readDataset(file) + val testDataset = reader.readNerDataset(file, Some(model.metadata)) - for ((labels, sentence) <- testInstances) { - val instance = fg.generate(sentence, model.metadata) + for ((labels, sentence) <- testDataset.instances) { - val predictedLabels = model.predict(instance) + val predictedLabels = model.predict(sentence) .labels .map(l => model.metadata.labels(l)) + val correctLabels = + labels.labels.map{l => if (l >= 0) model.metadata.labels(l) else "unknown"} - for ((lCorrect, lPredicted) <- labels.labels.zip(predictedLabels)) { + for ((lCorrect, lPredicted) <- correctLabels.zip(predictedLabels)) { correct(lCorrect) = correct.getOrElseUpdate(lCorrect, 0) + 1 predicted(lPredicted) = predicted.getOrElse(lPredicted, 0) + 1 @@ -107,7 +76,7 @@ object CoNLL2003CrfTest extends App { System.out.println("label\tprec\trec\tf1") val totalCorrect = correct.filterKeys(label => label != "O").values.sum - val totalPredicted = correct.filterKeys(label => label != "O").values.sum + val totalPredicted = predicted.filterKeys(label => label != "O").values.sum val totalPredictedCorrect = predictedCorrect.filterKeys(label => label != "O").values.sum val rec = totalPredictedCorrect.toFloat / totalCorrect From 4f65b891e1f2d79739b47ca15eb2b8617abac627 Mon Sep 17 00:00:00 2001 From: aleksei Date: Wed, 22 Nov 2017 19:16:29 +0300 Subject: [PATCH 3/8] Binary format support --- .../nlp/datasets/CoNLL2003NerReader.scala | 21 ++- .../AnnotatorWithWordEmbeddings.scala | 20 ++- .../nlp/embeddings/RocksDbIndexer.scala | 42 +++++ .../nlp/embeddings/WordEmbeddings.scala | 66 +------- .../nlp/embeddings/WordEmbeddingsFormat.scala | 4 +- .../embeddings/WordEmbeddingsIndexer.scala | 153 ++++++++++++++++++ .../ml/crf/CoNLL2003CrfTest.scala | 10 +- .../ml/crf/CoNLL2003PipelineTest.scala | 2 +- 8 files changed, 240 insertions(+), 78 deletions(-) create mode 100644 src/main/scala/com/johnsnowlabs/nlp/embeddings/RocksDbIndexer.scala create mode 100644 src/main/scala/com/johnsnowlabs/nlp/embeddings/WordEmbeddingsIndexer.scala diff --git a/src/main/scala/com/johnsnowlabs/nlp/datasets/CoNLL2003NerReader.scala b/src/main/scala/com/johnsnowlabs/nlp/datasets/CoNLL2003NerReader.scala index 5a5ffd63532e1d..dd3b7797ae8288 100644 --- a/src/main/scala/com/johnsnowlabs/nlp/datasets/CoNLL2003NerReader.scala +++ b/src/main/scala/com/johnsnowlabs/nlp/datasets/CoNLL2003NerReader.scala @@ -6,13 +6,17 @@ import com.johnsnowlabs.ml.crf.{CrfDataset, DatasetMetadata, InstanceLabels, Tex import com.johnsnowlabs.nlp.AnnotatorType import com.johnsnowlabs.nlp.annotators.common.TaggedSentence import com.johnsnowlabs.nlp.annotators.ner.crf.{DictionaryFeatures, FeatureGenerator} -import com.johnsnowlabs.nlp.embeddings.{WordEmbeddings, WordEmbeddingsIndexer} +import com.johnsnowlabs.nlp.embeddings.{WordEmbeddings, WordEmbeddingsFormat, WordEmbeddingsIndexer} /** * Helper class for to work with CoNLL 2003 dataset for NER task * Class is made for easy use from Java */ -class CoNLL2003NerReader(wordEmbeddingsFile: String, wordEmbeddingsNDims: Int, dictionaryFile: String) { +class CoNLL2003NerReader(wordEmbeddingsFile: String, + wordEmbeddingsNDims: Int, + embeddingsFormat: WordEmbeddingsFormat.Format, + dictionaryFile: String) { + private val nerReader = CoNLL(3, AnnotatorType.NAMED_ENTITY) private val posReader = CoNLL(1, AnnotatorType.POS) @@ -21,9 +25,18 @@ class CoNLL2003NerReader(wordEmbeddingsFile: String, wordEmbeddingsNDims: Int, d if (wordEmbeddingsFile != null) { require(new File(wordEmbeddingsFile).exists()) - val fileDb = wordEmbeddingsFile + ".db" + var fileDb = wordEmbeddingsFile + ".db" + if (!new File(fileDb).exists()) { - WordEmbeddingsIndexer.indexGlove(wordEmbeddingsFile, fileDb) + embeddingsFormat match { + case WordEmbeddingsFormat.Text => + WordEmbeddingsIndexer.indexText(wordEmbeddingsFile, fileDb) + case WordEmbeddingsFormat.Binary => + WordEmbeddingsIndexer.indexBinary(wordEmbeddingsFile, fileDb) + case WordEmbeddingsFormat.SparkNlp => + fileDb = wordEmbeddingsFile + } + } if (new File(fileDb).exists()) { diff --git a/src/main/scala/com/johnsnowlabs/nlp/embeddings/AnnotatorWithWordEmbeddings.scala b/src/main/scala/com/johnsnowlabs/nlp/embeddings/AnnotatorWithWordEmbeddings.scala index 9be8f6011a90e5..755c3558821734 100644 --- a/src/main/scala/com/johnsnowlabs/nlp/embeddings/AnnotatorWithWordEmbeddings.scala +++ b/src/main/scala/com/johnsnowlabs/nlp/embeddings/AnnotatorWithWordEmbeddings.scala @@ -1,10 +1,12 @@ package com.johnsnowlabs.nlp.embeddings +import java.io.DataInputStream import java.nio.file.Files import java.util.UUID import com.johnsnowlabs.nlp.util.SparkNlpConfigKeys import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.spark.input.PortableDataStream import org.apache.spark.ml.Estimator import org.apache.spark.ml.param.{IntParam, Param} import org.apache.spark.sql.SparkSession @@ -76,12 +78,22 @@ trait AnnotatorWithWordEmbeddings extends AutoCloseable { this: Estimator[_] => } private def indexEmbeddings(localFile: String): Unit = { - if ($(embeddingsFormat) == WordEmbeddingsFormat.GloVe.id) { + val formatId = $(embeddingsFormat) + if (formatId == WordEmbeddingsFormat.Text.id) { val lines = spark.sparkContext.textFile($(sourceEmbeddingsPath)).toLocalIterator - WordEmbeddingsIndexer.indexGlove(lines, localFile) + WordEmbeddingsIndexer.indexText(lines, localFile) + } else if (formatId == WordEmbeddingsFormat.Binary.id) { + val streamSource = spark.sparkContext.binaryFiles($(sourceEmbeddingsPath)).toLocalIterator.toList.head._2 + val stream = streamSource.open() + try { + WordEmbeddingsIndexer.indexBinary(stream, localFile) + } + finally { + stream.close() + } } - else { - require(false, s"Unsupported word embeddings format ${$(embeddingsFormat)}") + else if (formatId == WordEmbeddingsFormat.SparkNlp.id) { + hdfs.copyToLocalFile(new Path($(sourceEmbeddingsPath)), new Path(localFile)) } } diff --git a/src/main/scala/com/johnsnowlabs/nlp/embeddings/RocksDbIndexer.scala b/src/main/scala/com/johnsnowlabs/nlp/embeddings/RocksDbIndexer.scala new file mode 100644 index 00000000000000..3f3a41c2fb78a4 --- /dev/null +++ b/src/main/scala/com/johnsnowlabs/nlp/embeddings/RocksDbIndexer.scala @@ -0,0 +1,42 @@ +package com.johnsnowlabs.nlp.embeddings + +import java.io.Closeable +import org.rocksdb.{Options, RocksDB, WriteBatch, WriteOptions} + + +private [embeddings] case class RocksDbIndexer(dbFile: String, autoFlashAfter: Option[Integer] = None) extends Closeable{ + val options = new Options() + options.setCreateIfMissing(true) + options.setWriteBufferSize(20 * 1 << 20) + + RocksDB.loadLibrary() + val writeOptions = new WriteOptions() + + val db = RocksDB.open(options, dbFile) + var batch = new WriteBatch() + var batchSize = 0 + + def flush() = { + db.write(writeOptions, batch) + batch.close() + batch = new WriteBatch() + batchSize = 0 + } + + def add(word: String, vector: Array[Float]) = { + batch.put(word.getBytes, WordEmbeddingsIndexer.toBytes(vector)) + batchSize += 1 + + if (autoFlashAfter.isDefined) { + if (batchSize >= autoFlashAfter.get) + flush() + } + } + + override def close(): Unit = { + if (batchSize > 0) + flush() + + db.close + } +} diff --git a/src/main/scala/com/johnsnowlabs/nlp/embeddings/WordEmbeddings.scala b/src/main/scala/com/johnsnowlabs/nlp/embeddings/WordEmbeddings.scala index 7fbbb0772ead89..4a869b3c659ddc 100644 --- a/src/main/scala/com/johnsnowlabs/nlp/embeddings/WordEmbeddings.scala +++ b/src/main/scala/com/johnsnowlabs/nlp/embeddings/WordEmbeddings.scala @@ -1,73 +1,9 @@ package com.johnsnowlabs.nlp.embeddings -import java.io.Closeable -import java.nio.ByteBuffer - +import java.io._ import com.johnsnowlabs.nlp.util.LruMap import org.rocksdb._ -import scala.io.Source - - -object WordEmbeddingsIndexer { - - private[embeddings] def toBytes(embeddings: Array[Float]): Array[Byte] = { - val buffer = ByteBuffer.allocate(embeddings.length * 4) - for (value <- embeddings) { - buffer.putFloat(value) - } - buffer.array() - } - - private[embeddings] def fromBytes(source: Array[Byte]): Array[Float] = { - val wrapper = ByteBuffer.wrap(source) - val result = Array.fill[Float](source.length / 4)(0f) - - for (i <- 0 until result.length) { - result(i) = wrapper.getFloat(i * 4) - } - result - } - - def indexGlove(source: Iterator[String], dbFile: String): Unit = { - val options = new Options() - options.setCreateIfMissing(true) - options.setWriteBufferSize(20 * 1 << 20) - - RocksDB.loadLibrary() - val writeOptions = new WriteOptions() - - val db = RocksDB.open(options, dbFile) - var batch = new WriteBatch() - try { - var batchSize = 0 - for (line <- source) { - val items = line.split(" ") - val word = items(0) - val embeddings = items.drop(1).map(i => i.toFloat) - batch.put(word.getBytes, toBytes(embeddings)) - - batchSize += 1 - if (batchSize % 1000 == 0) { - db.write(writeOptions, batch) - batch.close() - batch = new WriteBatch() - batchSize == 0 - } - } - - db.write(writeOptions, batch) - batch.close() - } finally { - db.close() - } - } - - def indexGlove(source: String, dbFile: String): Unit = { - val lines = Source.fromFile(source).getLines() - indexGlove(lines, dbFile) - } -} case class WordEmbeddings(dbFile: String, nDims: Int, diff --git a/src/main/scala/com/johnsnowlabs/nlp/embeddings/WordEmbeddingsFormat.scala b/src/main/scala/com/johnsnowlabs/nlp/embeddings/WordEmbeddingsFormat.scala index c6f8885c005d3c..abb2994ec391c8 100644 --- a/src/main/scala/com/johnsnowlabs/nlp/embeddings/WordEmbeddingsFormat.scala +++ b/src/main/scala/com/johnsnowlabs/nlp/embeddings/WordEmbeddingsFormat.scala @@ -4,6 +4,6 @@ object WordEmbeddingsFormat extends Enumeration { type Format = Value val SparkNlp = Value(1) - - val GloVe = Value(2) + val Text = Value(2) + val Binary = Value(3) } diff --git a/src/main/scala/com/johnsnowlabs/nlp/embeddings/WordEmbeddingsIndexer.scala b/src/main/scala/com/johnsnowlabs/nlp/embeddings/WordEmbeddingsIndexer.scala new file mode 100644 index 00000000000000..a01580f0c69e45 --- /dev/null +++ b/src/main/scala/com/johnsnowlabs/nlp/embeddings/WordEmbeddingsIndexer.scala @@ -0,0 +1,153 @@ +package com.johnsnowlabs.nlp.embeddings + +import java.io._ +import java.nio.ByteBuffer +import org.slf4j.LoggerFactory +import scala.io.Source + + +object WordEmbeddingsIndexer { + + private[embeddings] def toBytes(embeddings: Array[Float]): Array[Byte] = { + val buffer = ByteBuffer.allocate(embeddings.length * 4) + for (value <- embeddings) { + buffer.putFloat(value) + } + buffer.array() + } + + private[embeddings] def fromBytes(source: Array[Byte]): Array[Float] = { + val wrapper = ByteBuffer.wrap(source) + val result = Array.fill[Float](source.length / 4)(0f) + + for (i <- 0 until result.length) { + result(i) = wrapper.getFloat(i * 4) + } + result + } + + /** + * Indexes Word embeddings in CSV Format + */ + def indexText(source: Iterator[String], dbFile: String): Unit = { + TextIndexer.index(source, dbFile) + } + + /** + * Indexes Word embeddings in CSV Text File + */ + def indexText(source: String, dbFile: String): Unit ={ + TextIndexer.index(source, dbFile) + } + + + def indexBinary(source: DataInputStream, dbFile: String): Unit = { + BinaryIndexer.index(source, dbFile) + } + + /** + * Indexes Binary formatted file + */ + def indexBinary(source: String, dbFile: String): Unit = { + BinaryIndexer.index(source, dbFile) + } +} + + + + +private[embeddings] object TextIndexer { + + def index(source: Iterator[String], dbFile: String): Unit = { + val indexer = RocksDbIndexer(dbFile, Some(1000)) + + try { + for (line <- source) { + val items = line.split(" ") + val word = items(0) + val embeddings = items.drop(1).map(i => i.toFloat) + indexer.add(word, embeddings) + } + } finally { + indexer.close() + } + } + + def index(source: String, dbFile: String): Unit = { + val lines = Source.fromFile(source).getLines() + index(lines, dbFile) + } +} + + +private[embeddings] object BinaryIndexer { + + private val logger = LoggerFactory.getLogger("WordEmbeddings") + + def index(source: DataInputStream, dbFile: String): Unit = { + val indexer = RocksDbIndexer(dbFile, Some(1000)) + + try { + // File Header + val numWords = Integer.parseInt(readString(source)) + val vecSize = Integer.parseInt(readString(source)) + + // File Body + for (i <- 0 until numWords) { + val word = readString(source) + + // Unit Vector + val vector = readFloatVector(source, vecSize) + indexer.add(word, vector) + } + + logger.info(s"Loaded $numWords words, vector size $vecSize") + } finally { + indexer.close() + } + } + + def index(source: String, dbFile: String): Unit = { + + val ds = new DataInputStream(new BufferedInputStream(new FileInputStream(source), 1 << 15)) + + try { + index(ds, dbFile) + } finally { + ds.close() + } + } + + /** + * Read a string from the binary model (System default should be UTF-8): + */ + private def readString(ds: DataInputStream): String = { + val byteBuffer = new ByteArrayOutputStream() + + var isEnd = false + while (!isEnd) { + val byteValue = ds.readByte() + if ((byteValue != 32) && (byteValue != 10)) { + byteBuffer.write(byteValue) + } else if (byteBuffer.size() > 0) { + isEnd = true + } + } + + val word = byteBuffer.toString() + byteBuffer.close() + word + } + + /** + * Read a Vector - Array of Floats from the binary model: + */ + private def readFloatVector(ds: DataInputStream, vectorSize: Int): Array[Float] = { + // Read Bytes + val vectorBuffer = Array.fill[Byte](4 * vectorSize)(0) + ds.read(vectorBuffer) + + // Convert Bytes to Floats + WordEmbeddingsIndexer.fromBytes(vectorBuffer) + } +} \ No newline at end of file diff --git a/src/test/scala/com/johnsnowlabs/ml/crf/CoNLL2003CrfTest.scala b/src/test/scala/com/johnsnowlabs/ml/crf/CoNLL2003CrfTest.scala index 760aed41f95353..b5ac6db45af194 100644 --- a/src/test/scala/com/johnsnowlabs/ml/crf/CoNLL2003CrfTest.scala +++ b/src/test/scala/com/johnsnowlabs/ml/crf/CoNLL2003CrfTest.scala @@ -1,6 +1,7 @@ package com.johnsnowlabs.ml.crf import com.johnsnowlabs.nlp.datasets.{CoNLL, CoNLL2003NerReader} +import com.johnsnowlabs.nlp.embeddings.WordEmbeddingsFormat import scala.collection.mutable @@ -23,7 +24,12 @@ object CoNLL2003CrfTest extends App { val embeddingsDims = 100 val embeddingsFile = folder + s"glove.6B.${embeddingsDims}d.txt" - val reader = new CoNLL2003NerReader(embeddingsFile, embeddingsDims, "/ner-corpus/dict.txt") + val reader = new CoNLL2003NerReader( + embeddingsFile, + embeddingsDims, + WordEmbeddingsFormat.Text, + "/ner-corpus/dict.txt" + ) def trainModel(file: String): LinearChainCrfModel = { System.out.println("Dataset Reading") @@ -34,7 +40,7 @@ object CoNLL2003CrfTest extends App { System.out.println("Start fitting") val params = CrfParams( - maxEpochs = 25, + maxEpochs = 10, l2 = 1f, verbose = Verbose.Epochs, randomSeed = Some(0), diff --git a/src/test/scala/com/johnsnowlabs/ml/crf/CoNLL2003PipelineTest.scala b/src/test/scala/com/johnsnowlabs/ml/crf/CoNLL2003PipelineTest.scala index e79f3885a1ff20..b7bf1c33395835 100644 --- a/src/test/scala/com/johnsnowlabs/ml/crf/CoNLL2003PipelineTest.scala +++ b/src/test/scala/com/johnsnowlabs/ml/crf/CoNLL2003PipelineTest.scala @@ -60,7 +60,7 @@ object CoNLL2003PipelineTest extends App { .setRandomSeed(100) .setMaxEpochs(14) .setOutputCol("ner") - .setEmbeddingsSource("glove.6B.100d.txt", 100, WordEmbeddingsFormat.GloVe) + .setEmbeddingsSource("glove.6B.100d.txt", 100, WordEmbeddingsFormat.Text) getPosStages() :+ nerTagger } From 55f02776d2920e5e6a29e8c35a0f5c1871f074e4 Mon Sep 17 00:00:00 2001 From: aleksei Date: Thu, 21 Dec 2017 17:36:03 +0300 Subject: [PATCH 4/8] Using Spark addFile method instead of manual files copying --- python/sparknlp/annotator.py | 22 ++++- .../johnsnowlabs/nlp/AnnotatorApproach.scala | 12 ++- .../annotators/ner/crf/NerCrfApproach.scala | 7 +- .../nlp/annotators/ner/crf/NerCrfModel.scala | 7 +- .../AnnotatorWithWordEmbeddings.scala | 80 ++++++++---------- .../embeddings/ModelWithWordEmbeddings.scala | 82 ++++--------------- 6 files changed, 88 insertions(+), 122 deletions(-) diff --git a/python/sparknlp/annotator.py b/python/sparknlp/annotator.py index e47bc38f2d7b1c..97b75ffa07b1aa 100755 --- a/python/sparknlp/annotator.py +++ b/python/sparknlp/annotator.py @@ -48,6 +48,26 @@ def setOutputCol(self, value): return self._set(outputCol=value) +class AnnotatorWithEmbeddings(Params): + sourceEmbeddingsPath = Param(Params._dummy(), + "sourceEmbeddingsPath", + "Word embeddings file", + typeConverter=TypeConverters.toString) + embeddingsFormat = Param(Params._dummy(), + "embeddingsFormat", + "Word vectors file format", + typeConverter=TypeConverters.toInt) + embeddingsNDims = Param(Params._dummy(), + "embeddingsNDims", + "Number of dimensions for word vectors", + typeConverter=TypeConverters.toInt) + + def setEmbeddingsSource(self, path, nDims, format): + self._set(sourceEmbeddingsPath=path) + self._set(embeddingsFormat=format) + return self._set(embeddingsNDims=nDims) + + class AnnotatorTransformer(JavaModel, JavaMLReadable, JavaMLWritable, AnnotatorProperties): column_type = "array>>" @@ -428,7 +448,7 @@ class NorvigSweetingModel(JavaModel, JavaMLWritable, JavaMLReadable, AnnotatorPr -class NerCrfApproach(JavaEstimator, JavaMLWritable, JavaMLReadable, AnnotatorProperties): +class NerCrfApproach(JavaEstimator, JavaMLWritable, JavaMLReadable, AnnotatorProperties, AnnotatorWithEmbeddings): labelColumn = Param(Params._dummy(), "labelColumn", "Column with label per each token", diff --git a/src/main/scala/com/johnsnowlabs/nlp/AnnotatorApproach.scala b/src/main/scala/com/johnsnowlabs/nlp/AnnotatorApproach.scala index c1df9007d55434..0d83d6f6370da4 100644 --- a/src/main/scala/com/johnsnowlabs/nlp/AnnotatorApproach.scala +++ b/src/main/scala/com/johnsnowlabs/nlp/AnnotatorApproach.scala @@ -2,7 +2,7 @@ package com.johnsnowlabs.nlp import org.apache.spark.ml.param.ParamMap import org.apache.spark.ml.{Estimator, Model} -import org.apache.spark.sql.Dataset +import org.apache.spark.sql.{Dataset, SparkSession} import org.apache.spark.sql.types.{ArrayType, MetadataBuilder, StructField, StructType} import org.apache.spark.ml.util.DefaultParamsWritable @@ -24,8 +24,15 @@ abstract class AnnotatorApproach[M <: Model[M]] def train(dataset: Dataset[_]): M + def beforeTraining(spark: SparkSession): Unit = {} + + def onTrained(model: M, spark: SparkSession): Unit = {} + override final def fit(dataset: Dataset[_]): M = { - copyValues(train(dataset).setParent(this)) + beforeTraining(dataset.sparkSession) + val model = copyValues(train(dataset).setParent(this)) + onTrained(model, dataset.sparkSession) + model } override final def copy(extra: ParamMap): Estimator[M] = defaultCopy(extra) @@ -50,5 +57,4 @@ abstract class AnnotatorApproach[M <: Model[M]] StructField(getOutputCol, ArrayType(Annotation.dataType), nullable = false, metadataBuilder.build) StructType(outputFields) } - } diff --git a/src/main/scala/com/johnsnowlabs/nlp/annotators/ner/crf/NerCrfApproach.scala b/src/main/scala/com/johnsnowlabs/nlp/annotators/ner/crf/NerCrfApproach.scala index 220538ffbffac2..8e71bfbb413052 100644 --- a/src/main/scala/com/johnsnowlabs/nlp/annotators/ner/crf/NerCrfApproach.scala +++ b/src/main/scala/com/johnsnowlabs/nlp/annotators/ner/crf/NerCrfApproach.scala @@ -18,8 +18,9 @@ import org.apache.spark.sql.{DataFrame, Dataset} /* Algorithm for training Named Entity Recognition Model. */ -class NerCrfApproach(override val uid: String) extends AnnotatorApproach[NerCrfModel] - with AnnotatorWithWordEmbeddings { +class NerCrfApproach(override val uid: String) + extends AnnotatorWithWordEmbeddings[NerCrfModel] { + def this() = this(Identifiable.randomUID("NER")) override val description = "CRF based Named Entity Recognition Tagger" @@ -146,7 +147,7 @@ class NerCrfApproach(override val uid: String) extends AnnotatorApproach[NerCrfM if (isDefined(minW)) model = model.shrink($(minW).toFloat) - fillModelEmbeddings(model) + model } } diff --git a/src/main/scala/com/johnsnowlabs/nlp/annotators/ner/crf/NerCrfModel.scala b/src/main/scala/com/johnsnowlabs/nlp/annotators/ner/crf/NerCrfModel.scala index 04ec15c0529eba..8a9c179326f385 100644 --- a/src/main/scala/com/johnsnowlabs/nlp/annotators/ner/crf/NerCrfModel.scala +++ b/src/main/scala/com/johnsnowlabs/nlp/annotators/ner/crf/NerCrfModel.scala @@ -15,8 +15,7 @@ import org.apache.spark.sql.{Encoders, Row} /* Named Entity Recognition model */ -class NerCrfModel(override val uid: String) - extends AnnotatorModel[NerCrfModel] with ModelWithWordEmbeddings{ +class NerCrfModel(override val uid: String) extends ModelWithWordEmbeddings[NerCrfModel]{ def this() = this(Identifiable.randomUID("NER")) @@ -119,7 +118,7 @@ object NerCrfModel extends DefaultParamsReadable[NerCrfModel] { .setModel(crfModel.deserialize) .setDictionaryFeatures(dictFeatures) - instance.deserializeEmbeddings(path) + instance.deserializeEmbeddings(path, sparkSession.sparkContext) instance } } @@ -142,7 +141,7 @@ object NerCrfModel extends DefaultParamsReadable[NerCrfModel] { val dictLines = model.dictionaryFeatures.dict.toSeq.map(p => p._1 + ":" + p._2) Seq(dictLines).toDS.write.mode("overwrite").parquet(dictPath) - model.serializeEmbeddings(path) + model.serializeEmbeddings(path, sparkSession.sparkContext) } } } diff --git a/src/main/scala/com/johnsnowlabs/nlp/embeddings/AnnotatorWithWordEmbeddings.scala b/src/main/scala/com/johnsnowlabs/nlp/embeddings/AnnotatorWithWordEmbeddings.scala index 4d0be6ea96f336..fce34dc4633b43 100644 --- a/src/main/scala/com/johnsnowlabs/nlp/embeddings/AnnotatorWithWordEmbeddings.scala +++ b/src/main/scala/com/johnsnowlabs/nlp/embeddings/AnnotatorWithWordEmbeddings.scala @@ -1,28 +1,32 @@ package com.johnsnowlabs.nlp.embeddings +import java.io.File import java.nio.file.Files import java.util.UUID -import com.johnsnowlabs.nlp.util.SparkNlpConfigKeys +import com.johnsnowlabs.nlp.AnnotatorApproach import org.apache.hadoop.fs.{FileSystem, Path} -import org.apache.spark.ml.Estimator +import org.apache.spark.SparkContext import org.apache.spark.ml.param.{IntParam, Param} import org.apache.spark.sql.SparkSession -trait AnnotatorWithWordEmbeddings extends AutoCloseable { this: Estimator[_] => +/** + * Base class for annotators that uses Word Embeddings. + * This implementation is based on RocksDB so it has a compact RAM usage + * + * 1. User configures Word Embeddings by method 'setWordEmbeddingsSource'. + * 2. During training Word Embeddings are indexed as RockDB index file. + * 3. Than this index file is spread across the cluster. + * 4. Every model 'ModelWithWordEmbeddings' uses local RocksDB as Word Embeddings lookup. + */ +abstract class AnnotatorWithWordEmbeddings[M <: ModelWithWordEmbeddings[M]] + extends AnnotatorApproach[M] with AutoCloseable { + val sourceEmbeddingsPath = new Param[String](this, "sourceEmbeddingsPath", "Word embeddings file") val embeddingsFormat = new IntParam(this, "embeddingsFormat", "Word vectors file format") val embeddingsNDims = new IntParam(this, "embeddingsNDims", "Number of dimensions for word vectors") - val embeddingsFolder = new Param[String](this, "embeddingsFolder", - "Folder to store Embeddings Index") - - private val defaultFolder = spark.sparkContext.getConf - .getOption(SparkNlpConfigKeys.embeddingsFolder).getOrElse("embeddings/") - - setDefault(this.embeddingsFolder -> defaultFolder) - def setEmbeddingsSource(path: String, nDims: Int, format: WordEmbeddingsFormat.Format) = { set(this.sourceEmbeddingsPath, path) @@ -30,22 +34,20 @@ trait AnnotatorWithWordEmbeddings extends AutoCloseable { this: Estimator[_] => set(this.embeddingsNDims, nDims) } - def setEmbeddingsFolder(path: String) = set(this.embeddingsFolder, path) - - def fillModelEmbeddings[T <: ModelWithWordEmbeddings](model: T): T = { - if (!isDefined(sourceEmbeddingsPath)) { - return model + override def beforeTraining(spark: SparkSession): Unit = { + if (isDefined(sourceEmbeddingsPath)) { + indexEmbeddings(localPath, spark.sparkContext) + spark.sparkContext.addFile(localPath, true) } + } - val file = "/" + new Path(localPath).getName - val path = Path.mergePaths(new Path($(embeddingsFolder)), new Path(file)) - hdfs.copyFromLocalFile(new Path(localPath), path) - - model.setDims($(embeddingsNDims)) - - model.setIndexPath(path.toUri.toString) + override def onTrained(model: M, spark: SparkSession): Unit = { + if (isDefined(sourceEmbeddingsPath)) { + model.setDims($(embeddingsNDims)) - model + val fileName = new File(localPath).getName + model.setIndexPath(fileName) + } } lazy val embeddings: Option[WordEmbeddings] = { @@ -53,35 +55,18 @@ trait AnnotatorWithWordEmbeddings extends AutoCloseable { this: Estimator[_] => } private lazy val localPath: String = { - val path = Files.createTempDirectory(UUID.randomUUID().toString.takeRight(12) + "_idx") + Files.createTempDirectory(UUID.randomUUID().toString.takeRight(12) + "_idx") .toAbsolutePath.toString - - if ($(embeddingsFormat) == WordEmbeddingsFormat.SparkNlp.id) { - hdfs.copyToLocalFile(new Path($(sourceEmbeddingsPath)), new Path(path)) - } else { - indexEmbeddings(path) - } - - path - } - - private lazy val spark: SparkSession = { - SparkSession - .builder() - .getOrCreate() - } - - private lazy val hdfs: FileSystem = { - FileSystem.get(spark.sparkContext.hadoopConfiguration) } - private def indexEmbeddings(localFile: String): Unit = { + private def indexEmbeddings(localFile: String, spark: SparkContext): Unit = { val formatId = $(embeddingsFormat) + if (formatId == WordEmbeddingsFormat.Text.id) { - val lines = spark.sparkContext.textFile($(sourceEmbeddingsPath)).toLocalIterator + val lines = spark.textFile($(sourceEmbeddingsPath)).toLocalIterator WordEmbeddingsIndexer.indexText(lines, localFile) } else if (formatId == WordEmbeddingsFormat.Binary.id) { - val streamSource = spark.sparkContext.binaryFiles($(sourceEmbeddingsPath)).toLocalIterator.toList.head._2 + val streamSource = spark.binaryFiles($(sourceEmbeddingsPath)).toLocalIterator.toList.head._2 val stream = streamSource.open() try { WordEmbeddingsIndexer.indexBinary(stream, localFile) @@ -91,7 +76,8 @@ trait AnnotatorWithWordEmbeddings extends AutoCloseable { this: Estimator[_] => } } else if (formatId == WordEmbeddingsFormat.SparkNlp.id) { - hdfs.copyToLocalFile(new Path($(sourceEmbeddingsPath)), new Path(localFile)) + val hdfs = FileSystem.get(spark.hadoopConfiguration) + hdfs.copyToLocalFile(new Path($(sourceEmbeddingsPath)), new Path(localFile)) } } diff --git a/src/main/scala/com/johnsnowlabs/nlp/embeddings/ModelWithWordEmbeddings.scala b/src/main/scala/com/johnsnowlabs/nlp/embeddings/ModelWithWordEmbeddings.scala index fbbd5d3cea9dab..0fd1a4d0be1d16 100644 --- a/src/main/scala/com/johnsnowlabs/nlp/embeddings/ModelWithWordEmbeddings.scala +++ b/src/main/scala/com/johnsnowlabs/nlp/embeddings/ModelWithWordEmbeddings.scala @@ -1,23 +1,19 @@ package com.johnsnowlabs.nlp.embeddings -import java.io.File -import java.nio.file.Files -import java.util.UUID - -import com.johnsnowlabs.nlp.util.SparkNlpConfigKeys -import org.apache.hadoop.fs.{FileSystem, FileUtil, Path} -import org.apache.spark.ml.Model +import com.johnsnowlabs.nlp.AnnotatorModel +import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.spark.{SparkContext, SparkFiles} import org.apache.spark.ml.param.{IntParam, Param} -import org.apache.spark.sql.SparkSession /** - * Trait for models that want to use Word Embeddings + * Base class for models that uses Word Embeddings. + * This implementation is based on RocksDB so it has a compact RAM usage * * Corresponding Approach have to implement AnnotatorWithWordEmbeddings */ -trait ModelWithWordEmbeddings extends AutoCloseable { - this: Model[_] => +abstract class ModelWithWordEmbeddings[M <: ModelWithWordEmbeddings[M]] + extends AnnotatorModel[M] with AutoCloseable { val nDims = new IntParam(this, "nDims", "Number of embedding dimensions") val indexPath = new Param[String](this, "indexPath", "File that stores Index") @@ -25,36 +21,9 @@ trait ModelWithWordEmbeddings extends AutoCloseable { def setDims(nDims: Int) = set(this.nDims, nDims) def setIndexPath(path: String) = set(this.indexPath, path) - private lazy val spark = { - SparkSession.builder().getOrCreate() - } - - private lazy val hdfs = { - FileSystem.get(spark.sparkContext.hadoopConfiguration) - } - - private lazy val embeddingsFile: String = { - val localFile = if (!new File($(indexPath)).exists()) { - val localPath = Files.createTempDirectory(UUID.randomUUID().toString.takeRight(12) + "embedddings_idx") - .toAbsolutePath.toString - - hdfs.copyToLocalFile(new Path($(indexPath)), new Path(localPath)) - localPath - } else { - $(indexPath) - } - - val crcFiles = new File(localFile).listFiles().filter(f => f.getName.endsWith(".crc")) - for (file <- crcFiles) { - file.delete() - } - - localFile - } - lazy val embeddings: Option[WordEmbeddings] = { get(indexPath).map { path => - WordEmbeddings(embeddingsFile, $(nDims)) + WordEmbeddings(SparkFiles.get(path), $(nDims)) } } @@ -63,37 +32,22 @@ trait ModelWithWordEmbeddings extends AutoCloseable { embeddings.get.close() } - def deserializeEmbeddings(path: String): Unit = { - if (isDefined(indexPath)) { - val embeddingsFolder = spark.conf.getOption(SparkNlpConfigKeys.embeddingsFolder) - if (embeddingsFolder.isDefined) { - val dst = new Path(embeddingsFolder.get) - val file = getEmbeddingsSerializedPath(path).getName - - val indexFile = new Path(dst.toString, file) - setIndexPath(indexFile.toString) - } + def deserializeEmbeddings(path: String, spark: SparkContext): Unit = { + val src = getEmbeddingsSerializedPath(path).toString - try { - // ToDo make files comparision - if (!hdfs.exists(new Path($(indexPath)))) - FileUtil.copy(hdfs, getEmbeddingsSerializedPath(path), hdfs, new Path($(indexPath)), false, spark.sparkContext.hadoopConfiguration) - } - catch { - case e: Exception => - throw new Exception(s"Set spark option ${SparkNlpConfigKeys.embeddingsFolder} to store embeddings", e) - } + if (new java.io.File(src).exists()) { + spark.addFile(src) + set(indexPath, src) } } - def serializeEmbeddings(path: String): Unit = { + def serializeEmbeddings(path: String, spark: SparkContext): Unit = { if (isDefined(indexPath)) { - val dst = getEmbeddingsSerializedPath(path) - if (hdfs.exists(dst)) { - hdfs.delete(dst, true) - } + val index = new Path(SparkFiles.get($(indexPath))) + val fs = FileSystem.get(spark.hadoopConfiguration) - hdfs.copyFromLocalFile(new Path(embeddingsFile), dst) + val dst = getEmbeddingsSerializedPath(path) + fs.copyFromLocalFile(index, dst) } } From f85017a8c215d457c052f51a121b820034fdcb9e Mon Sep 17 00:00:00 2001 From: aleksei Date: Fri, 22 Dec 2017 12:53:37 +0300 Subject: [PATCH 5/8] Spark don't copy local folder --- .../AnnotatorWithWordEmbeddings.scala | 28 +++++++++++++++++-- .../embeddings/ModelWithWordEmbeddings.scala | 21 +++++++++----- 2 files changed, 40 insertions(+), 9 deletions(-) diff --git a/src/main/scala/com/johnsnowlabs/nlp/embeddings/AnnotatorWithWordEmbeddings.scala b/src/main/scala/com/johnsnowlabs/nlp/embeddings/AnnotatorWithWordEmbeddings.scala index fce34dc4633b43..5ffca7feabf8b1 100644 --- a/src/main/scala/com/johnsnowlabs/nlp/embeddings/AnnotatorWithWordEmbeddings.scala +++ b/src/main/scala/com/johnsnowlabs/nlp/embeddings/AnnotatorWithWordEmbeddings.scala @@ -5,6 +5,7 @@ import java.nio.file.Files import java.util.UUID import com.johnsnowlabs.nlp.AnnotatorApproach +import com.johnsnowlabs.nlp.embeddings.WordEmbeddingsClusterHelper.getClusterFileName import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.SparkContext import org.apache.spark.ml.param.{IntParam, Param} @@ -37,7 +38,7 @@ abstract class AnnotatorWithWordEmbeddings[M <: ModelWithWordEmbeddings[M]] override def beforeTraining(spark: SparkSession): Unit = { if (isDefined(sourceEmbeddingsPath)) { indexEmbeddings(localPath, spark.sparkContext) - spark.sparkContext.addFile(localPath, true) + WordEmbeddingsClusterHelper.copyIndexToCluster(localPath, spark.sparkContext) } } @@ -45,7 +46,7 @@ abstract class AnnotatorWithWordEmbeddings[M <: ModelWithWordEmbeddings[M]] if (isDefined(sourceEmbeddingsPath)) { model.setDims($(embeddingsNDims)) - val fileName = new File(localPath).getName + val fileName = WordEmbeddingsClusterHelper.getClusterFileName(localPath).toString model.setIndexPath(fileName) } } @@ -86,3 +87,26 @@ abstract class AnnotatorWithWordEmbeddings[M <: ModelWithWordEmbeddings[M]] embeddings.get.close() } } + +object WordEmbeddingsClusterHelper { + + def getClusterFileName(localFile: String): Path = { + val name = new File(localFile).getName + Path.mergePaths(new Path("embeddings/"), new Path(name)) + } + + def copyIndexToCluster(localFolder: String, spark: SparkContext): String = { + val fs = FileSystem.get(spark.hadoopConfiguration) + + val src = new Path(localFolder) + val dst = getClusterFileName(localFolder) + + fs.copyFromLocalFile(false, true, src, dst) + fs.deleteOnExit(dst) + + spark.addFile(dst.toString, true) + + dst.toString + } +} + diff --git a/src/main/scala/com/johnsnowlabs/nlp/embeddings/ModelWithWordEmbeddings.scala b/src/main/scala/com/johnsnowlabs/nlp/embeddings/ModelWithWordEmbeddings.scala index 0fd1a4d0be1d16..707ebfcd231ad1 100644 --- a/src/main/scala/com/johnsnowlabs/nlp/embeddings/ModelWithWordEmbeddings.scala +++ b/src/main/scala/com/johnsnowlabs/nlp/embeddings/ModelWithWordEmbeddings.scala @@ -1,7 +1,12 @@ package com.johnsnowlabs.nlp.embeddings +import java.io.File +import java.nio.file.{CopyOption, Files, Paths, StandardCopyOption} +import java.util.concurrent.Executor + import com.johnsnowlabs.nlp.AnnotatorModel import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.ivy.util.FileUtil import org.apache.spark.{SparkContext, SparkFiles} import org.apache.spark.ml.param.{IntParam, Param} @@ -21,10 +26,13 @@ abstract class ModelWithWordEmbeddings[M <: ModelWithWordEmbeddings[M]] def setDims(nDims: Int) = set(this.nDims, nDims) def setIndexPath(path: String) = set(this.indexPath, path) - lazy val embeddings: Option[WordEmbeddings] = { - get(indexPath).map { path => - WordEmbeddings(SparkFiles.get(path), $(nDims)) - } + lazy val embeddings: Option[WordEmbeddings] = get(indexPath).map { path => + // Have to copy file because RockDB changes it and Spark rises Exception + val src = SparkFiles.get(path) + val workPath = src + "_work" + FileUtil.deepCopy(new File(src), new File(workPath), null, true) + + WordEmbeddings(workPath, $(nDims)) } override def close(): Unit = { @@ -36,8 +44,7 @@ abstract class ModelWithWordEmbeddings[M <: ModelWithWordEmbeddings[M]] val src = getEmbeddingsSerializedPath(path).toString if (new java.io.File(src).exists()) { - spark.addFile(src) - set(indexPath, src) + WordEmbeddingsClusterHelper.copyIndexToCluster(src, spark) } } @@ -47,7 +54,7 @@ abstract class ModelWithWordEmbeddings[M <: ModelWithWordEmbeddings[M]] val fs = FileSystem.get(spark.hadoopConfiguration) val dst = getEmbeddingsSerializedPath(path) - fs.copyFromLocalFile(index, dst) + fs.copyFromLocalFile(false, true, index, dst) } } From d97dfb4201d45e23d47c2cd8e9358cf905baa7cc Mon Sep 17 00:00:00 2001 From: aleksei Date: Fri, 22 Dec 2017 15:49:50 +0300 Subject: [PATCH 6/8] 1. Deserialization fix 2. More straightforward way to index embeddings --- .../AnnotatorWithWordEmbeddings.scala | 32 ++++++++++--------- 1 file changed, 17 insertions(+), 15 deletions(-) diff --git a/src/main/scala/com/johnsnowlabs/nlp/embeddings/AnnotatorWithWordEmbeddings.scala b/src/main/scala/com/johnsnowlabs/nlp/embeddings/AnnotatorWithWordEmbeddings.scala index 5ffca7feabf8b1..5536d84548307e 100644 --- a/src/main/scala/com/johnsnowlabs/nlp/embeddings/AnnotatorWithWordEmbeddings.scala +++ b/src/main/scala/com/johnsnowlabs/nlp/embeddings/AnnotatorWithWordEmbeddings.scala @@ -5,9 +5,9 @@ import java.nio.file.Files import java.util.UUID import com.johnsnowlabs.nlp.AnnotatorApproach -import com.johnsnowlabs.nlp.embeddings.WordEmbeddingsClusterHelper.getClusterFileName import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.SparkContext +import org.apache.spark.input.PortableDataStream import org.apache.spark.ml.param.{IntParam, Param} import org.apache.spark.sql.SparkSession @@ -56,25 +56,22 @@ abstract class AnnotatorWithWordEmbeddings[M <: ModelWithWordEmbeddings[M]] } private lazy val localPath: String = { - Files.createTempDirectory(UUID.randomUUID().toString.takeRight(12) + "_idx") - .toAbsolutePath.toString + WordEmbeddingsClusterHelper.createLocalPath } private def indexEmbeddings(localFile: String, spark: SparkContext): Unit = { val formatId = $(embeddingsFormat) + val fs = FileSystem.get(spark.hadoopConfiguration) + if (formatId == WordEmbeddingsFormat.Text.id) { - val lines = spark.textFile($(sourceEmbeddingsPath)).toLocalIterator - WordEmbeddingsIndexer.indexText(lines, localFile) + val tmpFile = Files.createTempFile("embeddings", ".bin").toAbsolutePath.toString() + fs.copyToLocalFile(new Path($(sourceEmbeddingsPath)), new Path(tmpFile)) + WordEmbeddingsIndexer.indexText(tmpFile, localFile) } else if (formatId == WordEmbeddingsFormat.Binary.id) { - val streamSource = spark.binaryFiles($(sourceEmbeddingsPath)).toLocalIterator.toList.head._2 - val stream = streamSource.open() - try { - WordEmbeddingsIndexer.indexBinary(stream, localFile) - } - finally { - stream.close() - } + val tmpFile = Files.createTempFile("embeddings", ".bin").toAbsolutePath.toString() + fs.copyToLocalFile(new Path($(sourceEmbeddingsPath)), new Path(tmpFile)) + WordEmbeddingsIndexer.indexBinary(tmpFile, localFile) } else if (formatId == WordEmbeddingsFormat.SparkNlp.id) { val hdfs = FileSystem.get(spark.hadoopConfiguration) @@ -90,16 +87,21 @@ abstract class AnnotatorWithWordEmbeddings[M <: ModelWithWordEmbeddings[M]] object WordEmbeddingsClusterHelper { + def createLocalPath(): String = { + Files.createTempDirectory(UUID.randomUUID().toString.takeRight(12) + "_idx") + .toAbsolutePath.toString + } + def getClusterFileName(localFile: String): Path = { val name = new File(localFile).getName - Path.mergePaths(new Path("embeddings/"), new Path(name)) + Path.mergePaths(new Path("/embeddings"), new Path(name)) } def copyIndexToCluster(localFolder: String, spark: SparkContext): String = { val fs = FileSystem.get(spark.hadoopConfiguration) val src = new Path(localFolder) - val dst = getClusterFileName(localFolder) + val dst = Path.mergePaths(fs.getHomeDirectory, getClusterFileName(localFolder)) fs.copyFromLocalFile(false, true, src, dst) fs.deleteOnExit(dst) From 0140c3080a65c543bec4a1d35f5ae0cc77b26d16 Mon Sep 17 00:00:00 2001 From: aleksei Date: Fri, 22 Dec 2017 17:31:12 +0300 Subject: [PATCH 7/8] 1. Embeddings serialization\deserialization 2. Better typed AnnotatorWithWordEmbeddings 3. Added Embeddings to tests --- .../annotators/ner/crf/NerCrfApproach.scala | 2 +- .../AnnotatorWithWordEmbeddings.scala | 7 ++-- .../embeddings/ModelWithWordEmbeddings.scala | 38 +++++++++++++++---- .../resources/ner-corpus/test_embeddings.txt | 2 + .../johnsnowlabs/nlp/AnnotatorBuilder.scala | 2 + 5 files changed, 38 insertions(+), 13 deletions(-) create mode 100644 src/test/resources/ner-corpus/test_embeddings.txt diff --git a/src/main/scala/com/johnsnowlabs/nlp/annotators/ner/crf/NerCrfApproach.scala b/src/main/scala/com/johnsnowlabs/nlp/annotators/ner/crf/NerCrfApproach.scala index 8e71bfbb413052..8a86b9a6e99f38 100644 --- a/src/main/scala/com/johnsnowlabs/nlp/annotators/ner/crf/NerCrfApproach.scala +++ b/src/main/scala/com/johnsnowlabs/nlp/annotators/ner/crf/NerCrfApproach.scala @@ -19,7 +19,7 @@ import org.apache.spark.sql.{DataFrame, Dataset} Algorithm for training Named Entity Recognition Model. */ class NerCrfApproach(override val uid: String) - extends AnnotatorWithWordEmbeddings[NerCrfModel] { + extends AnnotatorWithWordEmbeddings[NerCrfApproach, NerCrfModel] { def this() = this(Identifiable.randomUID("NER")) diff --git a/src/main/scala/com/johnsnowlabs/nlp/embeddings/AnnotatorWithWordEmbeddings.scala b/src/main/scala/com/johnsnowlabs/nlp/embeddings/AnnotatorWithWordEmbeddings.scala index 5536d84548307e..25588fc59911b8 100644 --- a/src/main/scala/com/johnsnowlabs/nlp/embeddings/AnnotatorWithWordEmbeddings.scala +++ b/src/main/scala/com/johnsnowlabs/nlp/embeddings/AnnotatorWithWordEmbeddings.scala @@ -7,7 +7,6 @@ import java.util.UUID import com.johnsnowlabs.nlp.AnnotatorApproach import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.SparkContext -import org.apache.spark.input.PortableDataStream import org.apache.spark.ml.param.{IntParam, Param} import org.apache.spark.sql.SparkSession @@ -21,7 +20,7 @@ import org.apache.spark.sql.SparkSession * 3. Than this index file is spread across the cluster. * 4. Every model 'ModelWithWordEmbeddings' uses local RocksDB as Word Embeddings lookup. */ -abstract class AnnotatorWithWordEmbeddings[M <: ModelWithWordEmbeddings[M]] +abstract class AnnotatorWithWordEmbeddings[A <: AnnotatorWithWordEmbeddings[A, M], M <: ModelWithWordEmbeddings[M]] extends AnnotatorApproach[M] with AutoCloseable { val sourceEmbeddingsPath = new Param[String](this, "sourceEmbeddingsPath", "Word embeddings file") @@ -29,10 +28,10 @@ abstract class AnnotatorWithWordEmbeddings[M <: ModelWithWordEmbeddings[M]] val embeddingsNDims = new IntParam(this, "embeddingsNDims", "Number of dimensions for word vectors") - def setEmbeddingsSource(path: String, nDims: Int, format: WordEmbeddingsFormat.Format) = { + def setEmbeddingsSource(path: String, nDims: Int, format: WordEmbeddingsFormat.Format): A = { set(this.sourceEmbeddingsPath, path) set(this.embeddingsFormat, format.id) - set(this.embeddingsNDims, nDims) + set(this.embeddingsNDims, nDims).asInstanceOf[A] } override def beforeTraining(spark: SparkSession): Unit = { diff --git a/src/main/scala/com/johnsnowlabs/nlp/embeddings/ModelWithWordEmbeddings.scala b/src/main/scala/com/johnsnowlabs/nlp/embeddings/ModelWithWordEmbeddings.scala index 707ebfcd231ad1..8fe438d8a5d93f 100644 --- a/src/main/scala/com/johnsnowlabs/nlp/embeddings/ModelWithWordEmbeddings.scala +++ b/src/main/scala/com/johnsnowlabs/nlp/embeddings/ModelWithWordEmbeddings.scala @@ -1,8 +1,7 @@ package com.johnsnowlabs.nlp.embeddings import java.io.File -import java.nio.file.{CopyOption, Files, Paths, StandardCopyOption} -import java.util.concurrent.Executor +import java.nio.file.{Files, Paths} import com.johnsnowlabs.nlp.AnnotatorModel import org.apache.hadoop.fs.{FileSystem, Path} @@ -23,14 +22,15 @@ abstract class ModelWithWordEmbeddings[M <: ModelWithWordEmbeddings[M]] val nDims = new IntParam(this, "nDims", "Number of embedding dimensions") val indexPath = new Param[String](this, "indexPath", "File that stores Index") - def setDims(nDims: Int) = set(this.nDims, nDims) - def setIndexPath(path: String) = set(this.indexPath, path) + def setDims(nDims: Int) = set(this.nDims, nDims).asInstanceOf[M] + def setIndexPath(path: String) = set(this.indexPath, path).asInstanceOf[M] lazy val embeddings: Option[WordEmbeddings] = get(indexPath).map { path => // Have to copy file because RockDB changes it and Spark rises Exception val src = SparkFiles.get(path) val workPath = src + "_work" - FileUtil.deepCopy(new File(src), new File(workPath), null, true) + if (!new File(workPath).exists()) + FileUtil.deepCopy(new File(src), new File(workPath), null, false) WordEmbeddings(workPath, $(nDims)) } @@ -40,11 +40,33 @@ abstract class ModelWithWordEmbeddings[M <: ModelWithWordEmbeddings[M]] embeddings.get.close() } + def moveFolderFiles(folderSrc: String, folderDst: String): Unit = { + for (file <- new File(folderSrc).list()) { + Files.move(Paths.get(folderSrc, file), Paths.get(folderDst, file)) + } + + Files.delete(Paths.get(folderSrc)) + } + def deserializeEmbeddings(path: String, spark: SparkContext): Unit = { - val src = getEmbeddingsSerializedPath(path).toString + val fs = FileSystem.get(spark.hadoopConfiguration) + + val src = getEmbeddingsSerializedPath(path) + + // 1. Copy to local file + val localPath = WordEmbeddingsClusterHelper.createLocalPath + if (fs.exists(src)) { + fs.copyToLocalFile(src, new Path(localPath)) + + // 2. Move files from localPath/embeddings to localPath + moveFolderFiles(localPath + "/embeddings", localPath) + + // 2. Copy local file to cluster + WordEmbeddingsClusterHelper.copyIndexToCluster(localPath, spark) - if (new java.io.File(src).exists()) { - WordEmbeddingsClusterHelper.copyIndexToCluster(src, spark) + // 3. Set correct path + val fileName = WordEmbeddingsClusterHelper.getClusterFileName(localPath).toString + setIndexPath(fileName) } } diff --git a/src/test/resources/ner-corpus/test_embeddings.txt b/src/test/resources/ner-corpus/test_embeddings.txt new file mode 100644 index 00000000000000..844cecf36dac47 --- /dev/null +++ b/src/test/resources/ner-corpus/test_embeddings.txt @@ -0,0 +1,2 @@ +hello 0.1 0.6 0.0 +world 0 1 0 \ No newline at end of file diff --git a/src/test/scala/com/johnsnowlabs/nlp/AnnotatorBuilder.scala b/src/test/scala/com/johnsnowlabs/nlp/AnnotatorBuilder.scala index 75b2c83ef08598..1a97d12e292bce 100644 --- a/src/test/scala/com/johnsnowlabs/nlp/AnnotatorBuilder.scala +++ b/src/test/scala/com/johnsnowlabs/nlp/AnnotatorBuilder.scala @@ -8,6 +8,7 @@ import com.johnsnowlabs.nlp.annotators.sbd.pragmatic.SentenceDetectorModel import com.johnsnowlabs.nlp.annotators.sda.pragmatic.SentimentDetectorModel import com.johnsnowlabs.nlp.annotators.sda.vivekn.ViveknSentimentApproach import com.johnsnowlabs.nlp.annotators.spell.norvig.NorvigSweetingApproach +import com.johnsnowlabs.nlp.embeddings.WordEmbeddingsFormat import org.apache.spark.sql.{Dataset, Row} import org.scalatest._ @@ -152,6 +153,7 @@ object AnnotatorBuilder extends FlatSpec { this: Suite => .setMinEpochs(1) .setMaxEpochs(3) .setDatsetPath("src/test/resources/ner-corpus/test_ner_dataset.txt") + .setEmbeddingsSource("src/test/resources/ner-corpus/test_embeddings.txt", 3, WordEmbeddingsFormat.Text) .setC0(34) .setL2(3.0) .setOutputCol("ner") From 6f30421ead7daed5ee2d1da0de500c4263dcfe89 Mon Sep 17 00:00:00 2001 From: aleksei Date: Fri, 22 Dec 2017 19:54:59 +0300 Subject: [PATCH 8/8] Allow to train pipeline several times --- .../AnnotatorWithWordEmbeddings.scala | 23 ++++++++++--------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/src/main/scala/com/johnsnowlabs/nlp/embeddings/AnnotatorWithWordEmbeddings.scala b/src/main/scala/com/johnsnowlabs/nlp/embeddings/AnnotatorWithWordEmbeddings.scala index 25588fc59911b8..6b69f24aa62ea3 100644 --- a/src/main/scala/com/johnsnowlabs/nlp/embeddings/AnnotatorWithWordEmbeddings.scala +++ b/src/main/scala/com/johnsnowlabs/nlp/embeddings/AnnotatorWithWordEmbeddings.scala @@ -36,27 +36,28 @@ abstract class AnnotatorWithWordEmbeddings[A <: AnnotatorWithWordEmbeddings[A, M override def beforeTraining(spark: SparkSession): Unit = { if (isDefined(sourceEmbeddingsPath)) { - indexEmbeddings(localPath, spark.sparkContext) - WordEmbeddingsClusterHelper.copyIndexToCluster(localPath, spark.sparkContext) + // 1. Create tmp file for index + localPath = Some(WordEmbeddingsClusterHelper.createLocalPath()) + // 2. Index Word Embeddings + indexEmbeddings(localPath.get, spark.sparkContext) + // 3. Copy WordEmbeddings to cluster + WordEmbeddingsClusterHelper.copyIndexToCluster(localPath.get, spark.sparkContext) + // 4. Create Embeddings for usage during train + embeddings = Some(WordEmbeddings(localPath.get, $(embeddingsNDims))) } } override def onTrained(model: M, spark: SparkSession): Unit = { if (isDefined(sourceEmbeddingsPath)) { - model.setDims($(embeddingsNDims)) + val fileName = WordEmbeddingsClusterHelper.getClusterFileName(localPath.get).toString - val fileName = WordEmbeddingsClusterHelper.getClusterFileName(localPath).toString + model.setDims($(embeddingsNDims)) model.setIndexPath(fileName) } } - lazy val embeddings: Option[WordEmbeddings] = { - get(sourceEmbeddingsPath).map(_ => WordEmbeddings(localPath, $(embeddingsNDims))) - } - - private lazy val localPath: String = { - WordEmbeddingsClusterHelper.createLocalPath - } + var embeddings: Option[WordEmbeddings] = None + private var localPath: Option[String] = None private def indexEmbeddings(localFile: String, spark: SparkContext): Unit = { val formatId = $(embeddingsFormat)