From 05a804c9bdbd8af39e3e0be15e12c52e92ec9f8f Mon Sep 17 00:00:00 2001 From: Keith Alcock Date: Thu, 14 Mar 2024 13:30:13 -0700 Subject: [PATCH 1/5] Call Apps Apps --- .../{Step2InputEidos1.scala => Step2InputEidos1App.scala} | 2 +- .../{Step2InputEidos2.scala => Step2InputEidos2App.scala} | 2 +- .../mysql/{Step2InputEidos2.scala => Step2InputEidos2App.scala} | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) rename src/main/scala/org/clulab/habitus/apps/elasticsearch/{Step2InputEidos1.scala => Step2InputEidos1App.scala} (99%) rename src/main/scala/org/clulab/habitus/apps/elasticsearch/{Step2InputEidos2.scala => Step2InputEidos2App.scala} (99%) rename src/main/scala/org/clulab/habitus/apps/mysql/{Step2InputEidos2.scala => Step2InputEidos2App.scala} (99%) diff --git a/src/main/scala/org/clulab/habitus/apps/elasticsearch/Step2InputEidos1.scala b/src/main/scala/org/clulab/habitus/apps/elasticsearch/Step2InputEidos1App.scala similarity index 99% rename from src/main/scala/org/clulab/habitus/apps/elasticsearch/Step2InputEidos1.scala rename to src/main/scala/org/clulab/habitus/apps/elasticsearch/Step2InputEidos1App.scala index e9dbe1df..b57c4641 100644 --- a/src/main/scala/org/clulab/habitus/apps/elasticsearch/Step2InputEidos1.scala +++ b/src/main/scala/org/clulab/habitus/apps/elasticsearch/Step2InputEidos1App.scala @@ -13,7 +13,7 @@ import org.json4s.jackson.JsonMethods import java.io.File import scala.util.Using -object Step2InputEidos1 extends App with Logging { +object Step2InputEidos1App extends App with Logging { implicit val formats: DefaultFormats.type = org.json4s.DefaultFormats val contextWindow = 3 val baseDirectory = "../corpora/uganda-local" diff --git a/src/main/scala/org/clulab/habitus/apps/elasticsearch/Step2InputEidos2.scala b/src/main/scala/org/clulab/habitus/apps/elasticsearch/Step2InputEidos2App.scala similarity index 99% rename from src/main/scala/org/clulab/habitus/apps/elasticsearch/Step2InputEidos2.scala rename to src/main/scala/org/clulab/habitus/apps/elasticsearch/Step2InputEidos2App.scala index 7365a3a7..40c8cfcc 100644 --- a/src/main/scala/org/clulab/habitus/apps/elasticsearch/Step2InputEidos2.scala +++ b/src/main/scala/org/clulab/habitus/apps/elasticsearch/Step2InputEidos2App.scala @@ -19,7 +19,7 @@ import java.io.File import java.net.URL import scala.util.{Try, Using} -object Step2InputEidos2 extends App with Logging { +object Step2InputEidos2App extends App with Logging { case class LocalTsvRecord( sentenceIndex: Int, diff --git a/src/main/scala/org/clulab/habitus/apps/mysql/Step2InputEidos2.scala b/src/main/scala/org/clulab/habitus/apps/mysql/Step2InputEidos2App.scala similarity index 99% rename from src/main/scala/org/clulab/habitus/apps/mysql/Step2InputEidos2.scala rename to src/main/scala/org/clulab/habitus/apps/mysql/Step2InputEidos2App.scala index 81e9cea5..5735c9ee 100644 --- a/src/main/scala/org/clulab/habitus/apps/mysql/Step2InputEidos2.scala +++ b/src/main/scala/org/clulab/habitus/apps/mysql/Step2InputEidos2App.scala @@ -19,7 +19,7 @@ import java.sql.{Connection, Date, DriverManager, Statement, Timestamp} import java.time.LocalDateTime import scala.util.{Try, Using} -object Step2InputEidos2 extends App with Logging { +object Step2InputEidos2App extends App with Logging { case class LocalTsvRecord( sentenceIndex: Int, From 50067ad00b276326e3604d6547eba01a8aa506c1 Mon Sep 17 00:00:00 2001 From: Keith Alcock Date: Thu, 14 Mar 2024 13:34:22 -0700 Subject: [PATCH 2/5] Also split datasets because of memory constraints --- .../apps/grid/CombineDatasetsApp.scala | 11 ++-- .../habitus/apps/grid/SplitDatasetsApp.scala | 61 +++++++++++++++++++ 2 files changed, 67 insertions(+), 5 deletions(-) create mode 100644 src/main/scala/org/clulab/habitus/apps/grid/SplitDatasetsApp.scala diff --git a/src/main/scala/org/clulab/habitus/apps/grid/CombineDatasetsApp.scala b/src/main/scala/org/clulab/habitus/apps/grid/CombineDatasetsApp.scala index 7555e305..825f2a53 100644 --- a/src/main/scala/org/clulab/habitus/apps/grid/CombineDatasetsApp.scala +++ b/src/main/scala/org/clulab/habitus/apps/grid/CombineDatasetsApp.scala @@ -7,12 +7,13 @@ import scala.util.Using object CombineDatasetsApp extends App { val inputFileNames = Seq( - "../corpora/grid/uganda.tsv", - "../corpora/grid/uganda-mining2.tsv", - "../corpora/grid/uganda-pdfs2.tsv", - "../corpora/grid/uganda-pdfs-karamoja2.tsv" + "../corpora/ghana-elasticsearch/ghana-elasticsearch-0.tsv", + "../corpora/ghana-elasticsearch/ghana-elasticsearch-1.tsv", + "../corpora/ghana-elasticsearch/ghana-elasticsearch-2.tsv", + "../corpora/ghana-elasticsearch/ghana-elasticsearch-3.tsv", + "../corpora/ghana-elasticsearch/ghana-elasticsearch-4.tsv" ) - val ouputFileName = "../corpora/grid/uganda-all.tsv" + val ouputFileName = "../corpora/ghana-elasticsearch/combined.tsv" def copyLines(inputFileName: String, printWriter: PrintWriter, keepHeader: Boolean): Unit = { Using.resource(Sourcer.sourceFromFilename(inputFileName)) { source => diff --git a/src/main/scala/org/clulab/habitus/apps/grid/SplitDatasetsApp.scala b/src/main/scala/org/clulab/habitus/apps/grid/SplitDatasetsApp.scala new file mode 100644 index 00000000..708198ee --- /dev/null +++ b/src/main/scala/org/clulab/habitus/apps/grid/SplitDatasetsApp.scala @@ -0,0 +1,61 @@ +package org.clulab.habitus.apps.grid + +import org.clulab.utils.{FileUtils, Sourcer, StringUtils} + +import scala.util.Using + +object SplitDatasetsApp extends App { + + class FileNameFormatter(baseFileName: String) { + val left = StringUtils.beforeLast(baseFileName, '.') + val right = StringUtils.afterLast(baseFileName, '.') + + def format(index: Int): String = { + s"$left-$index.$right" + } + } + + val inputFileName = args.lift(0).getOrElse("../corpora/ghana-elasticsearch/ghana-elasticsearch.tsv") + val count = args.lift(1).getOrElse("300000").toInt + val outputFileNameFormatter = new FileNameFormatter(inputFileName) + + Using.resource(Sourcer.sourceFromFilename(inputFileName)) { source => + val lines = source.getLines + val header = lines.next + var bufferedLine: Option[String] = None + var index = 0 + + while (lines.hasNext) { + val batchOfLines = lines.take(count).toArray + val outputFileName = outputFileNameFormatter.format(index) + + index += 1 + Using.resource(FileUtils.printWriterFromFile(outputFileName)) { printWriter => + printWriter.println(header) + bufferedLine.foreach(printWriter.println) + bufferedLine = None + batchOfLines.foreach(printWriter.println) + + if (lines.hasNext) { + val prevLine = lines.next + printWriter.println(prevLine) + val prevUrl = StringUtils.beforeFirst(prevLine, '\t') + + while (lines.hasNext && { + val nextLine = lines.next + val nextUrl = StringUtils.beforeFirst(nextLine, '\t') + + if (prevUrl == nextUrl) { + printWriter.println(nextLine) + true + } + else { + bufferedLine = Some(nextLine) + false + } + }) {} + } + } + } + } +} From b66d55aafd7e960761ccc4f23355aab6af4f7811 Mon Sep 17 00:00:00 2001 From: Keith Alcock Date: Mon, 18 Mar 2024 18:41:14 -0700 Subject: [PATCH 3/5] Add the vectors --- belief_pipeline/tpi_main.py | 6 +- belief_pipeline/vector_main.py | 5 +- .../Step2InputEidos1GhanaApp.scala | 145 ++++++++++++++++++ .../apps/elasticsearch/VerifyGhanaApp.scala | 84 ++++++++++ 4 files changed, 235 insertions(+), 5 deletions(-) create mode 100644 src/main/scala/org/clulab/habitus/apps/elasticsearch/Step2InputEidos1GhanaApp.scala create mode 100644 src/main/scala/org/clulab/habitus/apps/elasticsearch/VerifyGhanaApp.scala diff --git a/belief_pipeline/tpi_main.py b/belief_pipeline/tpi_main.py index 8f7eddeb..25183bfa 100644 --- a/belief_pipeline/tpi_main.py +++ b/belief_pipeline/tpi_main.py @@ -20,15 +20,15 @@ def get_in_and_out() -> Tuple[str, str]: belief_model_name: str = "maxaalexeeva/belief-classifier_mturk_unmarked-trigger_bert-base-cased_2023-4-26-0-34" sentiment_model_name: str = "hriaz/finetuned_beliefs_sentiment_classifier_experiment1" locations_file_name: str = "./belief_pipeline/GH.tsv" - input_file_name: str = "../corpora/ghana-regulations/ghana-regulations.tsv" - output_file_name: str = "../corpora/ghana-regulations/ghana-regulations-2.tsv" + input_file_name: str = "../corpora/ghana-elasticsearch/ghana-elasticsearch-4.tsv" + output_file_name: str = "../corpora/ghana-elasticsearch/ghana-elasticsearch-4a.tsv" # input_file_name, output_file_name = get_in_and_out() pipeline = Pipeline( TpiInputStage(input_file_name), [ TpiResolutionStage(), TpiBeliefStage(belief_model_name), - # TpiSentimentStage(sentiment_model_name), + TpiSentimentStage(sentiment_model_name), TpiLocationStage(locations_file_name) ], PandasOutputStage(output_file_name) diff --git a/belief_pipeline/vector_main.py b/belief_pipeline/vector_main.py index 0ea70765..45a97dca 100644 --- a/belief_pipeline/vector_main.py +++ b/belief_pipeline/vector_main.py @@ -15,8 +15,9 @@ def get_in_and_out() -> Tuple[str, str]: if __name__ == "__main__": vector_model_name: str = "all-MiniLM-L6-v2" - input_file_name: str = "../corpora/uganda-local/uganda-2.tsv" - output_file_name: str = "../corpora/uganda-local/uganda-2-vectors.tsv" + input_file_name: str = "../corpora/ghana-elasticsearch/ghana-elasticsearch-4a.tsv" + output_file_name: str = "../corpora/ghana-elasticsearch/ghana-elasticsearch-4b.tsv" + # input_file_name, output_file_name = get_in_and_out() pipeline = Pipeline( VectorInputStage(input_file_name), diff --git a/src/main/scala/org/clulab/habitus/apps/elasticsearch/Step2InputEidos1GhanaApp.scala b/src/main/scala/org/clulab/habitus/apps/elasticsearch/Step2InputEidos1GhanaApp.scala new file mode 100644 index 00000000..4c4cf543 --- /dev/null +++ b/src/main/scala/org/clulab/habitus/apps/elasticsearch/Step2InputEidos1GhanaApp.scala @@ -0,0 +1,145 @@ +package org.clulab.habitus.apps.elasticsearch + +import ai.lum.common.FileUtils._ +import org.clulab.habitus.apps.elasticsearch.VerifyGhanaApp.datasetFilename +import org.clulab.habitus.apps.utils.{AttributeCounts, JsonRecord} +import org.clulab.processors.{Document, Sentence} +import org.clulab.utils.{Sourcer, StringUtils} +import org.clulab.wm.eidos.document.AnnotatedDocument +import org.clulab.wm.eidos.serialization.jsonld.JLDDeserializer +import org.clulab.wm.eidoscommon.utils.{FileEditor, FileUtils, Logging, TsvReader, TsvWriter} +import org.json4s.DefaultFormats +import org.json4s.jackson.JsonMethods + +import java.io.File +import scala.util.Using + +object Step2InputEidos1GhanaApp extends App with Logging { + implicit val formats: DefaultFormats.type = org.json4s.DefaultFormats + val contextWindow = 3 + val datasetFilename = "../corpora/ghana-elasticsearch/dataset55k.tsv" + val baseDirectory = "/home/kwa/data/Corpora/habitus-project/corpora/multimix" + val outputFileName = "../corpora/ghana-elasticsearch/ghana-elasticsearch.tsv" + val deserializer = new JLDDeserializer() + + def getDatasetUrls(): Set[String] = { + // TODO: Also get terms from here instead of from directory names. + val datasetUrls = Using.resource(Sourcer.sourceFromFilename(datasetFilename)) { source => + val tsvReader = new TsvReader() + val datasetUrls = source.getLines.drop(1).map { line => + val Array(url) = tsvReader.readln(line, 1) + + url + }.toSet + + datasetUrls + } + + datasetUrls + } + + def jsonFileToJsonld(jsonFile: File): File = + FileEditor(jsonFile).setExt("jsonld").get + + def jsonFileToRecord(jsonFile: File): JsonRecord = { + val json = FileUtils.getTextFromFile(jsonFile) + val jValue = JsonMethods.parse(json) + val url = (jValue \ "url").extract[String] + val titleOpt = (jValue \ "title").extractOpt[String] + val datelineOpt = (jValue \ "dateline").extractOpt[String] + val bylineOpt = (jValue \ "byline").extractOpt[String] + val text = (jValue \ "text").extract[String] + + // Don't use them all in order to save space. + JsonRecord(url, None, None, None, "") + } + + def jsonldFileToAnnotatedDocument(jsonldFile: File): AnnotatedDocument = { + val json = FileUtils.getTextFromFile(jsonldFile) + val corpus = deserializer.deserialize(json) + val annotatedDocument = corpus.head + + annotatedDocument + } + + def rawTextToCleanText(rawText: String): String = rawText + .trim + .replaceAll("\r\n", " ") + .replaceAll("\n", " ") + .replaceAll("\r", " ") + .replaceAll("\t", " ") + .replaceAll("\u2028", " ") // unicode line separator + .replaceAll("\u2029", " ") // unicode paragraph separator + .map { letter => + if (letter.toInt < 32) ' ' + else letter + } + .trim + + def getSentenceText(document: Document, sentence: Sentence): String = { + val rawText = document.text.get.slice(sentence.startOffsets.head, sentence.endOffsets.last) + val cleanText = rawTextToCleanText(rawText) + + cleanText + } + + def attributeCountsToTsvWriter(attributeCounts: AttributeCounts, tsvWriter: TsvWriter): Unit = { + tsvWriter.print( + attributeCounts.increaseCount.toString, attributeCounts.decreaseCount.toString, + attributeCounts.posChangeCount.toString, attributeCounts.negChangeCount.toString, + "" + ) + } + + val datasetUrls: Set[String] = getDatasetUrls + val jsonFilesAndUrls: Seq[(File, String)] = { + val allJsonFiles = new File(baseDirectory).listFilesByWildcard("*.json", recursive = true).toVector + val jsonFilesWithJsonld = allJsonFiles.filter { jsonFile => + jsonFileToJsonld(jsonFile).exists + } + val jsonFilesAndUrls: Seq[(File, String)] = jsonFilesWithJsonld.map { jsonFile => + val record = jsonFileToRecord(jsonFile) + + (jsonFile, record.url) + } + val headJsonFilesAndUrls = jsonFilesAndUrls.groupBy(_._2).map(_._2.head).toSeq + + headJsonFilesAndUrls + } + + Using.resource(FileUtils.printWriterFromFile(outputFileName)) { printWriter => + val tsvWriter = new TsvWriter(printWriter) + + tsvWriter.println("url", "sentenceIndex", "sentence", "context", "prevSentence") + datasetUrls.zipWithIndex.foreach { case (url, index) => + val jsonFile = jsonFilesAndUrls.find(_._2 == url).get._1 + + println(s"$index ${jsonFile.getPath}") + try { + val jsonldFile = jsonFileToJsonld(jsonFile) + val annotatedDocument = jsonldFileToAnnotatedDocument(jsonldFile) + val document = annotatedDocument.document + val sentences = document.sentences + + sentences.zipWithIndex.foreach { case (sentence, sentenceIndex) => + val cleanText = getSentenceText(document, sentence) + val context = sentences + .slice(sentenceIndex - contextWindow, sentenceIndex + contextWindow + 1) + .map(getSentenceText(document, _)) + .mkString(" ") + val prevSentenceText = sentences + .lift(sentenceIndex - 1) + .map(getSentenceText(document, _)) + .getOrElse("") + + tsvWriter.println(url, sentenceIndex.toString, cleanText, context, prevSentenceText) + } + } + catch + { + case throwable: Throwable => + logger.error(s"Exception for file $jsonFile", throwable) + } + } + } +} diff --git a/src/main/scala/org/clulab/habitus/apps/elasticsearch/VerifyGhanaApp.scala b/src/main/scala/org/clulab/habitus/apps/elasticsearch/VerifyGhanaApp.scala new file mode 100644 index 00000000..0864d846 --- /dev/null +++ b/src/main/scala/org/clulab/habitus/apps/elasticsearch/VerifyGhanaApp.scala @@ -0,0 +1,84 @@ +package org.clulab.habitus.apps.elasticsearch + +import ai.lum.common.FileUtils._ +import org.clulab.habitus.apps.utils.JsonRecord +import org.clulab.utils.Sourcer +import org.clulab.wm.eidoscommon.utils.{FileEditor, FileUtils, TsvReader} +import org.json4s.DefaultFormats +import org.json4s.jackson.JsonMethods + +import java.io.File +import scala.util.Using + +object VerifyGhanaApp extends App { + implicit val formats: DefaultFormats.type = org.json4s.DefaultFormats + + val datasetFilename = args.lift(0).getOrElse("../corpora/ghana-multimix/dataset55k.tsv") + val jsonDirname = args.lift(1).getOrElse("/home/kwa/data/Corpora/habitus-project/corpora/multimix") + + def jsonFileToJsonld(jsonFile: File): File = + FileEditor(jsonFile).setExt("jsonld").get + + def jsonFileToRecord(jsonFile: File): JsonRecord = { + val json = FileUtils.getTextFromFile(jsonFile) + val jValue = JsonMethods.parse(json) + val url = (jValue \ "url").extract[String] + val titleOpt = (jValue \ "title").extractOpt[String] + val datelineOpt = (jValue \ "dateline").extractOpt[String] + val bylineOpt = (jValue \ "byline").extractOpt[String] + val text = (jValue \ "text").extract[String] + + // Don't use them all in order to save space. + JsonRecord(url, None, None, None, "") + } + + def getDatasetUrls(): Set[String] = { + // TODO: Also get terms from here instead of from directory names. + val datasetUrls = Using.resource(Sourcer.sourceFromFilename(datasetFilename)) { source => + val tsvReader = new TsvReader() + val datasetUrls = source.getLines.drop(1).map { line => + val Array(url) = tsvReader.readln(line, 1) + + url + }.toSet + + datasetUrls + } + + datasetUrls + } + + def getJsonUrls(): Set[String] = { + val jsonUrls = { + val loneJsonFiles = new File(jsonDirname).listFilesByWildcard("*.json", recursive = true).toVector + val pairedJsonFiles = loneJsonFiles.filter { jsonFile => + jsonFileToJsonld(jsonFile).exists + } + val jsonUrls = pairedJsonFiles.map { jsonFile => + val record = jsonFileToRecord(jsonFile) + + record.url + } + + val set = jsonUrls.toSet + val vectorLength = jsonUrls.length + val setLength = set.size + + set + } + + jsonUrls + } + + val datasetUrls = getDatasetUrls() + val jsonUrls = getJsonUrls() + + val missingJson = datasetUrls -- jsonUrls + println("Missing jsons:") + missingJson.foreach(println) + + val missingDataset = jsonUrls -- datasetUrls + println() + println("Missing dataset:") + missingDataset.foreach(println) +} From 8c4a4c4edcd2eed0ff24402cca878483a992253f Mon Sep 17 00:00:00 2001 From: Keith Alcock Date: Mon, 18 Mar 2024 18:41:53 -0700 Subject: [PATCH 4/5] Fix typos --- .../habitus/apps/elasticsearch/Step2InputEidos1GhanaApp.scala | 3 +-- .../habitus/apps/elasticsearch/Step2InputEidos2App.scala | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/src/main/scala/org/clulab/habitus/apps/elasticsearch/Step2InputEidos1GhanaApp.scala b/src/main/scala/org/clulab/habitus/apps/elasticsearch/Step2InputEidos1GhanaApp.scala index 4c4cf543..854bddc2 100644 --- a/src/main/scala/org/clulab/habitus/apps/elasticsearch/Step2InputEidos1GhanaApp.scala +++ b/src/main/scala/org/clulab/habitus/apps/elasticsearch/Step2InputEidos1GhanaApp.scala @@ -1,10 +1,9 @@ package org.clulab.habitus.apps.elasticsearch import ai.lum.common.FileUtils._ -import org.clulab.habitus.apps.elasticsearch.VerifyGhanaApp.datasetFilename import org.clulab.habitus.apps.utils.{AttributeCounts, JsonRecord} import org.clulab.processors.{Document, Sentence} -import org.clulab.utils.{Sourcer, StringUtils} +import org.clulab.utils.Sourcer import org.clulab.wm.eidos.document.AnnotatedDocument import org.clulab.wm.eidos.serialization.jsonld.JLDDeserializer import org.clulab.wm.eidoscommon.utils.{FileEditor, FileUtils, Logging, TsvReader, TsvWriter} diff --git a/src/main/scala/org/clulab/habitus/apps/elasticsearch/Step2InputEidos2App.scala b/src/main/scala/org/clulab/habitus/apps/elasticsearch/Step2InputEidos2App.scala index 40c8cfcc..9ef3f516 100644 --- a/src/main/scala/org/clulab/habitus/apps/elasticsearch/Step2InputEidos2App.scala +++ b/src/main/scala/org/clulab/habitus/apps/elasticsearch/Step2InputEidos2App.scala @@ -263,7 +263,7 @@ object Step2InputEidos2App extends App with Logging { val contextLocations = parseLocations(contextLocationsString) val vector = normalize(parseVector(vectorString)) - (url, sentenceIndex) -> new LocalTsvRecord(sentenceIndex, sentence, belief, sentimentScoreOpt, sentenceLocations, contextLocations, vector) + (url, sentenceIndex) -> LocalTsvRecord(sentenceIndex, sentence, belief, sentimentScoreOpt, sentenceLocations, contextLocations, vector) }.toMap } val restClient = Elasticsearch.mkRestClient(url, credentialsFilename) From da4922ddb3bf19ac27f09a70959df180a4b46710 Mon Sep 17 00:00:00 2001 From: Keith Alcock Date: Tue, 19 Mar 2024 08:05:26 -0700 Subject: [PATCH 5/5] Read Ghana --- .../Step2InputEidos2GhanaApp.scala | 354 ++++++++++++++++++ 1 file changed, 354 insertions(+) create mode 100644 src/main/scala/org/clulab/habitus/apps/elasticsearch/Step2InputEidos2GhanaApp.scala diff --git a/src/main/scala/org/clulab/habitus/apps/elasticsearch/Step2InputEidos2GhanaApp.scala b/src/main/scala/org/clulab/habitus/apps/elasticsearch/Step2InputEidos2GhanaApp.scala new file mode 100644 index 00000000..fb2c8130 --- /dev/null +++ b/src/main/scala/org/clulab/habitus/apps/elasticsearch/Step2InputEidos2GhanaApp.scala @@ -0,0 +1,354 @@ +package org.clulab.habitus.apps.elasticsearch + +import ai.lum.common.FileUtils._ +import org.clulab.habitus.apps.utils.{AttributeCounts, DateString, JsonRecord} +import org.clulab.habitus.elasticsearch.ElasticsearchIndexClient +import org.clulab.habitus.elasticsearch.data.{CausalRelation, CauseOrEffect, DatasetRecord, LatLon, Location} +import org.clulab.habitus.elasticsearch.utils.Elasticsearch +import org.clulab.odin.{EventMention, Mention} +import org.clulab.processors.{Document, Sentence} +import org.clulab.utils.Sourcer +import org.clulab.wm.eidos.attachments.{Decrease, Increase, NegChange, Negation, PosChange} +import org.clulab.wm.eidos.document.AnnotatedDocument +import org.clulab.wm.eidos.serialization.jsonld.{JLDDeserializer, JLDRelationCausation} +import org.clulab.wm.eidoscommon.utils.{FileEditor, FileUtils, Logging, TsvReader, TsvWriter} +import org.json4s.DefaultFormats +import org.json4s.jackson.JsonMethods + +import java.io.File +import java.net.URL +import scala.util.{Try, Using} + +object Step2InputEidos2GhanaApp extends App with Logging { + + case class LocalTsvRecord( + url: String, + sentenceIndex: Int, + sentence: String, + belief: Boolean, + sentimentScoreOpt: Option[Float], + sentenceLocations: Array[Location], + contextLocations: Array[Location], + vector: Array[Float] + ) + + implicit val formats: DefaultFormats.type = org.json4s.DefaultFormats + val contextWindow = 3 + val datasetFilename = "../corpora/ghana-elasticsearch/dataset55k.tsv" + val baseDirectory = "/home/kwa/data/Corpora/habitus-project/corpora/multimix" + val inputFilename = "../corpora/ghana-elasticsearch/ghana-elasticsearch-4b.tsv" + val credentialsFilename = "../credentials/elasticsearch-credentials.properties" + val deserializer = new JLDDeserializer() + val url = new URL("http://localhost:9200") + // val url = new URL("https://elasticsearch.keithalcock.com") + // val indexName = "habitus4" + val indexName = "dataset55k4" + val datasetName = "dataset55k.tsv" + val regionName = "ghana" + val alreadyNormalized = true + + def getDatasetUrlsToTerms(): Map[String, Array[String]] = { + // TODO: Also get terms from here instead of from directory names. + val datasetUrlsToTerms = Using.resource(Sourcer.sourceFromFilename(datasetFilename)) { source => + val tsvReader = new TsvReader() + val datasetUrlsToTerms = source.getLines.drop(1).map { line => + val Array(url, termsString) = tsvReader.readln(line, 2) + val terms = termsString.split(' ') + + url -> terms + }.toMap + + datasetUrlsToTerms + } + + datasetUrlsToTerms + } + + def jsonFileToJsonld(jsonFile: File): File = + FileEditor(jsonFile).setExt("jsonld").get + + def jsonFileToRecord(jsonFile: File): JsonRecord = { + val json = FileUtils.getTextFromFile(jsonFile) + val jValue = JsonMethods.parse(json) + val url = (jValue \ "url").extract[String] + val titleOpt = (jValue \ "title").extractOpt[String] + val datelineOpt = (jValue \ "dateline").extractOpt[String] + val bylineOpt = (jValue \ "byline").extractOpt[String] + val text = (jValue \ "text").extract[String] + + // Don't use them all in order to save space. + JsonRecord(url, titleOpt, datelineOpt, bylineOpt, "") + } + + def jsonldFileToAnnotatedDocument(jsonldFile: File): AnnotatedDocument = { + val json = FileUtils.getTextFromFile(jsonldFile) + val corpus = deserializer.deserialize(json) + val annotatedDocument = corpus.head + + annotatedDocument + } + + def rawTextToCleanText(rawText: String): String = rawText + .trim + .replaceAll("\r\n", " ") + .replaceAll("\n", " ") + .replaceAll("\r", " ") + .replaceAll("\t", " ") + .replaceAll("\u2028", " ") // unicode line separator + .replaceAll("\u2029", " ") // unicode paragraph separator + .map { letter => + if (letter.toInt < 32) ' ' + else letter + } + .trim + + def getSentenceText(document: Document, sentence: Sentence): String = { + val rawText = document.text.get.slice(sentence.startOffsets.head, sentence.endOffsets.last) + val cleanText = rawTextToCleanText(rawText) + + cleanText + } + + def attributeCountsToTsvWriter(attributeCounts: AttributeCounts, tsvWriter: TsvWriter): Unit = { + tsvWriter.print( + attributeCounts.increaseCount.toString, attributeCounts.decreaseCount.toString, + attributeCounts.posChangeCount.toString, attributeCounts.negChangeCount.toString, + "" + ) + } + + def mentionToAttributeCounts(mention: Mention): AttributeCounts = { + val increaseCount = mention.attachments.count(_.isInstanceOf[Increase]) + val decreaseCount = mention.attachments.count(_.isInstanceOf[Decrease]) + val posCount = mention.attachments.count(_.isInstanceOf[PosChange]) + val negCount = mention.attachments.count(_.isInstanceOf[NegChange]) + val negationCount = mention.attachments.count(_.isInstanceOf[Negation]) + + AttributeCounts(increaseCount, decreaseCount, posCount, negCount, negationCount) + } + + def newCauseOrEffect(text: String, attributeCounts: AttributeCounts): CauseOrEffect = { + new CauseOrEffect(text, + attributeCounts.increaseCount, attributeCounts.decreaseCount, + attributeCounts.posChangeCount, attributeCounts.negChangeCount + ) + } + + def parseLocations(locationString: String): Array[Location] = { + if (locationString.isEmpty) Array.empty + else { + val locations = locationString.split(')').map { commaAndNameAndLatLon => + val trimmedCommaAndNameAndLatLon = commaAndNameAndLatLon.trim + val toDrop = if (trimmedCommaAndNameAndLatLon.startsWith(",")) 1 else 0 + val nameAndLatLon = trimmedCommaAndNameAndLatLon.drop(toDrop) + val Array(name, latLon) = nameAndLatLon.split('(').map(_.trim) + val Array(latString, lonString) = latLon.split(",").map(_.trim) + // Sometimes we have NaN for these. We know it's a place, but not where. + val latOpt = Try(latString.toFloat).toOption + val lonOpt = Try(lonString.toFloat).toOption + val latLonOpt = latOpt.flatMap { lat => + lonOpt.map { lon => + LatLon(lat, lon) + } + } + + Location(name,latLonOpt) + } + + locations + } + } + + def parseVector(vectorString: String): Array[Float] = { + val values = vectorString.split(", ") + val floats = values.map(_.toFloat) + + floats + } + + def normalize(floats: Array[Float]): Array[Float] = { + if (alreadyNormalized) floats + else { + val sumSquare = floats.foldLeft(0f) { case (sum, float) => sum + float * float } + val divisor = math.sqrt(sumSquare) + val normalized = floats.map { float => (float / divisor).toFloat } + + normalized + } + } + + def getCausalRelations(causalMentionGroup: Seq[Mention]): Array[CausalRelation] = { + val causalRelations = causalMentionGroup.zipWithIndex.map { case (causalMention, causalIndex) => + val causalAttributeCounts = mentionToAttributeCounts(causalMention) + assert(causalAttributeCounts.increaseCount == 0) + assert(causalAttributeCounts.decreaseCount == 0) + assert(causalAttributeCounts.posChangeCount == 0) + assert(causalAttributeCounts.negChangeCount == 0) + + val causeMentions = causalMention.arguments("cause") + assert(causeMentions.length == 1) + val effectMentions = causalMention.arguments("effect") + assert(effectMentions.length == 1) + + val causeMention = causeMentions.head + val effectMention = effectMentions.head + + val causeText = causeMention.text + val cleanCauseText = rawTextToCleanText(causeText) + val effectText = effectMention.text + val cleanEffectText = rawTextToCleanText(effectText) + + val causeAttributeCounts = mentionToAttributeCounts(causeMention) + assert(causeAttributeCounts.negatedCount == 0) + val effectAttributeCounts = mentionToAttributeCounts(effectMention) + assert(effectAttributeCounts.negatedCount == 0) + + val cause = newCauseOrEffect(cleanCauseText, causeAttributeCounts) + val effect = newCauseOrEffect(cleanEffectText, effectAttributeCounts) + val causalRelation = CausalRelation( + causalIndex, + causalAttributeCounts.negatedCount, + cause, + effect + ) + + causalRelation + } + + causalRelations.toArray + } + + def getLocationsAndDistance(sentenceIndex: Int, range: Range, urlSentenceIndexToTsvRecordMap: Map[(String, Int), + LocalTsvRecord], url: String): (Array[Location], Option[Int]) = { + val tsvRecord = urlSentenceIndexToTsvRecordMap(url, sentenceIndex) + + if (tsvRecord.sentenceLocations.nonEmpty) + (tsvRecord.sentenceLocations, Some(0)) + else { + val locationsIndexOpt = range.find { sentenceIndex => + urlSentenceIndexToTsvRecordMap(url, sentenceIndex).sentenceLocations.nonEmpty + } + val locations = locationsIndexOpt.map { sentenceIndex => + urlSentenceIndexToTsvRecordMap(url, sentenceIndex).sentenceLocations + }.getOrElse(Array.empty) + val distanceOpt = locationsIndexOpt.map { index => math.abs(sentenceIndex - index) } + + (locations, distanceOpt) + } + } + + val datasetUrlsToTerms: Map[String, Array[String]] = getDatasetUrlsToTerms() + val jsonUrlsToFileAndTermsAndJsonRecord: Map[String, (File, JsonRecord, Array[String])] = { + val allJsonFiles = new File(baseDirectory).listFilesByWildcard("*.json", recursive = true).toVector + val jsonFilesWithJsonld = allJsonFiles.filter { jsonFile => + jsonFileToJsonld(jsonFile).exists + } + val jsonFilesUrlsAndTerms: Map[String, (File, JsonRecord, Array[String])] = jsonFilesWithJsonld.flatMap { jsonFile => + val jsonRecord = jsonFileToRecord(jsonFile) + val termsOpt = datasetUrlsToTerms.get(jsonRecord.url) + + if (termsOpt.isEmpty) + println(jsonRecord.url) + // There may be some files found that didn't make it into the dataset. + termsOpt.map { terms => + jsonRecord.url -> (jsonFile, jsonRecord, terms) + } + }.toMap + + jsonFilesUrlsAndTerms + } + val localTsvRecords: Seq[LocalTsvRecord] = Using.resource(Sourcer.sourceFromFilename(inputFilename)) { source => + val lines = source.getLines.drop(1) + val tsvReader = new TsvReader() + + lines.map { line => + val Array(url, sentenceIndexString, sentence, beliefString, sentimentScore, sentenceLocationsString, contextLocationsString, vectorString) = tsvReader.readln(line, 8) + val sentenceIndex = sentenceIndexString.toInt + val belief = beliefString == "True" + val sentimentScoreOpt = if (sentimentScore.isEmpty) None else Some(sentimentScore.toFloat) + val sentenceLocations = parseLocations(sentenceLocationsString) + val contextLocations = parseLocations(contextLocationsString) + val vector = normalize(parseVector(vectorString)) + + LocalTsvRecord(url, sentenceIndex, sentence, belief, sentimentScoreOpt, sentenceLocations, contextLocations, vector) + }.toVector + } + val urlSentenceIndexToTsvRecordMap: Map[(String, Int), LocalTsvRecord] = localTsvRecords.map { localTsvRecord => + (localTsvRecord.url, localTsvRecord.sentenceIndex) -> localTsvRecord + }.toMap + val restClient = Elasticsearch.mkRestClient(url, credentialsFilename) + + Using.resource(restClient) { restClient => + val elasticsearchIndexClient = ElasticsearchIndexClient(restClient, indexName) + val firstLocalTsvRecordsAndIndex = localTsvRecords.zipWithIndex.filter(_._1.sentenceIndex == 0) + + firstLocalTsvRecordsAndIndex.foreach { case (localTsvRecord, index) => + val (jsonFile, jsonRecord, terms) = jsonUrlsToFileAndTermsAndJsonRecord(localTsvRecord.url) + println(s"$index ${jsonFile.getPath}") + try { + val url = jsonRecord.url + val jsonldFile = jsonFileToJsonld(jsonFile) + val annotatedDocument = jsonldFileToAnnotatedDocument(jsonldFile) + val document = annotatedDocument.document + val allMentions = annotatedDocument.allOdinMentions.toVector + val causalMentions = allMentions.filter { mention => + mention.isInstanceOf[EventMention] && mention.label == JLDRelationCausation.taxonomy + } + val causalMentionGroups: Map[Int, Seq[Mention]] = causalMentions.groupBy(_.sentence) + val sentences = document.sentences + val dateOpt = jsonRecord.datelineOpt.map(DateString(_).canonicalize) + + sentences.zipWithIndex.foreach { case (sentence, sentenceIndex) => + val causal = causalMentionGroups.contains(sentenceIndex) + val cleanText = getSentenceText(document, sentence) + val tsvRecord = localTsvRecord + val contextBefore = sentences + .slice(sentenceIndex - contextWindow, sentenceIndex) + .map(getSentenceText(document, _)) + .mkString(" ") + val contextAfter = sentences + .slice(sentenceIndex + 1, sentenceIndex + contextWindow + 1) + .map(getSentenceText(document, _)) + .mkString(" ") + val (prevLocations, prevDistanceOpt) = getLocationsAndDistance(sentenceIndex, + Range(0, sentenceIndex).reverse, urlSentenceIndexToTsvRecordMap, url) + val (nextLocations, nextDistanceOpt) = getLocationsAndDistance(sentenceIndex, + Range(sentenceIndex + 1, document.sentences.length), urlSentenceIndexToTsvRecordMap, url) + val causalRelations = + if (causal) getCausalRelations(causalMentionGroups(sentenceIndex).sorted) + else Array.empty[CausalRelation] + val datasetRecord: DatasetRecord = DatasetRecord( + datasetName, + regionName, + url, + jsonRecord.titleOpt, + terms, + jsonRecord.datelineOpt, + dateOpt, + jsonRecord.bylineOpt, + sentenceIndex, + cleanText, + causalRelations, + tsvRecord.belief, + tsvRecord.sentimentScoreOpt, + tsvRecord.sentenceLocations, + contextBefore, + contextAfter, + tsvRecord.contextLocations, + prevLocations, + prevDistanceOpt, + nextLocations, + nextDistanceOpt, + tsvRecord.vector + ) + + elasticsearchIndexClient.index(datasetRecord) + } + } + catch { + case throwable: Throwable => + logger.error(s"Exception for file $jsonFile", throwable) + } + } + } +}