Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Figure out how to launch Stratio deep-spark #11

Closed
alexander-myltsev opened this issue Jan 25, 2016 · 1 comment
Closed

Figure out how to launch Stratio deep-spark #11

alexander-myltsev opened this issue Jan 25, 2016 · 1 comment

Comments

@alexander-myltsev
Copy link
Contributor

No description provided.

@alexander-myltsev
Copy link
Contributor Author

Progress: Stratio/deep-spark#24

Code:

import com.stratio.deep.aerospike.config.AerospikeDeepJobConfig
import com.stratio.deep.aerospike.config.AerospikeConfigFactory
//import com.stratio.deep.core.entity.MessageTestEntity
//Commmons
import com.stratio.deep.commons.annotations._
import com.stratio.deep.commons.config._
import com.stratio.deep.commons.entity._

//Spark
import org.apache.spark.rdd._
import org.apache.spark.SparkContext._
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.Row
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.types.StructType

import com.stratio.deep.core.context.DeepSparkContext

import scala.collection.JavaConversions._
val deepContext = new DeepSparkContext(sc)

import com.stratio.deep.commons.annotations._
import com.stratio.deep.commons.config._
import com.stratio.deep.commons.entity._

@DeepEntity
class MessageTestEntity(id: String, number: Long, message: List[String]) extends IDeepType {

    @DeepField(fieldName = "id", isPartOfClusterKey = true, isPartOfPartitionKey = true)
    private var _id: String = id

    @DeepField
    private var _message: List[String] = message

    @DeepField
    private var _number: Long = number

    def this() = this("", 0L, List.empty)

    def getId(): String = _id

    def setId(id: String): Unit = {
        _id = id
    }

    def getMessage(): List[String] = _message

    def setMessage(message: List[String]): Unit = {
        _message = message
    }

    def getNumber(): Long = _number

    def setNumber(number: Long): Unit = {
        _number = number
    }
}

@DeepEntity
class WordCount(word: String, count: Long) extends IDeepType {

    @DeepField(isPartOfClusterKey = true, isPartOfPartitionKey = true)
    private var _word: String = word

    @DeepField
    private var _count: Long = count

    def this() = this("", 0)

    def this(count: Long) = this("", count)

    def getWord(): String = _word

    def setWord(word: String): Unit = {
        _word = word
    }

    def getCount(): Long = _count

    def setCount(count: Long): Unit = {
        _count = count
    }

    override def toString(): String = {
        val sb = new StringBuilder("WordCount{")
        sb.append("word='").append(word).append('\'')
        sb.append(", count=").append(count)
        sb.append("}")
        sb.toString()
    }
}



val inputConfigEntity: AerospikeDeepJobConfig[MessageTestEntity] = AerospikeConfigFactory.createAerospike(classOf[MessageTestEntity]).host("130.211.182.43").port(3000).namespace("test").set("input").initialize
val inputRDDEntity: RDD[MessageTestEntity] = deepContext.createJavaRDD(inputConfigEntity)

val words: RDD[String] = inputRDDEntity flatMap {
      e: MessageTestEntity => (for (message <- e.getMessage) yield message.split(" ")).flatten
    }

val wordCount : RDD[(String, Long)] = words map { s:String => (s,1) }
val wordCountReduced  = wordCount reduceByKey { (a,b) => a + b }
val outputRDD = wordCountReduced map { e:(String, Long) => new WordCount(e._1, e._2) }


val outputConfigEntity: AerospikeDeepJobConfig[WordCount] = AerospikeConfigFactory.createAerospike(classOf[WordCount]).host("130.211.182.43").port(3000).namespace("test").set("input").initialize
DeepSparkContext.saveRDD(outputRDD, outputConfigEntity)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant