Skip to content
This repository has been archived by the owner on Mar 11, 2023. It is now read-only.

Akka conflict in Flink environment #225

Open
wzorgdrager opened this issue Jun 11, 2018 · 4 comments
Open

Akka conflict in Flink environment #225

wzorgdrager opened this issue Jun 11, 2018 · 4 comments
Labels

Comments

@wzorgdrager
Copy link

wzorgdrager commented Jun 11, 2018

Hi,

I'm currently running a Twitter4s application in a Flink cluster.
And I'm running into the following error:

taskrunner_1  | Uncaught error from thread [twitter4s-rest-akka.actor.default-dispatcher-5]: loader constraint violation: when resolving method "akka.actor.ActorCell$$anonfun$3.<init>(Lakka/actor/ActorCell;)V" the class loader (instance of org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders$ChildFirstClassLoader) of the current class, akka/actor/ActorCell, and the class loader (instance of sun/misc/Launcher$AppClassLoader) for the method's defining class, akka/actor/ActorCell$$anonfun$3, have different Class objects for the type akka/actor/ActorCell used in the signature, shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled for for ActorSystem[twitter4s-rest]

taskrunner_1  | java.lang.LinkageError: loader constraint violation: when resolving method "akka.actor.ActorCell$$anonfun$3.<init>(Lakka/actor/ActorCell;)V" the class loader (instance of org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders$ChildFirstClassLoader) of the current class, akka/actor/ActorCell, and the class loader (instance of sun/misc/Launcher$AppClassLoader) for the method's defining class, akka/actor/ActorCell$$anonfun$3, have different Class objects for the type akka/actor/ActorCell used in the signature

taskrunner_1  | Exception in thread "twitter4s-streaming-shutdown-hook-1" java.lang.NoClassDefFoundError: akka/actor/CoordinatedShutdown$$anonfun$totalTimeout$1

Basically what my application does:

  1. Retrieve trending topics.
  2. Start statuses stream and filter on trending topics.
  3. Repeat this every 15 minutes.

Here some snippets of my code:

override def run(ctx: SourceFunction.SourceContext[TweetWrapper]): Unit = {
    var stream: TwitterStream = null

    while (isRunning) {
      //get stream
      val currentTrends = requestTrending()
      logger.info(s"Retrieved new trends: $currentTrends")

      //close the old stream
      if (stream != null) Await.result(stream.close(), 10 seconds)

      //start new stream
      stream = Await.result(startStream(currentTrends, ctx), 10 seconds)

      //time to sleep
      logger.info(s"Started new stream, now sleeping for $sleepTime minute(s).")
      Thread.sleep(sleepTimeMilli)
    }
  }

And the startStream method:

def startStream(trending: List[String], ctx: SourceFunction.SourceContext[TweetWrapper]): Future[TwitterStream] = {
    getStreamingClient.filterStatuses(tracks = trending) {
      case tweet: Tweet => ctx.collectWithTimestamp(TweetWrapper(tweet), tweet.created_at.getTime)
      case x => logger.info(x)
    }
  }

I think this error is caused by version conflicts related to Akka (Flink also uses Akka), but I have no idea how to solve it. The Twitter4s version I'm using: 5.5

@wzorgdrager
Copy link
Author

Update: what I tried now is to 'shade' akka in the twitter4s library into an alternative package with the following lines in my build:

assemblyShadeRules in assembly := Seq(
  ShadeRule
    .rename("akka.**" -> "twitterAkka.@1")
    .inAll
)

The shading works fine, but I then run into the following runtime error:

com.typesafe.config.ConfigException$Missing: No configuration setting found for key 'twitterAkka'
	at com.typesafe.config.impl.SimpleConfig.findKeyOrNull(SimpleConfig.java:156)
	at com.typesafe.config.impl.SimpleConfig.findKey(SimpleConfig.java:149)
	at com.typesafe.config.impl.SimpleConfig.findOrNull(SimpleConfig.java:176)
	at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:188)
	at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:193)
	at com.typesafe.config.impl.SimpleConfig.getString(SimpleConfig.java:250)
	at twitterAkka.actor.ActorSystem$Settings.<init>(ActorSystem.scala:315)
	at twitterAkka.actor.ActorSystemImpl.<init>(ActorSystem.scala:683)
	at twitterAkka.actor.ActorSystem$.apply(ActorSystem.scala:245)
	at twitterAkka.actor.ActorSystem$.apply(ActorSystem.scala:288)
	at twitterAkka.actor.ActorSystem$.apply(ActorSystem.scala:233)
	at com.danielasfregola.twitter4s.TwitterRestClient$.$lessinit$greater$default$3(TwitterRestClient.scala:31)
	at com.danielasfregola.twitter4s.TwitterRestClient$.apply(TwitterRestClient.scala:71)
	at org.codefeedr.plugin.twitter.stages.TwitterTrendingStatusSource.getRestClient(TwitterTrendingStatusInput.scala:80)
	at org.codefeedr.plugin.twitter.stages.TwitterTrendingStatusSource.requestTrending(TwitterTrendingStatusInput.scala:126)
	at org.codefeedr.plugin.twitter.stages.TwitterTrendingStatusSource.run(TwitterTrendingStatusInput.scala:91)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:86)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:94)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
	at java.lang.Thread.run(Thread.java:748)

Hope you can help.

@DanielaSfregola
Copy link
Owner

My apologies for the late reply.

It seems that it is having problems creating the ActorSystem...? At least, this is my best guess. What happens if you use the function withActorSystem and you pass the actor system as a parameter?

Regards,
D.

@ljurukov
Copy link

So I am finding a similar issues for unrelated reasons. The reason as to why shading isn't working in this case is because of how Akka names its packages (ie akka.blah.blah). The relocation process ends up changing more than is intended however due to this.
Akka has reference.conf files that it uses to keep defaults. The structure of which, when referenced in text looks similar "akka.blah.blah". I suspect it changes this in the code thinking that it is a stringified class name, resulting in your case looking for a key twitterAkka.blah.blah
However the associated reference.conf files are not changed as well and have the original akka.blah.blah path.

@wzorgdrager
Copy link
Author

Yes, you're right on that. We resolved this back then by downgrading some versions of dependencies we were using. However, in the long run that isn't really a good solution.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
Projects
None yet
Development

No branches or pull requests

3 participants