Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/elastic store update #5210

Open
wants to merge 20 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions .github/workflows/gradle-build.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# This workflow uses actions that are not certified by GitHub.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the goal here to run GHA instead of using Travis?
In any case this change seems orthogonal to the rest of the PR.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I created that because I don't have a Travis or Jenkins environment to build this in and my personal laptop has an M1 chip in it. Additionally, I wanted to be sure that I had fixed the issue before I submitted the PR but left it in because I felt it could be beneficial to others who might want to build the project but don't have much Jenkins, Travis, or Gradle experience.

The action only runs when triggered manually.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there are some unittests for es in tests/src/test/scala/org/apache/openwhisk/core/database/elasticsearch/ElasticSearchActivationStoreTests.scala which is using testcontainers, so I think you can use it to run tests on your laptop?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It failed when trying to download one of the dependencies. I'll try again so I can get the dependency name.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a quick update, I tried to build the project locally again and got the following error:

Execution failed for task ':core:scheduler:generateProto'.
> Could not resolve all files for configuration ':core:scheduler:protobufToolsLocator_protoc'.
   > Could not find protoc-osx-aarch_64.exe (com.google.protobuf:protoc:3.4.0).
     Searched in the following locations:
         https://repo.maven.apache.org/maven2/com/google/protobuf/protoc/3.4.0/protoc-3.4.0-osx-aarch_64.exe

I must admit, this is my first time with Scala and Gradle, and while I think I might know what happened, I'm still trying to work out how to go about fixing it.

# They are provided by a third-party and are governed by
# separate terms of service, privacy policy, and support
# documentation.

name: Java CI

on:
workflow_dispatch:

jobs:
build:
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v2

- name: Login to github docker Registry
run: docker login ghcr.io -u ${{ github.actor }} -p ${{ secrets.GITHUB_TOKEN }}

- name: Set the image tag
id: image_vars
run: |
TAG=latest
echo ::set-output name=tag::$(echo $TAG | awk '{print tolower($0)}')

- uses: actions/setup-java@v2
with:
distribution: adopt
java-version: 11

- name: Setup Gradle
uses: gradle/gradle-build-action@v2

