In [None]:
# Text Processing and LDA

# Change Log
### v.1
- initial build

### v.2 
- added code for Spark

### v.3 
- added code to load data from web

### v.4
- switched from `ntlk` to `pattern` due to issues of `nltk` with Spark

### v.5
- switched from `BeautifulSoup` to `lxml` for performance
- mapping from industries to topics
- saves results to AWS

In [None]:
# not necessary on AWS
! pip install pattern

In [1]:
import numpy as np
import scipy.sparse as sps
from bs4 import BeautifulSoup
import os
#import nltk
#from nltk.tokenize import word_tokenize
from pattern.en import tag
import collections
import gensim
import requests, zipfile, StringIO
from lxml import etree 
import json

In [None]:
from pyspark.sql import SQLContext
from pyspark.sql.types import *

In [None]:
# when using (not necessary on AWS)
#nltk.download()
#nltk.download('punkt')
#nltk.download('maxent_treebank_pos_tagger')
#nltk.download('averaged_perceptron_tagger')

In [None]:
# Spark setup for vagrant (not necessary on AWS)

import findspark
findspark.init()
print findspark.find()
# Depending on your setup you might have to change this line of code
#findspark makes sure I dont need the below on homebrew.
#os.environ['SPARK_HOME']="/usr/local/Cellar/apache-spark/1.5.1/libexec/"
#the below actually broke my spark, so I removed it. 
#Depending on how you started the notebook, you might need it.
#os.environ['PYSPARK_SUBMIT_ARGS']="--master local pyspark --executor-memory 4g"

import pyspark
conf = (pyspark.SparkConf()
    .setMaster('local')
    .setAppName('pyspark')
    .set('spark.executor.memory', '4g')
    )
sc = pyspark.SparkContext(conf=conf)

In [None]:
%%time
# load data from web (given url to file with urls of zips)
# e.g.'https://s3.amazonaws.com/cs109project/2004.txt')
# for each zip: loads all xmls files into dictionary with key=filename

urls = 'https://s3.amazonaws.com/cs109project/2002-2004.txt'
rs = requests.get(urls)
data = {}
for url in rs.content.split('\r\n'):
    #print url
    r = requests.get(url)
    if zipfile.is_zipfile(StringIO.StringIO(r.content)):
        z = zipfile.ZipFile(StringIO.StringIO(r.content))
        xmls = [member for member in z.namelist() if os.path.splitext(member.lower())[1]=='.xml']
        newdata = {os.path.basename(xml): z.open(xml).read() for xml in xmls}
        data.update(newdata)
    else:
        print url+' contains no zip file.'

In [37]:
%%time
# load data from web (given url to zip file)
# e.g. e.g.'https://s3.amazonaws.com/cs109project/Unpacked+Data.zip'
# loads all xmls files from given zip into dictionary with key=filename

url = 'https://s3.amazonaws.com/cs109project/Unpacked+Data.zip'
r = requests.get(url)
if zipfile.is_zipfile(StringIO.StringIO(r.content)):
    z = zipfile.ZipFile(StringIO.StringIO(r.content))
    xmls = [member for member in z.namelist()[:100] if os.path.splitext(member.lower())[1]=='.xml']
    data = {os.path.basename(xml): z.open(xml).read() for xml in xmls}
else:
    print 'URL does not link to zip file. Please provide valid URL.'

CPU times: user 2.66 s, sys: 2.18 s, total: 4.85 s
Wall time: 5.01 s


In [None]:
# load data from disk
# loads data with structure year\weeks\xmls into dictionary with key=filename
source = os.getcwd()
path = os.path.join(source,'2014')
data = {}
for week in os.listdir(path):
    week_path = os.path.join(path, week)
    for patent in os.listdir(week_path):
        patent_path = os.path.join(week_path, patent)
        if os.path.isfile(patent_path):
            with open(patent_path, 'r') as my_file:
                data[patent] = my_file.read()

In [51]:
len(data)

500642

In [10]:
def get_abstract_soup(xml, xml_format='pre2005'):
    if xml_format=='pre2005':
        abstract = BeautifulSoup(xml, 'lxml').find('sdoab').get_text().strip()
    else:
        abstract = BeautifulSoup(xml, 'lxml').find('abstract').get_text()
    return abstract


