Skip to content
This repository has been archived by the owner. It is now read-only.

Replace Cassandra sink with Kafka sink #54

Merged
merged 2 commits into from Jul 17, 2017

Conversation

Projects
None yet
4 participants
@c-w
Copy link
Member

commented Jul 15, 2017

We publish a simple flat/primitive structure to Kafka so that [1] the event is easy to process by any downstream processing (e.g. Spark Structured Streaming or DB ingestion) and [2] we decouple the Spark Streaming data representation from the data storage representation in Kafka.

@c-w c-w requested a review from kevinhartman Jul 15, 2017

@c-w c-w requested a review from erikschlegel Jul 15, 2017

@c-w c-w force-pushed the kafka-sink branch from 90deb7f to b0c5df6 Jul 15, 2017

@c-w c-w force-pushed the kafka-sink branch from b0c5df6 to 84d9502 Jul 15, 2017

@jcjimenez
Copy link
Contributor

left a comment

LGTM with minor question.

insertion_time: Long
)

case class Stream(

This comment has been minimized.

Copy link
@jcjimenez

jcjimenez Jul 15, 2017

Contributor

Don't we still need this Stream class (and maybe TrustedSource below) to setup our pipeline? Or maybe a dupe of this class exists outside the com.microsoft.partnercatalyst.fortis.spark.sinks.cassandra package? (if not I would propose to move to someplace like com.microsoft.partnercatalyst.fortis.spark.CassandraSchema)

This comment has been minimized.

Copy link
@c-w

c-w Jul 16, 2017

Author Member

This wasn't referenced anywhere and with the schema changes, I'm not sure how useful this still would be. If @kevinhartman's work needs this, it'll come out in the merge when the classes can be added back.

This comment has been minimized.

Copy link
@kevinhartman

kevinhartman Jul 17, 2017

Contributor

@jcjimenez I'll add them elsewhere as you've proposed once they're needed. Thanks for pointing this out.

@c-w c-w merged commit 3f40046 into master Jul 17, 2017

2 checks passed

continuous-integration/travis-ci/pr The Travis CI build passed
Details
continuous-integration/travis-ci/push The Travis CI build passed
Details

@c-w c-w deleted the kafka-sink branch Jul 17, 2017

@c-w c-w removed the in progress label Jul 17, 2017

@kevinhartman
Copy link
Contributor

left a comment

LGTM.

Makes sense to create a separate Spark job(s) to process results from our ingestion/analysis to allow us (and others) to build downstream components such as ML predictors without coupling that logic to ingestion.

"type": "string"
}
},
"sentimens": {

This comment has been minimized.

Copy link
@kevinhartman

kevinhartman Jul 17, 2017

Contributor

"sentiments"

This comment has been minimized.

Copy link
@c-w

c-w Jul 17, 2017

Author Member

Thanks, see #56


A repository for all spark jobs running on fortis
A repository for Project Fortis' data ingestion Spark jobs.

This comment has been minimized.

Copy link
@kevinhartman

kevinhartman Jul 17, 2017

Contributor

nit: Project Fortis's

This comment has been minimized.

Copy link
@c-w

c-w Jul 17, 2017

Author Member

Doh, I always get this one wrong :( See #56

@erikschlegel

This comment has been minimized.

Copy link
Contributor

commented Jul 17, 2017

@c-w A question related to this PR came up during todays standup, which can wait til tomorrow for an answer... What are the drawbacks with converting the fortisEvents to a DataFrame via toDF as opposed to funneling all pipeline events to Kafka? We'll probably need to add ~2 new VMs to our cluster for Kafka. I fully agree with the direction towards DataFrames / Datasets, but there seems to be other options available to us https://indatalabs.com/blog/data-engineering/convert-spark-rdd-to-dataframe-dataset. I feel like it's worth a dialogue before we start walking down this road.

@c-w

This comment has been minimized.

Copy link
Member Author

commented Jul 18, 2017

@erikschlegel From my understanding, DataFrames and Structured Streaming are orthogonal concerns. The former is a way to represent data inside of Spark, the latter is a way to process data by event-time as opposed to batch-time; it's merely coincidental that they both expose a SQL API. There's a section in the Structured Streaming docs that talks a bit about the difference and about the currently available ways to create a structured stream (which for now is just Kafka or the file system).

@erikschlegel

This comment has been minimized.

Copy link
Contributor

commented Jul 18, 2017

@c-w I appreciate you putting this together, but I'm still unclear that we need the dev-ops overhead of Kafka introduced to the pipeline. We can transform the rdd collection of fortisEvents to DataFrames via val df = sqlContext.createDataFrame(fortisEvents. I understand that Kafka offers us the ability to replay historic streams, which isn't a requirement at the moment. Converting our events to DataFrames will ensure our event batch windows are reliably processed. My preference is to convert the rdds to DFs in ProjectFortis.scala... Open to hear more about what other benefits Kafka opens to us which warrants the cost and maintenance overhead?

@c-w

This comment has been minimized.

Copy link
Member Author

commented Jul 18, 2017

@erikschlegel We talked about this in detail last week. We need structured streaming to deal with stragglers when aggregating the streams into time windows. Consider the following situation:

Let's assume our window size is Tw=5s

Let's assume we have event E1 generated at T0
Let's assume we have event E2 generated at T0+1s
Let's assume we have event E3 generated at T0+2s

We want E1, E2 and E3 to be aggregated in the same window since they all happened within a delta of Tw

Now let's add some processing downstream of the events.
E1 is simple so we process it in 1s, done at T0+1s
E2 is simple so we process it in 1s, done at T0+2s
E3 is complex so we process it in 4s, done at T0+6s

Now we run the aggregation windows and we only pick up E1 and E2 and drop E3 because it's a straggler.

Spark DataFrames won't help us with this issue (sample reference 1, sample reference 2). The only way to solve the straggler problem in traditional Spark is to make your batch sizes big enough that you hope you will capture all the variance in processing within it which is not a reliable approach and should not be used if we want even just semi-reliable aggregation results.

DataFrames and structured streaming are unrelated, really; they just have the same API: DataFrames are a way to give a SQL-like query interface for a set of Spark RDDs, Structured streaming on the other hand is a way to aggregate data and automatically re-run the aggregation in case of stragglers.

@erikschlegel

This comment has been minimized.

Copy link
Contributor

commented Jul 18, 2017

Agree with you on using Structured Streaming. We can use structured streaming through event hub(which is a managed Azure service) or just converting each rdd into a dataframe. My earlier point was to better understand the benefits of Kafka as opposed to the other two approaches mentioned ^^^.

@c-w

This comment has been minimized.

Copy link
Member Author

commented Jul 18, 2017

I didn't know that EventHub offers Structured Streaming; thanks for bringing that to my attention. Reading their docs, it looks as though the functionality is pretty beta-quality though, so I'm not sure how comfortable I'd be building on it, especially given that their future improvements contain some pretty basic functionality and there isn't even sample code:

image

Given that Structured Streaming itself is pretty new, I'd simply go for the path of least resistance and use the Spark recommended way to integrate with Structured Streaming which is Kafka.

Additionally, Kafka is part of essentially every data pipeline out there, so if we can make it easier to stand up on Azure via the Fortis work, that'll be a win in general as it's certainly re-usable functionality :)

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
You can’t perform that action at this time.