# Introduction
**Goal**: This notebook is meant to give a more thorough example of how to write code in a notebook. We will create a basic scaffolding for our future notebooks in this demo.

[hacker news](https://news.ycombinator.com/) is a site full of technical knowledge, often considered a one-stop-shop for trending news. However, there might be some benefit to looking at relations to current trending items. Or potentially some useful links can be found in the comments. This notebook will be used to explore these relations and expose them in a way to be consumed by outside users. Specifically, we will look at the following questions:

* What are the links in the comments?
* What words are mentioned the most in the comments?


## Add Dependencies
To get started we will need to add some additional libraries. These libraries will be added locally to this kernel, as well as our Spark cluster.
* __jsoup__ - Used to strip out any links or raw text found within html snippets.
* __hackernew4s__ - Used to get the top items from hacker news, the comments on these items, and information about the users.

In [None]:
%adddeps org.jsoup jsoup 1.9.2 --transitive
%adddeps com.github.seratch hackernews4s_2.10 0.6.0 --transitive

# Getting Hacker News Articles

We need to get a reference to the SQL context created by Toree.

In [None]:
val sqlC = sqlContext

We can now import all of the classes we need to create our application.

In [None]:
import hackernews4s.v0._
import sqlC.implicits._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.Row
import org.jsoup.Jsoup
import org.jsoup.nodes.Document
import org.jsoup.nodes.Element
import scala.collection.JavaConversions._
import org.apache.spark.ml.feature.{HashingTF, IDF, Tokenizer}
import org.apache.spark.ml.feature.StopWordsRemover
import org.apache.spark.ml.feature.{CountVectorizer, CountVectorizerModel}
import org.apache.spark.sql.DataFrame
import org.apache.spark.rdd.RDD

In [None]:
case class Comment(story: Long, itemId: Long, text: String)

In [None]:
// A function to transform an item into a tuple of that item and a list of comments on that item
val getComments: (Item) => Seq[Comment] = (story: Item) => {
    def _getComments:  (Item) => Seq[Comment] = (item: Item) => {
        val commentIds = item.commentIds
        if(commentIds.size == 0){
            Seq(Comment(story.id.id, item.id.id, item.text))
        } else {
            val comments: Seq[Comment] = commentIds.flatMap((itemId: ItemId) => { 
                _getComments(HackerNews.getItem(itemId).get)
            })
            if("Story".equals(item.itemType.toString)){
                comments
            } else {
                Comment(story.id.id, item.id.id, item.text) +: comments
            }
            
        }   
    }
    
    _getComments(story)
}

val getItemText: (Comment) => String = (comment: Comment) => {
    Jsoup.parse(comment.text).text()
}
val getItemLinks: (Comment) => Seq[String] = (comment: Comment) => {
    val aTags: List[Element] = Jsoup.parse(comment.text).select("a").toList
    aTags.map((link: Element) => {
        link.attr("href")
    })
}

This function will take in a story id as an argument and will return a Spark RDD of all the comments. This allows us to parallelize our work in the Spark Cluster.

In [None]:
def getStoryComments(storyId: Int) = {
    val story = Seq(HackerNews.getItem(ItemId(storyId)).get)
    sc.parallelize(story).flatMap((item: Item) => {
        getComments(item)
    })
}   

This function will take in a comments RDD and will return a new Spark RDD with all of the links for the comments.

In [None]:
def getCommentLinks(comments: RDD[Comment]) = {
    comments.flatMap((comment:Comment) => {
        getItemLinks(comment)
    })
}

**`tokenizer`** and **`remover`** are objects from the Apache Spark ML API. They will be used to tokenize the comments and filter out words we do not want to count.

**NOTE:** This is an example of a core Spark API being exposed through Toree. Another third party library, like Apache System ML, could be plugged in at this point.

In [None]:
val tokenizer = new Tokenizer().setInputCol("_1").setOutputCol("words")
val remover = new StopWordsRemover().setInputCol("words").setOutputCol("filteredWords")

This function will take in a comments RDD and will return a new Spark RDD with words and the number of times they appear in the comments.

In [None]:
def getCommentWordCounts(comments: RDD[Comment]) = {
    val textDF = comments.map((comment:Comment) => {
        getItemText(comment)
    }).toDF
    val tokenizedComments = tokenizer.transform(textDF)
    val filteredWordCountsDF = remover.transform(tokenizedComments)
    val terms = filteredWordCountsDF.flatMap((row: Row) =>{
        row.getSeq[String](2)
    })
    val wordCounts = terms.map((word: String) => {
        (word, 1)
    }).reduceByKey(_+_)
    wordCounts
}

In [None]:
def getStoryInfo(storyId: Int) = {
    val commentsRDD = getStoryComments(storyId)    
    val comments = commentsRDD.collect()
    val links = getCommentLinks(commentsRDD).collect()
    val counts = getCommentWordCounts(commentsRDD).sortBy((wordCount: (String, Int)) => {
        wordCount._2
    }, ascending=false).take(50)
    (comments, counts, links)
}

## Testing
We can test our functions out by calling them and inspecting the output value

In [None]:
val storyInfo = getStoryInfo(12476597)

In [None]:
storyInfo._1

In [None]:
storyInfo._2

In [None]:
storyInfo._3