In [8]:
%load_ext autoreload
%autoreload 2
%matplotlib inline
import os
import numpy as np
from snorkel import SnorkelSession
import findspark
findspark.init()
import sys
sys.path.append('../utils')

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [9]:
# Initialize Spark Environment and Spark SQL
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark import SparkContext, SparkConf

In [10]:
# Starting spark session
spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName("Snorkel MEMEX Preprocessing") \
    .config("spark.cores.max", "96") \
    .config("spark.executor.memory", '20g')\
    .config("spark.driver.memory", '50g')\
    .getOrCreate()

In [11]:
c = spark.sparkContext.getConf()
c.getAll()

[('spark.driver.memory', '50g'),
 ('spark.cores.max', '96'),
 ('spark.driver.host', '172.24.75.93'),
 ('spark.executor.memory', '20g'),
 ('spark.app.id', 'local-1539757444894'),
 ('spark.rdd.compress', 'True'),
 ('spark.driver.port', '44634'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.master', 'local[*]'),
 ('spark.executor.id', 'driver'),
 ('spark.submit.deployMode', 'client'),
 ('spark.ui.showConsoleProgress', 'true'),
 ('spark.app.name', 'Snorkel MEMEX Preprocessing')]

In [None]:
# Defining parent data path
from dataset_utils import retrieve_all_files
#pth='/lfs/local/0/jdunnmon/data/memex-data/escorts/2015/05/08' #/0000.jsonl.gz
root_pth = '/lfs/local/0/jdunnmon/data/memex-data'
json_path = 'escorts/2017'
pth = os.path.join(root_pth,json_path)
pth_lst = retrieve_all_files(pth)
%time test_df = spark.read.json(pth_lst)

In [None]:
# Getting list of attributes to extract
attr_list = ['doc_id','type', 'raw_content','url',\
             'extractions.phonenumber.results',
             'extractions.age.results',
             'extractions.rate.results',
             #'extractions.location.results',
             'extractions.ethnicity.results',
             'extractions.email.results',
             'extractions.incall.results'
            ]

cols = ['doc_id','type', 'raw_content','url','extracted_phone','extracted_age',
        'extracted_rate','extracted_ethnicity',\
        'extracted_email','extracted_incall']

In [None]:
# Dropping samples with no relevant data
test_df = test_df.na.drop(subset=['doc_id','raw_content'])

In [None]:
# Counting examples
test_df.count()

In [None]:
import sys, os
import warnings
warnings.filterwarnings("ignore", category=UserWarning, module='bs4')
sys.path.append('../utils')
spark.sparkContext.addPyFile('../utils/dataset_utils.py')
from dataset_utils import get_posting_html_fast, parse_url
from pyspark.sql.functions import udf
from functools import partial

# Transforming raw content column
term = r'([Ll]ocation:[\w\W]{1,200}</.{0,20}>|\W[cC]ity:[\w\W]{1,200}</.{0,20}>|\d\dyo\W|\d\d.{0,10}\Wyo\W|\d\d.{0,10}\Wold\W|\d\d.{0,10}\Wyoung\W|\Wage\W.{0,10}\d\d)'
get_posting_html_fast_udf = udf(partial(get_posting_html_fast, search_term=term), StringType())
parse_url_udf = udf(parse_url, StringType())

# Testing on single row
#test_row = test_df.where(test_df.doc_id == 'F1729609E6729B799FB4CC9B5EA5EE0743D8DE10741622DDA4D768443DB64242')
#test_row = test_row.withColumn("raw_content_parsed", get_posting_html_fast_udf(test_row.raw_content))

In [None]:
# Defining query over entire df 
test_df = test_df.withColumn("raw_content_parsed", get_posting_html_fast_udf(test_df.raw_content))
test_df = test_df.withColumn("url_parsed", parse_url_udf(test_df.url))

In [None]:
# Executing and showing entire df
%time test_df = test_df.select(attr_list+['raw_content_parsed', 'url_parsed'])
cols = cols +['raw_content_parsed', 'url_parsed']
test_df = test_df.toDF(*cols)
test_df.show()

In [None]:
for col in cols:
    if 'extracted' in col:
        test_df = test_df.withColumn(col, concat_ws(',',col))

In [None]:
cols

In [None]:
test_df.printSchema()

In [None]:
# Timing single example
#%time test_df_row = test_df.where(test_df.doc_id == 'F1729609E6729B799FB4CC9B5EA5EE0743D8DE10741622DDA4D768443DB64242').collect()

In [None]:
cols_to_write = ['doc_id','type', 'url','url_parsed','raw_content_parsed','extracted_phone','extracted_age',
        'extracted_rate','extracted_ethnicity',\
        'extracted_email','extracted_incall']
write_path = os.path.join(root_pth,'escorts_preproc/spark_test/2017')
%time test_df.select(cols_to_write).write.csv(write_path,header = 'true',sep='\t',mode='overwrite')
#.write.option("maxRecordsPerFile", 1000000)
#.coalesce(1)


In [None]:
#test_df.rdd.getNumPartitions()

In [None]:
# Printing schema
#test_df.printSchema()

In [119]:
from snorkel.models import Context, Candidate
from snorkel.contrib.models.text import RawText

# Make sure DB is cleared
session.query(Context).delete()
session.query(Candidate).delete()



Running phone extractor

In [121]:
from dataset_utils import set_preprocessor
from snorkel.parser import CorpusParser
from parser_utils import SimpleTokenizer
from snorkel.models import Document, Sentence
from snorkel.parser import HTMLDocPreprocessor as pp

dbname = "".join(in_loc.split('/'))
postgres_location = 'postgresql:///'
os.system(f'../utils/kill_db.sh {dbname}')
os.system(f'createdb {dbname}')
os.environ['SNORKELDB'] = os.path.join(postgres_location, db_name)

session = SnorkelSession()

# Now we create the candidates with a simple loop
candidate_docs = test_df.select(attr_list+['raw_content_parsed']).orderBy("doc_id").distinct()

# export SNORKELDB=postgresql:///${DBNAME}; cd /dfs/scratch1/jdunnmon/repos/extractors/src/shell; 
#sh ../utils/kill_db.sh ${DBNAME}; source activate snorkel; python create_dbs.py -f ${FOLDERS[$iter]}; bash -l

# Using Spark to add candidates
def spark_preprocessor(cd, verbose=False, max_doc_length=2000):
    # Fields to add
    text_fields = ['raw_content_parsed','url']
    doc_text = cd['raw_content_parsed']+" "+cd['url_parsed']
    doc_text = doc_text.strip()
    doc_name = cd['doc_id']
    extractions = cd['extractions']

    # Short documents are usually parsing errors...
    if len(doc_text) < 10:
        if verbose:
            print('Short Doc!')
        continue

    if max_doc_length and len(doc_text) > max_doc_length:
        if verbose:
            print('Long document')
        continue

    stable_id = pp.get_stable_id(doc_name)

    # Yielding results, adding useful info to metadata
    doc = Document(
        name=doc_name, stable_id=stable_id,
        meta={'extractions':extractions,
              'url':url}
    )
    yield doc, doc_text
    
# Setting parser and applying corpus preprocessor
parser=SimpleTokenizer(delim='<|>')
corpus_parser = CorpusParser(parser=parser)
corpus_parser.apply(list(doc_preprocessor), parallelism=parallelism, verbose=False)

In [None]:
from dataset_utils import create_candidate_class
extraction_type = 'phone'

# Creating candidate class
candidate_class, candidate_class_name = create_candidate_class(extraction_type)

In [None]:
tweet_bodies = candidate_labeled_tweets \
    .select("tweet_id", "tweet_body") \
    .orderBy("tweet_id") \
    .distinct()

# Generate and store the tweet candidates to be classified
# Note: We split the tweets in two sets: one for which the crowd 
# labels are not available to Snorkel (test, 10%) and one for which we assume
# crowd labels are obtained (to be used for training, 90%)
total_tweets = tweet_bodies.count()
test_split = total_tweets*0.1
for i, t in enumerate(tweet_bodies.collect()):
    split = 1 if i <= test_split else 0
    raw_text = RawText(stable_id=t.tweet_id, name=t.tweet_id, text=t.tweet_body)
    tweet = Tweet(tweet=raw_text, split=split)
    session.add(tweet)
session.commit()


# Getting gold label for each doc
print("Running regex extractor...")
doc_extractions = {}
for ii, _ in enumerate(eval_cands):
    doc_extractions[doc.name] = {}
    if ii % 1000 == 0:
        print(f'Extracting regexes from doc {ii} out of {len(eval_cands)}')
    doc_extractions[doc.name]['phone'] = regex_matcher(doc, mode='phonenumbers')

# Setting filename
out_filename = "phone_extraction_"+postgres_db_name+".jsonl"
out_folder = os.path.join(config['output_dir'], 'phone')
out_path = os.path.join(out_folder, out_filename)

if not os.path.exists(out_folder):
    os.makedirs(out_folder)
                          
# Saving file to jsonl in extractions format
print(f"Saving output to {out_path}")
with open(out_path, 'w') as outfile:
    for k,v in doc_extractions.items():
        v['id'] = k
        v['phone'] = list(v['phone'])
        print(json.dumps(v), file=outfile)

## SANDBOX

In [None]:
#"export SNORKELDB=postgresql://jdunnmon:123@localhost:5432/${DBNAME}; export CUDA_VISIBLE_DEVICES=0; 
#cd /dfs/scratch1/jdunnmon/repos/extractors/src/shell; source activate snorkel; 
#echo 'Evaluating phone extractor for node ${NODES[NODE_INDEX]} with database ${DBNAME} ...';
#python evaluate_phone_extractor.py -f ${FOLDERS[$iter]}; bash -l" &

In [None]:
# Seeing if PySpark works

import findspark
findspark.init()
import pyspark
import random
sc = pyspark.SparkContext(appName="Pi")
num_samples = 100000000
def inside(p):     
  x, y = random.random(), random.random()
  return x*x + y*y < 1
count = sc.parallelize(range(0, num_samples)).filter(inside).count()
pi = 4 * count / num_samples
print(pi)
sc.stop()

In [None]:
import json
import sys, os
import argparse
import numpy as np
import findspark
findspark.init()
import pyspark
import random
import pandas as pd

#from multiprocessing import Pool

# Getting config
with open('/dfs/scratch1/jdunnmon/data/memex-data/config/config_spark.json') as fl:
    config = json.load(fl)

# Adding path for utils
sys.path.append('/dfs/scratch1/jdunnmon/repos/extractors/src/utils')

# Setting random seed
seed = config['seed']
random.seed(seed)
np.random.seed(seed)

# If memex_raw_content is a content_field, uses term as a regex in raw data in addition to getting title and body
#term = r'\b[Ll]ocation:|\b[cC]ity:'
#term = r'([Ll]ocation:.{0,100}|[cC]ity:.{0,100}|\d\dyo\W|\d\d.{0,10}\Wyo\W|\d\d.{0,10}\Wold\W|\d\d.{0,10}\Wyoung\W|\Wage\W.{0,10}\d\d)'
term = r'([Ll]ocation:[\w\W]{1,200}</.{0,20}>|\W[cC]ity:[\w\W]{1,200}</.{0,20}>|\d\dyo\W|\d\d.{0,10}\Wyo\W|\d\d.{0,10}\Wold\W|\d\d.{0,10}\Wyoung\W|\Wage\W.{0,10}\d\d)'

# Setting up arguments (can also get from argparse)
args = {}
args['data_loc'] = '/dfs/scratch1/jdunnmon/data/memex-data/tsvs/output_all_slicetest'
# Getting raw_content column
print('Getting correct column...')
files = os.listdir(args['data_loc'])
df = pd.read_csv(os.path.join(args['data_loc'],files[0]),sep='\t',nrows=10)
col = df.columns.get_loc("memex_raw_content")

# Getting path and setting up data
path = args['data_loc']
file_list = os.listdir(path)
path_list = [os.path.join(path, file) for file in file_list]
file_data = [(path, term, col) for path in path_list if path.endswith('tsv')]

out_dir = path + '/parsed/'
if not os.path.exists(out_dir):
    os.makedirs(out_dir)

print('Processing in parallel')


sc = pyspark.SparkContext(appName="parse_html")
sc.addPyFile('/dfs/scratch1/jdunnmon/repos/extractors/src/utils/dataset_utils.py')
from dataset_utils import parse_html
print('Distributing data...')
distData = sc.parallelize(file_data)
print('Parsing distributed data...')
distData.foreach(parse_html)
sc.stop()
#threads = 30
#pool = Pool(threads) 
#results = pool.map(parse_html, file_data)
#pool.close()
#pool.join()


Getting correct column...
Processing in parallel
Distributing data...
Parsing distributed data...


In [5]:
sc.stop()

In [3]:
import json
import gzip
pth='/lfs/local/0/jdunnmon/data/memex-data/escorts/2015/05/08/0000.jsonl.gz'
with gzip.GzipFile(pth, 'r') as in_file:
        
        # Getting file objects
        json_reader = in_file.read()
        
        for ii, chunk in enumerate(json_reader.splitlines()):
            data = json.loads(chunk)
            break

In [6]:
data.keys()

dict_keys(['type', 'crawl_data', 'url', 'timestamp', 'extractions', 'raw_content', 'extracted_metadata', 'version', 'extracted_text', 'content_type', 'team', 'doc_id', 'crawler'])

In [7]:
field_dict = {'id':'doc_id', 'uuid':'', 'memex_id':'doc_id', 'memex_doc_type':'type', 'memex_raw_content':'raw_content', 'memex_url':'url', 'url':'', 'extractions':'extractions'}

In [8]:
field_dict.keys()

dict_keys(['id', 'uuid', 'memex_id', 'memex_doc_type', 'memex_raw_content', 'memex_url', 'url', 'extractions'])

In [1]:
import sys, os
import warnings
warnings.filterwarnings("ignore", category=UserWarning, module='bs4')
sys.path.append('../utils')
from dataset_utils import parse_html_from_jsonl_gz

in_loc = '/lfs/local/0/jdunnmon/data/memex-data/escorts/2015/05/08/0000.jsonl.gz'
in_parts = in_loc.split('/')
in_parts[in_parts.index('escorts')] = 'escorts_preproc'
in_parts[-1] = in_parts[-1].split('.')[0]+'.tsv'
out_loc = '/'.join(in_parts)
term = r'([Ll]ocation:[\w\W]{1,200}</.{0,20}>|\W[cC]ity:[\w\W]{1,200}</.{0,20}>|\d\dyo\W|\d\d.{0,10}\Wyo\W|\d\d.{0,10}\Wold\W|\d\d.{0,10}\Wyoung\W|\Wage\W.{0,10}\d\d)'
field_dict = {'id':'doc_id', 'uuid':'', 'memex_id':'doc_id', 'memex_doc_type':'type', 'memex_raw_content':'raw_content', 'memex_url':'url', 'url':'', 'extractions':'extractions'}
content_field = 'memex_raw_content'
file_data = (in_loc, out_loc, term, field_dict, content_field)
parse_html_from_jsonl_gz(file_data)

Error on line: 293
Error on line: 2759
Error on line: 4247
Error on line: 6240
Error on line: 8249
Error on line: 8801
Error on line: 10022
Error on line: 10517
Error on line: 10518
Error on line: 10954
Error on line: 11458
Error on line: 11705
Error on line: 13726
Error on line: 14000
Error on line: 14444
Error on line: 15012
Error on line: 15311
Error on line: 17569
Error on line: 18056
Error on line: 18318
Error on line: 18325
Error on line: 18366
Error on line: 18961
Error on line: 20161
Error on line: 20485
Error on line: 20843
Error on line: 21346
Error on line: 21535
Error on line: 22567
Error on line: 22639
Error on line: 22641
Error on line: 24061
Error on line: 25125
Error on line: 25774
Error on line: 25798
Error on line: 29836
Error on line: 31447
Error on line: 31460
Error on line: 31594
Error on line: 31840
Error on line: 32446
Error on line: 32659
Error on line: 35157
Error on line: 36897
Error on line: 38125
Error on line: 38569
Error on line: 39943
Error on line: 40149

## SANDBOX

In [None]:
class Matcher(object):
    """
    Applies a function f : c -> {True,False} to a generator of candidates,
    returning only candidates _c_ s.t. _f(c) == True_,
    where f can be compositionally defined.
    """
    def __init__(self, *children, **opts):
        self.children           = children
        self.opts               = opts
        self.longest_match_only = self.opts.get('longest_match_only', True)
        self.init()
        self._check_opts()

    def init(self):
        pass

    def _check_opts(self):
        """
        Checks for unsupported opts, throws error if found
        NOTE: Must be called _after_ init()
        """
        for opt in self.opts.keys():
            if not opt in self.__dict__:
                raise Exception("Unsupported option: %s" % opt)

    def _f(self, c):
        """The internal (non-composed) version of filter function f"""
        return True

    def f(self, c):
        """
        The recursively composed version of filter function f
        By default, returns logical **conjunction** of operator and single child operator
        """
        if len(self.children) == 0:
            return self._f(c)
        elif len(self.children) == 1:
            return self._f(c) and self.children[0].f(c)
        else:
            raise Exception("%s does not support more than one child Matcher" % self.__name__)

    def _is_subspan(self, c, span):
        """Tests if candidate c is subspan of span, where span is defined specific to candidate type"""
        return False

    def _get_span(self, c):
        """Gets a tuple that identifies a span for the specific candidate class that c belongs to"""
        return c

    def apply(self, candidates):
        """
        Apply the Matcher to a **generator** of candidates
        Optionally only takes the longest match (NOTE: assumes this is the *first* match)
        """
        seen_spans = set()
        for c in candidates:
            if self.f(c) and (not self.longest_match_only or not any([self._is_subspan(c, s) for s in seen_spans])):
                if self.longest_match_only:
                    seen_spans.add(self._get_span(c))
                yield c


WORDS = 'words'

class NgramMatcher(Matcher):
    """Matcher base class for Ngram objects"""
    def _is_subspan(self, c, span):
        """Tests if candidate c is subspan of span, where span is defined specific to candidate type"""
        return c.char_start >= span[0] and c.char_end <= span[1]

    def _get_span(self, c):
        """Gets a tuple that identifies a span for the specific candidate class that c belongs to"""
        return (c.char_start, c.char_end)

class Union(NgramMatcher):
    """Takes the union of candidate sets returned by child operators"""
    def f(self, c):
       for child in self.children:
           if child.f(c) > 0:
               return True
       return False