Skip to content
This repository has been archived by the owner on Mar 7, 2018. It is now read-only.

Implement radio ingestion #8

Merged
merged 15 commits into from Jun 7, 2017
Merged

Implement radio ingestion #8

merged 15 commits into from Jun 7, 2017

Conversation

c-w
Copy link
Contributor

@c-w c-w commented Jun 6, 2017

This PR implements Audio Streaming Integration by integrating the SpeechToText-WebSockets-Java with our Spark platform.

The integration works as follows:

  • We create a transcriber object via the speech-to-text library.
  • We open a standard URL connection to the radio stream and connect it to the transcriber.
  • The transcriber takes care of reading the audio stream, converting it to text, etc.
  • Whenever a transcription is complete, the transcriber calls our callback. In this callback, we store the transcribed audio in our Spark receiver which triggers the downstream Spark Streaming pipeline.

Pipeline in action:

image

Note that the speech-to-text library is still fairly brittle and will need some improvements to be fully production-ready. I recommend to start playing with the integration, collecting some issues and then fixing them in bulk.

The feature service seems to be broken/changed recently, so to test the core of this PR, I also needed to make our feature service client more robust to failures. These changes are also included in this PR: we now ignore bad responses from the feature service and we also ignore when we're unable to connect to the feature service. Even if the feature service is down, the rest of the pipeline can still work (just the locations-from-text inference piece won't work) so I assert that it's good to make our pipeline robust to feature service outages.

Copy link
Contributor

@kevinhartman kevinhartman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

// sentiment detection
val text = analyzedRadio.originalItem.text
val language = analyzedRadio.analysis.language.getOrElse("")
val inferredSentiment = sentimentDetection.detectSentiment(text, language).map(List(_)).getOrElse(List())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

toList on Option[String] should do what you want :)

Copy link
Contributor Author

@c-w c-w Jun 6, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ugh, Scala strikes again -- that's not what I'd call more readable/understandable. getOrElse seems more explicit to me.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Heh yeah but I think the major offense is just that you can enumerate an Option. Since we're already doing that, toList doesn't really push the boundaries too much further IMO :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently we use getOrElse in many places and isDefined / get in others. We should in bulk move all of this to whichever approach you think is better going forward (e.g. match / case).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM

private var audioStream: InputStream = _
private var transcriber: Transcriber = _

private lazy val onTranscription = new Func[String] {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps a comment for another repo, but why is there a custom class for Func coming from speechtotext.utils?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

speechtotext.utils.Func is part of the Java speech-to-text websocket package. Java doesn't have a native function interface so I had to make one.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

@c-w c-w Jun 6, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It shows that most of my Java-days were pre-8. I'll update the STT library and push a new version.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in 23d6409.

}
}

case class RadioTranscription(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd move this out to a dto folder. I can imagine clients that may want to work with RadioTranscriptions but not RadioStreamFactory.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in 33347bb

}
}

class RadioInputDStream(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should move this class and its dependencies into a separate package for cleaner separation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in 33347bb

} catch {
case ex: ParseException =>
logError(s"Unable to parse feature service response: $response", ex)
List()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a client of this code, I'd like to distinguish between the error case and having no features found. Perhaps this interface would be a good candidate for a return type of Try[T]?

Here's a pretty good article describing functional error handling with Try in Scala.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in c110efa.

fetchResponse(fetch)
}

private def fetchResponse(url: String): String = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like another good candidate for Try[String].

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in c110efa.

}

private def parseResponse(response: String): Iterable[FeatureServiceFeature] = {
private def unpack(responseBody: Try[String], endpointName: String): Iterable[FeatureServiceFeature] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error is still swallowed here. I would expect that clients of the public methods of this class should be able to distinguish between empty results and an internal error.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now is not the time to change the public API of this class. We currently have many cases where we're not handling dependency failure so we need to audit this anyways in the future.

@c-w
Copy link
Contributor Author

c-w commented Jun 6, 2017

Addressed all actionable comments from Kevin, now waiting for sign off from @erikschlegel.

Copy link
Contributor

@erikschlegel erikschlegel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code looks great @c-w . Only recommendation was to include one other radio stream to prove out that use-case.

Great work!

@@ -278,6 +314,19 @@ object DemoFortis {
)
)
),
"radio" -> List(
ConnectorConfig(
"Radio",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you provide an example with streaming from more than one source, just to prove out that we can support a list of audio sources.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The radio stream is designed to process a single radio feed. For ingesting multiple radio feeds, we should use multiple radio streams. I'm syncing up with @kevinhartman to see how we'd represent that in the current stream-factory paradigm since your ask is really a question for the stream-factories rather than the stream implementations.

@c-w
Copy link
Contributor Author

c-w commented Jun 7, 2017

Closing this off. Let's revisit the questions around multiple copies of the same stream configured with different parameters as a separate thread.

@c-w c-w merged commit e6bffa3 into master Jun 7, 2017
@c-w c-w removed the in progress label Jun 7, 2017
@c-w c-w deleted the radio-ingestion branch June 7, 2017 13:10
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants