In [1]:
%scala if (org.apache.spark.BuildInfo.sparkBranch < "1.6") sys.error("Attach this notebook to a cluster running Spark 1.6+")

In [2]:
# Load the raw dataset stored as a CSV file
# clickstreamRaw = sqlContext.read \
#   .format("com.databricks.spark.csv") \
#   .options(header="true", delimiter="\t", mode="PERMISSIVE", inferSchema="true") \
#   .load("dbfs:///databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed")
  
# # Convert the dataset to a more efficent format to speed up our analysis
# clickstreamRaw.write \
#   .mode("overwrite") \
#   .format("parquet") \
#   .save("/datasets/wiki-clickstream")

# # Load data as dataframe
# clicks = sqlContext.read.parquet("/datasets/wiki-clickstream")

# # Make clicks available as a SQL table.
# clicks.registerTempTable("clicks")

In [3]:
# # ReadOnly keys
# ACCESS_KEY = "AAAAAAAAAAAAAAAAAAAAAA"
# SECRET_KEY = "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
# ENCODED_SECRET_KEY = SECRET_KEY.replace("/", "%2F")
# AWS_BUCKET_NAME = "db-wikipedia-readonly-use"
# MOUNT_NAME = "wikipedia-readonly/"

# dbutils.fs.mount("s3a://%s:%s@%s" % (ACCESS_KEY, ENCODED_SECRET_KEY, AWS_BUCKET_NAME), "/mnt/%s" % MOUNT_NAME)

In [4]:
display(dbutils.fs.ls("/mnt/wikipedia-readonly/clickstream_parquet/"))

In [5]:
wikipedia_df = sqlContext.read.parquet("/mnt/wikipedia-readonly/en_wikipedia/march_5_2016_parquet/part-r-00000-81970a86-406e-440b-8308-5f4b74a77b60.gz.parquet")
clicks = sqlContext.read.parquet("/mnt/wikipedia-readonly/clickstream_parquet/*.gz.parquet")
print "Wikidup Schema"
wikipedia_df.printSchema()
print "Clickstream Schema"
clicks.printSchema()

In [6]:
titleTextMapDF = wikipedia_df.select("id", "title", "revision.text.#VALUE").withColumnRenamed("#VALUE", "text")
display(titleTextMapDF)

In [7]:
# input = ['Topic_1', 'Topic_2', ... 'Topic_n']
# output = parquet of SQL cmd
def createTopicParquet(topics, depth = 1):
  rval = None
  for cnt in xrange(depth):
    sql_topics = ''
    for num, t in enumerate(topics):
      if num == 0:
        sql_topics = sql_topics + '"' + t + '"' 
      else:
        sql_topics = sql_topics + ',"' + t + '"'
    result = sql("""SELECT 
          prev_title AS src,
          curr_title AS dest,
          n AS count FROM clicks
        WHERE 
          curr_title IN ( %s ) AND
          prev_id IS NOT NULL AND NOT (curr_title = 'Main_Page' OR prev_title = 'Main_Page')
        ORDER BY n DESC
        LIMIT 40""" % (sql_topics)) 
    rval = rval.unionAll(result) if rval != None else result
    topics =  result.rdd.map(lambda row : row.src).collect()
  rval.write.parquet("result.parquet", mode='overwrite')
  return rval

In [8]:
def contain(s, v):
  return (v != None and v.find(s) > 0)
# Search the dump for the specified terms 
def topics_with_term(term): 
  # Search all pages for the term  
  results_df = titleTextMapDF.filter(titleTextMapDF.text.like("%%%s%%" %(term))).select(titleTextMapDF.id, titleTextMapDF.title)
  # Map to a collection of titles
  return results_df

In [9]:
groupedCount = clicks.groupBy(clicks.curr_id).agg({"n" : "sum"})
clickclucks = clicks.select(clicks.curr_id, clicks.curr_title).distinct().where(clicks.curr_id!=15580374).join(groupedCount, clicks.curr_id==groupedCount.curr_id).drop(groupedCount.curr_id).sort('sum(n)', ascending=False)
# Make clicks available as a SQL table.
clicks.registerTempTable("clicks")

In [10]:
all_topics = topics_with_term('Donald Trump')
clicked_topics = all_topics.join(clickclucks, clickclucks.curr_id == all_topics.id).sort('sum(n)', ascending=False,).limit(10).select(clickclucks.curr_title)
top_topics =  clicked_topics.rdd.map(lambda row : row.curr_title).collect()
display(clicked_topics)

In [11]:
display(createTopicParquet(top_topics, 2))

In [12]:
%scala
package d3
// We use a package object so that we can define top level classes like Edge that need to be used in other cells

import org.apache.spark.sql._
import com.databricks.backend.daemon.driver.EnhancedRDDFunctions.displayHTML

case class Edge(src: String, dest: String, count: Long)

case class Node(name: String)
case class Link(source: Int, target: Int, value: Long)
case class Graph(nodes: Seq[Node], links: Seq[Link])

