This notebook requires FEL4ArchiveSpark: https://github.com/helgeho/FEL4ArchiveSpark

In [1]:
import de.l3s.archivespark._
import de.l3s.archivespark.implicits._
import de.l3s.archivespark.specific.warc._
import de.l3s.archivespark.specific.warc.specs._
import de.l3s.archivespark.specific.warc.implicits._
import de.l3s.archivespark.enrich._
import de.l3s.archivespark.enrich.functions._
import de.l3s.archivespark.enrich.dataloads._
import de.l3s.archivespark.enrichfunctions.fel._
import de.l3s.archivespark2triples._
import org.apache.hadoop.io.compress.GzipCodec

In [2]:
sc.setCheckpointDir("spark_checkpoint")

# Load the FEL model file

In [3]:
val modelFile = "english-nov15.hash"
sc.addFile("hdfs:///user/holzmann/" + modelFile)

# Load the web archive collection (filter duplicates and very big records)

In [4]:
val collection = "ArchiveIt-Collection-2950"
val cdxPath = s"/data/archiveit/$collection/cdx/*.cdx.gz"
val warcPath = s"/data/archiveit/$collection/warc"

In [5]:
val raw = ArchiveSpark.load(sc, WarcCdxHdfsSpec(cdxPath, warcPath))

In [6]:
val records = raw.distinctValue(_.get)((a, b) => a).filter(_.compressedSize < 1024 * 100).cache // 100 kb

In [7]:
records.peekJson

{
  "record":{
    "redirectUrl":"-",
    "timestamp":"20120515050246",
    "digest":"QMKCJGWCZI2GOTC6W7TQ5YTMY2YRMWWV",
    "originalUrl":"http://www.occupyslc.org/login/return_url/64-L2dyb3VwLzIz",
    "surtUrl":"org,occupyslc)/login/return_url/64-l2dyb3vwlziz",
    "mime":"text/html",
    "compressedSize":4499,
    "meta":"-",
    "status":200
  }
}

# Select successful responses of type HTML and detect duplicates

In [8]:
val responses = records.filter(r => r.status == 200 && r.mime == "text/html")

In [9]:
val earliestDigests = responses.map(r => ((r.surtUrl, r.digest), r)).reduceByKey{(r1, r2) =>
    if (r1.time < r2.time) r1 else r2
}

In [10]:
earliestDigests.count

7750123

In [11]:
val duplicates = records.map(r => ((r.surtUrl, r.digest), r)).join(earliestDigests).map{case (_, records) => records}.filter{case (r1, r2) => r1.time != r2.time}

In [12]:
duplicates.count

1344450

# Generate *ArchivedDocument* triples representing distinct webpages

In [13]:
val versions = earliestDigests.map{case (_, r) => r}.union(duplicates.map{case (r1, r2) => r1})

In [14]:
versions.count

9094573

In [15]:
val documentTriples = ArchiveSpark2Triples.generateDocs(versions)

In [16]:
documentTriples.count

3036326

In [17]:
println(documentTriples.peek)


<http://occupyalabama.org/forum/showthread.php?t=161&goto=nextnewest> rdf:type owa:ArchivedDocument ;
    owa:numOfCaptures    "6"^^xsd:integer ;
    owa:firstCapture     "2011-12-03T05:56:19"^^xsd:dateTime ;
    owa:lastCapture      "2012-01-03T03:36:09"^^xsd:dateTime ;
    dc:hasVersion        <https://web.archive.org/web/20111203055619/http://occupyalabama.org/forum/showthread.php?t=161&goto=nextnewest>,
                         <https://web.archive.org/web/20111210061229/http://occupyalabama.org/forum/showthread.php?t=161&goto=nextnewest>,
                         <https://web.archive.org/web/20111217055735/http://occupyalabama.org/forum/showthread.php?t=161&goto=nextnewest>,
                         <https://web.archive.org/web/20111220030152/http://occupyalabama.org/forum/showthread.php?t=161&goto=nextnewest>,
                         <https://web.archive.org/web/20111227031921/http://occupyalabama.org/forum/showthread.php?t=161&goto=nextnewest>,
                         <https:

# Create "*sameAs* triples" from duplicates