def get_abstract_lxml(xml, xml_format='pre2005'):
    if xml_format=='pre2005':
        xml = xml.replace('&','')
        abstract = etree.XML(xml).xpath('//SDOAB//PDAT')[0].text
    else:
        abstract = etree.XML(xml).xpath('//abstract')[0][0].text
        
    if not abstract: #lxml less robust than soup (e.g. lxml returns None for US08623623-20140107.XML)
        abstract = get_abstract_soup(xml, xml_format)
        
    return abstract


def get_supplement_lxml(xml, xml_format='pre2005'):
    if xml_format=='pre2005':
        xml = xml.replace('&','')
        city_tag = '//CITY//PDAT'
        state_tag = '//STATE//PDAT'
        assignee_tag = '//ONM//PDAT'
    else:
        city_tag = '//city'
        state_tag = '//state'
        assignee_tag = '//assignee//orgname'        
        
    # first inventor geography 
    try:
        inv_city = etree.XML(xml).xpath(city_tag)[0].text
    except:
        inv_city = None

    try:
        inv_state = etree.XML(xml).xpath(state_tag)[0].text
    except:
        inv_state = None

    if inv_state:
        try:
            if xml_format=='pre2005':
                inv_ctry = etree.XML(xml).xpath(state_tag)[0].getnext().getchildren()[0].text
            else:
                inv_ctry = etree.XML(xml).xpath(state_tag)[0].getnext().text
        except:
            inv_ctry = None
    else:
        try:
            if xml_format=='pre2005':
                inv_ctry = etree.XML(xml).xpath('//CITY')[0].getnext().getchildren()[0].text
            else:
                inv_ctry = etree.XML(xml).xpath(city_tag)[0].getnext().text            
        except:
            inv_ctry = None

    # last org name        
    try:
        assignee = etree.XML(xml).xpath(assignee_tag)[0].text
    except:
        assignee = None
    return [inv_city, inv_state, inv_ctry, assignee]

In [11]:
get_supplement_lxml(data[data.keys()[1]])#, xml_format='post')

['Wyomissing Hills', 'PA', None, 'Lucent Technologies Inc.']

In [12]:
get_abstract_lxml(data[data.keys()[1]])#, xml_format='post')

'A self-aligned clock recovery circuit for synchronizing a local clock with an input data signal includes a sampling type phase detector for generating an output signal based on the phase difference between the local clock and the data signal timing. The phase detector obtains samples of consecutive data symbols at sampling times corresponding to transitions of the local clock, and obtains a data crossover sample at a sampling instant in between those of the consecutive data symbol samples. A phase shifter is employed to phase shift the local clock by an amount corresponding to a time varying modulation signal so as to obtain each data crossover sample at a variable sampling instant relative to the associated consecutive symbol samples. Logic circuitry determines whether the local clock appears to be early or late based on a comparison of the logic levels of the symbol samples and the associated data crossover sample, and provides a corresponding output signal through a filter to the l

In [13]:
get_abstract_soup(data[data.keys()[1]])#, xml_format='post')

u'A self-aligned clock recovery circuit for synchronizing a local clock with an input data signal includes a sampling type phase detector for generating an output signal based on the phase difference between the local clock and the data signal timing. The phase detector obtains samples of consecutive data symbols at sampling times corresponding to transitions of the local clock, and obtains a data crossover sample at a sampling instant in between those of the consecutive data symbol samples. A phase shifter is employed to phase shift the local clock by an amount corresponding to a time varying modulation signal so as to obtain each data crossover sample at a variable sampling instant relative to the associated consecutive symbol samples. Logic circuitry determines whether the local clock appears to be early or late based on a comparison of the logic levels of the symbol samples and the associated data crossover sample, and provides a corresponding output signal through a filter to the 

In [14]:
def get_nouns(text):  
    # using pattern
    tagged = tag(text.lower(), tokenize=True)
    
    # using nltk (if statements only necessary on AWS)
    #if '/tmp' not in nltk.data.path:
    #    nltk.data.path.append('/tmp')
    #if 'tokenizers' not in os.listdir('/tmp'):
    #    nltk.download('punkt', '/tmp')
    #    nltk.download('maxent_treebank_pos_tagger', '/tmp')
    #    nltk.download('averaged_perceptron_tagger', '/tmp')
    #tokenized = word_tokenize(text.lower())
    #tagged = nltk.pos_tag(tokenized)
    
    nouns = [a for (a, b) in tagged if b == 'NN']
    return nouns

