## Spark Project
#### Exploring the Ling-Spam email dataset

**Language**: Scala

**Authors**: Lynda

To run a paragraph in a Zeppelin notebook you can either click the `play` button (blue triangle) on the right-hand side or simply press `Shift + Enter`.

In the following paragraphs we are going to execute Spark code, run shell commands to download and move files, run sql queries etc. Each paragraph will start with `%` followed by an interpreter name, e.g. `%spark` for a Spark interpreter. Different interpreter names indicate what will be executed: code, markdown, html etc.  This allows you to perform data ingestion, munging, wrangling, visualization, analysis, processing and more, all in one place!

Throughtout this notebook we will use the following interpreters:

- `%spark` - Spark interpreter to run Spark code written in Scala
- `%md` - Markdown for displaying formatted text, links, and images

To learn more about Zeppelin interpreters check out this [link](https://zeppelin.apache.org/).

In [3]:
%spark

spark.version

In [4]:
%spark

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD

In [5]:
sc.wholeTextFiles("/tmp/ling-spam/spam/spmsgc99.txt").collect

In [6]:
%spark

def probaWordDir(sc:SparkContext)(filesDir:String)
  :(RDD[(String, Double)], Long) = {
    // reading all the text files withing filesDir
    // pair : (name of the input file, text)
    val rdd = sc.wholeTextFiles(filesDir)
    
    // number of files
    val nbFiles = rdd.count()
    
    // list of non-informative words
    val nonInformativeWords = List(".", ":", ",", " ", "/","\\", "-", "'", "(", ")", "@")
                                 
    
    // RDD where the key is the file name and the value is the list of unique words contained in it
    // steps à appliquer sur value : 
    // 1) remove extra spaces using : trim()
    // 2) split (separated by "\\s+")
    // 3) keep only informative words, to do that use filterNot
    
    val filesWords: RDD[(String, List[String])]= rdd.map(textFile=>(textFile._1,textFile._2.trim().split("\\s+").distinct.toList.filterNot(nonInformativeWords.toSet)))
    
    // number of occurences of each word among all files
    // for each file (in value list) of a word container(key) we assign 1, will gonna get a list of pair (file,1)
    // then we use reduceByKey to sum over the files
    val wordDirOccurency: RDD[(String,Int)] = filesWords.flatMap(w=>w._2.map(f=>(f,1))).reduceByKey(_+_)
    
    //probability 
    // proba = ratio = number of occurrences of the word / nbFiles
    // !!! convert to double to not have problems with the rest of the division !!!
    val probaWord: RDD[(String,Double)] = wordDirOccurency.map(wo=>(wo._1,wo._2.toDouble/nbFiles))
    
    (probaWord, nbFiles) // couple returned by the function
    
 }


In [7]:
%spark
val rdd = sc.wholeTextFiles("/tmp/ling-spam/spam/spmsgc99.txt")
val nonInformativeWords = List(".", ":", ",", " ", "/","\\", "-", "'", "(", ")", "@")
val rddsplit = rdd.map(textFile=>(textFile._1,textFile._2.trim().split("\\s+").distinct) )
val rddsplit2 = rdd.map(textFile=>(textFile._1,textFile._2.split("\\s+").distinct) )
rddsplit.collect



In [8]:
%spark
rddsplit2.collect

In [9]:
%spark


def computeMutualInformationFactor(
    probaWC:RDD[(String, Double)],// probability the word occurs (or not) in an email of a given class(spam or ham)
    probaW:RDD[(String, Double)], // probability the word occurs (whatever the class)
    probaC: Double, // probability that an email belongs to the given class, p(class)
    default: Double // default value when a probability is missing
):RDD[(String, Double)] = {
    // get RDD where 
    // key = word
    // value (probability the word occurs, probability the word occurs to a specific class)
    // i.e : (word,(p(occurs),p(occurs,class))
    // use leftOuterJoin to get none if no proba found
    val probWJoin: RDD[(String, (Double, Option[Double]))] = probaW.leftOuterJoin(probaWC)
    
    // same as the one above, however
    // here we replace when the probability probaWC is missing
    // (when we have a none returned by the join) by
    // the default proba
    val factors: RDD[(String, (Double, Double))] = probWJoin.map(wp => (wp._1, (wp._2._1, wp._2._2.getOrElse(default))))
    
    
    //Computing using the given formule 
    // Remark :
    // we need log2 but we have only log 
    // we use : log2 = log/log(2)
    
    // log2 = log(p(o,c)/p(o)p(c)))/log(2)
    // p(o,c)*log2
    // for each word : (word,p(o,c)*log(p(o,c)/p(o)p(c))/log(2))
    // supp wp is (word,(p(occurs),p(occurs,class))
    // wp._1 = word
    //wp._2._1 = p(occurs) = p(o)
    // wp._2._2 = p(occurs,class) = p(o,c)
    factors.map(wp => (wp._1, wp._2._2 * math.log(wp._2._2 / (wp._2._1 * probaC)) / math.log(2.0)))
    
  }


In [10]:
%spark

// Read the directories containing the data

val (pPresentGivenSpam, nSpam) = probaWordDir(sc)("/tmp/ling-spam/spam/*")
val (pPresentGivenHam, nHam) = probaWordDir(sc)("/tmp/ling-spam/ham/*")


pPresentGivenSpam.collect().foreach(println)



In [11]:
%spark


In [12]:
%spark

// main part of the notebook
  def main(args: Array[String]) {
      // the directory  path is given as an argument to the main  !!! 
      // we should hold exception if don't have it
      // ie : /tmp/ling-spam/
      if(args.length<=0){
          println("No directory path provided")
      }
      val (probaW, nbFiles) = probaWordDir(sc)(args(0) + "/*/*.txt") // p(occurs), nbFiles in total
      // COmputing the couple (ptobaWordHam,nbFilesHam)
      val (probaWordHam, nbFilesHam) = probaWordDir(sc)(args(0).concat("/ham/*.txt")) // the arg given to the param is : /tmp/ling-spam/ham/*"
      val (probaWordSpam, nbFilesSpam) = probaWordDir(sc)(args(0).concat("/spam/*.txt")) // the arg given to the param is : /tmp/ling-spam/spam/*"
       
      //Computing the probability P(occurs, class) for each word.
      // Each RDD has the map structure: word => probability the word occurs (or not) in an email of a given class.
      // p(true,ham) = probaWordHam , p(false,ham),= 1 - probaWordHam (probaWordHam.map(x => (x._1, 1 - x._2))
      // p(true,spam) = probaWordSpam , p(false,spam) = 1 - probaWordSpam  (probaWordSpam.map(x => (x._1, 1 - x._2)
     // the 4 RDDs : 
      val probaWC = (probaWordHam, probaWordSpam, probaWordHam.map(x => (x._1, 1 - x._2)), probaWordSpam.map(x => (x._1, 1 - x._2)))

    // the probability that an email belongs to the given class : p(class)
    val probaHam = nbFilesHam.toDouble / nbFiles.toDouble  // p(Ham)
    val probaSpam = nbFilesSpam.toDouble / nbFiles.toDouble // p(Spam)
    val defaultProba = 10^(-5) //default value 
    // Compute mutual information for each class and occurs 
    val MITrueHam = computeMutualInformationFactor(probaWC._1, probaW, probaHam, defaultProba) 
    val MITrueSpam = computeMutualInformationFactor(probaWC._2, probaW, probaSpam, defaultProba)
    val MIFalseHam = computeMutualInformationFactor(probaWC._3, probaW, probaHam, defaultProba)
    val MIFalseSpam = computeMutualInformationFactor(probaWC._4, probaW, probaSpam, defaultProba)
    
    
    
    //computing the mutual information of each word as a RDD with the map structure: word => MI(word)
    val MI :RDD[(String, Double)] = MITrueHam.union(MITrueSpam).union(MIFalseHam).union(MIFalseSpam).reduceByKey( (x, y) => x + y)
    
    
    //  print on screen the 20 top words 
     val topWords: Array[(String, Double)] = MI.top(20)(Ordering[Double].on(x => x._2))
     topWords.map(_=>println)
     
    //Save all the words and their mutual information values, sorted and the corresp top words must be also stored on HDFS 
    val path: String = "/tmp/topWords.txt"
    sc.parallelize(topWords).keys.coalesce(1, true).saveAsTextFile(path)
    sc.wholeTextFiles("/tmp/topWords.txt").collect 

     
}



In [13]:

%spark
main(Array("/tmp/ling-spam"))

In [14]:
%spark
sc.wholeTextFiles("/tmp/topWords.txt").collect 

In [15]:
%spark
