In [1]:
import findspark
findspark.init()
import pyspark
sc = pyspark.SparkContext()
sqlContext = pyspark.sql.SQLContext(sc)

In [2]:
import re
import csv
import random
import ujson as json
from itertools import izip
from operator import add, itemgetter
from collections import Counter, defaultdict
from urlparse import urljoin

In [3]:
from boto.s3.connection import S3Connection
from boto.s3.connection import Key
from datetime import datetime
from time import time
s3baseuri = "s3n://"

def zip_sum(*x):
  return [sum(i) for i in izip(*x)]
def trim_link_protocol(s):
    idx = s.find('://')
    return s if idx == -1 else s[idx+3:]
def get_timestamp():
  return datetime.fromtimestamp(time()).strftime('%Y%m%d%H%M%S')
def write_file_to_s3(localfile, s3_bucket, s3_filename):
  conn = S3Connection(key, secret)
  bucket = conn.get_bucket(s3_bucket)
  if len(list(bucket.list(s3_filename))) == 0:
    k = Key(bucket)
    k.key = s3_filename
    k.set_contents_from_filename(localfile)

In [4]:
def get_mention_aligned_links(doc):
  text = doc['full_text']
  for m in doc['mentions']:
    mention_start, mention_stop = m['start'], m['stop']
    # filter mentions which occur outside of document full_text
    if mention_start >= 0 and mention_stop > mention_start:
      link_start = mention_stop+2
      # naively detect whether this mention sits inside a markdown link anchor
      if text[mention_start-1] == '[' and text[mention_stop:link_start] == '](':
        link_stop = text.find(')', link_start)

        if text[link_start:link_stop].startswith('http://'):
            link_start += 7
        elif text[link_start:link_stop].startswith('https://'):
            link_start += 8

        if link_stop != -1:
          yield slice(link_start, link_stop), slice(mention_start,mention_stop)

def get_links(doc):
  for m in re.finditer('(?<!\\\\)\[(([^]]|(\\\\]))+)(]\(\s*(http[s]?://)?)([^)]+)\s*\)', doc['full_text']):
    parts = m.groups()
    a, uri = parts[0], parts[5]
    if uri and not a.startswith('www') and not a.startswith('http') and not 'secure.adnxs.com' in uri:
      if 'digg.com' in uri:
        continue # todo: add check for anchor diversity to filter this kidn of thing
      mention_start = m.start() + 1
      mention_stop = mention_start + len(parts[0])
      link_start = mention_stop + len(parts[3])
      link_stop = link_start + len(parts[5])
      yield slice(link_start, link_stop), slice(mention_start, mention_stop)

import base64
import urlparse
def resolve_hardcoded_redirects(l):
  try:
    if l.startswith('www.prweb.net'):
      l = base64.b64decode(l[len('www.prweb.net/Redirect.aspx?id='):])
    elif l.startswith('cts.businesswire.com/ct/') or l.startswith('ctt.marketwire.com/'):
      l = urlparse.parse_qs(l)['url'][0]
  except: pass
  return trim_link_protocol(l)

anchor_filters = set([
  'facebook',
  'twitter',
  'zacks investment research',
  'reuters',
  'linkedin',
  'marketbeat'
])

if False:
  def get_link_labels(doc):
    text = doc['full_text']
    aligned_spans = set()
    for l, a in get_mention_aligned_links(doc):
      aligned_spans.add((l.start, l.stop))
      uri = text[l]
      if not 'search' in uri and not text[a].lower().strip() in anchor_filters:
        yield (1.0, uri)
    for l, a in get_links(doc):
      if (l.start, l.stop) not in aligned_spans:
        yield (0.0, text[l])

