In [1]:
def strToInt(str: String): Int = {
    Seq("zero", "one", "two", "three", "four", "five", "six", "seven", "eight", "nine").indexOf(str)
}

In [2]:
strToInt("four")

4

In [3]:
Seq("one", "two", "three").map(strToInt)

List(1, 2, 3)

In [4]:
Seq("one", "two", "three").map(str => strToInt(str) + 10)

List(11, 12, 13)

# Apache Spark
### See the [programming guide](http://spark.apache.org/docs/latest/programming-guide.html)

In [5]:
val numbers = sc.textFile("numbers.txt") // a file with a number string on each line

In [6]:
numbers.take(1).head

two

In [7]:
numbers.collect.foreach(println)

two
seven
one
five


In [8]:
numbers.map(strToInt).collect.foreach(println)

2
7
1
5


In [9]:
numbers.map(strToInt).reduce(_ + _)

15

In [10]:
numbers.map(strToInt).filter(_ < 5).reduce(_ + _)

3

# ArchiveSpark

### Read more on [GitHub](https://github.com/helgeho/ArchiveSpark)

In [11]:
import de.l3s.archivespark.ArchiveSpark
import de.l3s.archivespark.enrich.functions._
import de.l3s.archivespark.nativescala.implicits._
import de.l3s.archivespark.implicits._

In [12]:
val collection = "ArchiveIt-Collection-2950"

In [13]:
val cdxPath = s"/data/hackathon/$collection/cdx/*.cdx.gz"
val warcPath = s"/data/hackathon/$collection/warc"

In [14]:
val rdd = ArchiveSpark.hdfs(cdxPath, warcPath)(sc)

In [15]:
rdd.take(1).head.toJsonString

{
  "record":{
    "redirectUrl":"-",
    "timestamp":"20111222021216",
    "digest":"ZPP7YDXYWOVKO3RILANIHUPL3REXJDZE",
    "originalUrl":"http://184.107.185.46/newsbeast/index.php?w=500&h=300&b=http://www.newsbeast.gr/files/1/2011/06/15/sintagma_1_5.jpg",
    "surtUrl":"46,185,107,184)/newsbeast/index.php?b=http://www.newsbeast.gr/files/1/2011/06/15/sintagma_1_5.jpg&h=300&w=500",
    "mime":"text/html",
    "meta":"-",
    "status":404
  }
}

In [16]:
rdd.filter(r => r.status == 200).take(1).head.toJsonString

{
  "record":{
    "redirectUrl":"-",
    "timestamp":"20111222011042",
    "digest":"M5X4A2NFIZRSBQIALYJQXV5AV3DVPIK5",
    "originalUrl":"http://zamg.ac.at/pict/aktuell/20110321_fuku_Cs-137-glob.gif",
    "surtUrl":"at,ac,zamg)/pict/aktuell/20110321_fuku_cs-137-glob.gif",
    "mime":"image/gif",
    "meta":"-",
    "status":200
  }
}

In [17]:
rdd.filter(r => r.status == 200 && r.mime == "text/html").take(1).head.toJsonString

{
  "record":{
    "redirectUrl":"-",
    "timestamp":"20111222043804",
    "digest":"YVOEIYJ45I7QNNFBQTCPKIQAQJIE4B46",
    "originalUrl":"http://english.cntv.cn/program/newsupdate/20110504/109544.shtml",
    "surtUrl":"cn,cntv,english)/program/newsupdate/20110504/109544.shtml",
    "mime":"text/html",
    "meta":"-",
    "status":200
  }
}

In [18]:
val onlineHtml = rdd.filter(r => r.status == 200 && r.mime == "text/html")

In [19]:
onlineHtml.count

13382715

In [20]:
val sample = onlineHtml.sample(false, .001, 12345).cache

In [21]:
sample.count

13485

### Dealing with revisions (here: selecting the latest capture of each URLs)

In [22]:
sample.map(r => (r.surtUrl, 1)).reduceByKey(_ + _).sortBy{case (url, count) => -count}.take(10)

Array((com,twitter)/occupymilwaukee,3), (org,occupywallst)/forum/raising-awareness-for-support-of-the-movement,3), (com,twitter)/occupychi,3), (com,salon)/2011/12/31/progressives_and_the_ron_paul_fallacies/singleton/salon.com,3), (org,occupywallst)/forum/matt-taibbi-of-rolling-stone-famously-described-go,3), (com,twitter)/an0nyc,3), (org,occupyclaremont)/?month=jan&page_id=9&yr=2008,2), (com,twitter)/rhiannonkwy?_twitter_noscript=1,2), (com,twitter)/occupylubbock?_twitter_noscript=1,2), (com,twitter)/occupyarkansas,2))

