# Data Science with Spark and Hadoop

![A Hadoop stack](images/Hadoop_ecosystem.png) 

1. Scala/Spark syntax and patterns
1. The barrier to using Spark can be low
1. All researchers can benefit from big data processing

# Analyzing Engineering Faculty Member Bios

In [9]:
%%html
<iframe width="100%" height="550" 
src="http://engineering.vanderbilt.edu/people/">
</iframe>

## Tranform XML to JSON 

```html
<td>
    <h4>
        <a href="/bio/michael-alles">Michael Alles</a>
    </h4>
    <br>
    <strong>Intellectual Neighborhoods:</strong> 
    Risk and Reliability,Nano Science and Technology
</td>
```

```javascript
{
    "name":"Michael Alles",
    "focus":"",
    "nhood":" Risk and Reliability,Nano Science and Technology"
}
```

```scala
package faculty
/**
  * Created by joshuaarnold on 8/8/16.
  */

import common._
import java.util.regex.Pattern

import net.liftweb.json.JsonDSL._
import net.liftweb.json._

import scala.collection.breakOut
import scala.reflect.ClassTag
import scala.xml._

object FacultyParsingSuite {

  def parseEngineeringFaculty() = {

    class Faculty(val name: String, val focus: String, val nhood: String) {

      def getJson(): String = {
        val json =
          ("name" -> name) ~
            ("focus" -> focus) ~
            ("nhood" -> nhood)

        compactRender(json)
      }

    }


    object Faculty {

      def apply(node: Node) = {
        def extract[T: ClassTag](t: T): List[String] = {
          t match {
            case n: Node => n match {
              case <a>{_*}</a> => List(n text)
              case <strong>{_*}</strong> => List(n text)
              case <td>{_*}</td> => extract(n child)
              case _ => extract(n child)
            }
            case s: Seq[Node@unchecked] => s match {
              case Seq() => Nil
              case Seq(x, xs@_*) => extract(x) ++ extract(xs)
            }
            case _ => throw new java.util.InputMismatchException("must be node ")
          }
        }

        val keys = extract(node)

        val keysRE = ((for {
          k <- keys
        } yield (Pattern quote k concat "(.*)")) mkString).r

        val nKeys = keys length
        val nodeText = node text
        val fields = (for {
          m <- keysRE.findAllIn(nodeText).matchData
          g <- 1 to nKeys
        } yield m.group(g)) toList

        val dict: Map[String, String] = (keys zip fields)(breakOut)
        val out = new Faculty(keys(0),
          dict.getOrElse("Research Focus:", ""),
          dict.getOrElse("Intellectual Neighborhoods:", ""))
        out
      }
    }



    val fpath = "/Users/joshuaarnold/Documents/MyApps/faculty-stats/resources" +
      "/html/engineeringFaculty/engineeringFaculty.html"
    val elem = customParser.xmlToHtml.loadFile(fpath)

    val tables = elem \\ "table" filter (x => (x \ "@id" text) == "peoplelisting")

    val meat = tables \\ "tr" \\ "td" filter (x => x \ "h4" nonEmpty)

    val jsonLines = meat map (x => Faculty(x)) map (_.getJson())

    outputWriter("tmp/engineeringFaculty.json", jsonLines)(x => x.concat("\n"))

    jsonLines foreach println
  }




  def parseAllFaculty(fpath: String) = {


    /**
      * Parses the table element from a Degrees or Titles table
      *
      * @param tbody element containing sequence of <tr>
      * @return Map[String, List[String]) of column header -> values
      */
    def parseTableBody(tbody: NodeSeq): Map[String, List[String]] = {

      val trs = tbody \\ "tr"

      // Stores all th and td values in column major order and transposes
      val values = (for {
        tr <- trs
        if tr \\ "td" nonEmpty
      } yield (tr \\ "td" toList) map (_ text) ) toList

      val cols = (for {
        th <- trs \\ "th"
      } yield th.text) toList

      val lists = (cols :: values) transpose

      val acc0: Map[String, List[String]] = Map()
      val maps =
        (lists map {
          case List() => Map()
          case x :: xs => Map(x.toLowerCase() -> xs)
        }).foldLeft(acc0)(_ ++ _)
      maps
    }

    def extract[T: ClassTag](t: T): Map[String, List[String]] = {
      t match {
        case n: Node => {
          val id = (n \ "@id" text)
          n match {
            case <span>{ s }</span> => id match {
              case "ctl00_ContentPlaceHolder1_dlFaculty_ctl00_lblName" =>
                Map("name" -> List(s.text))
              case "ctl00_ContentPlaceHolder1_dlFaculty_ctl00_Label2" =>
                Map("year" -> List(s.text))
              case _ =>
                Map()
            }
            case <table>{ _* }</table> => parseTableBody(n)
            case _ => extract(n child)
          }
        }
        case s: Seq[Node@unchecked] => s match {
          case Seq() => Map()
          case Seq(x, xs@_*) => extract(x) ++ extract(xs)
        }
        case _ => throw new java.util.InputMismatchException("must be node ")
      }
    }

    def parseOuterTable(outerTable: NodeSeq): String = {
      val jin = extract(outerTable) map { case (s, xs) =>
        if (xs.length == 1)
          (s, xs.head) else (s, xs) }

      val (a, b) = extract(outerTable) partition(_._2.length==1)

      val j1 = parse(compactRender(a map {case (s, xs) => (s, xs.head)}))
      val j2 = parse(compactRender(b))

      val out = compactRender(j1 merge j2)

      out
    }

    val customParser =
      XML.withSAXParser(new org.ccil.cowan.tagsoup.jaxp.SAXFactoryImpl()
        .newSAXParser())

    val elem = customParser.loadFile(fpath)

    val tables = elem \\ "table" filter (x => (x \ "@id" text)
      == "ctl00_ContentPlaceHolder1_dlFaculty")

    val rows = tables \\ "tr"

    val jsonLines = rows map parseOuterTable filterNot (_ == "{}")

    outputWriter("tmp/allFaculty.json", jsonLines)(x => x.concat("\n"))
  }


  object customParser {
    val xmlToHtml =
      XML.withSAXParser(new org.ccil.cowan.tagsoup.jaxp.SAXFactoryImpl()
        .newSAXParser())
  }


  def main(args: Array[String]): Unit = {
    // parseEngineeringFaculty()


    val fpath = "/Users/joshuaarnold/Documents/MyApps/faculty-stats/resources" +
      "/html/allFaculty/A_names.html"

    parseAllFaculty(fpath)
  }


}
```

