This notebook requires FEL4ArchiveSpark: https://github.com/helgeho/FEL4ArchiveSpark

In [1]:
import de.l3s.archivespark._
import de.l3s.archivespark.implicits._
import de.l3s.archivespark.specific.warc._
import de.l3s.archivespark.specific.warc.specs._
import de.l3s.archivespark.specific.warc.implicits._
import de.l3s.archivespark.enrich._
import de.l3s.archivespark.enrich.functions._
import de.l3s.archivespark.enrich.dataloads._
import de.l3s.archivespark.enrichfunctions.fel._
import de.l3s.archivespark2triples._
import org.apache.hadoop.io.compress.GzipCodec

In [2]:
sc.setCheckpointDir("spark_checkpoint")

# Load the FEL model file

In [3]:
val modelFile = "english-nov15.hash"
sc.addFile("hdfs:///user/holzmann/" + modelFile)

# Initialize the web archive collection

In [4]:
val collection = "ArchiveIt-Collection-2950"
val cdxPath = s"/data/archiveit/$collection/cdx/*.cdx.gz"
val warcPath = s"/data/archiveit/$collection/warc"

In [5]:
val records = ArchiveSpark.load(sc, WarcCdxHdfsSpec(cdxPath, warcPath)).filter(_.compressedSize < 1024 * 100) // 100 kb

In [6]:
records.peekJson

{
  "record":{
    "redirectUrl":"-",
    "timestamp":"20111222021216",
    "digest":"ZPP7YDXYWOVKO3RILANIHUPL3REXJDZE",
    "originalUrl":"http://184.107.185.46/newsbeast/index.php?w=500&h=300&b=http://www.newsbeast.gr/files/1/2011/06/15/sintagma_1_5.jpg",
    "surtUrl":"46,185,107,184)/newsbeast/index.php?b=http://www.newsbeast.gr/files/1/2011/06/15/sintagma_1_5.jpg&h=300&w=500",
    "mime":"text/html",
    "compressedSize":464,
    "meta":"-",
    "status":404
  }
}

# Select successful responses of type HTML and detect duplicates

In [7]:
val responses = records.filter(r => r.status == 200 && r.mime == "text/html")

In [8]:
val earliestDigests = responses.map(r => ((r.surtUrl, r.digest), r)).reduceByKey{(r1, r2) =>
    if (r1.time < r2.time) r1 else r2
}

In [9]:
val duplicates = records.map(r => ((r.surtUrl, r.digest), r)).join(earliestDigests).map{case (_, records) => records}.filter{case (r1, r2) => r1.time != r2.time}

# Generate *ArchivedDocument* triples representing distinct webpages

In [10]:
val versions = earliestDigests.map{case (_, r) => r}.union(duplicates.map{case (r1, r2) => r1})

In [11]:
versions.count

10427385

In [12]:
val documentTriples = ArchiveSpark2Triples.generateDocs(versions)

In [13]:
documentTriples.count

3036326

In [14]:
println(documentTriples.peek)


<http://occupyalabama.org/forum/showthread.php?t=161&goto=nextnewest> rdf:type owa:ArchivedDocument ;
    owa:numOfCaptures    "6"^^xsd:integer ;
    owa:firstCapture     "2011-12-03T05:56:19"^^xsd:dateTime ;
    owa:lastCapture      "2012-01-03T03:36:09"^^xsd:dateTime ;
    dc:hasVersion        <https://web.archive.org/web/20111203055619/http://occupyalabama.org/forum/showthread.php?t=161&goto=nextnewest>,
                         <https://web.archive.org/web/20111210061229/http://occupyalabama.org/forum/showthread.php?t=161&goto=nextnewest>,
                         <https://web.archive.org/web/20111217055735/http://occupyalabama.org/forum/showthread.php?t=161&goto=nextnewest>,
                         <https://web.archive.org/web/20111220030152/http://occupyalabama.org/forum/showthread.php?t=161&goto=nextnewest>,
                         <https://web.archive.org/web/20111227031921/http://occupyalabama.org/forum/showthread.php?t=161&goto=nextnewest>,
                         <https:

# Create "*sameAs* triples" from duplicates

