Skip to content

Commit

Permalink
#234 Fix worker refresh
Browse files Browse the repository at this point in the history
  • Loading branch information
To-om committed Jan 20, 2020
1 parent 2a17ec1 commit 5c682c8
Showing 1 changed file with 22 additions and 19 deletions.
41 changes: 22 additions & 19 deletions app/org/thp/cortex/services/WorkerSrv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,27 @@ package org.thp.cortex.services
import java.net.URL
import java.nio.file.{Files, Path, Paths}

import scala.collection.JavaConverters._
import scala.concurrent.{ExecutionContext, Future}
import scala.io.Codec
import scala.util.{Failure, Success, Try}

import play.api.libs.json.{JsArray, JsObject, JsString, Json}
import play.api.libs.ws.WSClient
import play.api.{Configuration, Logger}

import akka.NotUsed
import akka.stream.Materializer
import akka.stream.scaladsl.{Sink, Source}
import javax.inject.{Inject, Provider, Singleton}
import org.elastic4play._
import org.elastic4play.controllers.{Fields, StringInputValue}
import org.elastic4play.database.ModifyConfig
import org.elastic4play.services._
import org.scalactic.Accumulation._
import org.scalactic._
import org.thp.cortex.models._
import play.api.libs.json.{JsArray, JsObject, JsString, Json}
import play.api.libs.ws.WSClient
import play.api.{Configuration, Logger}

import scala.collection.JavaConverters._
import scala.concurrent.{ExecutionContext, Future}
import scala.io.Codec
import scala.util.{Failure, Success, Try}
import org.elastic4play._
import org.elastic4play.controllers.{Fields, StringInputValue}
import org.elastic4play.database.ModifyConfig
import org.elastic4play.services._

@Singleton
class WorkerSrv @Inject()(
Expand Down Expand Up @@ -131,18 +133,19 @@ class WorkerSrv @Inject()(
scan(
analyzersURLs.map(_ WorkerType.analyzer) ++
respondersURLs.map(_ WorkerType.responder)
)
userSrv.inInitAuthContext { implicit authContext
find(any, Some("all"), Nil)._1.runForeach { worker
workerMap.get(worker.workerDefinitionId()) match {
case Some(wd) update(worker, Fields.empty.set("dataTypeList", Json.toJson(wd.dataTypeList)))
case None update(worker, Fields.empty.set("dataTypeList", JsArray.empty))
).onComplete { _
userSrv.inInitAuthContext { implicit authContext
find(any, Some("all"), Nil)._1.runForeach { worker
workerMap.get(worker.workerDefinitionId()) match {
case Some(wd) update(worker, Fields.empty.set("dataTypeList", Json.toJson(wd.dataTypeList)))
case None update(worker, Fields.empty.set("dataTypeList", JsArray.empty))
}
}
}
}
}

def scan(workerUrls: Seq[(String, WorkerType.Type)]): Unit = {
def scan(workerUrls: Seq[(String, WorkerType.Type)]): Future[Unit] = {
def readUrl(url: URL, workerType: WorkerType.Type): Future[Seq[WorkerDefinition]] =
url.getProtocol match {
case "file" Future.successful(readFile(Paths.get(url.toURI), workerType))
Expand Down Expand Up @@ -206,7 +209,7 @@ class WorkerSrv @Inject()(
}
}
}
.foreach { worker
.map { worker
val wmap = worker.flatten.map(w w.id w).toMap
workerMapLock.synchronized(workerMap = wmap)
logger.info(s"New worker list:\n\n\t${workerMap.values.map(a s"${a.name} ${a.version}").mkString("\n\t")}\n")
Expand Down

0 comments on commit 5c682c8

Please sign in to comment.