def get_anchor_target_pairs(doc):
  text = doc['full_text']
  aligned_spans = set()
  for l, a in get_mention_aligned_links(doc):
    aligned_spans.add((l.start, l.stop))
    yield (text[a], resolve_hardcoded_redirects(text[l]), True)
  for l, a in get_links(doc):
    if (l.start, l.stop) not in aligned_spans:
      is_mention = False
      if text[a].startswith('@') and not ' ' in text[a]:
        is_mention = True # twitter NER = solved
      yield (text[a], resolve_hardcoded_redirects(text[l]), is_mention)

URI Classification

In [5]:
def normalize_uri(uri):
  uri = uri.lower()
  if uri.startswith('//'):
    uri = uri[2:]
  if uri.startswith('www.'):
    uri = uri[4:]

  # trim uri protocol
  idx = uri.find('://')
  uri = uri[idx+3:] if idx != -1 else uri

  # convert 'blah.com/users.php?id=bob' into 'blah.com/users.php/id=bob'
  uri = re.sub('([a-z]+)\?', r"\1/", uri)
  
  # convert 'blah.com/users#bob' into 'blah.com/users/bob'
  uri = uri.replace('#', '/')

  parts = uri.rstrip('/').split('/')
  suffix = parts[-1].lower()
  if len(parts) > 1 and suffix.startswith('index') or suffix.startswith('default'):
    parts = parts[:-1]
  if len(parts) > 1:
    parts[-1] = '<eid>'
  else:
    parts.append('<nil>')
  return '/'.join(parts)

#normalize_uri('vanityfair.com/index.aspx?rofl')

In [6]:
def get_uri_domain(uri):
  return uri.split('/')[0]

def get_uri_features(uri):
  features = []

  uri_parts = re.sub('[0-9]', 'N', uri).split('/')
  dom = uri_parts[0]
  uri_parts[0] = "<domain>"
  features += list('/'.join(p) for p in izip(uri_parts, uri_parts[1:]))
  features += [dom+':'+f for f in features]
  features += uri_parts

  dom_parts = dom.split('.')
  if len(dom_parts) >= 3:
    features.append('SD:' + '.'.join(dom_parts[:-2]))
  return features

from pyspark.ml.classification import NaiveBayes, LogisticRegression
from pyspark.ml.feature import HashingTF, StringIndexer, CountVectorizer

def balance_dataset(dataset, minor = 1.0, major = 0.0):
  major_count = dataset.filter(dataset.label == major).count()
  minor_count = dataset.filter(dataset.label == minor).count()
  return dataset.filter(dataset.label == major)\
                .sample(withReplacement=False, fraction=minor_count/float(major_count))\
                .unionAll(dataset.filter(dataset.label == minor))

def stats_at_p(r, p):
  tp = 1.0 if (r['label'] == 1.0 and r['probability'][1] >= p) else 0.0
  fp = 1.0 if (r['label'] == 0.0 and r['probability'][1] >= p) else 0.0
  fn = 1.0 if (r['label'] == 1.0 and r['probability'][1] < p) else 0.0
  return p, (tp, fp, fn)

def evaluate(dataset, ps = None):
  if ps == None:
    ps = [0.5]
  stats_by_p = dataset\
    .flatMap(lambda r: (stats_at_p(r, p) for p in ps))\
    .reduceByKey(lambda a, b: [x+y for x,y in zip(a, b)])\
    .filter(lambda (p, (tp, fp, fn)): (tp+fp) > 0 and (tp+fn) > 0)\
    .mapValues(lambda (tp, fp, fn): ((float(tp) / (tp+fp)), (float(tp) / (tp+fn))))\
    .mapValues(lambda (p, r): (p, r, 2 * (p*r/(p+r))))\
    .collect()
  return stats_by_p
    
classifier = LogisticRegression(featuresCol="hashed_features")

In [7]:
REBUILD_CORPUS = False

raw_corpus_path = s3baseuri + 'abbrevi8-rnd/kb/live/20160301/articles'
link_corpus_path = s3baseuri + 'abbrevi8-rnd/web/links/seed/'

In [8]:
anchor_target_pairs = sc\
  .textFile(raw_corpus_path)\
  .map(json.loads)\
  .flatMap(get_anchor_target_pairs)