## A Word2Vec Example

In [2]:
val sqlc = sqlContext
import sqlc.implicits._
val facultyDF = sqlc.read.json("tmp/engineeringFaculty.json")
facultyDF.registerTempTable("faculty")

In [3]:
%%SQL
SELECT * FROM faculty

+--------------------+-------------------+--------------------+
|               focus|               name|               nhood|
+--------------------+-------------------+--------------------+
| Risk management,...|      Mark Abkowitz| Risk and Reliabi...|
| Nonlinear struct...|      Douglas Adams| Risk and Reliabi...|
| Human-System Int...|     Julie A. Adams| Cyber Physical S...|
|         Development|     Nicholas Adams|                    |
|                    |      Michael Alles| Risk and Reliabi...|
| Magnetic resonan...|      Adam Anderson| Biomedical Imagi...|
| Drop dynamics, a...|    A. V. Anilkumar| Energy and Natur...|
|                    |     Theodore Bapty|                    |
| Solar energy con...|      Rizia Bardhan| Regenerative Med...|
| Welding and weld...|Robert Joel Barnett| Energy and Natur...|
+--------------------+-------------------+--------------------+
only showing top 10 rows



## Select and transform input data using SQL queries

In [4]:
val focusDF = sqlc.sql("SELECT focus FROM faculty WHERE LENGTH(focus)>0")

// Transform String to Tuple1[List[String]]
val focusSplitDF = focusDF.map(r => Tuple1(r(0).toString
  .replaceAll("""[\p{Punct}]"""," ").split("\\s+").filterNot(_ == ""))
).toDF("focus")

focusSplitDF.show()

+--------------------+
|               focus|
+--------------------+
|[Risk, management...|
|[Nonlinear, struc...|
|[Human, System, I...|
|       [Development]|
|[Magnetic, resona...|
|[Drop, dynamics, ...|
|[Solar, energy, c...|
|[Welding, and, we...|
|[Dynamic, systems...|
|[Multiscale, beha...|
|[Bioinstrumentati...|
|[Microfluidics, m...|
|[Tech, based, ent...|
|[Technology, stra...|
|[Computer, aided,...|
|[Modeling, and, a...|
|[Radiation, effec...|
|[Virtual, Environ...|
|[nanoscience, gra...|
|[Information, pro...|
+--------------------+
only showing top 20 rows



## Use Word2Vec to Find Synonyms in Research Focus

In [14]:
import org.apache.spark.ml.feature.{Word2Vec, StopWordsRemover}

// Remove stop words
val remover = new StopWordsRemover().setInputCol("focus").setOutputCol("filtered")
val dataSet = remover.transform(focusSplitDF)

// Learn a mapping from words to Vectors
val word2Vec = (new Word2Vec()
  .setInputCol("filtered")aa
  .setOutputCol("result")
  .setVectorSize(100)
  .setMinCount(0))
val model = word2Vec.fit(dataSet)
val resultDF = model.findSynonyms("data", 50)

resultDF.show()

+---------------+--------------------+
|           word|          similarity|
+---------------+--------------------+
|      Hydrology|0.009062745917056093|
|      signaling|0.007882261001329424|
|       Modeling|0.007774564754125474|
|     underwater|0.007724221027430538|
|       spectral|0.007616483427429222|
|        antigen|0.007540289727149277|
|    distributed|0.007514163031044968|
|        welding|0.007366753454299188|
|         Secure|0.007362351015667...|
|      appraisal|0.007210608123060892|
|        display|0.007104453107582285|
|  environmental| 0.00675192222738749|
|           path|0.006719895624075037|
|      batteries|0.006464656359015513|
|        cardiac|0.006458926007323657|
|electrochemical|0.006457020772064...|
|       Computer|0.006418360492989414|
|          Raman|0.006353728381891622|
|       learning|0.006349404061198687|
|       recovery|0.006339612316653455|
+---------------+--------------------+
only showing top 20 rows



## The MapReduce Framework

![mapreduce](images/MapReduce_Work_Structure.png)

# Analyze all faculty

In [4]:
%%html
<iframe width="100%" height="550" 
src="http://virg.vanderbilt.edu/webtools/registry/FacDetail.aspx?fname=&lname=A&school=0&dept=0">
</iframe>