# for each noun in list of nouns: get its number and number of occurences in list
def tocorpus(nouns,vocabulary):
    count = collections.defaultdict(int) # to count number of occurences of a noun
    for noun in nouns:
        count[vocabulary[noun]] +=1  # for new nouns: creates new key, sets value to 1. for existing keys: increases value by 1
    return count.items()

In [22]:
%%time
# code does not require Spark

data_parsed = {k: get_abstract_lxml(v) for (k,v) in data.iteritems()}
data_nouns = {k: get_nouns(v) for (k,v) in data_parsed.iteritems()}

# loops through all list of nouns and gets all nouns and creates set of them (set ->each noun only once)
flat = {item for sublist in data_nouns.values() for item in sublist}

vocab = dict(zip(flat,range(len(flat))))    #word-to-number dict 
id2word = dict(zip(range(len(flat)),flat))  #number-to-word dict

corpus = {k: tocorpus(v,vocab) for (k,v) in data_nouns.iteritems()}
corpval = corpus.values()

CPU times: user 4.42 s, sys: 36 ms, total: 4.46 s
Wall time: 4.47 s


In [11]:
# on AWS/vagrant: check inodes
! df -i

Filesystem        Inodes  IUsed     IFree IUse% Mounted on
/dev/xvda1        655360 178231    477129   28% /
devtmpfs        31485576    669  31484907    1% /dev
tmpfs           31487830      1  31487829    1% /dev/shm
/dev/xvdb      314572800   9190 314563610    1% /mnt
/dev/xvdc      314572800     75 314572725    1% /mnt1


In [12]:
# on AWS/vagrant: check disk space
! df -h

Filesystem      Size  Used Avail Use% Mounted on
/dev/xvda1      9.8G  8.1G  1.7G  84% /
devtmpfs        121G   68K  121G   1% /dev
tmpfs           121G     0  121G   0% /dev/shm
/dev/xvdb       300G  228M  300G   1% /mnt
/dev/xvdc       300G  341M  300G   1% /mnt1


In [None]:
# on AWS/vagrant: check memory usage
! free -m

In [53]:
# set number of partitions for rdd throughout notebook
part = 16384#8192

In [None]:
%%time
# code requires Spark

data_rdd = sc.parallelize(data.iteritems(),part)
print data_rdd.getNumPartitions()

data_nouns = (data_rdd.mapValues(lambda v: get_abstract_lxml(v))
               .mapValues(lambda v: get_nouns(v))
).cache()

# associates all distinct words with a number
vocabtups = (data_nouns.flatMap(lambda (k,v): v)
             .distinct()
             .zipWithIndex()
)

vocab = vocabtups.collectAsMap()                            #word-to-number dict
id2word = vocabtups.map(lambda (x,y): (y,x)).collectAsMap() #number-to-word dict

corpus = data_nouns.mapValues(lambda v: tocorpus(v, vocab))
corpval = corpus.values().collect()

In [17]:
print len(data),len(corpval)
print len(vocab)

19627 19627
19324


In [18]:
%%time
num_topics=10
lda2 = gensim.models.ldamodel.LdaModel(corpval, id2word=id2word, num_topics=num_topics, passes=1)



CPU times: user 34.9 s, sys: 0 ns, total: 34.9 s
Wall time: 34.8 s


In [19]:
lda2.print_topics()

