Skip to content


Folders and files

Last commit message
Last commit date

Latest commit



46 Commits

Repository files navigation


This Spark application loads data from a Kafka queue into a DSE Graph instance, by way of Spark Streaming and DSE GraphFrames APIs.


This project leverage sbt for Scala package management and compilation. Make sure sbt is installed on your machine (

echo "deb /" | sudo tee -a /etc/apt/sources.list.d/sbt.list
sudo apt-key adv --keyserver hkp:// --recv 2EE0EA64E40A89B84B2DF73499E82A75642AC823
sudo apt-get update
sudo apt-get install sbt

Next, clone the kafka_to_sparkstreaming_to_graph project to your cluster that is running DSE Spark, and build with sbt

git clone
cd <PATH TO kafka_to_sparkstreaming_to_graph>
sbt package

In addition, one must have DSE Spark and DSE Grapph both already running on their cluster.


In this Spark application, we will be dealing with a subset of the Amazon graph data set ( to your DSE Spark cluster. The data comes in JSON format, with the following respective document schema:

  • Item metadata:

{'asin': '0700026444', 'description': 'Silverpot Tea, Pure Darjeeling, is an exquisite tea enjoyed at leisure, when you want to relax or celebrate.   Its mellow yet layered taste will constantly surprise and delight.  This subtle and refined tea is of incomparable taste and flavor.  \n\nPackaged in an exotic handmade pinewood chestlet, this magnificent tea was rolled from tender leaves grown and hand plucked in the legendary mist covered fields of Darjeeling.\n\nFor authentic experience of this luxury tea, it is presented as 100 gms  loose leaf, and is a great way to show your appreciation, celebrate an occasion or send your best wishes.\n\nSilverpot is different - it embraces the pioneering spirit of the Ghose family, personifies the highest standards of excellence, and is proof that this is only possible from the love and care of a family tea company located at the country of origin, with a tea tradition of five generations.The Silverpot philosophy is, first and foremost, about a passionatecommitment to quality and a series of uncompromising choices. These, taken together, create a taste anda style that is as legendary as it is unique. Silverpot Tea balances richness, freshness andfinesse, and is nurtured from the leaf to the cup with painstaking care and attention to detail.HERITAGE.Silverpot is born of a cherished heritage dating back to 1879, when tea was still a monopoly of the colonial rulers, and the Ghose family established Indias first native owned tea plantation. The legacy of Debes Chandra Ghose and his family continues till this day. Each expression of Silverpot draws on the companys long history, rich traditions and legendary dedication to quality teas.', 'title': 'Pure Darjeeling Tea: Loose Leaf', 'imUrl': '', 'salesRank': {'Grocery & Gourmet Food': 620307}, 'categories': [['Grocery & Gourmet Food']]}
  • Item review data:

{"reviewerID": "A3I0AV0UJX5OH0", "asin": "1403796890", "reviewerName": "Alicia b", "helpful": [0, 0], "reviewText": "I ordered spongbob slippers and I got John Cena not too happy ... my son was looking forward to them being spongebob!! ..  there very thin :(((( ps if I wanted john cena I would have ordered that ... zero stars", "overall": 1.0, "summary": "grrrrrrr", "unixReviewTime": 1385942400, "reviewTime": "12 2, 2013"}

This makes the DataFrames API a natural choice to use for its storage and manipulation in Spark.

Provided in the project’s resources/scripts subdirectory is a bash script for downloading and unzipping the Grocery and Gourmet Foods subset of the Amazon data. Go ahead and run the following:

~/kafka_to_sparkstreaming_to_graph$ export APP_ROOT=$HOME/kafka_to_sparkstreaming_to_graph/
~/kafka_to_sparkstreaming_to_graph$ bash resources/scripts/
--2018-07-02 01:28:46--
Resolving (
Connecting to (||:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 54490289 (52M) [application/x-gzip]
Saving to: ‘/home/automaton/kafka_to_sparkstreaming_to_graph//amazon_data/meta_Grocery_and_Gourmet_Food.json.gz’

100%[=================================================================================>] 54,490,289  74.9MB/s   in 0.7s

2018-07-02 01:28:46 (74.9 MB/s) - ‘/home/automaton/kafka_to_sparkstreaming_to_graph//amazon_data/meta_Grocery_and_Gourmet_Food.json.gz’ saved [54490289/54490289]

--2018-07-02 01:28:46--
Resolving (
Connecting to (||:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 231127100 (220M) [application/x-gzip]
Saving to: ‘/home/automaton/kafka_to_sparkstreaming_to_graph//amazon_data/reviews_Grocery_and_Gourmet_Food.json.gz’

100%[=================================================================================>] 231,127,100 43.0MB/s   in 4.6s

2018-07-02 01:28:51 (48.1 MB/s) - ‘/home/automaton/kafka_to_sparkstreaming_to_graph//amazon_data/reviews_Grocery_and_Gourmet_Food.json.gz’ saved [231127100/231127100]

You should now see a subdirectory called amazon_data, containing two json files:

~/kafka_to_sparkstreaming_to_graph$ ls
amazon_data  build.sbt  project  resources  src  target
~/kafka_to_sparkstreaming_to_graph$ cd amazon_data/
~/kafka_to_sparkstreaming_to_graph/amazon_data$ ls
meta_Grocery_and_Gourmet_Food.json  reviews_Grocery_and_Gourmet_Food.json


Provided in the project’s resources/schema subdirectory is a sample DSE Graph schema script AmazonDSEMaterializedCustomID.groovy, used for loading data into a DSE Graph instance called graph_stress. Let’s go ahead and create this Graph schema through the Gremlin shell with the following script:

~/kafka_to_sparkstreaming_to_graph$ cat resources/schema/AmazonDSEMaterializedCustomID.groovy | dse gremlin-console

         (o o)
plugin activated: tinkerpop.tinkergraph
plugin activated: tinkerpop.server
plugin activated: tinkerpop.utilities
gremlin> if(system.graph("graph_stress").exists()) {
......1>   system.graph("graph_stress").drop()
......2> }
gremlin> system.graph("graph_stress").create()
gremlin> :remote config alias g graph_stress.g
gremlin> schema.config().option('graph.schema_mode').set(com.datastax.bdp.graph.api.model.Schema.Mode.Production);
gremlin> schema.propertyKey('summary').Text().single().create()
gremlin> schema.propertyKey('rating').Double().single().create()
gremlin> schema.propertyKey('description').Text().single().create()
gremlin> schema.propertyKey('title').Text().single().create()
gremlin> schema.propertyKey('imUrl').Text().single().create()
gremlin> schema.propertyKey('name').Text().single().create()
gremlin> schema.propertyKey('price').Double().single().create()
gremlin> schema.propertyKey('rank').Int().single().create()
gremlin> schema.propertyKey('id').Text().single().create()
gremlin> schema.propertyKey('helpful').Double().single().create()
gremlin> schema.propertyKey('brand').Text().single().create()
gremlin> schema.propertyKey('reviewText').Text().single().create()
gremlin> schema.propertyKey('timestamp').Timestamp().single().create()
gremlin> schema.propertyKey('timestampAsText').Text().single().create()
gremlin> schema.propertyKey('answer').Text().single().create()
gremlin> schema.propertyKey('answerType').Text().single().create()
gremlin> schema.propertyKey('question').Text().single().create()
gremlin> schema.propertyKey('questionType').Text().single().create()
gremlin> schema.vertexLabel('Item').partitionKey("id").properties('price', 'title', 'imUrl', 'description', 'brand').create()
gremlin> schema.vertexLabel('Category').partitionKey('id').create()
gremlin> schema.vertexLabel('Customer').partitionKey('id').properties('name').create()
gremlin> schema.vertexLabel('Question').partitionKey('id').properties('timestampAsText', 'timestamp', 'answerType', 'answer', 'question', 'questionType').create()
gremlin> schema.edgeLabel('viewed_with').connection('Item', 'Item').create()
gremlin> schema.edgeLabel('also_bought').connection('Item', 'Item').create()
gremlin> schema.edgeLabel('reviewed').properties('summary', 'reviewText', 'timestampAsText', 'timestamp', 'helpful', 'rating').connection('Customer', 'Item').create()
gremlin> schema.edgeLabel('purchased_with').connection('Item', 'Item').create()
gremlin> schema.edgeLabel('belongs_in_category').connection('Item', 'Category').create()
gremlin> schema.edgeLabel('has_salesRank').properties('rank').connection('Item', 'Category').create()
gremlin> schema.edgeLabel('bought_after_viewing').connection('Item', 'Item').create()
gremlin> schema.vertexLabel('Item').index('byprice').materialized().by('price').add()
gremlin> schema.vertexLabel('Customer').index('byrating').outE("reviewed").by('rating').add()
gremlin> schema.vertexLabel('Item').index('byreviewText').outE("reviewed").by('reviewText').add()
gremlin> schema.vertexLabel('Item').index('bytimestamp').inE('reviewed').by('timestamp').add()
gremlin> schema.vertexLabel("Item").index("customer_review_by_rating").inE("reviewed").by("rating").add()
gremlin> schema.vertexLabel('Question').index('byAnswerType').materialized().by('answerType').add()


This Spark application will be reading data that is streamed into appropriate queues (topics) in Kafka. The following steps will get you started with this initial setup:

  • From the project’s root directory unpack the Kafka project that is provided:

~/kafka_to_sparkstreaming_to_graph$ tar -zxvf resources/kafka_2.11-

You should now see a subdirectory kafka_2.11-, which contains pre-built Kafka 2.11.

~/kafka_to_sparkstreaming_to_graph$ ls
amazon_data  build.sbt  kafka_2.11-  project  resources  src  target
  • Launch the ZooKeeper server as a background process:

~/kafka_to_sparkstreaming_to_graph$ kafka_2.11- kafka_2.11- &

Note that we are specifying an already provided configuration file kafka_2.11- here.

When listing all running Java processes, we should now see QuorumPeerMain

~/kafka_to_sparkstreaming_to_graph$ jps
14417 QuorumPeerMain
14703 Jps
  • Launch the Kafka server as a background process:

~/kafka_to_sparkstreaming_to_graph$ kafka_2.11- kafka_2.11- &

Again, we are using a provided configuration file kafka_2.11- kafka_2.11- for this server.

When listing all running Java processes, we should now also see Kafka:

~/kafka_to_sparkstreaming_to_graph$ jps
14417 QuorumPeerMain
15396 Jps
15071 Kafka


The main abstraction which Kafka uses for reading and storing streamed input is a topic. As mentioned previously, our downloaded Amazon data set consists of two json files:

  • meta_Grocery_and_Gourmet_Food.json : metadata associated to Amazon items

  • reviews_Grocery_and_Gourmet_Food.json : data corresponding to customer reviews of Amazon items

Let’s go ahead and make a topic for each of these files:

~/kafka_to_sparkstreaming_to_graph$ kafka_2.11- --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1  --topic metadata

~/kafka_to_sparkstreaming_to_graph$ kafka_2.11- --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1  --topic reviews

Let us verify that the topics were in fact successfully created:

~/kafka_to_sparkstreaming_to_graph$ kafka_2.11- --describe --zookeeper localhost:2181
[2018-07-02 00:19:39,818] INFO Accepted socket connection from / (org.apache.zookeeper.server.NIOServerCnxnFactory)
[2018-07-02 00:19:39,821] INFO Client attempting to establish new session at / (org.apache.zookeeper.server.ZooKeeperServer)
[2018-07-02 00:19:39,822] INFO Established session 0x16456fc6dc80003 with negotiated timeout 30000 for client / (org.apache.zookeeper.server.ZooKeeperServer)
Topic:metadata	PartitionCount:1	ReplicationFactor:1	Configs:
	Topic: metadata	Partition: 0	Leader: 0	Replicas: 0	Isr: 0
Topic:reviews	PartitionCount:1	ReplicationFactor:1	Configs:
	Topic: reviews	Partition: 0	Leader: 0	Replicas: 0	Isr: 0
[2018-07-02 00:19:40,075] INFO Processed session termination for sessionid: 0x16456fc6dc80003 (org.apache.zookeeper.server.PrepRequestProcessor)
[2018-07-02 00:19:40,077] INFO Closed socket connection for client / which had sessionid 0x16456fc6dc80003 (org.apache.zookeeper.server.NIOServerCnxn)

Next, we re-direct data from our JSON files into the respective topics that were just created

  • Item metadata:

~/kafka_to_sparkstreaming_to_graph$ kafka_2.11- --broker-list localhost:9092 --topic metadata < amazon_data/meta_Grocery_and_Gourmet_Food.json
  • Item review data:

~/kafka_to_sparkstreaming_to_graph$ kafka_2.11- --broker-list localhost:9092 --topic reviews < amazon_data/reviews_Grocery_and_Gourmet_Food.json


Now that the overhead involved with setting up our Kafka topics and DSE Graph instance is finished, we return to the Spark application itself, for loading data into our created DSE Graph instance 'graph_stress`. The following command will do such:

~/kafka_to_sparkstreaming_to_graph$ dse spark-submit --packages org.apache.spark:spark-streaming-kafka-0-10_2.11:2.1.1  target/scala-2.11/kafka_to_spark_streaming_to_graph_2.11-0.1.jar graph_stress metadata reviews

Note that the ordered arguments we pass in are graph_stress, metadata, and reviews; these correspond to the name of the DSE Graph instance to load data into via DSE GraphFrames API, the Kafka topic containing Amazon metadata, and the Kafka topic containing Amazon review data, respectively.


Once the Spark application has successfully processed all records in both Kafka topics, and finished, we can verify that loading was successful through the DSE Spark shell:


No description, website, or topics provided.






No releases published


No packages published