In [23]:
val url = "org,occupywallst)/forum/matt-taibbi-of-rolling-stone-famously-described-go"
sample.filter(r => r.surtUrl == url).toJsonStrings.collect.foreach(println)

                                                                                {
  "record":{
    "redirectUrl":"-",
    "timestamp":"20120301043214",
    "digest":"QMDB46M4TOZZVCAPUXPSV5M5XQK5RYWY",
    "originalUrl":"http://occupywallst.org/forum/matt-taibbi-of-rolling-stone-famously-described-go/",
    "surtUrl":"org,occupywallst)/forum/matt-taibbi-of-rolling-stone-famously-described-go",
    "mime":"text/html",
    "meta":"-",
    "status":200
  }
}
{
  "record":{
    "redirectUrl":"-",
    "timestamp":"20111231034641",
    "digest":"W4DGQOVS6BV62ORB5UHIILTPFDGFFUFT",
    "originalUrl":"http://occupywallst.org/forum/matt-taibbi-of-rolling-stone-famously-described-go/",
    "surtUrl":"org,occupywallst)/forum/matt-taibbi-of-rolling-stone-famously-described-go",
    "mime":"text/html",
    "meta":"-",
    "status":200
  }
}
{
  "record":{
    "redirectUrl":"-",
    "timestamp":"20120226053324",
    "digest":"HIQRYCYJ4QP4KHGF6U75ZHMNTY7BRESF",
    "originalUrl":"http://occupywallst.

In [24]:
val latest = sample.map(r => (r.surtUrl, r)).reduceByKey((a, b) => Seq(a, b).maxBy(r => r.time.getMillis)).map{case (url, r) => r}.cache

In [25]:
latest.count

13285

### Enriching records

#### See [GitHub](https://github.com/helgeho/ArchiveSpark/tree/master/src/main/scala/de/l3s/archivespark/enrich/functions) for an overview of available enrich functions

#### See [GitHub](https://github.com/helgeho/ArchiveSpark/blob/master/src/main/scala/de/l3s/archivespark/implicits/classes/EnrichableRDD.scala) for the set of specialized archive RDD functions, such as rdd.enrich(fun)

In [26]:
val payload = latest.enrich(Payload)

In [27]:
println(payload.take(1).head.toJsonString)

                                                                                {
  "record":{
    "redirectUrl":"http://www.occupyunconference.net/",
    "timestamp":"20120107070224",
    "digest":"R6VHAQHS5EXFBMDGVORLPYH5HLNI7IWI",
    "originalUrl":"https://www.facebook.com/l.php?u=http%3A%2F%2Fwww.occupyunconference.net&h=HAQEF5ZeO",
    "surtUrl":"com,facebook)/l.php?h=haqef5zeo&u=http://www.occupyunconference.net",
    "mime":"text/html",
    "meta":"-",
    "status":200
  },
  "recordHeader":{
    "WARC-Target-URI":"https://www.facebook.com/l.php?u=http%3A%2F%2Fwww.occupyunconference.net&h=HAQEF5ZeO",
    "WARC-Date":"2012-01-07T07:02:24Z",
    "WARC-IP-Address":"69.171.229.16",
    "WARC-Type":"response",
    "Content-Length":"770",
    "WARC-Payload-Digest":"sha1:R6VHAQHS5EXFBMDGVORLPYH5HLNI7IWI",
    "Content-Type":"application/http; msgtype=response",
    "absolute-offset":"0",
    "WARC-Record-ID":"<urn:uuid:6db32307-e7b5-4540-ae6c-95e8f4ca41e0>",
    "reader-identifier":

In [28]:
payload.map(r => r.get[Array[Byte]]("payload").get.length).take(1).head

420

In [29]:
payload.map(r => r.value(Payload).get.length).take(1).head

420

In [30]:
println(payload.mapEnrich(Payload, "length") {p => p.length}.take(1).head.toJsonString)

{
  "record":{
    "redirectUrl":"http://www.occupyunconference.net/",
    "timestamp":"20120107070224",
    "digest":"R6VHAQHS5EXFBMDGVORLPYH5HLNI7IWI",
    "originalUrl":"https://www.facebook.com/l.php?u=http%3A%2F%2Fwww.occupyunconference.net&h=HAQEF5ZeO",
    "surtUrl":"com,facebook)/l.php?h=haqef5zeo&u=http://www.occupyunconference.net",
    "mime":"text/html",
    "meta":"-",
    "status":200
  },
  "recordHeader":{
    "WARC-Target-URI":"https://www.facebook.com/l.php?u=http%3A%2F%2Fwww.occupyunconference.net&h=HAQEF5ZeO",
    "WARC-Date":"2012-01-07T07:02:24Z",
    "WARC-IP-Address":"69.171.229.16",
    "WARC-Type":"response",
    "Content-Length":"770",
    "WARC-Payload-Digest":"sha1:R6VHAQHS5EXFBMDGVORLPYH5HLNI7IWI",
    "Content-Type":"application/http; msgtype=response",
    "absolute-offset":"0",
    "WARC-Record-ID":"<urn:uuid:6db32307-e7b5-4540-ae6c-95e8f4ca41e0>",
    "reader-identifier":"ARCHIVEIT-2950-DAILY-DWOFTZ-20120107070220-00022-crawling200.us.archive.org-6682.

In [31]:
payload.mapEnrich(Payload, "length") {p => p.length}.mapValues[Int]("payload.length").take(1).head

420

In [32]:
val strings = payload.enrich(StringContent)

In [33]:
println(strings.take(1).head.toJsonString)

{
  "record":{
    "redirectUrl":"http://www.occupyunconference.net/",
    "timestamp":"20120107070224",
    "digest":"R6VHAQHS5EXFBMDGVORLPYH5HLNI7IWI",
    "originalUrl":"https://www.facebook.com/l.php?u=http%3A%2F%2Fwww.occupyunconference.net&h=HAQEF5ZeO",
    "surtUrl":"com,facebook)/l.php?h=haqef5zeo&u=http://www.occupyunconference.net",
    "mime":"text/html",
    "meta":"-",
    "status":200
  },
  "recordHeader":{
    "WARC-Target-URI":"https://www.facebook.com/l.php?u=http%3A%2F%2Fwww.occupyunconference.net&h=HAQEF5ZeO",
    "WARC-Date":"2012-01-07T07:02:24Z",
    "WARC-IP-Address":"69.171.229.16",
    "WARC-Type":"response",
    "Content-Length":"770",
    "WARC-Payload-Digest":"sha1:R6VHAQHS5EXFBMDGVORLPYH5HLNI7IWI",
    "Content-Type":"application/http; msgtype=response",
    "absolute-offset":"0",
    "WARC-Record-ID":"<urn:uuid:6db32307-e7b5-4540-ae6c-95e8f4ca41e0>",
    "reader-identifier":"ARCHIVEIT-2950-DAILY-DWOFTZ-20120107070220-00022-crawling200.us.archive.org-6682.

### Working with HTML tags (here: titles)

In [34]:
val Title = Html.first("title")

In [35]:
println(strings.enrich(Title).filterExists(Title).take(1).head.toJsonString)

                                                                                {
  "record":{
    "redirectUrl":"-",
    "timestamp":"20120310104855",
    "digest":"TTTWXNBO4GKSXQTUXPWTOSVQ6BR4JQWU",
    "originalUrl":"http://blog.alexanderhiggins.com/tags/libyan-revolution/",
    "surtUrl":"com,alexanderhiggins,blog)/tags/libyan-revolution",
    "mime":"text/html",
    "meta":"I",
    "status":200
  },
  "recordHeader":{
    "WARC-Target-URI":"http://blog.alexanderhiggins.com/tags/libyan-revolution/",
    "WARC-Date":"2012-03-10T10:48:55Z",
    "WARC-IP-Address":"199.27.135.227",
    "WARC-Type":"response",
    "Content-Length":"55707",
    "WARC-Payload-Digest":"sha1:TTTWXNBO4GKSXQTUXPWTOSVQ6BR4JQWU",
    "Content-Type":"application/http; msgtype=response",
    "absolute-offset":"0",
    "WARC-Record-ID":"<urn:uuid:626988a0-aa89-48b5-aebe-d43378a45c22>",
    "reader-identifier":"ARCHIVEIT-2950-MONTHLY-TMVOKO-20120310104813-00102-crawling207.us.archive.org-6682.warc.gz"
  },
  "htt

In [36]:
latest.take(1).head.toJsonString

{
  "record":{
    "redirectUrl":"http://www.occupyunconference.net/",
    "timestamp":"20120107070224",
    "digest":"R6VHAQHS5EXFBMDGVORLPYH5HLNI7IWI",
    "originalUrl":"https://www.facebook.com/l.php?u=http%3A%2F%2Fwww.occupyunconference.net&h=HAQEF5ZeO",
    "surtUrl":"com,facebook)/l.php?h=haqef5zeo&u=http://www.occupyunconference.net",
    "mime":"text/html",
    "meta":"-",
    "status":200
  }
}

In [37]:
latest.enrich(Title).filterExists(Title).take(1).head.toJsonString

{
  "record":{
    "redirectUrl":"-",
    "timestamp":"20120310104855",
    "digest":"TTTWXNBO4GKSXQTUXPWTOSVQ6BR4JQWU",
    "originalUrl":"http://blog.alexanderhiggins.com/tags/libyan-revolution/",
    "surtUrl":"com,alexanderhiggins,blog)/tags/libyan-revolution",
    "mime":"text/html",
    "meta":"I",
    "status":200
  },
  "payload":{
    "string":{
      "html":{
        "title":"<title>Libyan Revolution | Alexander Higgins Blog</title>"
      }
    }
  }
}

In [38]:
val titles = latest.enrich(Title).filterExists(Title)

In [39]:
println(titles.take(1).head.toJsonString)

{
  "record":{
    "redirectUrl":"-",
    "timestamp":"20120310104855",
    "digest":"TTTWXNBO4GKSXQTUXPWTOSVQ6BR4JQWU",
    "originalUrl":"http://blog.alexanderhiggins.com/tags/libyan-revolution/",
    "surtUrl":"com,alexanderhiggins,blog)/tags/libyan-revolution",
    "mime":"text/html",
    "meta":"I",
    "status":200
  },
  "payload":{
    "string":{
      "html":{
        "title":"<title>Libyan Revolution | Alexander Higgins Blog</title>"
      }
    }
  }
}


In [40]:
titles.enrich(HtmlText).take(1).head.toJsonString

{
  "record":{
    "redirectUrl":"-",
    "timestamp":"20120310104855",
    "digest":"TTTWXNBO4GKSXQTUXPWTOSVQ6BR4JQWU",
    "originalUrl":"http://blog.alexanderhiggins.com/tags/libyan-revolution/",
    "surtUrl":"com,alexanderhiggins,blog)/tags/libyan-revolution",
    "mime":"text/html",
    "meta":"I",
    "status":200
  },
  "payload":{
    "string":{
      "html":{
        "title":"<title>Libyan Revolution | Alexander Higgins Blog</title>",
        "body":{
          "text":"Alexander Higgins Blog The Latest Buzz, Analysis, and News Without the Snooze! Home Headlines Authors About Subscribe, Friend or Follow Advertise Economy Environment Headlines Health Member Submitted Middle East Projects Society Technology The Alexander Higgins Show Uncategorized US Videos Web ...

In [41]:
titles.enrich(HtmlText.of(Title)).take(1).head.toJsonString

{
  "record":{
    "redirectUrl":"-",
    "timestamp":"20120310104855",
    "digest":"TTTWXNBO4GKSXQTUXPWTOSVQ6BR4JQWU",
    "originalUrl":"http://blog.alexanderhiggins.com/tags/libyan-revolution/",
    "surtUrl":"com,alexanderhiggins,blog)/tags/libyan-revolution",
    "mime":"text/html",
    "meta":"I",
    "status":200
  },
  "payload":{
    "string":{
      "html":{
        "title":{
          "_":"<title>Libyan Revolution | Alexander Higgins Blog</title>",
          "text":"Libyan Revolution | Alexander Higgins Blog"
        }
      }
    }
  }
}

In [42]:
val TitleText = HtmlText.of(Html.first("title"))

In [43]:
titles.mapValues(TitleText).take(10).foreach(println)

Libyan Revolution | Alexander Higgins Blog
Changes related to Turning of the Tide (20-5-1996) /Video index - Talk2000.NL
Toxicology | Alexander Higgins Blog
#OCCUPY | OCCUPY PIX - Part 50
Soundboard
Alicia MarMar - Google Profile
Demands | Forum | NYC General Assembly # Occupy Wall Street
Sign up | Tumblr
Forum Post: they are coming home | OccupyWallSt.org
Google Groups


In [44]:
titles.mapEnrich(TitleText, "tokens") {t => t.split(" ")}.take(1).head.toJsonString

{
  "record":{
    "redirectUrl":"-",
    "timestamp":"20120310104855",
    "digest":"TTTWXNBO4GKSXQTUXPWTOSVQ6BR4JQWU",
    "originalUrl":"http://blog.alexanderhiggins.com/tags/libyan-revolution/",
    "surtUrl":"com,alexanderhiggins,blog)/tags/libyan-revolution",
    "mime":"text/html",
    "meta":"I",
    "status":200
  },
  "payload":{
    "string":{
      "html":{
        "title":{
          "_":"<title>Libyan Revolution | Alexander Higgins Blog</title>",
          "text":{
            "tokens":[
              "Libyan",
              "Revolution",
              "|",
              "Alexander",
              "Higgins",
              "Blog"
            ]
          }
        }
      }
    }
  }
}

In [45]:
titles.enrich(Entities.of(TitleText)).take(1).head.toJsonString

{
  "record":{
    "redirectUrl":"-",
    "timestamp":"20120310104855",
    "digest":"TTTWXNBO4GKSXQTUXPWTOSVQ6BR4JQWU",
    "originalUrl":"http://blog.alexanderhiggins.com/tags/libyan-revolution/",
    "surtUrl":"com,alexanderhiggins,blog)/tags/libyan-revolution",
    "mime":"text/html",
    "meta":"I",
    "status":200
  },
  "payload":{
    "string":{
      "html":{
        "title":{
          "_":"<title>Libyan Revolution | Alexander Higgins Blog</title>",
          "text":{
            "entities":{
              "persons":[
                "Alexander",
                "Higgins",
                "Blog"
              ],
              "organizations":[
                
              ],
              "locations":[
                
              ],
              "dates...

In [46]:
val TitleEntities = Entities.of(TitleText)
val entities = titles.enrich(TitleEntities)
entities.take(1).head.toJsonString

{
  "record":{
    "redirectUrl":"-",
    "timestamp":"20120310104855",
    "digest":"TTTWXNBO4GKSXQTUXPWTOSVQ6BR4JQWU",
    "originalUrl":"http://blog.alexanderhiggins.com/tags/libyan-revolution/",
    "surtUrl":"com,alexanderhiggins,blog)/tags/libyan-revolution",
    "mime":"text/html",
    "meta":"I",
    "status":200
  },
  "payload":{
    "string":{
      "html":{
        "title":{
          "_":"<title>Libyan Revolution | Alexander Higgins Blog</title>",
          "text":{
            "entities":{
              "persons":[
                "Alexander",
                "Higgins",
                "Blog"
              ],
              "organizations":[
                
              ],
              "locations":[
                
              ],
              "dates...

In [47]:
entities.saveAsJson("entities.gz")

Name: org.apache.hadoop.mapred.FileAlreadyExistsException
Message: Output directory hdfs://nn-ia.s3s.altiscale.com:8020/user/holzmann/entities.gz already exists
StackTrace: org.apache.hadoop.mapred.FileOutputFormat.checkOutputSpecs(FileOutputFormat.java:131)
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1179)
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1156)
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1156)
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1156)
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply$mcV$sp(PairRDDFunctions.scala:1060)
org.apache.spark.rd

In [48]:
titles.mapValues[Seq[String]](Entities.of(TitleText), "persons").flatMap(persons => persons).take(20).foreach(println)

                                                                                Alexander
Higgins
Blog
Alexander
Higgins
Blog
Alicia
MarMar
Alexander
Higgins
Blog
Digg
Carol
Stephanie
Carranza
Shin
Bet
Prescott
Yuri
Madarame


### Working with links

In [49]:
latest.take(1).head.toJsonString

{
  "record":{
    "redirectUrl":"http://www.occupyunconference.net/",
    "timestamp":"20120107070224",
    "digest":"R6VHAQHS5EXFBMDGVORLPYH5HLNI7IWI",
    "originalUrl":"https://www.facebook.com/l.php?u=http%3A%2F%2Fwww.occupyunconference.net&h=HAQEF5ZeO",
    "surtUrl":"com,facebook)/l.php?h=haqef5zeo&u=http://www.occupyunconference.net",
    "mime":"text/html",
    "meta":"-",
    "status":200
  }
}

In [50]:
latest.enrich(Html.all("a", "links")).take(1).head.toJsonString

{
  "record":{
    "redirectUrl":"http://www.occupyunconference.net/",
    "timestamp":"20120107070224",
    "digest":"R6VHAQHS5EXFBMDGVORLPYH5HLNI7IWI",
    "originalUrl":"https://www.facebook.com/l.php?u=http%3A%2F%2Fwww.occupyunconference.net&h=HAQEF5ZeO",
    "surtUrl":"com,facebook)/l.php?h=haqef5zeo&u=http://www.occupyunconference.net",
    "mime":"text/html",
    "meta":"-",
    "status":200
  },
  "payload":{
    "string":{
      "html":{
        "links":[
          "<a href=\"http://www.occupyunconference.net/\">http://www.occupyunconference.net/</a>"
        ]
      }
    }
  }
}

In [51]:
val Links = HtmlAttribute("href").ofEach(Html.all("a", "links"))

In [52]:
latest.enrich(Links).take(1).head.toJsonString

{
  "record":{
    "redirectUrl":"http://www.occupyunconference.net/",
    "timestamp":"20120107070224",
    "digest":"R6VHAQHS5EXFBMDGVORLPYH5HLNI7IWI",
    "originalUrl":"https://www.facebook.com/l.php?u=http%3A%2F%2Fwww.occupyunconference.net&h=HAQEF5ZeO",
    "surtUrl":"com,facebook)/l.php?h=haqef5zeo&u=http://www.occupyunconference.net",
    "mime":"text/html",
    "meta":"-",
    "status":200
  },
  "payload":{
    "string":{
      "html":{
        "links":[
          {
            "attributes":{
              "href":"http://www.occupyunconference.net/"
            }
          }
        ]
      }
    }
  }
}

In [53]:
import java.net.URL
import scala.util.Try

In [54]:
val hosts = latest.mapEnrich(Links, "host") {url => Try{new URL(url).getHost}.getOrElse("")}

In [55]:
hosts.take(1).head.toJsonString

{
  "record":{
    "redirectUrl":"http://www.occupyunconference.net/",
    "timestamp":"20120107070224",
    "digest":"R6VHAQHS5EXFBMDGVORLPYH5HLNI7IWI",
    "originalUrl":"https://www.facebook.com/l.php?u=http%3A%2F%2Fwww.occupyunconference.net&h=HAQEF5ZeO",
    "surtUrl":"com,facebook)/l.php?h=haqef5zeo&u=http://www.occupyunconference.net",
    "mime":"text/html",
    "meta":"-",
    "status":200
  },
  "payload":{
    "string":{
      "html":{
        "links":[
          {
            "attributes":{
              "href":{
                "host":"www.occupyunconference.net"
              }
            }
          }
        ]
      }
    }
  }
}

In [56]:
hosts.mapValues[Seq[String]]("payload.string.html.links*.attributes.href.host").take(1).head

List(www.occupyunconference.net)

In [57]:
val recordHosts = hosts.mapValues[Seq[String]]("payload.string.html.links*.attributes.href.host")

In [58]:
val distinctHosts = recordHosts.flatMap(hosts => hosts.filter(_.nonEmpty).distinct.map(host => (host, 1)))

In [59]:
distinctHosts.take(10)

Array((www.occupyunconference.net,1), (twitter.com,1), (blog.alexanderhiggins.com,1), (feedburner.google.com,1), (feeds2.feedburner.com,1), (www.alexa.com,1), (wordpress.org,1), (shtf411.com,1), (censoredrickreuben.blogspot.com,1), (t.co,1))

In [60]:
val hostUrlCounts = distinctHosts.reduceByKey(_ + _).sortBy{case (host, count) => -count}

In [61]:
hostUrlCounts.take(10).foreach(println)

                                                                                (www.facebook.com,5775)
(twitter.com,5252)
(www.youtube.com,2798)
(www.nycga.net,2327)
(www.google.com,2317)
(occupywallst.org,1918)
(wordpress.org,1848)
(www.reddit.com,1793)
(occupytogether.org,1756)
(howtooccupy.org,1592)


In [62]:
val hostDist = hostUrlCounts.map{case (host, count) => (count / 10 * 10, 1)}.reduceByKey(_ + _)

In [63]:
import org.apache.hadoop.io.compress.GzipCodec
hostDist.saveAsTextFile("hostDist.gz", classOf[GzipCodec])