- name: Execute Gradle build
run: ./gradlew distDocker -PdockerRegistry=ghcr.io -PdockerImagePrefix=${{ github.repository }} -PdockerImageTag=${{steps.image_vars.outputs.tag}}
2 changes: 1 addition & 1 deletion common/scala/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,8 @@ dependencies {
compile "io.reactivex:rxjava:1.3.8"
compile "io.reactivex:rxjava-reactive-streams:1.2.1"
compile "com.microsoft.azure:azure-cosmosdb:2.6.2"
compile "com.sksamuel.elastic4s:elastic4s-client-esjava_${gradle.scala.depVersion}:7.10.3"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this compatible with elasticsearch 6.x version?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked that 6.x is not compatible with this PR because 6.x requires a type parameter for index and deleteByQuery method while 7.x doesn't want it

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am concerned about the license.
I didn't look into it deeply but from a certain version of ES-7.x they would no longer support it with the Apache license.
Not sure we can upgrade the ES version.

Copy link
Author

@dylanturn dylanturn Apr 6, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't look into it deeply but from a certain version of ES-7.x they would no longer support it with the Apache license.

What wouldn't they support? Elastic changed the license model sometime after 7.10.x, so this is still entirely covered Apache 2.0.

It's also worth mentioning we don't have to use Elasticsearch. This update makes it possible to use Opensearch, which is covered entirely by Apache 2.0.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new scheduler relies on multiple filtered aliases against one ES index.
I am unclear ES 7.x support this.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new scheduler relies on multiple filtered aliases against one ES index. I am unclear ES 7.x support this.

I'm not sure what to tell you, I've been using OpenWhisk with my ES 7.10 (Opensearch) cluster for the last week without any issues...

For what it's worth I'm working on updating the tests so they work correctly.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you using FPCScheduler too?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, I don't think I am.

That said, I can't find anything in the Elasticsearch documentation or release notes that suggests "multiple filtered aliases against one ES index" would have been removed.

Was it something you read that gave you cause for concern?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@style95 I think new scheduler doen't rely on multiple filtered aliase, and even not on elasticsearch

we are using multiple filtered aliase to organize activations by namespaces in our downstream, we may need to consider changing that

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm..
If we don't use multiple filtered aliases, it would create a huge number of indices for all namespaces even if they rarely invoke any actions.
It is necessary to have shared indices for namespaces without less invocation.


compile "com.sksamuel.elastic4s:elastic4s-http_${gradle.scala.depVersion}:6.7.4"
//for mongo
compile "org.mongodb.scala:mongo-scala-driver_${gradle.scala.depVersion}:2.7.0"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,17 @@ package org.apache.openwhisk.core.database.elasticsearch

import java.time.Instant
import java.util.concurrent.TimeUnit

import scala.language.postfixOps
import akka.actor.ActorSystem
import akka.event.Logging.ErrorLevel
import akka.http.scaladsl.model._
import akka.stream.scaladsl.Flow
import com.sksamuel.elastic4s.http.search.SearchHit
import com.sksamuel.elastic4s.http.{ElasticClient, ElasticProperties, NoOpRequestConfigCallback}
import com.sksamuel.elastic4s.indexes.IndexRequest
import com.sksamuel.elastic4s.searches.queries.RangeQuery
import com.sksamuel.elastic4s.searches.queries.matches.MatchPhrase
import com.sksamuel.elastic4s.http.{JavaClient, NoOpRequestConfigCallback}
import com.sksamuel.elastic4s.{ElasticClient, ElasticProperties}
import com.sksamuel.elastic4s.requests.indexes.IndexRequest
import com.sksamuel.elastic4s.requests.searches.SearchHit
import com.sksamuel.elastic4s.requests.searches.queries.RangeQuery
import com.sksamuel.elastic4s.requests.searches.queries.matches.MatchPhrase
import org.apache.http.auth.{AuthScope, UsernamePasswordCredentials}
import org.apache.http.impl.client.BasicCredentialsProvider
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder
Expand Down Expand Up @@ -60,18 +60,16 @@ class ElasticSearchActivationStore(
useBatching: Boolean = false)(implicit actorSystem: ActorSystem, override val logging: Logging)
extends ActivationStore {

import com.sksamuel.elastic4s.http.ElasticDsl._
import com.sksamuel.elastic4s.ElasticDsl._
import ElasticSearchActivationStore.{generateIndex, httpClientCallback}

private implicit val executionContextExecutor: ExecutionContextExecutor = actorSystem.dispatcher
private val javaClient = JavaClient(ElasticProperties(s"${elasticSearchConfig.protocol}://${elasticSearchConfig.hosts}"),
NoOpRequestConfigCallback,
httpClientCallback)

private val client =
ElasticClient(
ElasticProperties(s"${elasticSearchConfig.protocol}://${elasticSearchConfig.hosts}"),
NoOpRequestConfigCallback,
httpClientCallback)
private val client = ElasticClient(javaClient)

private val esType = "_doc"
private val maxOpenDbRequests = actorSystem.settings.config
.getInt("akka.http.host-connection-pool.max-connections") / 2
private val batcher: Batcher[IndexRequest, Either[ArtifactStoreException, DocInfo]] =
Expand Down Expand Up @@ -111,7 +109,7 @@ class ElasticSearchActivationStore(
"response" -> response))

val index = generateIndex(activation.namespace.namespace)
val op = indexInto(index, esType).doc(payload.toString).id(activation.docid.asString)
val op = indexInto(index).doc(payload.toString).id(activation.docid.asString)

// always use batching
val res = batcher.put(op).map {
Expand Down Expand Up @@ -185,7 +183,7 @@ class ElasticSearchActivationStore(
}
.map { res =>
if (res.status == StatusCodes.OK.intValue) {
if (res.result.hits.total == 0) {
if (res.result.hits.total.value == 0) {
transid.finished(this, start, s"[GET] 'activations', document: '$activationId'; not found.")
throw NoDocumentException("not found on 'get'")
} else {
Expand Down Expand Up @@ -227,7 +225,7 @@ class ElasticSearchActivationStore(

val res = client
.execute {
deleteByQuery(index, esType, termQuery("_id", activationId.asString))
deleteByQuery(index, termQuery("_id", activationId.asString))
}
.map { res =>
if (res.status == StatusCodes.OK.intValue) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@
package org.apache.openwhisk.core.scheduler.queue

import akka.actor.ActorSystem
import com.sksamuel.elastic4s.http.ElasticDsl._
import com.sksamuel.elastic4s.http.{ElasticClient, ElasticProperties, NoOpRequestConfigCallback}
import com.sksamuel.elastic4s.searches.queries.Query
import com.sksamuel.elastic4s.ElasticDsl._
import com.sksamuel.elastic4s.http.{JavaClient, NoOpRequestConfigCallback}
import com.sksamuel.elastic4s.{ElasticClient, ElasticProperties}
import com.sksamuel.elastic4s.requests.searches.queries.Query
import com.sksamuel.elastic4s.{ElasticDate, ElasticDateMath, Seconds}
import org.apache.openwhisk.common.Logging
import org.apache.openwhisk.core.ConfigKeys
Expand All @@ -28,7 +29,7 @@ import org.apache.openwhisk.spi.Spi
import pureconfig.loadConfigOrThrow
import spray.json.{JsArray, JsNumber, JsValue, RootJsonFormat, deserializationError, _}

import scala.concurrent.Future
import scala.concurrent.{ExecutionContextExecutor, Future}
import scala.concurrent.duration.FiniteDuration
import scala.language.implicitConversions
import scala.util.{Failure, Try}
Expand All @@ -45,7 +46,7 @@ object ElasticSearchDurationChecker {
val FilterAggregationName = "filterAggregation"
val AverageAggregationName = "averageAggregation"

implicit val serde = new ElasticSearchDurationCheckResultFormat()
implicit val serde: ElasticSearchDurationCheckResultFormat = new ElasticSearchDurationCheckResultFormat()

def getFromDate(timeWindow: FiniteDuration): ElasticDateMath =
ElasticDate.now minus (timeWindow.toSeconds.toInt, Seconds)
Expand All @@ -58,7 +59,7 @@ class ElasticSearchDurationChecker(private val client: ElasticClient, val timeWi
import ElasticSearchDurationChecker._
import org.apache.openwhisk.core.database.elasticsearch.ElasticSearchActivationStore.generateIndex

implicit val ec = actorSystem.getDispatcher
implicit val ec: ExecutionContextExecutor = actorSystem.getDispatcher

override def checkAverageDuration(invocationNamespace: String, actionMetaData: WhiskActionMetaData)(
callback: DurationCheckResult => DurationCheckResult): Future[DurationCheckResult] = {
Expand Down Expand Up @@ -104,7 +105,7 @@ class ElasticSearchDurationChecker(private val client: ElasticClient, val timeWi
.map(callback(_))
.andThen {
case Failure(t) =>
logging.error(this, s"failed to check the average duration: ${t}")
logging.error(this, s"failed to check the average duration: $t")
}
}
}
Expand All @@ -116,11 +117,11 @@ object ElasticSearchDurationCheckerProvider extends DurationCheckerProvider {
implicit val as: ActorSystem = actorSystem
implicit val logging: Logging = log

val elasticClient =
ElasticClient(
ElasticProperties(s"${elasticSearchConfig.protocol}://${elasticSearchConfig.hosts}"),
NoOpRequestConfigCallback,
httpClientCallback)
val javaClient = JavaClient(ElasticProperties(s"${elasticSearchConfig.protocol}://${elasticSearchConfig.hosts}"),
NoOpRequestConfigCallback,
httpClientCallback)

val elasticClient = ElasticClient(javaClient)

new ElasticSearchDurationChecker(elasticClient, durationCheckerConfig.timeWindow)
}
Expand Down Expand Up @@ -187,7 +188,7 @@ class ElasticSearchDurationCheckResultFormat extends RootJsonFormat[DurationChec
"took": 0
}
*/
implicit def read(json: JsValue) = {
implicit def read(json: JsValue): DurationCheckResult = {
val jsObject = json.asJsObject

jsObject.getFields("aggregations", "took", "hits") match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@

package org.apache.openwhisk.core.scheduler.queue.test

import com.sksamuel.elastic4s.http.ElasticDsl._
import com.sksamuel.elastic4s.http.{ElasticClient, ElasticProperties, NoOpRequestConfigCallback}
import com.sksamuel.elastic4s.ElasticDsl._
import com.sksamuel.elastic4s.http.{JavaClient, NoOpRequestConfigCallback}
import com.sksamuel.elastic4s.{ElasticClient, ElasticProperties}
import common._
import common.rest.WskRestOperations
import org.apache.http.auth.{AuthScope, UsernamePasswordCredentials}
Expand Down Expand Up @@ -91,11 +92,11 @@ class ElasticSearchDurationCheckerTests
}
}

private val client =
ElasticClient(
ElasticProperties(s"${elasticSearchConfig.protocol}://${elasticSearchConfig.hosts}"),
val javaClient = JavaClient(ElasticProperties(s"${elasticSearchConfig.protocol}://${elasticSearchConfig.hosts}"),
NoOpRequestConfigCallback,
httpClientCallback)

val client = ElasticClient(javaClient)

private val elasticSearchDurationChecker = new ElasticSearchDurationChecker(client, defaultDurationCheckWindow)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package org.apache.openwhisk.core.scheduler.queue.test
import akka.actor.ActorRef
import akka.actor.FSM.{CurrentState, StateTimeout, SubscribeTransitionCallBack, Transition}
import akka.testkit.{TestActor, TestFSMRef, TestProbe}
import com.sksamuel.elastic4s.http.{search => _}
import com.sksamuel.elastic4s.requests.{searches => _}
import org.apache.openwhisk.common.GracefulShutdown
import org.apache.openwhisk.core.connector.ContainerCreationError.{NonExecutableActionError, WhiskError}
import org.apache.openwhisk.core.connector.ContainerCreationMessage
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import com.ibm.etcd.api._
import com.ibm.etcd.client.kv.KvClient.Watch
import com.ibm.etcd.client.kv.WatchUpdate
import com.ibm.etcd.client.{EtcdClient => Client}
import com.sksamuel.elastic4s.http.ElasticClient
import com.sksamuel.elastic4s.ElasticClient
import common.StreamLogging
import org.apache.openwhisk.common.{GracefulShutdown, TransactionId}
import org.apache.openwhisk.core.ack.ActiveAck
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
package org.apache.openwhisk.core.scheduler.queue.test

import java.time.Instant

import akka.actor.{ActorRef, ActorSystem}
import akka.testkit.{ImplicitSender, TestKit, TestProbe}
import com.sksamuel.elastic4s.http
import com.sksamuel.elastic4s.http.ElasticDsl.{avgAgg, boolQuery, matchQuery, rangeQuery, search}
import com.sksamuel.elastic4s.http._
import com.sksamuel.elastic4s.http.search.{SearchHits, SearchResponse}
import com.sksamuel.elastic4s.searches.SearchRequest
import com.sksamuel.elastic4s.Executor
import com.sksamuel.elastic4s.ElasticDsl.{avgAgg, boolQuery, matchQuery, rangeQuery, search}
import com.sksamuel.elastic4s._
import com.sksamuel.elastic4s.requests.common.Shards
import com.sksamuel.elastic4s.requests.searches.{SearchHits, SearchRequest, SearchResponse, Total}
import common.StreamLogging
import org.apache.kafka.clients.producer.RecordMetadata
import org.apache.kafka.common.TopicPartition
Expand All @@ -24,20 +23,9 @@ import org.apache.openwhisk.core.entity.{WhiskActivation, _}
import org.apache.openwhisk.core.etcd.EtcdKV.{ContainerKeys, QueueKeys, ThrottlingKeys}
import org.apache.openwhisk.core.scheduler.SchedulerEndpoints
import org.apache.openwhisk.core.scheduler.grpc.GetActivation
import org.apache.openwhisk.core.scheduler.queue.ElasticSearchDurationChecker.{getFromDate, AverageAggregationName}
import org.apache.openwhisk.core.scheduler.queue.ElasticSearchDurationChecker.{AverageAggregationName, getFromDate}
import org.apache.openwhisk.core.scheduler.queue._
import org.apache.openwhisk.core.service.{
AlreadyExist,
DeleteEvent,
Done,
InitialDataStorageResults,
PutEvent,
RegisterData,
RegisterInitialData,
UnregisterData,
UnwatchEndpoint,
WatchEndpoint
}
import org.apache.openwhisk.core.service.{AlreadyExist, DeleteEvent, Done, InitialDataStorageResults, PutEvent, RegisterData, RegisterInitialData, UnregisterData, UnwatchEndpoint, WatchEndpoint}
import org.scalamock.scalatest.MockFactory

import scala.concurrent.duration.DurationInt
Expand Down Expand Up @@ -217,17 +205,18 @@ class MemoryQueueTestsFixture

(mockEsClient
.execute[SearchRequest, SearchResponse, Future](_: SearchRequest)(
_: Executor[Future],
_: Functor[Future],
_: http.Executor[Future],
_: Handler[SearchRequest, SearchResponse],
_: Manifest[SearchResponse]))
.expects(searchRequest, *, *, *, *)
_: Manifest[SearchResponse],
_: CommonRequestOptions))
.expects(searchRequest, *, *, *, *, *)
.returns(
Future.successful(RequestSuccess(
200,
None,
Map.empty,
SearchResponse(1, false, false, Map.empty, Shards(0, 0, 0), None, Map.empty, SearchHits(0, 0, Array.empty)))))
SearchResponse(1, false, false, Map.empty, Shards(0, 0, 0), None, Map.empty, SearchHits(Total(0, "eq"), 0, Array.empty)))))
.once()
}

Expand Down
1 change: 0 additions & 1 deletion tools/jenkins/apache/dockerhub.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,5 @@ node('ubuntu') {
withCredentials([string(credentialsId: 'openwhisk_slack_token', variable: 'OPENWHISK_SLACK_TOKEN')]) {
sh "curl -X POST --data-urlencode 'payload={\"channel\": \"#dev\", \"username\": \"whiskbot\", \"text\": \"OpenWhisk Docker Images build and posted to https://hub.docker.com/u/openwhisk by Jenkins job ${BUILD_URL}\", \"icon_emoji\": \":openwhisk:\"}' https://hooks.slack.com/services/${OPENWHISK_SLACK_TOKEN}"
}

}
}
2 changes: 1 addition & 1 deletion tools/macos/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ brew install gnu-tar
# install pip
sudo easy_install pip
# install script prerequisites
pip install docker==5.0.0 ansible==4.1.0 jinja2==3.0.1 couchdb==1.2 httplib2==0.19.1 requests==2.25.1 six=1.16.0
pip install docker==5.0.0 ansible==4.1.0 jinja2==3.0.1 couchdb==1.2 httplib2==0.19.1 requests==2.25.1 six==1.16.0
```

Make sure you correctly configure the environment variable $JAVA_HOME.
Expand Down