object graphs {
val sqlContext = SQLContext.getOrCreate(org.apache.spark.SparkContext.getOrCreate())  
import sqlContext.implicits._
  
def force(clicks: Dataset[Edge], height: Int = 100, width: Int = 960): Unit = {
  val data = clicks.collect()
  val nodes = (data.map(_.src) ++ data.map(_.dest)).map(_.replaceAll("_", " ")).toSet.toSeq.map(Node)
  val links = data.map { t =>
    Link(nodes.indexWhere(_.name == t.src.replaceAll("_", " ")), nodes.indexWhere(_.name == t.dest.replaceAll("_", " ")), t.count / 20 + 1)
  }
  showGraph(height, width, Seq(Graph(nodes, links)).toDF().toJSON.first())
}

/**
 * Displays a force directed graph using d3
 * input: {"nodes": [{"name": "..."}], "links": [{"source": 1, "target": 2, "value": 0}]}
 */
def showGraph(height: Int, width: Int, graph: String): Unit = {

displayHTML(s"""
<!DOCTYPE html>
<html>
<head>
  <meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
  <title>Polish Books Themes - an Interactive Map</title>
  <meta charset="utf-8">
<style>

.node_circle {
  stroke: #777;
  stroke-width: 1.3px;
}

.node_label {
  pointer-events: none;
}

.link {
  stroke: #777;
  stroke-opacity: .2;
}

.node_count {
  stroke: #777;
  stroke-width: 1.0px;
  fill: #999;
}

text.legend {
  font-family: Verdana;
  font-size: 13px;
  fill: #000;
}

.node text {
  font-family: "Helvetica Neue","Helvetica","Arial",sans-serif;
  font-size: 17px;
  font-weight: 200;
}

</style>
</head>

<body>
<script src="//d3js.org/d3.v3.min.js"></script>
<script>

var graph = $graph;

var width = $width,
    height = $height;

var color = d3.scale.category20();

var force = d3.layout.force()
    .charge(-700)
    .linkDistance(180)
    .size([width, height]);

var svg = d3.select("body").append("svg")
    .attr("width", width)
    .attr("height", height);
    
force
    .nodes(graph.nodes)
    .links(graph.links)
    .start();

var link = svg.selectAll(".link")
    .data(graph.links)
    .enter().append("line")
    .attr("class", "link")
    .style("stroke-width", function(d) { return Math.sqrt(d.value); });

var node = svg.selectAll(".node")
    .data(graph.nodes)
    .enter().append("g")
    .attr("class", "node")
    .call(force.drag);

node.append("circle")
    .attr("r", 10)
    .style("fill", function (d) {
    if (d.name.startsWith("other")) { return color(1); } else { return color(2); };
})

node.append("text")
      .attr("dx", 10)
      .attr("dy", ".35em")
      .text(function(d) { return d.name });
      
//Now we are giving the SVGs co-ordinates - the force layout is generating the co-ordinates which this code is using to update the attributes of the SVG elements
force.on("tick", function () {
    link.attr("x1", function (d) {
        return d.source.x;
    })
        .attr("y1", function (d) {
        return d.source.y;
    })
        .attr("x2", function (d) {
        return d.target.x;
    })
        .attr("y2", function (d) {
        return d.target.y;
    });
    d3.selectAll("circle").attr("cx", function (d) {
        return d.x;
    })
        .attr("cy", function (d) {
        return d.y;
    });
    d3.selectAll("text").attr("x", function (d) {
        return d.x;
    })
        .attr("y", function (d) {
        return d.y;
    });
});
</script>
</html>
""")
}
  
  def help() = {
displayHTML("""
<p>
Produces a force-directed graph given a collection of edges of the following form:</br>
<tt><font color="#a71d5d">case class</font> <font color="#795da3">Edge</font>(<font color="#ed6a43">src</font>: <font color="#a71d5d">String</font>, <font color="#ed6a43">dest</font>: <font color="#a71d5d">String</font>, <font color="#ed6a43">count</font>: <font color="#a71d5d">Long</font>)</tt>
</p>
<p>Usage:<br/>
<tt>%scala</tt></br>
<tt><font color="#a71d5d">import</font> <font color="#ed6a43">d3._</font></tt><br/>
<tt><font color="#795da3">graphs.force</font>(</br>
&nbsp;&nbsp;<font color="#ed6a43">height</font> = <font color="#795da3">500</font>,<br/>
&nbsp;&nbsp;<font color="#ed6a43">width</font> = <font color="#795da3">500</font>,<br/>
&nbsp;&nbsp;<font color="#ed6a43">clicks</font>: <font color="#795da3">Dataset</font>[<font color="#795da3">Edge</font>])</tt>
</p>""")
  }
}

In [13]:
%scala
import d3._

graphs.force(
  height = 1580,
  width = 1420,
  clicks = sqlContext.read.parquet("result.parquet").as[Edge])