In [14]:
import collection.JavaConversions._

import org.apache.hadoop.conf.Configuration
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.rdd.RDD

import org.bson.BSONObject
import org.bson.types.BasicBSONList
import java.io._
import com.mongodb.hadoop.{
  MongoInputFormat, MongoOutputFormat,
  BSONFileInputFormat, BSONFileOutputFormat}

import org.jsoup.Jsoup
import org.jsoup.HttpStatusException

In [2]:
val mongoConfig = new Configuration()
mongoConfig.set("mongo.input.uri",
    "mongodb://localhost:27017/twitter.tweets")
val documents = sc.newAPIHadoopRDD(
    mongoConfig,                // Configuration
    classOf[MongoInputFormat],  // InputFormat
    classOf[Object],            // Key type
    classOf[BSONObject])        // Value type

In [3]:
val topURLs = documents.map{case (a, b) => b}.
  map(_ get("entities")).
  map{case e: BSONObject => e.get("urls")}.
  flatMap{case l: BasicBSONList => l.toMap.map(_._2).toList}.
  flatMap{case b: BSONObject => Some(b.get("expanded_url"))
          case _ => None}.
  flatMap{
      x => {
          val re = """http://(?:www\.)?(\w+\.\w+)/.*""".r
          x match {
              case re(a) => Some(a)
              case _ => None
          }
      }
  }.
  countByValue().
  toList.
  sortBy(- _._2)

In [4]:
topURLs.take(20).map(println)

(bit.ly,295)
(softnetsearch.com,199)
(buff.ly,130)
(ow.ly,125)
(wp.me,46)
(goo.gl,25)
(firstround.com,22)
(snip.ly,21)
(activevoice.us,21)
(dlvr.it,19)
(openculture.com,17)
(bluehillresearch.com,17)
(datasciencecentral.com,14)
(onforb.es,11)
(dataversity.net,11)
(datanami.com,11)
(paper.li,10)
(boozallen.com,9)
(owl.li,9)
(tweetedtimes.com,8)


List((), (), (), (), (), (), (), (), (), (), (), (), (), (), (), (), (), (), (), ())

In [6]:
val file = new File("topURLs.csv")
val writer = new BufferedWriter(new FileWriter(file))
writer.write("Base URL,Use Count\n")
for (pair <- topURLs.take(15)) writer.write(pair._1 + "," + pair._2 + "\n")
writer.close()

In [21]:
val topExpanded = documents.map{case (a, b) => b}.
  map(_ get("entities")).
  map{case e: BSONObject => e.get("urls")}.
  flatMap{case l: BasicBSONList => l.toMap.map(_._2).toList}.
  flatMap{case b: BSONObject => Some(b.get("expanded_url"))
          case _ => None}.
  map(x => {
      val bit = """http://bit\.ly/.*""".r
      val buf = """http://buff\.ly/.*""".r
      def lengthen(x: Object) : String = {
        val doc = Jsoup.connect(x + "+").get()
        doc.select("a#bitmark_long_url").attr("href")
      }
      def retryLengthen(x: Object): Option[String] = {
          try {
              return Some(lengthen(x))
          } catch {
              case e: HttpStatusException => {
                  if (e.getStatusCode() == 403) {
                      Thread.sleep(10000) // crawl responsibly!
                      return retryLengthen(x)
                  } else return None
              }
          }
      }
      x match {
          case bit() => {
             retryLengthen(x)
          }
          case buf() => { // more of a hail marry
             retryLengthen(x)
          }
          case _ => x
      }}).
   flatMap{
      x => {
          val re = """http://(?:www\.)?(\w+\.\w+)/.*""".r
          x match {
              case re(a) => Some(a)
              case _ => None
          }
      }
  }.
  countByValue().
  toList.
  sortBy(- _._2)

In [26]:
val file = new File("topURLsExpanded.csv")
val writer = new BufferedWriter(new FileWriter(file))
writer.write("Base URL,Use Count\n")
for (pair <- topExpanded.filter(!_._1.contains(".ly")).take(15)) writer.write(pair._1 + "," + pair._2 + "\n")
writer.close()

In [7]:
// get a sample of bit.ly urls
documents.map{case (a, b) => b}.
  map(_ get("entities")).
  map{case e: BSONObject => e.get("urls")}.
  flatMap{case l: BasicBSONList => l.toMap.map(_._2).toList}.
  flatMap{case b: BSONObject => Some(b.get("expanded_url"))
          case _ => None}.
  flatMap{case s: String => "http://bit.ly/.*".r findFirstIn s}.
  take(5)

Array(http://bit.ly/1PNSyUE, http://bit.ly/1RyUCeo, http://bit.ly/1SjkWi8, http://bit.ly/1XhvNIU, http://bit.ly/1RkWvvv)

In [8]:
// get a sample of buff.ly urls
documents.map{case (a, b) => b}.
  map(_ get("entities")).
  map{case e: BSONObject => e.get("urls")}.
  map{case l: BasicBSONList => l.get("0")}.
  flatMap{case b: BSONObject => Some(b.get("expanded_url"))
          case _ => None}.
  flatMap{case s: String => "http://buff.ly/.*".r findFirstIn s}.
  take(5)

Array(http://buff.ly/1Poikha, http://buff.ly/1Poikha, http://buff.ly/1Poikha, http://buff.ly/1Poikha, http://buff.ly/1UNUNtq)