[u'0.019*information + 0.017*method + 0.016*system + 0.015*invention + 0.015*network + 0.015*device + 0.014*interface + 0.013*user + 0.010*telephone + 0.009*tissue',
 u'0.041*portion + 0.025*end + 0.023*vehicle + 0.019*side + 0.016*device + 0.016*body + 0.015*wall + 0.015*container + 0.013*system + 0.013*panel',
 u'0.044*system + 0.021*unit + 0.019*method + 0.019*information + 0.015*control + 0.015*apparatus + 0.013*data + 0.012*computer + 0.011*communication + 0.011*ink',
 u'0.066*layer + 0.042*circuit + 0.031*substrate + 0.029*voltage + 0.025*semiconductor + 0.023*power + 0.016*electrode + 0.015*device + 0.015*gate + 0.014*film',
 u'0.031*device + 0.030*position + 0.022*frame + 0.017*beam + 0.017*assembly + 0.015*lens + 0.015*end + 0.015*direction + 0.014*axis + 0.013*plate',
 u'0.023*filter + 0.018*air + 0.016*surface + 0.016*water + 0.013*toner + 0.013*ring + 0.013*element + 0.012*vacuum + 0.010*roller + 0.010*pipe',
 u'0.043*material + 0.023*metal + 0.020*surface + 0.016*heat + 0.

In [20]:
# fetch naics nouns
naics_url = 'https://s3.amazonaws.com/cs109projectr/naics_nouns.json'
naics_r = requests.get(naics_url)
naics_nouns = json.loads(naics_r.content)

# take only manufacturing
industries =  [u'311',u'312',u'313',u'314',u'315',u'316',u'321',u'322',u'323',u'324',u'325',u'326',u'327',u'331',u'332',u'333',u'334',u'335',u'336',u'337',u'339',
              ]
naics_subset = []
for i in naics_nouns:
    if i['naics_code'] in industries:
        naics_subset.append(i)


In [21]:
# check that no invalid naics code entered
print len(industries), len(naics_subset)

26 26


In [22]:
# format naics data
'''
format_naics
input = list of dictionaries with naics definitions (json output of ind definitions ipython notebook)
output = naics_tuples, naics_codes
naics_tuples = list of tuples, tuple1 = (noun1, count1), tuple2 = ()...
naics_codes = list of naics codes
'''
def format_naics(input):
    naics_tuples = [i['noun_dict'].items() for i in input]
    naics_codes = [i['naics_code'] for i in input]
    return naics_tuples, naics_codes

In [23]:
naics_tuples, naics_codes = format_naics(naics_subset)
print len(naics_tuples), len(naics_codes)

26 26


In [24]:
# replace naics nouns with vocab id
# i.e. preps naics definitions with the same signature as corpval
print 'list of nouns in naics definitions that do not appear in patents'
naics_output = []
for i in naics_tuples:
    new_noun_list = []
    for j in i:
        try:
            new_noun_list.append((vocab[j[0]], j[1]))
        except:
            pass
            #print j[0]
    naics_output.append(new_noun_list)

# consider dropping industries with fewer than 10 nouns or whatever cutoff seems appropriate

list of nouns in naics definitions that do not appear in patents


In [25]:
# sparse matrix of industry-topic relation
val = []
col = []
row = []
for i,industry in enumerate(naics_output):
    for topic in lda2.get_document_topics(industry):
        row.append(i) # only changes after len(industry) numbers of loops
        col.append(topic[0]) # index of topic
        val.append(topic[1]) # probability that industry belong to this topic
# spark doc recommends np.array over list for efficiency (http://spark.apache.org/docs/latest/mllib-data-types.html)
indstr_tpc = sps.csc_matrix((np.array(val), (np.array(row), np.array(col))), shape=(len(industries), num_topics))

In [26]:
indstr_tpc.toarray() # sanity check: compare this to [lda2.get_document_topics(i) for i in naics_output]

array([[ 0.1232419 ,  0.02598243,  0.34301399,  0.18799013,  0.03144995,
         0.06063741,  0.07118193,  0.01069146,  0.04197328,  0.10383751],
       [ 0.08276814,  0.38395326,  0.04007173,  0.06566465,  0.04191848,
         0.03061305,  0.08926058,  0.04507391,  0.08268692,  0.13798929],
       [ 0.02378697,  0.09066628,  0.02678708,  0.02038845,  0.04258457,
         0.10494352,  0.45217201,  0.07185087,  0.        ,  0.16540534],
       [ 0.01729065,  0.12850362,  0.05333094,  0.0305271 ,  0.09097744,
         0.04485774,  0.44873224,  0.08416759,  0.        ,  0.10141961],
       [ 0.03078105,  0.07158176,  0.02249734,  0.        ,  0.        ,
         0.17695425,  0.35215268,  0.        ,  0.13922742,  0.20511032],
       [ 0.07686529,  0.0344051 ,  0.0631793 ,  0.1752406 ,  0.0279589 ,
         0.06712524,  0.12061129,  0.05991341,  0.0819431 ,  0.29275777],
       [ 0.        ,  0.0285028 ,  0.34632276,  0.        ,  0.02232747,
         0.09553749,  0.35977942,  0.10439474

## Code for single instance

In [27]:
'''
input
nouns: list of nouns with signature as corpval
lda: trained lda classifier
indstr_tpc_matrix: matrix of relation between industries and topics (dimesions: #indutries x # topics)

output
list of probabilities for each topic
'''
def tpc2indstr(nouns, lda, indstr_tpc_matrix):
    topics = lda.get_document_topics(nouns)
    #os.environ['HOME'] = '/tmp'
    # turn topic probabilities into sparse row vector
    val = []
    col = []
    for topic in topics:
        col.append(topic[0]) # index of topic
        val.append(topic[1]) # probability that industry belong to this topic
    tpc_vector = sps.csc_matrix((np.array(val), (np.zeros(len(val)), np.array(col))), shape=(1, num_topics))
    
    # combine probabilities with relation between topics and industries
    indstr = indstr_tpc_matrix * tpc_vector.transpose()
    
    # normalize such that sum = 1 -> interpretable as probability
    indstr_norm = indstr / indstr.sum()
    
    # convert sparse matrix into array into flattened list
    return indstr_norm.transpose().toarray().flatten().tolist()

In [28]:
# on single instance: grant permission
! sudo chmod -R ugo+rw /home

In [29]:
# WORKS ONLY ON SINGLE INSTANCE,NOT ON CLUSTER
# recall: corpval = corpus.values().collect()
instrs = corpus.mapValues(lambda v: tpc2indstr(v, lda2, indstr_tpc))
supp = data_rdd.mapValues(lambda v: get_supplement_lxml(v))

## Code for cluster

In [22]:
'''
input
nouns: list of nouns with signature as corpval
lda: trained lda classifier
indstr_tpc_matrix: matrix of relation between industries and topics (dimesions: #indutries x # topics)

output
list of probabilities for each topic
'''
def tpc2indstr_cluster(tpcs, indstr_tpc_matrix):
    # turn topic probabilities into sparse row vector
    val = []
    col = []
    for topic in tpcs:
        col.append(topic[0]) # index of topic
        val.append(topic[1]) # probability that industry belong to this topic
    tpc_vector = sps.csc_matrix((np.array(val), (np.zeros(len(val)), np.array(col))), shape=(1, num_topics))
    
    # combine probabilities with relation between topics and industries
    indstr = indstr_tpc_matrix * tpc_vector.transpose()
    
    # normalize such that sum = 1 -> interpretable as probability
    indstr_norm = indstr / indstr.sum()
    
    # convert sparse matrix into array into flattened list
    return indstr_norm.transpose().toarray().flatten().tolist()

In [30]:
%%time
# to avoid permission error when running get_document_topics on executor
tpcs = {k: lda2.get_document_topics(v) for (k,v) in corpus.collect()}
tpcs_rdd = sc.parallelize(tpcs.iteritems(), part)

CPU times: user 11.2 s, sys: 24 ms, total: 11.2 s
Wall time: 19.1 s


In [31]:
# recall: corpval = corpus.values().collect()
instrs = tpcs_rdd.mapValues(lambda v: tpc2indstr_cluster(v, indstr_tpc))
supp = data_rdd.mapValues(lambda v: get_supplement_lxml(v, xml_format='pre2005'))

## Same code for single instance and cluster

In [30]:
instrs.take(1)

[('US06353594-20020305.XML',
  [0.0909609107674464,
   0.0289102386472117,
   0.010148724715883515,
   0.014195538238002063,
   0.0060817315097496764,
   0.05475341408531665,
   0.03410002146605919,
   0.018112463022192497,
   0.021144688901333638,
   0.07588262096461587,
   0.03560342213243369,
   0.08623378335443734,
   0.08903768555858754,
   0.07642027416733223,
   0.04126517327215404,
   0.0620940713088097,
   0.026518177240714244,
   0.03501149381947394,
   0.019731012482289142,
   0.02195631881617554,
   0.06829588270496435,
   0.00659029170498877,
   0.0011494551266106208,
   0.03346179536308506,
   0.00756573119056009,
   0.03477507943957262])]

In [31]:
joined = supp.join(instrs)

In [32]:
%%time
joined.take(1)

CPU times: user 1.04 s, sys: 272 ms, total: 1.32 s
Wall time: 1min 58s


[('US06341454-20020129.XML',
  (['Montauk', 'NY', None, None],
   [0.023133051577601534,
    0.016631934939355518,
    0.04682354118291602,
    0.05067104242127523,
    0.03577181775435819,
    0.01946119564580897,
    0.04457612544452538,
    0.07805857604535972,
    0.05060315743431658,
    0.026108277011777332,
    0.024415389196234257,
    0.024272697117669283,
    0.021915368415286438,
    0.02853778572702099,
    0.0423785116665556,
    0.033931200898239026,
    0.05004234027336171,
    0.045677721097780545,
    0.05074159257613148,
    0.05515080886857469,
    0.029974158257698377,
    0.05026912378221485,
    0.0500737438498226,
    0.043805444947937054,
    0.015048984897513107,
    0.041926408970665525]))]

In [40]:
def in1list(patent,data):
    supp = data[0]
    indstrs = data[1]
    patent_id, date = patent[:-4].split('-')
    merged = [patent_id, date] + supp + indstrs
    return merged

In [41]:
joined_flat = joined.map(lambda (k,v): in1list(k,v))

In [42]:
joined_flat.take(1)

[['US06341454',
  '20020129',
  'Montauk',
  'NY',
  None,
  None,
  0.023133051577601534,
  0.016631934939355518,
  0.04682354118291602,
  0.05067104242127523,
  0.03577181775435819,
  0.01946119564580897,
  0.04457612544452538,
  0.07805857604535972,
  0.05060315743431658,
  0.026108277011777332,
  0.024415389196234257,
  0.024272697117669283,
  0.021915368415286438,
  0.02853778572702099,
  0.0423785116665556,
  0.033931200898239026,
  0.05004234027336171,
  0.045677721097780545,
  0.05074159257613148,
  0.05515080886857469,
  0.029974158257698377,
  0.05026912378221485,
  0.0500737438498226,
  0.043805444947937054,
  0.015048984897513107,
  0.041926408970665525]]

In [43]:
sqlContext = SQLContext(sc)

In [44]:
supplements = ['city', 'state', 'country', 'assignee']
colnames = ['patent_id', 'date'] + supplements + industries

fields = [StructField(field_name, StringType(), True) for field_name in colnames]
schema = StructType(fields)

In [45]:
df = sqlContext.createDataFrame(joined_flat, schema)

In [46]:
print df.count(), len(data)

19627 19627


In [47]:
df.take(3)

[Row(patent_id=u'US06341454', date=u'20020129', city=u'Montauk', state=u'NY', country=None, assignee=None, 311=u'0.023133051577601534', 312=u'0.016631934939355518', 313=u'0.04682354118291602', 314=u'0.05067104242127523', 315=u'0.03577181775435819', 316=u'0.01946119564580897', 321=u'0.04457612544452538', 322=u'0.07805857604535972', 323=u'0.05060315743431658', 324=u'0.026108277011777332', 325=u'0.024415389196234257', 326=u'0.024272697117669283', 327=u'0.021915368415286438', 331=u'0.02853778572702099', 332=u'0.0423785116665556', 333=u'0.033931200898239026', 334=u'0.05004234027336171', 335=u'0.045677721097780545', 336=u'0.05074159257613148', 337=u'0.05515080886857469', 339=u'0.029974158257698377', 511=u'0.05026912378221485', 512=u'0.0500737438498226', 515=u'0.043805444947937054', 516=u'0.015048984897513107', 517=u'0.041926408970665525'),
 Row(patent_id=u'US06350229', date=u'20020226', city=u'Bilthoven', state=u'MN', country=None, assignee=u'Medtronic, Inc.', 311=u'0.024644634320671217', 31

In [49]:
df.repartition(1).write.save('s3n://cs109project/df8', format='json')