# Analyzing term / entity distributions in a dataset

In [1]:
import org.archive.archivespark._
import org.archive.archivespark.implicits._
import org.archive.archivespark.enrich.functions._
import org.archive.archivespark.utils.IOUtil
import org.archive.archivespark.specific.warc._
import org.archive.archivespark.specific.warc.enrichfunctions._
import org.archive.archivespark.specific.warc.implicits._
import org.archive.archivespark.specific.warc.specs._
import org.apache.hadoop.io.compress.GzipCodec

## Loading the dataset

In the example, the Web archive dataset will be loaded from local WARC / CDX files, using `WarcCdxHdfsSpec`, but any other [Data Specification (DataSpec)](https://github.com/helgeho/ArchiveSpark/blob/master/docs/DataSpecs.md) can be used here as well in order to load your records from different local or remote sources.

In [2]:
val path = "/data/archiveit/ArchiveIt-Collection-2950"
val cdxPath = path + "/cdx/*.cdx.gz"
val warcPath = path + "/warc"

In [3]:
val records = ArchiveSpark.load(WarcCdxHdfsSpec(cdxPath, warcPath))

### Filtering records

Embeds are specific to webpages, so we can filter out videos, images, stylesheets and any other files except for webpages ([mime type](https://en.wikipedia.org/wiki/Media_type) *text/html*), as well as webpages that were unavailable when they were crawled either ([status code](https://en.wikipedia.org/wiki/List_of_HTTP_status_codes) == 200). In addition to that, we also filter URLs here to only keep pages under *occupylowell.org*, to reduce the size of the dataset for this example.

*It is important to note that this filtering is done only based on metadata, so up to this point ArchiveSpark does not even touch the actual Web archive records, which is the core efficiency feature of ArchiveSpark.*

In [4]:
val pages = records.filter(r => r.mime == "text/html" && r.status == 200 && r.surtUrl.startsWith("org,occupylowell)"))

By looking at the first record in our remaining dataset, we can see that this indeed is of type *text/html* and was *online* (status 200) at the time of crawl:

In [5]:
pages.peekJson

{
  "record" : {
    "surtUrl" : "org,occupylowell)/",
    "timestamp" : "20120107063734",
    "originalUrl" : "http://occupylowell.org/",
    "mime" : "text/html",
    "status" : 200,
    "digest" : "NG6JBJ5VEZRU6FRYULHYBRHVNDFCPFR7",
    "redirectUrl" : "-",
    "meta" : "-",
    "compressedSize" : 8020
  }
}

### URL deduplication

As we want to consider the number of URLs mentioning a term as the term's frequency, we first need to make sure, every URL is only included once in the dataset. Therefore, we simply decide for the earliest snapshot of each URL. This should be cached, so that it does not need to be recomputed every time the a record is accessed:

In [6]:
val earliest = pages.distinctValue(_.surtUrl) {(a, b) => if (a.time < b.time) a else b}.cache

In [7]:
earliest.count

16956

## Counting terms

To extract the terms of a webpage, we have to keep in mind that a webpage consists of HTML code. Hence, using the `StringContent` Enrich Function would enrich our dataset with this HTML code. To parse the HTML and only keep the text, we provide the `HtmlText` Enrich Function. This can be used to extract the text of a single tag, such as `HtmlText.of(Html.first("title"))` to get the title text of a page. By default, `HtmlText` extracts the entire text of the page though.

For more details on the [Enrich Functions](https://github.com/helgeho/ArchiveSpark/blob/master/docs/EnrichFuncs.md) provided and their use, please [read the docs](https://github.com/helgeho/ArchiveSpark/blob/master/docs/README.md).

In [8]:
earliest.enrich(HtmlText).peekJson

{
  "record" : {
    "surtUrl" : "org,occupylowell)/forums/member.php?action=profile&uid=1246",
    "timestamp" : "20120131023424",
    "originalUrl" : "http://occupylowell.org/forums/member.php?action=profile&uid=1246",
    "mime" : "text/html",
    "status" : 200,
    "digest" : "UPZFUTUZNFESFJJ5R6LENO3LYEIKS354",
    "redirectUrl" : "-",
    "meta" : "-",
    "compressedSize" : 3680
  },
  "payload" : {
    "string" : {
      "html" : {
        "body" : {
          "text" : "Search Member List Calendar Hello There, Guest! Login Register Occupy Lowell Profile of Perbalierinip Perbalierinip (Account not Activated) Registration Date: 01-29-2012 Date of Birth: 01-29-1986 (26 years old) Local Time: 01-31-2012 at 03:34 AM Status: Offline Perbalierinip's Forum Info Joined: 01...

### Turn text into terms

As a very simple normalization, we convert the text into lowercase, before we split it up into single distinct terms:

In [9]:
val Terms = LowerCase.of(HtmlText).mapMulti("terms") { text: String => text.split("\\W+").distinct }

In [10]:
earliest.enrich(Terms).peekJson

{
  "record" : {
    "surtUrl" : "org,occupylowell)/forums/member.php?action=profile&uid=1246",
    "timestamp" : "20120131023424",
    "originalUrl" : "http://occupylowell.org/forums/member.php?action=profile&uid=1246",
    "mime" : "text/html",
    "status" : 200,
    "digest" : "UPZFUTUZNFESFJJ5R6LENO3LYEIKS354",
    "redirectUrl" : "-",
    "meta" : "-",
    "compressedSize" : 3680
  },
  "payload" : {
    "string" : {
      "html" : {
        "body" : {
          "text" : {
            "lowercase" : {
              "terms" : [ "search", "member", "list", "calendar", "hello", "there", "guest", "login", "register", "occupy", "lowell", "profile", "of", "perbalierinip", "account", "not", "activated", "registration", "date", "01", "29", "2012", "birth", "1986", "26", "yea...

### Compute term frequencies (number of records / URLs)

We can use `.flatMapValues` now to get a plain list of the terms included in the dataset. To get rid of short stopwords like articles, we only keep those terms with a minimum length of 4 characters.

For more details on available [ArchiveSpark operations](https://github.com/helgeho/ArchiveSpark/blob/master/docs/Operations.md), please [read the docs](https://github.com/helgeho/ArchiveSpark/blob/master/docs/README.md).

In [11]:
val terms = earliest.flatMapValues(Terms).filter(_.length >= 4)

In [12]:
terms.take(10).foreach(println)

search
member
list
calendar
hello
there
guest
login
register
occupy


As we made sure before that every URL is included only once in the dataset and each term is included only once per record, we can simply count the terms, using Spark's `.countByValue`. Finally, we sort the terms by count in descending order (negative count) and save them as [CSV (comma-separated values)](https://en.wikipedia.org/wiki/Comma-separated_values):

In [13]:
val counts = terms.countByValue.toSeq.sortBy{case (term, count) => -count}

In [14]:
IOUtil.text("term_counts.csv", printTop = 10) { out =>
    counts.map{case (term, count) => term + "," + count}.foreach(out.println)
}

lowell,16851
occupy,16851
list,16730
search,16726
group,16514
powered,16509
theme,16481
2012,16463
time,16456
there,16445


The `printTop = 10` parameter results in the first 10 lines being printed here for inspection. The `term_counts.csv` that is created in the same folder as this notebook will contain all terms. Now this CSV file can be loaded in a plotting tool of your choice and the term distribution could be plotted in a histogram.

## Counting entities

Similar to the term frequencies as shown above we can also count the occurrences of [named entities](https://en.wikipedia.org/wiki/Named_entity) in the dataset.

An [Enrich Function](https://github.com/helgeho/ArchiveSpark/blob/master/docs/EnrichFuncs.md) to extract named entities that we provide with core ArchiveSpark is `Entities`. It uses [Stanford's CoreNLP](https://stanfordnlp.github.io/CoreNLP/) Named Entity Extractor. In order to use it you need to add [`edu.stanford.nlp:stanford-corenlp:3.4.1`](http://central.maven.org/maven2/edu/stanford/nlp/stanford-corenlp/3.4.1/) with corresponding models to your classpath.

Another Enrich Function for more accurate Entity Linking that uses [Yahoo's Fast Entity Linker](https://github.com/yahoo/FEL) (FEL) with ArchiveSpark can be found here: [FEL4ArchiveSpark](https://github.com/helgeho/FEL4ArchiveSpark)

For more details on the [Enrich Functions](https://github.com/helgeho/ArchiveSpark/blob/master/docs/EnrichFuncs.md) and their use, please [read the docs](https://github.com/helgeho/ArchiveSpark/blob/master/docs/README.md).

In [15]:
earliest.enrich(Entities).peekJson

{
  "record" : {
    "surtUrl" : "org,occupylowell)/forums/member.php?action=profile&uid=16684",
    "timestamp" : "20120515045601",
    "originalUrl" : "http://occupylowell.org/forums/member.php?action=profile&uid=16684",
    "mime" : "text/html",
    "status" : 200,
    "digest" : "UQ36HQHC7C6W4R2OJJFSAUUHHX4ZWFPM",
    "redirectUrl" : "-",
    "meta" : "-",
    "compressedSize" : 3705
  },
  "payload" : {
    "string" : {
      "html" : {
        "body" : {
          "text" : {
            "entities" : {
              "persons" : [ "Justin", "S." ],
              "organizations" : [ "Yahoo" ],
              "locations" : [ ],
              "dates" : [ "Today", "11:31" ]
            }
          }
        }
      }
    }
  }
}

In [16]:
val Organizations = Entities.mapMulti("organizations", "distinct") { values: Seq[String] => values.map(_.toLowerCase).distinct }

In [17]:
earliest.enrich(Organizations).peekJson

{
  "record" : {
    "surtUrl" : "org,occupylowell)/forums/member.php?action=profile&uid=16684",
    "timestamp" : "20120515045601",
    "originalUrl" : "http://occupylowell.org/forums/member.php?action=profile&uid=16684",
    "mime" : "text/html",
    "status" : 200,
    "digest" : "UQ36HQHC7C6W4R2OJJFSAUUHHX4ZWFPM",
    "redirectUrl" : "-",
    "meta" : "-",
    "compressedSize" : 3705
  },
  "payload" : {
    "string" : {
      "html" : {
        "body" : {
          "text" : {
            "entities" : {
              "organizations" : {
                "distinct" : [ "yahoo" ]
              }
            }
          }
        }
      }
    }
  }
}

In [18]:
val organizations = earliest.flatMapValues(Organizations)

In [19]:
organizations.take(10).foreach(println)

yahoo
lowell
board
message
occupy
perbalierinip
yahoo
lowell
board
message


*Please note:*

*Named Entity Extraction is a pretty expensive operation, depending on the size of the dataset, the following instruction may run for hours or even days.*

In [20]:
val counts = organizations.countByValue.toSeq.sortBy{case (term, count) => -count}

In [21]:
IOUtil.text("organization_counts.csv", printTop = 10) { out =>
    counts.map{case (term, count) => term + "," + count}.foreach(out.println)
}

lowell,11837
board,11812
occupy,11525
message,11424
yahoo,3940
new,94
boston,86
msn,77
messenger,75
direct,74


## Caveats

The use of `.countByValue` automatically fetches / collects the counts for all available values to the local driver, which may lead to memory issues if the dataset is too big. Instead, the same operation can be implemented by a distributed `.reduceByKey` operation, with a filter to ensure that only values with high counts are fetched in order to avoid memory overruns. This way, also the sorting can be achieved in a distributed fashion:

In [22]:
val termCounts = terms.map(term => (term, 1L)).reduceByKey(_ + _).filter{case (term, count) => count > 100}

In [23]:
val fetchedTermCounts = termCounts.sortBy{case (term, count) => -count}.collect

In [24]:
fetchedTermCounts.take(10).foreach(println)

(occupy,16850)
(lowell,16850)
(list,16729)
(search,16725)
(group,16513)
(powered,16508)
(theme,16480)
(2012,16462)
(time,16455)
(there,16444)
