## WARC for Spark

This notebook will help you get started on developing code to analyze WARC files in Spark inside Spark Notebook.

Once the code seems ready, you can use _Download as scala_ from the File menu, and continue to develop it into the standalone application to be submitted using `spark-submit`; see specifically the [course instructions on self-contained apps](http://rubigdata.github.io/course/background/sbt.html).

### Preparations: external dependencies

We will use external libraries to parse WARC files (provided by SurfSara) and to parse noisy HTML (using a java version of Beautiful Soup).

The three cells below setup the environment.

_An alternative approach is to download the `jar` files yourself and use the `:cp` directive to add those to the classpath._

In [ ]:
:remote-repo com.github.sara-nl % default % https://jitpack.io % maven

In [ ]:
:dp "com.github.sara-nl" % "warcutils" % "-SNAPSHOT"

In [ ]:
:dp "org.jsoup" % "jsoup" % "1.9.2"
    "org.jwat"          % "jwat-common"    % "1.0.0"
    "org.jwat"          % "jwat-warc"      % "1.0.0"
    "org.jwat"          % "jwat-gzip"      % "1.0.0"

In [ ]:
import nl.surfsara.warcutils.WarcInputFormat
import org.jwat.warc.{WarcConstants, WarcRecord}

import org.apache.hadoop.io.LongWritable;

import org.apache.commons.lang.StringUtils;

Using Spark, classes may need to be shipped between different nodes in the cluster, which involves their _serialization_.

If you get errors that classes are not serializable, add those to the `.set` commands below - 
which involves a reset of the notebook, so existing variables will not exist any more.

In [ ]:
// Adapt the SparkConf to use Kryo and register the classes used through reset parameter lastChanges (a function)
//import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.SparkConf
reset( lastChanges= _.
      set( "spark.serializer", "org.apache.spark.serializer.KryoSerializer" ).
      set( "spark.kryo.classesToRegister", 
          "org.apache.hadoop.io.LongWritable," +
          "org.jwat.warc.WarcRecord," +
          "org.jwat.warc.WarcHeader" )
      )

In [ ]:
// Checking that configuration was successfully adapted
sc.getConf.toDebugString

### Use WARC contents

Let us load some WARC file and carry out a few analyses.

Assign `warcfile` an example WARC file to work with; I used `wget` to create a WARC file from the course website, see e.g. [the final assignment](http://rubigdata.github.io/course/assignments/P-commoncrawl.html).

In [ ]:
val warcfile = "/data/bigdata/course.warc.gz"

Now initialize an RDD from `warcfile` using the `WarcInputFormat` parser provided by `nl.surfsara.warcutils`:

In [ ]:
val warcf = sc.newAPIHadoopFile(
              warcfile,
              classOf[WarcInputFormat],               // InputFormat
              classOf[LongWritable],                  // Key
              classOf[WarcRecord]                     // Value
    )

**Note:**
My initial approach was to cache the constructed RDD; unfortunately, doing this interacts _somehow_ (I do not exactly understand why yet) with the inner workings of the `WarcRecord` classes, resulting in `java.io.IOException: Stream closed` errors when operating on the payloads in these WarcRecords.

I resorted to defining `warc` as a cached version, after an identity transform, and `warcc` as a transformation of `warcf` that already extracted the contents.
In your own code, try to ensure that you filter the stream as much as possible before accessing the contents though (so do not build on the `warcc` result if you are not using all records).

#### Using header info only

In [ ]:
val warc = warcf.map{wr => wr}.cache()

In [ ]:
val nHTML = warc.count()

In [ ]:
// WarcRecords header type info
warc.map{ wr => wr._2.header }.
map{ h => (h.warcTypeIdx, h.warcTypeStr) }.take(10)


In [ ]:
// Get responses with their size
warc.map{ wr => wr._2.header }.
     filter{ _.warcTypeIdx == 2 /* response */ }.
     map{ h => (h.warcTargetUriStr, h.contentLength, h.contentType.toString) }.collect

In [ ]:
// WarcRecords with responses that gave a 404:
warc.map{ wr => wr._2 }.
     filter{ _.header.warcTypeIdx == 2 /* response */ }.
     filter{ _.getHttpHeader().statusCode == 404 }.
     map{ wr => wr.header.warcTargetUriStr }. collect() 

In [ ]:
// WarcRecords corresponding to HTML responses:
warc.map{ wr => wr._2 }.
     filter{ _.header.warcTypeIdx == 2 /* response */ }.
     filter{ _.getHttpHeader().contentType.startsWith("text/html") }.
     map{ wr => (wr.header.warcTargetUriStr, wr.getHttpHeader().contentType) }. collect()

#### Using contents

Define a utility function to get access to the Payload, i.e., the actual contents of the WarcRecords.

In [ ]:
import java.io.InputStreamReader;
def getContent(record: WarcRecord):String = {
  val cLen = record.header.contentLength.toInt
  //val cStream = record.getPayload.getInputStreamComplete()
  val cStream = record.getPayload.getInputStream()
  val content = new java.io.ByteArrayOutputStream();

  val buf = new Array[Byte](cLen)
  
  var nRead = cStream.read(buf)
  while (nRead != -1) {
    content.write(buf, 0, nRead)
    nRead = cStream.read(buf)
  }

  cStream.close()
  
  content.toString("UTF-8");
}

In [ ]:
// Taking a substring to avoid messing up the rendering of results in the Notebook - would need proper handling
val warcc = warcf.
  filter{ _._2.header.warcTypeIdx == 2 /* response */ }.
  filter{ _._2.getHttpHeader().contentType.startsWith("text/html") }.
  map{wr => (wr._2.header.warcTargetUriStr, StringUtils.substring(getContent(wr._2), 0, 256))}.cache()

In [ ]:
warcc.take(10)

### Example: Use Jsoup to convert HTML to Text

[Jsoup](https://jsoup.org/) is a widely used library to clean HTML.

**Note:**
_Libraries can be "automagically" included using the special `:dp` directive; discussed briefly in more detail at the bottom of this notebook. Alternatively, copy the right `jar` into the docker image's filesystem yourself, and use the `:cp` directive like above._

In [ ]:
import java.io.IOException;
import org.jsoup.Jsoup;
def HTML2Txt(content: String) = {
  try {
    Jsoup.parse(content).text().replaceAll("[\\r\\n]+", " ")
  }
  catch {
    case e: Exception => throw new IOException("Caught exception processing input row ", e)
  }
}

In [ ]:
val warcc = warcf.
  filter{ _._2.header.warcTypeIdx == 2 /* response */ }.
  filter{ _._2.getHttpHeader().contentType.startsWith("text/html") }.
  map{wr => ( wr._2.header.warcTargetUriStr, HTML2Txt(getContent(wr._2)) )}.cache()

In [ ]:
warcc.map{ tt => (tt._1, StringUtils.substring(tt._2, 0, 128))}.take(10)

### Final words

Now it is time to continue to develop your own project.

Do not worry about a _"required"_ level of success; it does not have to be a publishable study!
It is perfectly fine if you only realize no more than rather simple standalone program that executes on the cluster
but does not run on the complete crawl, or uses only header information. 

**Even simple tasks are challenging when carried out on large data!**

Do not be too ambitious, and make progress step by step.

The examples presented in this notebook are meant to be helpful, but they are by no means complete and have not been tested thoroughly on actual data.
You may encounter weird problems, complex enough such that there exists no immediate answer on StackExchange.

I hope the course provided enough background on Spark to spot what the cause of the problem might be;
however, if you spend more than say two to three hours on analyzing 
and debugging a challenge, I recommend to give up and modify your objective - consider a different (simpler) project and only scale up later on
(provided there is still time left).

_If you cannot solve a problem, definitely do call out by creating a new issue on the Forum - maybe one of us knows the answer already!_

