In [129]:
import numpy as np
import pandas as pd
import os
import binascii
from dask.distributed import Client
from dask import delayed
import dask.bag as db
from functools import partial
from scipy import misc
import dask.dataframe as dd
from collections import defaultdict
from matplotlib import pyplot as plt
%matplotlib inline

# Get all abstract files

In [2]:
abstract_files = []
for root,_,files in os.walk("abstracts"):
    flist = [os.path.join(root,f) for f in files if f.endswith(".txt")]
    abstract_files.extend(flist)

# Get abstracts

Let's store data in **(key, value)** pairs where _key_ is a filename, and _value_ is whatever we want to save.

This way an abstract data is always connected correctly to its original file.

In [87]:
def get_abstract(fname):
    abstract = []
    with open(fname, 'rt', encoding='latin1') as fin:
        # skip file until 'Abstract :' line
        for line in fin:
            if "Abstract    :" in line:
                break

        # gather every non-empty line after
        for raw_line in fin:
            line = raw_line.strip(' \n\r')  # remove newline characters and whitespaces
            if len(line) > 0:
                abstract.append(line)
    
    # valid abstracts are at least 2 lines long
    # found by analyzing the dataset
    if len(abstract) < 2:  
        return None  # no valid abstract
    else:
        abstract = " ".join(abstract)
        abstract = " ".join(abstract.split())  # remove duplicate whitespaces
    
    return (fname, abstract)

# Getting abstracts in a sequential loop

In [None]:
%%time
abstract_texts = []

for f in abstract_files:
    abstract = get_abstract(f)
    if abstract is not None:
        abstract_texts.append(abstract)

# Get abstracts in parallel with Dask, limit max number of parallel jobs

In [125]:
client = Client()
client

0,1
Client  Scheduler: tcp://127.0.0.1:50079  Dashboard: http://127.0.0.1:8787,Cluster  Workers: 4  Cores: 4  Memory: 17.06 GB


In [6]:
%%time
# run in parallel with maximum 1000 partitions, to reduce the total number of parallel items
# also filter out 'None' items

abstract_texts = db.from_sequence(abstract_files, npartitions=1000)\
                   .map(get_abstract)\
                   .filter(lambda x: x is not None)\
                   .compute()

Wall time: 22.4 s


# Compute shingles

In [8]:
# homework
def get_shingles(abstract, k=5):
    fname, text = abstract
    L = len(text)
    shingles = set()  # we use a set to automatically eliminate duplicates
    for i in range(L-k+1):
        shingle = text[i:i+k]
        crc = binascii.crc32(shingle.encode('utf-8')) #& 0xffffffff  # hash the shingle to a 32-bit integer
        shingles.add(crc)
    return fname, shingles

In [9]:
def drop_to_disk(a):
    fname, shingles = a
    fshingles = os.path.join('data', "shingles_{}.h5".format(fname.replace('\\','_').replace('/', '_')))
    dd.from_array(np.array(list(shingles))).to_hdf(fshingles, key="/data")

In [12]:
%%time
# use shingles of length 5 (k=5)

s = db.from_sequence(abstract_texts, npartitions=1000)\
    .map(get_shingles, k=5)\
    .map(drop_to_disk)\
    .compute()

Wall time: 26min 36s


# Compute signatures with Minhash

In [37]:
# homework
# fast implementation of Minhash algorithm
# computes all random hash functions for a shingle at once, using vector operations
# also finds element-wise minimum of two vectors efficiently
def minhash_fast(shingles_file, A, B, nextPrime, maxShingleID, nsig):
    shingles = pd.read_hdf(shingles_file, key='/data')
    signature = np.ones((nsig,)) * (maxShingleID + 1)

    for ShingleID in shingles:
        hashCodes = ((A*ShingleID + B) % nextPrime) % maxShingleID
        np.minimum(signature, hashCodes, out=signature)

    return (shingles_file.replace('data\\shingles_', ''), signature)

def write_signatures_to_disk(s):
    fname, signature = s
    fsignature = os.path.join('signatures', "signature_{}.h5".format(fname.replace('\\','_').replace('/', '_')))
    dd.from_array(np.array(list(signature))).to_hdf(fsignature, key="/data")