In [9]:
if REBUILD_CORPUS:
  train, test = [
    split.flatMap(lambda (prefix, instances): instances)\
         .map(lambda (uri, is_mention): (uri, 1.0 if is_mention else 0.0, get_uri_features(uri)))\
         .repartition(128)\
         .cache()
    for split in
      anchor_target_pairs\
          .map(lambda (anchor, target, is_mention): (normalize_uri(target), is_mention))\
          .groupByKey()\
          .filter(lambda (k,vs): len(vs) >= 10)\
          .mapValues(Counter)\
          .mapValues(lambda cs: cs[True] > cs[False])\
          .map(lambda (uri, is_mention): (get_uri_domain(uri), (uri, is_mention)))\
          .groupByKey()\
          .randomSplit([0.9, 0.1])
  ]
  sqlContext\
    .createDataFrame(train, ['uri','label','features'])\
    .write.mode('overwrite')\
    .save(link_corpus_path + 'train')
  sqlContext\
    .createDataFrame(test, ['uri','label','features'])\
    .write.mode('overwrite')\
    .save(link_corpus_path + 'test')

train = sqlContext.load(link_corpus_path + 'train')
test = sqlContext.load(link_corpus_path + 'test')
full = train.unionAll(test)



In [14]:
train.filter(train['label']==1.0).count(), train.filter(train['label']==0.0).count()

(10429, 90243)

In [10]:
hashing_tf = HashingTF(inputCol="features", outputCol="hashed_features", numFeatures=500000)
train = hashing_tf.transform(train)
test = hashing_tf.transform(test)

Dev Evaluation

In [11]:
dev_model = classifier.fit(balance_dataset(train).repartition(64))

In [22]:
train_prs = evaluate(dev_model.transform(train), ps=[p/40. for p in xrange(1, 40)])
dev_prs = evaluate(dev_model.transform(test), ps=[p/40. for p in xrange(1, 40)])
#test_prs = evaluate(dev_model.transform(hashing_tf.transform(labeled_uris)), ps=[p/20. for p in xrange(1, 20)])

print 'Evaluation @ Confidence >= 0.5'
print 'Train P/R=(%.2f, %.2f), F=%.3f' % dict(train_prs)[0.5]
print '  Dev P/R=(%.2f, %.2f), F=%.3f' % dict(dev_prs)[0.5]

Evaluation @ Confidence >= 0.5
Train P/R=(0.87, 0.97), F=0.919
  Dev P/R=(0.84, 0.52), F=0.641


In [67]:
c, (p_c, r_c, f_c) = sorted(dev_prs, key=lambda (c, (p,r,f)): f, reverse=True)[0]

print 'Confidence @ Optimal Dev F1 >= %.3f' % c
print 'Train P/R=(%.2f, %.2f), F=%.3f' % dict(train_prs)[c]
print '  Dev P/R=(%.2f, %.2f), F=%.3f' % dict(dev_prs)[c]

Confidence @ Optimal Dev F1 >= 0.825
Train P/R=(0.89, 0.96), F=0.926
  Dev P/R=(0.88, 0.51), F=0.645


Full Model

In [19]:
model = classifier.fit(balance_dataset(hashing_tf.transform(full)).repartition(128))

In [20]:
uris = [normalize_uri(u) for u in [
  'facebook.com/efoim',
  'twitter.com/person',
  'twitter.com/person/status/1231',
  'linkedin.com/company/zcbvx',
  'linkedin.com/in/zcbvx',
  'en.wikipedia.org/wiki/someone',
  'en.wikipedia.org/w/index.php?id=123',
  'www.nytimes.com/topic/person/sheldon-silver',
]]
model.transform(
  hashing_tf.transform(
    sqlContext.createDataFrame(
      [(u, get_uri_features(u)) for u in uris], 
      ['uri','features'])))\
  .map(lambda r: (r['uri'], r['probability'][1]))\
  .collect()

