Skip to content

Commit

Permalink
Merge pull request #1 from shiv4nsh/master
Browse files Browse the repository at this point in the history
Added the Code and the README file
  • Loading branch information
nraboy committed Aug 1, 2016
2 parents 6b06ff6 + 4b3e899 commit cee3e68
Show file tree
Hide file tree
Showing 18 changed files with 1,214 additions and 1 deletion.
50 changes: 50 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
### Example user template template
### Example user template

# IntelliJ project files
.idea
*.iml
out
gen### Scala template
*.class
*.log

# sbt specific
.cache
.history
.lib/
dist/*
target/
lib_managed/
src_managed/
project/boot/
project/plugins/project/

# Scala-IDE specific
.scala_dependencies
.worksheet
### Java template
*.class

# Mobile Tools for Java (J2ME)
.mtj.tmp/

# Package Files #
*.jar
*.war
*.ear

# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml
hs_err_pid*
### SBT template
# Simple Build Tool
# http://www.scala-sbt.org/release/docs/Getting-Started/Directories.html#configuring-version-control

target/
lib_managed/
src_managed/
project/boot/
.history
.cache

# Created by .ignore support plugin (hsz.mobi)
13 changes: 13 additions & 0 deletions LICENSE
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
Copyright 2014 Typesafe, Inc.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
251 changes: 251 additions & 0 deletions README.adoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,251 @@
---
tags: [couchbase,spark,akka-http]
projects: [spark-couchbase-akka-http-starter-kit]
---
:toc:
:icons: font
:source-highlighter: prettify
:project_id: gs-accessing-data-couchbase
This guide walks you through the process of using http://spark.apache.org/[Apache Spark] and http://doc.akka.io/docs/akka/2.4.8/scala/http/[Akka-http] to build an application that stores data in and retrieves it from http://developer.couchbase.com/[Couchbase], a document-based database.

== What you'll build

You will build a REST api that will run on your development machine. You will access it through your browser at an address like :

http://localhost:8080/

With this REST Api, you’ll be able to perform the basic CRUD (Create,Read,Update,Delete) operations.

* **Create Route:**
http://localhost:8080/insert/name/shivansh/email/shiv4nsh@gmail.com

* **UpdateViaKeyValuePair Route:**
http://localhost:8080/updateViaKV/name/shivansh/email/shivansh@knoldus.com/id/user::267316c9-75a2-49d2-ae25-791d82b3d5fa

* **Retrieve Via Document id:**
http://localhost:8080/getViaKV/id/user::267316c9-75a2-49d2-ae25-791d82b3d5fa

* **Retrieve via View:**
http://localhost:8080/getViaView/name/shivansh

* **Retrieve via N1QL:**
http://localhost:8080/getViaN1Ql/name/shivansh

* **Delete via Document id:**
http://localhost:8080/delete/id/user::267316c9-75a2-49d2-ae25-791d82b3d5fa

In this demonstration, you’ll be interacting with information about people: their name and their email. The Couchbase database will contain one bucket named "userBucket" that contains people. Each person document will be represented in JSON:

You will be using http://spark.apache.org/[Apache Spark] , http://doc.akka.io/docs/akka/2.4.8/scala/http/[Akka-http] and http://developer.couchbase.com/documentation/server/current/connectors/spark-1.0/spark-intro.html[Couchbase Spark Connector]for this project.

== What you'll need

* 15-30 minutes.
* link:https://www.jetbrains.com/help/idea/2016.1/creating-and-running-your-scala-application.html[A Scala code editor]
* link:http://www.scala-sbt.org/0.13/docs/Setup.html[Sbt]
* link:https://www.lightbend.com/activator/download[Lightbend Activator](follow the instructions to install activator and run your first activator application, if you have this activator thtn you do not need the sbt and the scala code editor as it is already provided with it.)
* link:http://www.couchbase.com/nosql-databases/downloads[Couchbase Server 4.5+] (follow the instructions to install Couchbase and create a bucket - this guide assumes you will have it running locally)

== Setup

=== Install and launch Couchbase
With your project set up, you can install and launch Couchbase.

For whatever operating system you are using, instructions and downloads can be found at http://developer.couchbase.com/documentation/server/current/install/install-intro.html[How to install Couchbase?].

After you install Couchbase, launch it. You should see a webpage opening in your default browser allowing you to setup Couchbase

=== Setting up build.sbt

The following code will rely on Apache Spark , spark-couchbase connector and akka-http so make sure you add the correct dependency.Your build.sbt should look like this:
[source,scala]
name := "spark-couchbase-akka-http-starter-kit"
version := "1.0"
scalaVersion := "2.11.7"
organization := "com.knoldus"
val akkaV = "2.4.5"
libraryDependencies ++= Seq(
"org.apache.spark" % "spark-core_2.11" % "1.6.1",
"com.typesafe.akka" %% "akka-http-core" % akkaV,
"com.typesafe.akka" %% "akka-http-experimental" % akkaV,
"com.typesafe.akka" %% "akka-http-testkit" % akkaV % "test",
"com.typesafe.akka" %% "akka-http-spray-json-experimental" % akkaV,
"org.scalatest" %% "scalatest" % "2.2.6" % "test",
"com.couchbase.client" %% "spark-connector" % "1.1.0"
)
assembleArtifact in packageScala := false // We don't need the Scala library, Spark already includes it
mergeStrategy in assembly := {
case m if m.toLowerCase.endsWith("manifest.mf") => MergeStrategy.discard
case m if m.toLowerCase.matches("meta-inf.*\\.sf$") => MergeStrategy.discard
case "reference.conf" => MergeStrategy.concat
case _ => MergeStrategy.first
}
ivyScala := ivyScala.value map { _.copy(overrideScalaVersion = true) }
fork in run := true

For making the assembly we have to specify the Merge strategy and we do not ned the scala dependency because spark already includes it.



== Code

After following the steps above, we're ready to code CRUD operations in Couchbase

We will list all the important steps with the headings hence you can see it below.


== Creating a separate DatabaseFactory file.

In this we will add all the functions related to the Couchbase interactions and will initialize the spark context with couchbase configurations.

=== Creating the Context
[source,scala]

//loading the Configuration file
val config = ConfigFactory.load("application.conf")
val couchbaseUrl = config.getString("couchbase.url")
val bucketName = config.getString("couchbase.bucketName")
val bucketPassword = config.getString("couchbase.bucketPassword")
// setting the spark configuration
val sparkConf: SparkConf = new SparkConf()
.setAppName("spark-akka-http-couchbase-starter-kit").setMaster("local")
.set("com.couchbase.nodes", couchbaseUrl).set(s"com.couchbase.bucket.$bucketName", bucketPassword)
//initializing the spark Context
val sc = new SparkContext(sparkConf)


=== Creating or Updating a document
This function will return true if the document is successfully saved and if not it will return false.
[source,scala]
----
def persistOrUpdate(documentId: String, jsonObject: JsonObject): Boolean = {
val jsonDocument = JsonDocument.create(documentId, jsonObject)
val savedData = sc.parallelize(Seq(jsonDocument))
Try(savedData.saveToCouchbase()).toOption.fold(false)(x => true)
}
----
Couchbase has an idea of document "revisions", which you can read about in the main documentation. If you examine the document after this update, you can see the revision number has increased by one.

=== Reading a document

There are three ways of retrieving the document.We will cover all three here.
[source,scala]
----
val NIQLQUERY = s"SELECT * FROM `$bucketName` WHERE name LIKE"
val VIEWNAME = "emailtoName"
val DDOCNAME = "userddoc"
//Reirieving document via N1ql query
def getViaN1Ql(name: String): Option[Array[String]] = {
val n1qlRDD = Try(sc.couchbaseQuery(N1qlQuery.simple(NIQLQUERY + s"'$name%'")).collect()).toOption
n1qlRDD.map(_.map(a => a.value.toString))
}
//Retrieving data via Couchbase View.
def getViaView(name: String): Option[Array[String]] = {
val viewRDDData = Try(sc.couchbaseView(ViewQuery.from(DDOCNAME, VIEWNAME).startKey(name)).collect()).toOption
viewRDDData.map(_.map(a => a.value.toString))
}
//Retrieving data via Couchbase Document Id (Key Value Pair)
def getViaKV(listOfDocumentIds: String): Option[Array[String]] = {
val idAsRDD = sc.parallelize(listOfDocumentIds.split(","))
Try(idAsRDD.couchbaseGet[JsonDocument]().map(_.content.toString).collect).toOption
}
----


=== Deleting a document

Deleting a document is a single, straightforward call.
[source,scala]
----
def getNIQLDeleteQuery(documentId: String) =s"""DELETE FROM $bucketName p USE KEYS "$documentId" RETURNING p"""
def deleteViaId(documentID: String): Option[Array[String]] = {
val n1qlRDD = Try(sc.couchbaseQuery(N1qlQuery.simple(getNIQLDeleteQuery(documentID))).collect()).toOption
n1qlRDD.map(_.map(a => a.value.toString))
}
----

=== Writing the Akka-http Routes
[source,scala]
implicit val system:ActorSystem
implicit val materializer:ActorMaterializer
val logger = Logging(system, getClass)
// Default Exception Handler
implicit def myExceptionHandler =
ExceptionHandler {
case e: ArithmeticException =>
extractUri { uri =>
complete(HttpResponse(StatusCodes.InternalServerError, entity = s"Data is not persisted and something went wrong"))
}
}


Here is a sample code for writing a single route
[source,scala]
val sparkRoutes: Route = {
get {
path("insert" / "name" / Segment / "email" / Segment) { (name: String, email: String) =>
complete {
val documentId = "user::" + UUID.randomUUID().toString
try {
val jsonObject = JsonObject.create().put("name", name).put("email", email)
val isPersisted = persistOrUpdate(documentId, jsonObject)
isPersisted match {
case true => HttpResponse(StatusCodes.Created, entity = s"Data is successfully persisted with id $documentId")
case false => HttpResponse(StatusCodes.InternalServerError, entity = s"Error found for id : $documentId")
}
} catch {
case ex: Throwable =>
logger.error(ex, ex.getMessage)
HttpResponse(StatusCodes.InternalServerError, entity = s"Error found for id : $documentId")
}
}
}
}

You can find all the routes in the file SparkServices.scala located in src/main/scala/com/knoldus/couchbaseServices/routes/SparkServices.scala

=== Start the Akka-http server.

Read the application.conf file for the configurations of server port and start the server.

[source,scala]
----
class StartSparkServer(implicit val system: ActorSystem,
implicit val materializer: ActorMaterializer) extends SparkService {
def startServer(address: String, port: Int) = {
Http().bindAndHandle(sparkRoutes, address, port)
}
}
object StartApplication extends App {
StartApp
}
object StartApp {
implicit val system: ActorSystem = ActorSystem("Spark-Couchbase-Service")
implicit val executor = system.dispatcher
implicit val materializer = ActorMaterializer()
val server = new StartSparkServer()
val config = server.config
val serverUrl = config.getString("http.interface")
val port = config.getInt("http.port")
server.startServer(serverUrl, port)
}
----

== Run

At this point, the code is ready to run. We haven't added any code to display contents of documents. You may wish to add logging statements and examine output on the console, or even step through the application with the debugger to see the results.

You can run the application using the following command inisde the code directory

sbt run

and if you are using the activator then use

activator run

== Summary
Congratulations! You set up a Couchbase server and wrote a simple Spark-couchbase-akka-http application that stores a document in Couchbase and provides a basic REST api.

1 change: 0 additions & 1 deletion README.md

This file was deleted.

Loading

0 comments on commit cee3e68

Please sign in to comment.