Skip to content

Commit

Permalink
Adding file stream example
Browse files Browse the repository at this point in the history
  • Loading branch information
hkropp committed May 10, 2015
1 parent 96e4460 commit 4a61654
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 0 deletions.
9 changes: 9 additions & 0 deletions Readme.md
@@ -1 +1,10 @@
# Spark Streaming Examples


### Links

* http://spark.apache.org/docs/latest/streaming-kafka-integration.html
* http://stackoverflow.com/questions/22338025/kafka-consumers-in-spark-streaming-parallel-consumption-in-worker-nodes
* http://stackoverflow.com/questions/22132968/run-spark-kafka-wordcount-java-example-without-run-example-script
* http://spark.apache.org/docs/latest/streaming-programming-guide.html
* https://issues.apache.org/jira/browse/SPARK-944
12 changes: 12 additions & 0 deletions pom.xml
Expand Up @@ -61,6 +61,18 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.10</artifactId>
<version>${spark.core.version}</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_2.10</artifactId>
Expand Down
63 changes: 63 additions & 0 deletions src/main/scala/simpleexample/SparkFileExample.scala
@@ -0,0 +1,63 @@
package simpleexample

import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.streaming.{Seconds, StreamingContext}

/*
Submitting:
spark-submit --master yarn-client \
--num-executors 2 \
--driver-memory 512m \
--executor-memory 512m \
--executor-cores 1 \
--class simpleexample.SparkFileExample \
spark-streaming-simple-example-0.1-SNAPSHOT.jar /spark_log
*/
object SparkFileExample {

def main(args: Array[String]): Unit = {
if(args.length < 1) {
System.err.println("Usage: <log-dir>")
System.exit(1)
}

val sparkConf = new SparkConf().setAppName("SpoolDirSpark")
val ssc = new StreamingContext(sparkConf, Seconds(2))

val hiveContext = new HiveContext(ssc.sparkContext)
import hiveContext.implicits._
import hiveContext.sql

val inputDirectory = args(0)

val lines = ssc.fileStream[LongWritable, Text, TextInputFormat](inputDirectory).map{ case (x, y) => (x.toString, y.toString) }

lines.print()

// ToDo
// lines.foreachRDD { rdd =>
// rdd.foreachPartition { line =>
// line.foreach { item =>
// val values = item.toString().split(",")
// val date = values(0)
// val open = values(1)
// val high = values(2)
// val low = values(3)
// val close = values(4)
// val volume = values(5)
// val adj_close = values(6)
// val year = date.split("-")(0)
// sql(f"INSERT INTO TABLE stocks PARTITION (year= '$year') VALUES ('$date', $open, $high, $low, $close, $volume, $adj_close);")
// }
// }
// }

ssc.start()
ssc.awaitTermination()

}
}

0 comments on commit 4a61654

Please sign in to comment.