In [18]:
val sameAsTriples = ArchiveSpark2Triples.generateSameAsVersions(duplicates)

In [19]:
println(sameAsTriples.peek)


<https://web.archive.org/web/20120105062548/http://www.livestream.com/forum/showpost.php?p=18958&postcount=9> rdf:type owa:VersionedDocument ;
    dc:date              "2012-01-05T06:25:48"^^xsd:dateTime ;
    owl:sameAs           <https://web.archive.org/web/20111229064632/http://www.livestream.com/forum/showpost.php?p=18958&postcount=9> .



# Generate *VersionedDocument* triples with title and entities

In [20]:
val repartitioned = earliestDigests.map{case (_, r) => r}.repartition(5000)

In [21]:
val title = HtmlText.of(Html.first("title"))
val responsesWithTitles = repartitioned.enrich(title)

In [22]:
val fel = FELwithTimeOut(scoreThreshold = -5, modelFile = modelFile).on(HtmlText)
val responsesWithEntities = responsesWithTitles.enrich(fel)

In [23]:
responsesWithEntities.peekJson

{
  "record":{
    "redirectUrl":"-",
    "timestamp":"20120510075035",
    "digest":"BZBZGSQPJ5VJJJBNXN5TYD42F2JMHWLX",
    "originalUrl":"http://www.occupyottawa.org/comment/708",
    "surtUrl":"org,occupyottawa)/comment/708",
    "mime":"text/html",
    "compressedSize":10658,
    "meta":"-",
    "status":200
  },
  "payload":{
    "string":{
      "html":{
        "title":{
          "text":"Media Minutes from November 29th | OCCUPY OTTAWA"
        },
        "body":{
          "text":{
            "entities":[
              {
                "span":"calendar",
                "endOffset":94,
                "score":-1.7552143094338724,
                "annotation":"Calendar",
                "startOffset":86
              },
              {
                "span":"so...

In [24]:
val versionTriples = ArchiveSpark2Triples.generateVersionsMapped(responsesWithEntities) {(record, uid, doc) =>
    val recordTitle = record.value(title).getOrElse("")
    val recordEntities = record.value(fel).getOrElse(Seq.empty)
    doc.appendTriples("dc:title", s"""\"$recordTitle\"""").appendChildren("schema:mentions", {
        recordEntities.zipWithIndex.map{case (entity, i) => TripleDoc(
            s"_:e$uid-$i",
            "oae:Entity",
            Seq(
                "oae:confidence" -> Seq(s""""${entity.score}"^^xsd:double"""),
                "oae:detectedAs" -> Seq(s"""\"${entity.span}\""""),
                "oae:position" -> Seq(s""""${entity.startOffset}"^^xsd:integer"""),
                "oae:hasMatchedURI" -> Seq(s"<http://dbpedia.org/resource/${entity.annotation}>")
            )
        )}
    })
}

In [25]:
println(versionTriples.peek)


<https://web.archive.org/web/20111229171039/http://www.livestream.com/forum/search.php?searchthreadid=6480> rdf:type owa:VersionedDocument ;
    dc:date              "2011-12-29T17:10:39"^^xsd:dateTime ;
    dc:format            "text/html" ;
    dc:title             "Livestream Forum - Search Thread" ;
    schema:mentions      _:e0-0,
                         _:e0-1,
                         _:e0-2,
                         _:e0-3,
                         _:e0-4,
                         _:e0-5,
                         _:e0-6,
                         _:e0-7,
                         _:e0-8,
                         _:e0-9,
                         _:e0-10,
                         _:e0-11,
                         _:e0-12,
                         _:e0-13,
                         _:e0-14,
                         _:e0-15,
                         _:e0-16,
                         _:e0-17,
                         _:e0-18,
                         _:e0-19,
                        

# Sort and store with headers

In [26]:
val headers = TripleHeader.append("oae" -> "http://www.ics.forth.gr/isl/oae/core#")

In [27]:
val triples = ArchiveSpark2Triples.toStringsSorted(headers, documentTriples, sameAsTriples, versionTriples)

In [28]:
triples.saveAsTextFile(s"$collection-Triples1.gz", classOf[GzipCodec])