In [1]:
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat

val spark = SparkSession.builder.master("local[*]").appName("SparkSQL").getOrCreate()

import spark.implicits._
import org.apache.spark.sql.functions._

import org.apache.spark._
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD

import java.net.URL

spark = org.apache.spark.sql.SparkSession@54404505


In [2]:
val conf = new Configuration
conf.set("textinputformat.record.delimiter", "WARC/1.0")
val dataset = sc.newAPIHadoopFile(
    "../data/WAT/",
    classOf[TextInputFormat],
    classOf[LongWritable],
    classOf[Text], conf
)

val data = dataset
    .map(x => x._2.toString)
    .filter(_.nonEmpty)
    .map(_.split("\r\n\r\n"))
    .map(arr => (arr(0), arr(1)))
    .filter(_._2.startsWith("{"))

// println(data.count())
// data.take(5).foreach(println)

conf = Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml
dataset = data/WAT/ NewHadoopRDD[0] at newAPIHadoopFile at <console>:48
data = MapPartitionsRDD[5] at filter at <console>:60


MapPartitionsRDD[5] at filter at <console>:60

In [3]:
val uriMetaDataPairs = data
    .map(pair => {
        val (header, metadata) = pair
    
        val keyValuePairs = header
            .split("\r\n")
            .filter(_ != "")
            .map(line => {
                val splitList = line.split(": ")
                ((splitList(0), splitList(1)))
            })

        val targetURL = keyValuePairs
            .filter(_._1 == "WARC-Target-URI")(0)
            ._2
        
        val protocolRegex = """^https?:\/\/.*"""
        
        if (!targetURL.matches(protocolRegex)) {
            ("", metadata)
        } else {
            val domain = try {
                new URL(targetURL).getHost()
            } catch {
                case e: Exception => ""
            }
        
            (domain, metadata)
        }
          
    })
    .filter(_._1.nonEmpty)

// uriMetaDataPairs.take(1).foreach(println)

uriMetaDataPairs = MapPartitionsRDD[7] at filter at <console>:81


MapPartitionsRDD[7] at filter at <console>:81

In [4]:
val uriLinkPairs = uriMetaDataPairs
    .map(pair => {
        val (uri, unparsedMetaData) = pair

        val linkArrayStartIndex = unparsedMetaData.indexOf("\"Links\":") match {
            case x if (x > 0) => (x + ("\"Links\":[").length)
            case _ => 0
        }

        if (linkArrayStartIndex == 0) {
            (uri, Array[String]())
        } else {
             val unparsedLinkArray = unparsedMetaData
                .substring(linkArrayStartIndex)
                .split("]")(0)

            val links = unparsedLinkArray
                .split(",")
                .filter(_.contains("url"))
                .map(jsonObject => {
                    val startIndex = jsonObject.indexOf("\"url\":") + ("\"url\":").length + 1
                    val stopIndex = jsonObject.length - 2
                    
                    if (startIndex < stopIndex) {
                        jsonObject.substring(startIndex, stopIndex)
                    } else {
                        ""
                    }                    
                })
                .filter(_ != "")

            (uri, links)
        }
    })
    .filter(_._2.nonEmpty)

// uriLinkPairs.take(1).foreach(x => {
//     println("[URL: " + x._1 + "]")
//     x._2.foreach(println)
// })

uriLinkPairs = MapPartitionsRDD[9] at filter at <console>:86


MapPartitionsRDD[9] at filter at <console>:86

In [5]:
val uriStaticLinkPairs = uriLinkPairs
    .map(pair => {
        val (uri, links) = pair
        
        val protocolRegex = """^https?:\/\/.*"""
        
        val staticLinks = links
            .filter(_.matches(protocolRegex))
            .map(_.replaceAll("""^https?:\/\/""", """http://"""))
            .map(url => try {
                new URL(url).getHost()
            } catch {
                case e: Exception => ""
            })
            .filter(!_.isEmpty)
        
        (uri, staticLinks)
    })
    .reduceByKey((x, y) => x ++ y)

// uriStaticLinkPairs.take(1).foreach(x => {
//     println("[URL: " + x._1 + "]")
//     x._2.foreach(println)
// })

uriStaticLinkPairs = ShuffledRDD[11] at reduceByKey at <console>:72


ShuffledRDD[11] at reduceByKey at <console>:72

In [6]:
val uriLinkPairsWithIndex = uriStaticLinkPairs
    .zipWithIndex
    .map(x => (x._2.toLong, x._1._1, x._1._2))
    .cache()

// uriLinkPairsWithIndex.take(1).foreach(println)

uriLinkPairsWithIndex = MapPartitionsRDD[13] at map at <console>:58


MapPartitionsRDD[13] at map at <console>:58

In [7]:
val linksList = uriLinkPairsWithIndex.flatMap(pair => {
    val (index, uri, links) = pair
    
    links.map(link => (index, uri, link))
})

val uriList = uriLinkPairsWithIndex.map(x => (x._1, x._2))

// linksList.take(3).foreach(println)
// uriList.take(3).foreach(println)

linksList = MapPartitionsRDD[14] at flatMap at <console>:58
uriList = MapPartitionsRDD[15] at map at <console>:64


MapPartitionsRDD[15] at map at <console>:64

In [8]:
val linksDF = linksList.toDF("indexFrom", "from", "to").filter("from != to").cache()
val uriDF = uriList.toDF("index", "uri").cache()

