Skip to content
This repository has been archived by the owner on Mar 7, 2018. It is now read-only.

Publish AnalyzedItems to Cassandra #28

Merged
merged 19 commits into from
Jun 23, 2017
Merged

Publish AnalyzedItems to Cassandra #28

merged 19 commits into from
Jun 23, 2017

Conversation

c-w
Copy link
Contributor

@c-w c-w commented Jun 23, 2017

For now, using a test keyspace and table on the cluster that Erik set up:

CREATE KEYSPACE fortistest WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '3'}  AND durable_writes = true;

CREATE TABLE fortistest.events (
    created_at timestamp,
    pipeline text,
    PRIMARY KEY (created_at, pipeline)
)

There will be some follow-up work required to adapt the CassandraSchema class once our events schema is finalized.

Resolves #14

Copy link
Contributor

@Smarker Smarker left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Everything else looks good.


object Utils {
def mean(items: List[Double]): Double = {
items.sum / items.length
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if the length of items is 0? Wouldn't this cause an error?

Copy link
Contributor Author

@c-w c-w Jun 23, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Improved error handling in 37c1983.

def rescale(items: List[Double], min_new: Double, max_new: Double): List[Double] = {
val min_old = items.min
val max_old = items.max
val coef = (max_new - min_new) / (max_old - min_old)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If max_old == min_old, the denominator would be 0. Maybe put a check for this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added error handling in 37c1983.

@@ -0,0 +1,6 @@
package com.microsoft.partnercatalyst.fortis.spark.transforms.gender

object GenderDetector {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May I suggest adding extends Enumeration here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sweet. Done in 8162fe3.

Copy link
Contributor

@jcjimenez jcjimenez left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM with the division by zero check

@jcjimenez jcjimenez merged commit 257576d into master Jun 23, 2017
@c-w c-w deleted the publish-to-cassandra branch June 23, 2017 20:15
Copy link
Contributor

@erikschlegel erikschlegel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great. Nice work. Left some comments around offloading the uuid() call to cassandra.

Starting next week, I'll add the pieces to aggregate the results by place, tile and topic and make the saveToCassnadra call for each.

@@ -19,6 +20,7 @@ object TadawebPipeline extends Pipeline {
import transformContext._

stream.map(tada => AnalyzedItem(
id = randomUUID(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be called via the uuid() function in cassandra.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The advantage of creating the id early is that we have a way to track every event through the pipeline (e.g. useful when logging). Is this benefit worth explicitly creating the UUID?

@@ -17,6 +18,7 @@ object RadioPipeline extends Pipeline {

private def convertToSchema(stream: DStream[RadioTranscription], transformContext: TransformContext): DStream[AnalyzedItem] = {
stream.map(transcription => AnalyzedItem(
id = randomUUID(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be called via the uuid() function in cassandra.

@@ -18,6 +19,7 @@ object InstagramPipeline extends Pipeline {
// do computer vision analysis
val analysis = imageAnalyzer.analyze(instagram.images.standard_resolution.url)
AnalyzedItem(
id = randomUUID(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be called via the uuid() function in cassandra.

import org.scalatest.{BeforeAndAfter, FlatSpec}

import scala.collection.mutable

class StreamProviderSpec extends FlatSpec with BeforeAndAfter {
class SparkSpec extends FlatSpec with BeforeAndAfter {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would ideally like to keep the StreamProviderSpec separate since the StreamProvider package is an isolated component that could be moved out to a library at any time (and it'd be nice to take the test spec with it).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As per the comment on 629e5d3, we can only have a single Spark Context running per JVM so I merged all the tests for now (they were previously split but it's non-trivial to get that to work). If this becomes an issue, we can spend the time to figure out how to split the tests.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I briefly looked into using the spark-testing-base package but they don't have anything built-in for Spark Streaming. I started working on a streaming extension but got some odd errors so preferred to just push this for now. I'll look more into this when I get a free minute.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants