In [86]:
// Run this code to make sure that the spark session, spark context and scala are working
// you can install the scala/spark kernal following these directions:  https://www.datacamp.com/community/tutorials/beginners-guide-to-scala

In [None]:
object hello
{
    def main(args: Array[String])
    {
        print("\n\n>>>>> START OF PROGRAM <<<<<\n\n");
        
        println("Hello World.")
        
        print("\n\n>>>>> END OF PROGRAM <<<<<\n\n");
    }
}

In [None]:
//  Run this in the shell to get the data:  wget https://raw.githubusercontent.com/fenago/data/master/ErnestoSparkBook.txt

In [None]:
// Step 1: Load text file

In [None]:
val lines = sc.textFile("./ErnestoSparkBook.txt") // read the file into the cluster
lines.take(10).mkString("\n") // display first 10 lines in the file


In [None]:
// Step 2: Inspect the number of partitions (workers) used to store the dataset

In [None]:
val numPartitions = lines.partitions.length    // get the number of partitions
println(s"Number of partitions (workers) storing the dataset = $numPartitions")      

In [None]:
// Step 3: Split each line into a list of words separated by a space from the dataset

In [None]:
val words = lines.flatMap(x => x.split(' ')) // split each line into a list of words 
words.take(10).mkString("\n") // display the first 10 words

In [None]:
// Step 4: Filter the list of words to exclude common stop words

In [None]:
val stopWords = Seq("","a","*","and","is","of","the","a") // define the list of stop words 
val filteredWords = words.filter(x => !stopWords.contains(x.toLowerCase())) // filter the words 
filteredWords.take(10).mkString("\n") // display the first 10 filtered words


In [None]:
// Step 5: Cache the filtered dataset in memory to speed up future actions.

In [None]:
filteredWords.cache() // cache filtered dataset into memory across the cluster worker nodes 
filteredWords.count() // materialize the cache

In [None]:
// Step 6: Transform filtered words into list of (word,1) tuples for WordCount

In [None]:
val word1Tuples = filteredWords.map(x => (x, 1)) // map the words into (word,1) tuples 
word1Tuples.take(10).mkString("\n") // display the (word,1) tuples

In [None]:
// Step 7: Aggregate the (word,1) tuples into (word,count) tuples

In [None]:
val wordCountTuples = word1Tuples.reduceByKey{case (x, y) => x + y} // aggregate counts for each word 
wordCountTuples.take(10).mkString("\n") // display the first 10 (word,count) tuples


In [None]:
// Step 8: Display the top 10 (word,count) tuples by count

In [None]:
val sortedWordCountTuples = wordCountTuples.top(10)(Ordering.by(tuple => tuple._2)).mkString("\n") // top 10 (word,count) tuples

In [None]:
// Step 9: Create a table from the (word,count) tuples

In [None]:
case class WordCount(word: String, count: Int) // create a case class to name the tuple elements 

val wordCountRows = wordCountTuples.map(x => WordCount(x._1,x._2)) // tuples -> WordCount 

In [None]:
wordCountRows.toDF.createOrReplaceTempView("word_count") // convert RDDs to DataFrames and register a temp table for querying


In [None]:
// Step 10: Use SQL to visualize the words with count >= 2

In [None]:
val wc = spark.sql("select word, count from word_count").show(true)
//spark.sql("select word, count from word_count where count >=2").show(true)
//spark.sql("select word, count from word_count where count >=2 ORDER BY count DESC --use SQL to query words with count >= 2 in descending order").show(true)