In [1]:
%Addjar https://github.com/ibm-cds-labs/spark.samples/raw/master/dist/streaming-twitter-assembly-1.4.jar

Using cached version of streaming-twitter-assembly-1.4.jar


In [2]:
import org.apache.spark.sql.SQLContext

In [3]:
val sqlContext = new SQLContext(sc)

In [4]:
import org.apache.spark.storage.StorageLevel

In [5]:
val tweets = sqlContext.read.json("swift://ActorsFeb.spark/")

In [6]:
val flattened = tweets.select("tweets.message.body").explode("body", "bo"){c: scala.collection.mutable.WrappedArray[String] => c}

In [7]:
val alltweets = flattened.select("bo").collect()

In [8]:
import com.ibm.cds.spark.samples._

In [9]:
val tweetList_StatusAdapter = sc.parallelize(alltweets.map(tweet => new com.ibm.cds.spark.samples.StatusAdapter("","","",tweet.getString(0),0.0,0.0) ))

In [10]:
import com.google.common.base.CharMatcher

In [11]:
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.types._
import com.ibm.cds.spark.samples._
import org.apache.spark.sql.Row

In [12]:
var schemaTweets : StructType = null
  if ( schemaTweets == null ){
    val schemaString = "author date lang:double text lat:double long:double"
    schemaTweets =
      StructType(
        schemaString.split(" ").map(
          fieldName => {
            val ar = fieldName.split(":")
            StructField(
                ar.lift(0).get, 
                ar.lift(1).getOrElse("string") match{
                  case "int" => IntegerType
                  case "double" => DoubleType
                  case _ => StringType
                },
                true)
          }
        ).union( 
            ToneAnalyzer.sentimentFactors.map( f => StructField( f._1, DoubleType )).toArray[StructField]
        )
      )
  }

In [13]:
import com.ibm.cds.spark.samples.config.DemoConfig
import org.http4s.client.blaze.PooledHttp1Client
import org.apache.spark.sql.Row

In [14]:
val config1 = new DemoConfig
val keys = config1.getConfig("tweets.key").split(",");
 //Tone Analyzer service credential copied from section above
    config1.setConfig("watson.tone.url","https://gateway.watsonplatform.net/tone-analyzer-experimental/api")
    config1.setConfig("watson.tone.password","xxxxxx")
    config1.setConfig("watson.tone.username","xxxxxxx")

In [15]:
 val broadcastVar = sc.broadcast(config1.toImmutableMap)

In [16]:
import scala.collection.mutable._

In [17]:
   val rowTweets = tweetList_StatusAdapter.map(status => {
    lazy val client = PooledHttp1Client()
    val sentiment = ToneAnalyzer.computeSentiment( client, status, broadcastVar )

    var colValues = Array[Any](
      "", //author
      "",   //date
      0.0,  //Lang
      status.text,               //text
      0.0,      //lat
      0.0    //long
      //exception
    )

    var scoreMap : Map[String, Double] = Map()
    if ( sentiment != null ){
      for ( tone <- Option( sentiment.children ).getOrElse( Seq() ) ){
        for ( result <- Option( tone.children ).getOrElse( Seq() ) ){
          scoreMap.put( result.id, result.normalized_score )
        }
      }
    }

    colValues = colValues ++ ToneAnalyzer.sentimentFactors.map { f => (BigDecimal(scoreMap.get(f._2).getOrElse(0.0)).setScale(2, BigDecimal.RoundingMode.HALF_UP).toDouble) * 100.0  }
    //Return [Row, (sentiment, status)]
    (Row(colValues.toArray:_*),(sentiment, status))
  })

In [18]:
val workingRDD = rowTweets.map{rdd => rdd._1}

In [19]:
//workingRDD.persist(StorageLevel.DISK_ONLY)

In [20]:
    try{
      val df = sqlContext.createDataFrame( workingRDD, schemaTweets )
      df.registerTempTable("tweets")
      df.printSchema()
      
    }catch{
      case e: Exception => {e.printStackTrace();}
    }

In [21]:
val fullSet = sqlContext.sql("select * from tweets")

In [26]:
fullSet.printSchema()

root
 |-- author: string (nullable = true)
 |-- date: string (nullable = true)
 |-- lang: double (nullable = true)
 |-- text: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- long: double (nullable = true)
 |-- Cheerfulness: double (nullable = true)
 |-- Negative: double (nullable = true)
 |-- Anger: double (nullable = true)
 |-- Analytical: double (nullable = true)
 |-- Confident: double (nullable = true)
 |-- Tentative: double (nullable = true)
 |-- Openness: double (nullable = true)
 |-- Agreeableness: double (nullable = true)
 |-- Conscientiousness: double (nullable = true)



In [None]:
fullSet.repartition(1).saveAsParquetFile("swift://hivecontainer.spark/tweetsActorsLoad12K5.parquet")

In [23]:
print(sc.version)

1.6.0