linksDF.rdd.take(4).foreach(println)

linksDF.printSchema
linksDF.show()

uriDF.printSchema
uriDF.show()

[0,inct.cnpq.br,www.cnpq.br]
[0,inct.cnpq.br,twitter.com]
[0,inct.cnpq.br,www.youtube.com]
[0,inct.cnpq.br,soundcloud.com]
root
 |-- indexFrom: long (nullable = false)
 |-- from: string (nullable = true)
 |-- to: string (nullable = true)

+---------+-------------------+--------------------+
|indexFrom|               from|                  to|
+---------+-------------------+--------------------+
|        0|       inct.cnpq.br|         www.cnpq.br|
|        0|       inct.cnpq.br|         twitter.com|
|        0|       inct.cnpq.br|     www.youtube.com|
|        0|       inct.cnpq.br|      soundcloud.com|
|        0|       inct.cnpq.br|      www.mct.gov.br|
|        0|       inct.cnpq.br|             cnpq.br|
|        1|         nlmz-nn.ru|      maps.google.co|
|        2|jurnalul.antena3.ro|m.jurnalul.antena3.r|
|        2|jurnalul.antena3.ro|content.paydemic.com|
|        2|jurnalul.antena3.ro|    www.facebook.com|
|        2|jurnalul.antena3.ro|   www.libertatea.ro|
|        2|jurnalul

linksDF = [indexFrom: bigint, from: string ... 1 more field]
uriDF = [index: bigint, uri: string]


[index: bigint, uri: string]

In [9]:
val edgeDF = linksDF
    .join(uriDF, linksDF("to") === uriDF("uri"))
    .withColumnRenamed("index", "indexTo")

edgeDF.show()

// println(linksDF.rdd.count)
// println(uriDF.rdd.count)
// println(edgeDF.rdd.count)

+---------+-------------------+-----------------+-------+-----------------+
|indexFrom|               from|               to|indexTo|              uri|
+---------+-------------------+-----------------+-------+-----------------+
|        0|       inct.cnpq.br|  www.youtube.com|  24546|  www.youtube.com|
|        0|       inct.cnpq.br|   soundcloud.com|  10036|   soundcloud.com|
|        2|jurnalul.antena3.ro|www.romaniatv.net|   7128|www.romaniatv.net|
|        2|jurnalul.antena3.ro|www.romaniatv.net|   7128|www.romaniatv.net|
|        2|jurnalul.antena3.ro|www.romaniatv.net|   7128|www.romaniatv.net|
|        2|jurnalul.antena3.ro|www.romaniatv.net|   7128|www.romaniatv.net|
|        3|          auto-m.ru|  www.youtube.com|  24546|  www.youtube.com|
|        5| www.nigerialog.com|  play.google.com|  13354|  play.google.com|
|        5| www.nigerialog.com|  play.google.com|  13354|  play.google.com|
|        8|   cityofdestin.com|  www.youtube.com|  24546|  www.youtube.com|
|       10| 

edgeDF = [indexFrom: bigint, from: string ... 3 more fields]


[indexFrom: bigint, from: string ... 3 more fields]

In [10]:
val edges = edgeDF
    .select("indexFrom", "indexTo")
    .rdd
    .map(arr => Edge(
        arr(0).asInstanceOf[Long],
        arr(1).asInstanceOf[Long],
        1
    ))

val linkGraph = Graph(uriList, edges).cache

edges = MapPartitionsRDD[61] at map at <console>:69
linkGraph = org.apache.spark.graphx.impl.GraphImpl@57bf4d00


org.apache.spark.graphx.impl.GraphImpl@57bf4d00

In [11]:
val rankGraph = linkGraph.pageRank(0.0001).cache
val ranks = rankGraph.vertices.sortBy(- _._2)

rankGraph = org.apache.spark.graphx.impl.GraphImpl@6c3d7eb5
ranks = MapPartitionsRDD[1249] at sortBy at <console>:70


MapPartitionsRDD[1249] at sortBy at <console>:70

In [12]:
val ranksDF = ranks.toDF("index", "pageRank")

val rankedUriDF = uriDF.join(ranksDF, "index")

rankedUriDF.show()

+-----+--------------------+------------------+
|index|                 uri|          pageRank|
+-----+--------------------+------------------+
|24546|     www.youtube.com| 2568.376673307842|
| 6618|     plus.google.com| 2135.493481922949|
| 2381|         rusvesna.su|1815.7192176280064|
|17629|      www.google.com| 978.9277475485047|
| 9963|       wordpress.org| 617.5385857331374|
|15360|         telegram.me| 545.8948647527142|
|13354|     play.google.com|438.36751676310905|
|29496|   www.pinterest.com|363.71906764222774|
| 1317| accounts.google.com|362.26723605623135|
|30347|    drive.google.com| 349.1910284562913|
|29393| login.wordpress.org|300.34110106829013|
|15519|      www.amazon.com|182.26740169033596|
|12120|       wordpress.com|174.61697653822318|
|16909|    itunes.apple.com| 163.5412266540724|
| 2273|   support.apple.com|146.56107179300747|
|13807|translate.wordpre...|131.77745057508758|
| 7802|      www.flickr.com|111.10234561682434|
|29031|    en.wikipedia.org|  77.9185399

ranksDF = [index: bigint, pageRank: double]
rankedUriDF = [index: bigint, uri: string ... 1 more field]


[index: bigint, uri: string ... 1 more field]