In [16]:
import org.apache.spark.sql.types._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import scala.concurrent.duration._
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
val spark = SparkSession
        .builder
        .master("local[*]")
        .appName("RedditStream")
        .getOrCreate()
import spark.implicits._

spark = org.apache.spark.sql.SparkSession@36716eda


lastException: Throwable = null


In [17]:
object Spark {
    def getSession(): org.apache.spark.sql.SparkSession = { 
        import org.apache.spark.sql.SparkSession
        val spark = SparkSession
                .builder
                .appName("RedditStream")
                .getOrCreate()
        return spark
    }
}

defined object Spark


In [18]:
case class Reddit(author: String, body: String, author_flair_text: String, gilded: BigInt, score: BigInt,
                  link_id: String, retrieved_on: Long, author_flair_css_class: String, subreddit: String,
                  edited: Boolean, ups: BigInt, controversiality: BigInt, created_utc: java.sql.Timestamp,
                  parent_id: String, subreddit_id: String, id: String, distinguished: String)


class RedditStreamProcessor(var pathToFiles: String, var processingTime: Int) {
    import org.apache.spark.sql.Encoders
    import org.apache.spark.sql.{Dataset, Row}
    import scala.concurrent.duration._
    val sparkSession = Spark.getSession()
    import sparkSession.implicits._
    val schema = Encoders.product[Reddit].schema
    import org.apache.spark.sql.streaming.{OutputMode, Trigger, StreamingQuery}
    import java.io._
    
    var datasetReddit: Dataset[Row] = null
    val systemPath: String = System.getProperty("user.dir")
    
    def resetDirectory(path: String): Unit = {
        val file = new File(systemPath + File.separatorChar + path);
        if(file.exists){
            deleteRecursively(file)
        }
        if(!file.mkdirs()){           
            throw new Exception(s"Unable to create folder ${file.getAbsolutePath}")   
        }
    }
    
    def deleteRecursively(file: File): Unit = {
        if (file.isDirectory)
          file.listFiles.foreach(deleteRecursively)
        if (file.exists && !file.delete)
          throw new Exception(s"Unable to delete ${file.getAbsolutePath}")   
    }
    
    def fixEncoding(text:String): String = {
        val regex = "[\\xc2-\\xf4][\\x80-\\xbf]+".r
        return regex.replaceAllIn(text, m => new String(m.group(0).getBytes("ISO-8859-1"),"UTF-8"))
    }
    
    def startStream(): Unit = {
        val reddit = spark.readStream.schema(schema)
            .option("maxFilesPerTrigger", 1)
            .json(pathToFiles.concat("/*.json"))
            .as[Reddit]
        datasetReddit = reddit.select($"author", $"body", $"score", $"subreddit", $"edited", $"ups",
                                      $"controversiality", $"created_utc", $"parent_id",
                                      $"subreddit_id", $"id")
    }
    
    def writeStream(format: String, queryName: String, outputPath: String = null): StreamingQuery = {
        if(format == "console"){
            val stream = datasetReddit.writeStream.format(format)
                       .option("truncate", false)
                       .trigger(Trigger.ProcessingTime(5.seconds))
                       .outputMode(OutputMode.Update)
                       .queryName(queryName)
                       .start
//             stream.awaitTermination
            return stream
        }else if(format == "parquet" && outputPath != null){
            resetDirectory(outputPath)
            val stream = datasetReddit.writeStream
                        .format(format)
                        .option("truncate", false)
                        .trigger(Trigger.ProcessingTime(5.seconds))
                        .outputMode(OutputMode.Append)
                        .option("path", systemPath + File.separatorChar + outputPath)
                        .option("checkpointLocation", systemPath + File.separatorChar +
                                                        outputPath + "checkpoint")
                        .queryName(queryName)
                        .start
//             stream.awaitTermination
            return stream
        }else{
            println("supported formats: console or parquet")
            throw new Exception(s"supported formats: console or parquet")
        }
    }

}

defined class Reddit
defined class RedditStreamProcessor


In [19]:
val stream = new RedditStreamProcessor("reddit_posts_2005", 5)

stream = RedditStreamProcessor@2e730bac


$line79.$read$$iw$$iw$RedditStreamProcessor@2e730bac

In [20]:
stream.startStream()
val streamingQuery = stream.writeStream("parquet", "reddit_posts", "/posts_parquet/")

streamingQuery = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@179d1a9a


org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@179d1a9a

In [21]:
streamingQuery.stop

In [None]:
val user = reddit.groupBy($"author", $"subreddit").agg(count($"author"))

In [None]:
val user = reddit.groupBy(window($"created_utc", "30 minutes"), $"author").count()