In [24]:
# use a fast Minhash algorithm
# use 3 bands and 30 rows to get near-duplicates
bands = 10
rows = 30
nsig = bands*rows  # number of elements in signature, or the number of different random hash functions
maxShingleID = 2**32-1  # record the maximum shingle ID that we assigned
nextPrime = 4294967311  # next prime number after maxShingleID
A = np.random.randint(0, nextPrime, size=(nsig,), dtype=np.int64)
B = np.random.randint(0, nextPrime, size=(nsig,), dtype=np.int64)

In [31]:
%%time
shingles_files = []
for root,_,files in os.walk("data"):
    flist = [os.path.join(root,f) for f in files if f.endswith(".h5")]
    shingles_files.extend(flist)

Wall time: 4.16 s


In [41]:
%%time
#Compute signatures
sigs = db.from_sequence(shingles_files, npartitions=1000)\
    .map(minhash_fast, A, B, nextPrime, maxShingleID, nsig)\
    .map(write_signatures_to_disk)\
    .compute()

tornado.application - ERROR - Future <tornado.concurrent.Future object at 0x000001DFB547F8D0> exception was never retrieved: Traceback (most recent call last):
  File "C:\Users\aalizadeh\AppData\Local\Continuum\Anaconda3\lib\site-packages\tornado\gen.py", line 1069, in run
    yielded = self.gen.send(value)
  File "C:\Users\aalizadeh\AppData\Local\Continuum\Anaconda3\lib\site-packages\distributed\client.py", line 1226, in wait
    raise AllExit()
distributed.client.AllExit
tornado.application - ERROR - Future <tornado.concurrent.Future object at 0x000001DFB91DB6D8> exception was never retrieved: Traceback (most recent call last):
  File "C:\Users\aalizadeh\AppData\Local\Continuum\Anaconda3\lib\site-packages\tornado\gen.py", line 1069, in run
    yielded = self.gen.send(value)
  File "C:\Users\aalizadeh\AppData\Local\Continuum\Anaconda3\lib\site-packages\distributed\client.py", line 1226, in wait
    raise AllExit()
distributed.client.AllExit


Wall time: 1h 33min 33s


# Compute candidate duplicates with Locality Sensitive Hashing

In [164]:
def LSH(sfiles, bands, rows, Ab, Bb, nextPrime, maxShingleID, near_duplicate_count=3):
    numItems = len(sfiles)
    candidates = set()
    for nb in range(bands):
        hashTable = {}
        for ni in range(numItems):
            sgn = np.array(pd.read_hdf(sfiles[ni], key='/data'))
            item = sgn[nb*rows:(nb+1)*rows]
            hash_ = (np.dot(Ab[nb,:], item) + Bb[nb]) % nextPrime % maxShingleID
            if hash_ not in hashTable:
                hashTable[hash_] = [ni]
            else:
                hashTable[hash_].append(ni)
        for _, items in hashTable.items():
            if len(items) >= near_duplicate_count:
                candidates.add(tuple(np.sort(items)))


    return candidates

In [165]:
%%time

signature_files = []
for root,_,files in os.walk("signatures"):
    flist = [os.path.join(root,f) for f in files if f.endswith(".h5")]
    signature_files.extend(flist)

A2 = np.random.randint(0, nextPrime/2, size=(bands, rows), dtype=np.int64)  # now we need a vector of A parameters for each band
B2 = np.random.randint(0, nextPrime/2, size=(bands, ), dtype=np.int64)
near_duplicate_count = 5

file_number = 20000

candidates = delayed(LSH, pure=True)(sfiles=signature_files[:file_number], 
                                     bands=bands, rows=rows, Ab=A2, Bb=B2, 
                                     nextPrime=nextPrime, maxShingleID=maxShingleID, 
                                     near_duplicate_count=near_duplicate_count)\
                   .compute()


