##Linking spark from Python, Scala, Java


###python 
run bin/spark_submit
import pyspark 



##Calculating PI

In [None]:
#: src/main/scala/SparkPi.scala 
/** Import the spark and math packages */
import scala.math.random
import org.apache.spark._
/** Computes an approximation to pi */
object SparkPi {
def main(args: Array[String]) {
/** Create the SparkConf object */
val conf = new SparkConf().setAppName("Spark Pi")
/** Create the SparkContext */
val spark = new SparkContext(conf)
/** business logic to calculate Pi */
val slices = if (args.length > 0) args(0).toInt else 2
val n = math.min(100000L * slices, Int.MaxValue).toInt  // avoid overflow
val count = spark.parallelize(1 until n, slices).map { i =>
val x = random * 2 - 1
val y = random * 2 - 1
if (x*x + y*y < 1) 1 else 0
}.reduce(_ + _)
/** Printing the value of Pi */
println("Pi is roughly " + 4.0 * count / n)
/** Stop the SparkContext */
spark.stop()
}
}


In [None]:
bash-4.1# cat sparkpi.sbt 
name := "SparkPi Project"
version := "1.0"
scalaVersion := "2.10.4"
libraryDependencies += "org.apache.spark" %% "spark-core" % "1.3.1"


Run command

In [None]:
$SPARK_HOME/bin/spark-submit \
--class "SparkPi" \
--master local[4] \
target/scala-2.10/sparkpi-project_2.10-1.0.jar

##spark application: weather data -> hottest dates with some precipitation

In [None]:
import sqlContext.implicits._

case class Weather(date: String, temp: Int, precipitation: Double)

val weather = sc.textFile("input/tmp/labdata/sparkdata/nycweather.csv").map(_.split(",")).map(w => Weather(w(0), w(1).trim.toInt,w(2).trim.toDouble)).toDF()

weather.registerTempTable("weather")

val hottest_with_precip = sqlContext.sql("SELECT * FROM weather WHERE precipitation > 0.0 ORDER BY temp DESC")

hottest_with_precip.map(x => ("Date: " + x(0), "Temp : " + x(1),"Precip: " + x(2))).top(10).foreach(println)

##Spark application with MLlib

Clustering drop-off points for three taxi clusters
Best where to get a taxi

In [None]:
import org.apache.spark.mllib.clustering.KMeans

import org.apache.spark.mllib.linalg.Vectors

val taxiFile = sc.textFile("input/tmp/labdata/sparkdata/nyctaxisub.csv")

taxiFile.count()

val taxiData=taxiFile.filter(_.contains("2013")).filter(_.split(",")(3)!="").filter(_.split(",")(4)!="")


taxiData.count()


val taxiFence=taxiData.filter(_.split(",")(3).toDouble>40.70).
 filter(_.split(",")(3).toDouble<40.86).
filter(_.split(",")(4).toDouble>(-74.02)).
filter(_.split(",")(4).toDouble<(-73.93))

taxiFence.count()


val
taxi=taxiFence.map{line=>Vectors.dense(line.split(',').slice(3,5).map(_
.toDouble))}

val iterationCount=10
val clusterCount=3
val model=KMeans.train(taxi,clusterCount,iterationCount)
val clusterCenters=model.clusterCenters.map(_.toArray)
val cost=model.computeCost(taxi)
clusterCenters.foreach(lines=>println(lines(0),lines(1)))

#Spark streaming


In [None]:
import org.apache.log4j.Logger
import org.apache.log4j.Level
Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._


val ssc = new StreamingContext(sc,Seconds(1))

val lines = ssc.socketTextStream("localhost",7777)

val pass = lines.map(_.split(",")).
map(pass=>(pass(15),pass(7).toInt)).
reduceByKey(_+_)

pass.print()

ssc.start()
ssc.awaitTermination()