[(u'facebook.com/efoim', 0.46599315786468753),
 (u'twitter.com/person', 0.82871828691707539),
 (u'twitter.com/person/status/1231', 0.3947997711838524),
 (u'linkedin.com/company/zcbvx', 0.87861670564511407),
 (u'linkedin.com/in/zcbvx', 0.93873476766805786),
 (u'en.wikipedia.org/wiki/someone', 0.48006136230111524),
 (u'en.wikipedia.org/w/index.php?id=123', 0.67484407470849606),
 (u'www.nytimes.com/topic/person/sheldon-silver', 0.95935597056302369)]

In [30]:
cc_base_path = 's3n://aws-publicdatasets/'
cc_crawl_root = 'common-crawl/crawl-data/CC-MAIN-2016-07'

In [34]:
def parse_wats(lines):
    def to_wat(record):
        if record and len(record) >= 10 and record[9].startswith('{"Envelope":{'):
            return json.loads('\n'.join(record[9:]))
        return None

    record = []
    for line in lines:
        if line == 'WARC/1.0':
            w = to_wat(record)
            if w: yield w
            record = [line]
        else:
            record.append(line)
    w = to_wat(record)
    if w: yield w
def extract_links(record):
    nil = {}
    url = record\
        .get('Envelope', nil)\
        .get('WARC-Header-Metadata', nil)\
        .get('WARC-Target-URI', None)

    if url:
        links = record\
            .get('Envelope', nil)\
            .get('Payload-Metadata', nil)\
            .get('HTTP-Response-Metadata', nil)\
            .get('HTML-Metadata', nil)\
            .get('Links', [])
        for link in links:
            if 'text' in link and 'url' in link:
                try:
                    yield (url, link['text'], urljoin(url, link['url']))
                except:
                    pass

In [47]:
cc_wat_paths = ','.join(sc\
    .textFile(cc_base_path + cc_crawl_root + '/wat.paths.gz')\
    .map(lambda path: cc_base_path + path)\
    .takeSample(False, 32))

In [52]:
anchor_stoplist = sc\
    .textFile(cc_wat_paths)\
    .mapPartitions(parse_wats)\
    .flatMap(extract_links)\
    .filter(lambda (s, a, t): t.startswith('http://') or t.startswith('https://'))\
    .map(lambda (s, a, t): (a.lower(), 1))\
    .reduceByKey(add)\
    .sortBy(lambda (k,v): v, ascending=False)\
    .map(lambda (k, v): k)\
    .take(50)
anchor_stoplist = set(anchor_stoplist)

In [108]:
cc_base_path

's3n://aws-publicdatasets/'

In [111]:
cc_wat_paths = ','.join(sc\
    .textFile(cc_base_path + cc_crawl_root + '/wat.paths.gz')\
    .map(lambda path: 's3://aws-publicdatasets/' + path)\
    .sample(False, 0.1)\
    .collect())

In [None]:
cc_links = sqlContext.createDataFrame(
    sc\
        .textFile(cc_wat_paths)\
        .mapPartitions(parse_wats, preservesPartitioning=True)\
        .flatMap(extract_links)\
        .filter(lambda (s, a, t): t.startswith('http://') or t.startswith('https://'))\
        .filter(lambda (s, a, t): a.lower() not in anchor_stoplist)\
        .repartition(4096)\
        .map(lambda (s, a, t): (s, a, t, get_uri_features(normalize_uri(t))))
    , ['source', 'anchor', 'target', 'features'])
predicted_links = model.transform(hashing_tf.transform(cc_links))

In [None]:
predicted_links\
    .map(lambda r: (r['source'], r['anchor'], r['target'], r['probability'][1]))\
    .filter(lambda (s,a,t,p): p >= 0.825)\
    .map(json.dumps)\
    .saveAsTextFile(s3baseuri + 'abbrevi8-rnd/web/links/cc0.1/', 'org.apache.hadoop.io.compress.GzipCodec')\

In [None]:
1