tornado.application - ERROR - Future <tornado.concurrent.Future object at 0x000001536674F710> exception was never retrieved: Traceback (most recent call last):
  File "C:\Users\aalizadeh\AppData\Local\Continuum\Anaconda3\lib\site-packages\tornado\gen.py", line 1069, in run
    yielded = self.gen.send(value)
  File "C:\Users\aalizadeh\AppData\Local\Continuum\Anaconda3\lib\site-packages\distributed\client.py", line 1226, in wait
    raise AllExit()
distributed.client.AllExit
tornado.application - ERROR - Future <tornado.concurrent.Future object at 0x000001536670B240> exception was never retrieved: Traceback (most recent call last):
  File "C:\Users\aalizadeh\AppData\Local\Continuum\Anaconda3\lib\site-packages\tornado\gen.py", line 1069, in run
    yielded = self.gen.send(value)
  File "C:\Users\aalizadeh\AppData\Local\Continuum\Anaconda3\lib\site-packages\distributed\client.py", line 1226, in wait
    raise AllExit()
distributed.client.AllExit
tornado.application - ERROR - Future <tornad

Wall time: 1h 26min 27s


In [166]:

def normalize_filename(fname):
    #To get the actual filename from the signature filename
    return fname.replace('.h5', '').replace('/', '\\').replace('signatures\\','').replace('signature_', '')\
    .replace('abstracts_', 'abstracts\\').replace('_a', '\\a')

for i in candidates:
    flist = [normalize_filename(signature_files[:file_number][f]) for f in i]
    print(', '.join(flist))

abstracts\awards_1990\awd_1990_00\a9000356.txt, abstracts\awards_1990\awd_1990_00\a9000527.txt, abstracts\awards_1990\awd_1990_00\a9000962.txt, abstracts\awards_1990\awd_1990_01\a9001029.txt, abstracts\awards_1990\awd_1990_01\a9001030.txt, abstracts\awards_1990\awd_1990_01\a9001217.txt, abstracts\awards_1990\awd_1990_01\a9001218.txt, abstracts\awards_1990\awd_1990_01\a9001582.txt, abstracts\awards_1990\awd_1990_02\a9002509.txt, abstracts\awards_1990\awd_1990_04\a9004079.txt, abstracts\awards_1990\awd_1990_04\a9004221.txt, abstracts\awards_1990\awd_1990_05\a9005994.txt, abstracts\awards_1990\awd_1990_06\a9006086.txt, abstracts\awards_1990\awd_1990_06\a9006126.txt, abstracts\awards_1990\awd_1990_06\a9006435.txt, abstracts\awards_1990\awd_1990_06\a9006605.txt, abstracts\awards_1990\awd_1990_06\a9006856.txt, abstracts\awards_1990\awd_1990_07\a9007001.txt, abstracts\awards_1990\awd_1990_07\a9007037.txt, abstracts\awards_1990\awd_1990_07\a9007050.txt, abstracts\awards_1990\awd_1990_07\a90071

In [175]:
num_examples = 4
for c in candidates:
    if len(c) >= 10:
        print("------------\nCandidate Abstracts: total of {} abstracts".format(len(c)))
        for f in c:
            f_content = get_abstract(normalize_filename(signature_files[:file_number][f]))
            print("Content of the file {}:\n{}\n".format(f_content[0], f_content[1]))
        num_examples -= 1
        if num_examples < 1:
            break

------------
Candidate Abstracts: total of 23 abstracts
Content of the file abstracts\awards_1990\awd_1990_00\a9000356.txt:
This award will support study of the Japanese language by an American scientist or engineer by providing a stipend, tuition, or other course-related expenses. The objectives of the program are to enable active U.S. scientists and engineers to acquire sufficient familiarity with the language and customs of Japan to allow them to function independently there; or to acquire sufficient expertise in reading Japanese to be able to understand technical abstracts in their field of interest. There is a critical need for more effective technical communication between American and Japanese researchers. This award is intended to help remove language and cultural barriers to U.S.-Japanese cooperative research and to facilitate science and technology exchange.

Content of the file abstracts\awards_1990\awd_1990_00\a9000527.txt:
This award will support study of the Japanese lang