Permalink
Switch branches/tags
Nothing to show
Find file Copy path
Fetching contributors…
Cannot retrieve contributors at this time
231 lines (169 sloc) 8.49 KB
from __future__ import absolute_import, division, print_function, unicode_literals
import os
import shutil
from pyspark.sql import types as SparkTypes
from cosrlib.url import URL
from cosrlib.spark import createDataFrame, sql, SparkPlugin
from cosrlib import re, py2_long
from urlserver.id_generator import _fast_make_domain_id
_RE_STRIP_FRAGMENT = re.compile(r"#.*")
class WebGraphPlugin(SparkPlugin):
""" Base class for WebGraph plugins """
include_external = True
include_internal = True
def hook_spark_pipeline_init(self, sc, sqlc, schema, indexer):
if self.include_external:
schema.append(
SparkTypes.StructField("external_links", SparkTypes.ArrayType(SparkTypes.StructType([
SparkTypes.StructField("href", SparkTypes.StringType(), nullable=False),
SparkTypes.StructField("text", SparkTypes.StringType(), nullable=True)
])), nullable=True)
)
if self.include_internal:
schema.append(
SparkTypes.StructField("internal_links", SparkTypes.ArrayType(SparkTypes.StructType([
SparkTypes.StructField("path", SparkTypes.StringType(), nullable=False),
SparkTypes.StructField("text", SparkTypes.StringType(), nullable=True)
])), nullable=True)
)
def hook_document_post_index(self, document, metadata):
""" Collect all unique normalized external URLs """
if self.include_external:
seen = set()
for link in document.get_external_hyperlinks(exclude_nofollow=self.exclude_nofollow):
key = (link["href"].normalized, link["text"])
if key not in seen:
seen.add(key)
metadata.setdefault("external_links", [])
metadata["external_links"].append(key)
if self.include_internal:
seen = set()
metadata["internal_links"] = []
for link in document.get_internal_hyperlinks(): # exclude_nofollow=self.exclude_nofollow):
key = (_RE_STRIP_FRAGMENT.sub("", link["path"]), link["text"])
if key not in seen:
seen.add(key)
metadata.setdefault("internal_links", [])
metadata["internal_links"].append(key)
def init(self):
self.exclude_nofollow = (self.args.get("include_nofollow") != "1")
if self.args.get("output"):
if os.path.isdir(os.path.join(self.args["output"], "edges")):
shutil.rmtree(os.path.join(self.args["output"], "edges"))
if os.path.isdir(os.path.join(self.args["output"], "vertices")):
shutil.rmtree(os.path.join(self.args["output"], "vertices"))
class DomainToDomain(WebGraphPlugin):
""" Saves a graph of domain=>domain links in text format """
include_internal = False
def hook_spark_pipeline_action(self, sc, sqlc, df, indexer):
# Get all unique (host1 => host2) pairs
domain_pairs = sql(sqlc, """
SELECT parse_url(url, "HOST") as d1, parse_url(CONCAT("http://", link), "HOST") as d2
FROM (
SELECT url, EXPLODE(external_links.href) as link FROM df
) as pairs
""", {"df": df}).distinct()
# Format as csv
lines = sql(sqlc, """
SELECT CONCAT(d1, " ", d2) as r
FROM pairs
""", {"pairs": domain_pairs})
self.save_dataframe(lines, "text")
return True
class DomainToDomainParquet(WebGraphPlugin):
""" Saves a graph of domain=>domain links in Apache Parquet format """
include_internal = False
def hook_spark_pipeline_action(self, sc, sqlc, df, indexer):
self.save_vertex_graph(sqlc, df)
self.save_edge_graph(sqlc, df)
return True
def save_vertex_graph(self, sqlc, df):
""" Transforms a document metadata DataFrame into a Parquet dump of the vertices of the webgraph """
vertex_graph_schema = SparkTypes.StructType([
SparkTypes.StructField("id", SparkTypes.LongType(), nullable=False),
SparkTypes.StructField("domain", SparkTypes.StringType(), nullable=False)
])
# TODO ?!
if self.args.get("shuffle_partitions"):
sqlc.setConf("spark.sql.shuffle.partitions", self.args["shuffle_partitions"])
# We collect all unique domains from the page URLs & destination of all external links
d1_df = sql(sqlc, """
SELECT parse_url(url, "HOST") as domain from df
""", {"df": df}).distinct()
d2_df = sql(sqlc, """
SELECT parse_url(CONCAT("http://", link), "HOST") as domain
FROM (
SELECT EXPLODE(external_links.href) as link FROM df
) as pairs
""", {"df": df})
all_domains_df = d1_df.unionAll(d2_df).distinct()
def iter_domain(record):
""" Transforms Row(domain=www.example.com) into tuple([int64 ID], "example.com") """
domain = record["domain"]
if not domain or not domain.strip():
return []
name = URL("http://" + domain).normalized_domain
try:
_id = _fast_make_domain_id(name)
except Exception: # pylint: disable=broad-except
return []
return [(py2_long(_id), str(name))]
rdd_domains = all_domains_df.rdd.flatMap(iter_domain)
vertex_df = createDataFrame(sqlc, rdd_domains, vertex_graph_schema).distinct()
coalesce = int(self.args.get("coalesce_vertices") or self.args.get("coalesce", 1) or 0)
if coalesce > 0:
vertex_df = vertex_df.coalesce(coalesce)
vertex_df.write.parquet(os.path.join(self.args["output"], "vertices"))
def save_edge_graph(self, sqlc, df):
""" Transforms a document metadata DataFrame into a Parquet dump of the edges of the webgraph """
edge_graph_schema = SparkTypes.StructType([
SparkTypes.StructField("src", SparkTypes.LongType(), nullable=False),
SparkTypes.StructField("dst", SparkTypes.LongType(), nullable=False),
# Sum of weights must be 1
# This field will automatically be added by the SQL query
# SparkTypes.StructField("weight", SparkTypes.FloatType(), nullable=True)
])
# TODO?!
if self.args.get("shuffle_partitions"):
sqlc.setConf("spark.sql.shuffle.partitions", self.args["shuffle_partitions"])
# Get all unique (host1 => host2) pairs
new_df = sql(sqlc, """
SELECT parse_url(url, "HOST") as d1, parse_url(CONCAT("http://", link), "HOST") as d2
FROM (
SELECT url, EXPLODE(external_links.href) as link FROM df
) as pairs
""", {"df": df}).distinct()
def iter_links_domain(record):
""" Transforms Row(d1="x.com", d2="y.com") into tuple([int64 ID], [int64 ID]) """
d1 = record["d1"]
d2 = record["d2"]
if not d1 or not d2:
return []
try:
from_domain = _fast_make_domain_id(d1)
to_domain = _fast_make_domain_id(d2)
except Exception: # pylint: disable=broad-except
return []
if from_domain == to_domain:
return []
else:
return [(py2_long(from_domain), py2_long(to_domain))]
rdd_couples = new_df.rdd.flatMap(iter_links_domain)
edge_df = createDataFrame(sqlc, rdd_couples, edge_graph_schema).distinct()
# After collecting all the unique (from_id, to_id) pairs, we add the weight of every edge
# The current algorithm is naive: edge weight is equally split between all the links, with
# the sum of all weights for a source domain always = 1.
weights_df = sql(sqlc, """
SELECT src id, cast(1 / count(*) as float) weight
FROM edges
GROUP BY src
""", {"edges": edge_df})
weighted_edge_df = sql(sqlc, """
SELECT cast(src as long) src, cast(dst as long) dst, cast(weights.weight as float) weight
FROM edges
JOIN weights on edges.src = weights.id
""", {"edges": edge_df, "weights": weights_df})
coalesce = int(self.args.get("coalesce_edges") or self.args.get("coalesce", 1) or 0)
if coalesce > 0:
weighted_edge_df = weighted_edge_df.coalesce(coalesce)
weighted_edge_df.write.parquet(os.path.join(self.args["output"], "edges"))