In [15]:
val sameAsTriples = ArchiveSpark2Triples.generateSameAsVersions(duplicates)

In [16]:
println(sameAsTriples.peek)


<https://web.archive.org/web/20120105062548/http://www.livestream.com/forum/showpost.php?p=18958&postcount=9> rdf:type owa:VersionedDocument ;
    dc:date              "2012-01-05T06:25:48"^^xsd:dateTime ;
    owl:sameAs           <https://web.archive.org/web/20111229064632/http://www.livestream.com/forum/showpost.php?p=18958&postcount=9> .



# Generate *VersionedDocument* triples with title and entities

In [17]:
val repartitioned = earliestDigests.map{case (_, r) => r}.repartition(5000)

In [18]:
val title = HtmlText.of(Html.first("title"))
val responsesWithTitles = repartitioned.enrich(title)

In [19]:
val fel = FELwithTimeOut(scoreThreshold = -5, modelFile = modelFile).on(HtmlText)
val responsesWithEntities = responsesWithTitles.enrich(fel)

In [20]:
responsesWithEntities.peekJson

{
  "record":{
    "redirectUrl":"-",
    "timestamp":"20120107181926",
    "digest":"4HJLCNZLAJH33NLYYUFRHZMTHBKPKGSC",
    "originalUrl":"https://www.facebook.com/family/Fdez-D%C3%ADaz/aZc/7",
    "surtUrl":"com,facebook)/family/fdez-d%c3%adaz/azc/7",
    "mime":"text/html",
    "compressedSize":6365,
    "meta":"-",
    "status":200
  },
  "payload":{
    "string":{
      "html":{
        "title":{
          "text":"Fdez Díaz | Facebook"
        },
        "body":{
          "text":{
            "entities":[
              {
                "span":"facebook",
                "endOffset":717,
                "score":-0.35284731226502275,
                "annotation":"Facebook",
                "startOffset":709
              },
              {
                "span":"fac...

In [21]:
val versionTriples = ArchiveSpark2Triples.generateVersionsMapped(responsesWithEntities) {(record, uid, doc) =>
    val recordTitle = record.value(title).getOrElse("")
    val recordEntities = record.value(fel).getOrElse(Seq.empty)
    doc.appendTriples("dc:title", s"""\"recordTitle\"""").appendChildren("schema:mentions", {
        recordEntities.zipWithIndex.map{case (entity, i) => TripleDoc(
            s"_:e$uid-$i",
            "oae:Entity",
            Seq(
                "oae:confidence" -> Seq(s""""${entity.score}"^^xsd:double"""),
                "oae:detectedAs" -> Seq(s"""\"${entity.span}\""""),
                "oae:position" -> Seq(s""""${entity.startOffset}"^^xsd:integer"""),
                "oae:hasMatchedURI" -> Seq(s"<http://dbpedia.org/resource/${entity.annotation}>")
            )
        )}
    })
}

In [22]:
println(versionTriples.peek)


<https://web.archive.org/web/20120510090242/https://www.facebook.com/permalink.php?story_fbid=331396450236173&id=300935383297289> rdf:type owa:VersionedDocument ;
    dc:date              "2012-05-10T09:02:42"^^xsd:dateTime ;
    dc:format            "text/html" ;
    dc:title             "recordTitle" ;
    schema:mentions      _:e0-0,
                         _:e0-1,
                         _:e0-2,
                         _:e0-3,
                         _:e0-4,
                         _:e0-5,
                         _:e0-6,
                         _:e0-7,
                         _:e0-8,
                         _:e0-9,
                         _:e0-10,
                         _:e0-11,
                         _:e0-12,
                         _:e0-13,
                         _:e0-14,
                         _:e0-15,
                         _:e0-16,
                         _:e0-17,
                         _:e0-18,
                         _:e0-19,
                       

# Sort and store with headers

In [23]:
val headers = TripleHeader.append("oae" -> "http://www.ics.forth.gr/isl/oae/core#")

In [24]:
val triples = ArchiveSpark2Triples.toStringsSorted(headers, documentTriples, sameAsTriples, versionTriples)

Unauthorized system.exit detected!


In [25]:
triples.saveAsTextFile(s"$collection-Triples1.gz", classOf[GzipCodec])