In [1]:
import argparse
import gzip, os, csv
import numpy as np
import random
import time
import networkx as nx

In [2]:
import findspark
findspark.init()

from pyspark import SparkConf, SparkContext

In [3]:
if False: 
    sc.stop()

config = SparkConf()
config.setMaster("local[10]")
config.set("spark.executor.memory", "10g")
config.set('spark.driver.memory', '10g')
config.set("spark.memory.offHeap.enabled",True)
config.set("spark.memory.offHeap.size","10g") 
sc = SparkContext(conf=config)
print (sc)

<SparkContext master=local[10] appName=pyspark-shell>


In [4]:
def addTriple(net, source, target, edge):
    if source in net:
        if  target in net[source]:
            net[source][target].add(edge)
        else:
            net[source][target]= set([edge])
    else:
        net[source]={}
        net[source][target] =set([edge])
            
def getLinks(net, source):
    if source not in net:
        return {}
    return net[source]

def randomWalkUniform(triples, startNode, max_depth=5):
    next_node =startNode
    path = 'n'+str(startNode)+'->'
    for i in range(max_depth):
        neighs = getLinks(triples,next_node)
        #print (neighs)
        if len(neighs) == 0: break
        weights = []
        queue = []
        for neigh in neighs:
            for edge in neighs[neigh]:
                queue.append((edge,neigh))
                weights.append(1.0)
        edge, next_node = random.choice(queue)
        path = path+ 'e'+str(edge)+'->'
        path = path+ 'n'+str(next_node)+'->'

    return path

In [8]:
def preprocess(folders, filename):
    entity2id = {}
    relation2id = {}
    triples = {}

    ent_counter = 0
    rel_counter = 0
    for dirname in folders:
        for fname in os.listdir(dirname):
            if not filename in fname: continue
            print (fname)
            if fname.endswith('.gz'):
                kgfile = gzip.open(os.path.join(dirname, fname), mode='rt')
            else:
                kgfile = open(os.path.join(dirname, fname), mode='rt')
                
            for line in csv.reader(kgfile, delimiter=' ', quotechar='"'):
                h = line[0]
                r = line[1]
                t = line[2]

                if h in entity2id:
                    hid = entity2id[h]
                else:
                    entity2id[h] = ent_counter
                    ent_counter+=1
                    hid = entity2id[h]

                if t in entity2id:
                    tid = entity2id[t]
                else:
                    entity2id[t] = ent_counter
                    ent_counter+=1
                    tid = entity2id[t]

                if r in relation2id:
                    rid = relation2id[r]
                else:
                    relation2id[r] = rel_counter
                    rel_counter+=1
                    rid = relation2id[r]
                addTriple(triples, hid, tid, rid)
            print ('Relation:',rel_counter, ' Entity:',ent_counter)
    return entity2id,relation2id,triples

In [9]:
folders = ['../data/rdf/']
fileext = '.nq'
entity2id, relation2id, triples = preprocess(folders, fileext)

foodb.nq
Relation: 12  Entity: 92908


In [10]:
num_triples=0
for source in triples:
    for  target in triples[source]:
        num_triples+=len(triples[source][target])
print ('Number of triples',num_triples)

Number of triples 367941


In [11]:
def randomNWalkUniform(triples, n, walks, path_depth):
    path=[]
    for k in range(walks):
        walk = randomWalkUniform(triples, n, path_depth)
        path.append(walk)
    path = list(set(path))
    return path

In [12]:
walks = 5
path_depth = 10
paths = randomNWalkUniform(triples, 100, walks, path_depth)
print('\n'.join(paths))

n100->e5->n100->e5->n100->e7->n92899->
n100->e5->n100->e5->n100->e0->n3->
n100->e7->n92899->
n100->e5->n100->e7->n92899->
n100->e6->n84313->


In [14]:
entities = list(entity2id.values())
b_triples = sc.broadcast(triples)


In [15]:
folder = './walks/'
if not os.path.isdir(folder):
    os.mkdir(folder)
walks = 200
maxDepth = 3
for path_depth in range(1,maxDepth):
    filename = folder+'randwalks_n%d_depth%d_pagerank_uniform.txt'%(walks, path_depth)
    print (filename)
    start_time =time.time()
    rdd = sc.parallelize(entities).flatMap(lambda n: randomNWalkUniform(b_triples.value, n, walks, path_depth))
    rdd.saveAsTextFile(filename)
    elapsed_time = time.time() - start_time
    print ('Time elapsed to generate features:',time.strftime("%H:%M:%S", time.gmtime(elapsed_time)))

./walks/randwalks_n200_depth1_pagerank_uniform.txt
Time elapsed to generate features: 00:00:17
./walks/randwalks_n200_depth2_pagerank_uniform.txt
Time elapsed to generate features: 00:00:21


In [16]:
import gensim
class MySentences(object):
    def __init__(self, dirname, filename):
        self.dirname = dirname
        self.filename = filename

    def __iter__(self):
        print ('Processing ',self.filename)
        for subfname in os.listdir(self.dirname):
            if not self.filename in subfname: continue
            fpath = os.path.join(self.dirname, subfname)
            for fname in os.listdir(fpath):
                if not 'part' in fname: continue
                if '.crc' in fname: continue
                try:
                    for line in open(os.path.join(fpath, fname), mode='r'):
                        line = line.rstrip('\n')
                        words = line.split("->")
                        yield words
                except Exception:
                    print("Failed reading file:")
                    print(fname)

In [17]:
datafilename = './walks/' 
pattern = 'uniform'
sentences = MySentences(datafilename, filename=pattern) # a memory-friendly iterator
model = gensim.models.Word2Vec(size=200, workers=5, window=5, sg=1, negative=15, iter=5)

In [18]:
model.build_vocab(sentences)
corpus_count = model.corpus_count
model.train(sentences, total_examples=corpus_count, epochs =5)

Processing  uniform
Processing  uniform
Processing  uniform
Processing  uniform
Processing  uniform
Processing  uniform


(23319263, 42366440)

In [None]:
model.wv.save('vectors/w2v_vectors.kv')

In [19]:
id2entity = { value:key for key,value in entity2id.items()} 

In [60]:
def extractFeatureVector(model, drugs, id2entity, output): 
        
    fw=open(output,'w')
    fw.write(str(len(drugs))+" 200\n")

    for id_ in sorted(drugs):
        nid = 'n'+str(id_)
        if  (nid) not in  model.wv:
            #print (nid)
            continue
        vec = model.wv[nid]
        vec = " ".join(map(str,vec))
        #print (id2entity[id_])
        fw.write( id2entity[id_]+' '+str(vec)+'\n')
    fw.close()

In [64]:
foods = []
for e in entity2id.keys():
    if e.startswith('<https://foodb.ca/foods/'):
        if 'n'+str(entity2id[e]) in model.wv:
            foods.append(entity2id[e])

In [65]:
foods[:10]

[2, 7, 9, 11, 14, 17, 19, 21, 23, 25]

In [66]:
extractFeatureVector(model, foods, id2entity, '../data/embedding/rdf2vec_vectors.txt')
    