# Distributed LDA with gensim

This is a work in progress attempt to distribute gensim's LDA. Gensim supports this using the [Python Remote Objects](https://pyro4.readthedocs.io/en/stable/) library.

Install dependencies.

In [1]:
!pip3 install gensim[distributed]

You should consider upgrading via the '/usr/local/bin/python3.6 -m pip install --upgrade pip' command.[0m


In [2]:
import os
import time
from collections import defaultdict

import cdsw
from gensim import corpora, models



Launch a Pyro name server and fetch the IP address.

In [3]:
name_server = cdsw.launch_workers(
  n=1,
  cpu=1,
  memory=2,
  kernel="python3",
  code=f"!export PYRO_SERIALIZERS_ACCEPTED=pickle; export PYRO_SERIALIZER=pickle; python3 -m Pyro4.naming -n 0.0.0.0; while true; do sleep 10; done"
)

It takes a few seconds to establish an IP, so we pause before we fetch it.

In [4]:
time.sleep(10)

Fetch the IP address of the name server.

In [5]:
name_server_ip = [
    worker["ip_address"] for worker in cdsw.list_workers()
    if worker["id"] == name_server[0]["id"]
][0]

The IP should not be 'unknown' (if it is, re-run the above cell to fetch again).

In [6]:
name_server_ip

'100.100.149.250'

Launch some CDSW workers, with one gensim lda_worker on each node.

In [7]:
workers = cdsw.launch_workers(
  n=3,
  cpu=1,
  memory=2,
  kernel="python3",
  code=f"!export PYRO_SERIALIZERS_ACCEPTED=pickle; export PYRO_SERIALIZER=pickle; python3 -m gensim.models.lda_worker --host {name_server_ip} --verbose"
)

Launch a final CDSW worker to host the gensim dispatcher.

In [8]:
dispatcher = cdsw.launch_workers(
  n=1,
  cpu=1,
  memory=2,
  kernel="python3",
  code=f"!export PYRO_SERIALIZERS_ACCEPTED=pickle; export PYRO_SERIALIZER=pickle; python3 -m gensim.models.lda_dispatcher --host {name_server_ip}; while true; do sleep 10; done"
)

This is all we need.

In [9]:
len(cdsw.list_workers())

5

This is a minimum example of LDA, hacked together from the gensim tutorials. It gives the LDA algorithm something (very simple) to do. We would not distribute LDA for such a tiny dataset.

In [10]:
# from the gensim tutorials:

documents = [
    "Human machine interface for lab abc computer applications",
    "A survey of user opinion of computer system response time",
    "The EPS user interface management system",
    "System and human system engineering testing of EPS",
    "Relation of user perceived response time to error measurement",
    "The generation of random binary unordered trees",
    "The intersection graph of paths in trees",
    "Graph minors IV Widths of trees and well quasi ordering",
    "Graph minors A survey",
]

# remove common words and tokenize
stoplist = set('for a of the and to in'.split())
texts = [
    [word for word in document.lower().split() if word not in stoplist]
    for document in documents
]

# remove words that appear only once
frequency = defaultdict(int)
for text in texts:
    for token in text:
        frequency[token] += 1

texts = [
    [token for token in text if frequency[token] > 1]
    for text in texts
]

id2word = corpora.Dictionary(texts)
corpus = [id2word.doc2bow(text) for text in texts]

Setting the same environment variables on this host because I've tried making it an `lda_worker` in the terminal, which changes nothing. Making this node the `Pyro4.naming` name server gives a serialization error.

In [11]:
os.environ["PYRO_HOST"] = name_server_ip
os.environ["PYRO_SERIALIZERS_ACCEPTED"] = "pickle"
os.environ["PYRO_SERIALIZER"] = "pickle"

Fit an LDA model. This will be almost instantaneous, because the task is so small. If the next cell errors, it is likely because the worker nodes are still scheduled. Await them scheduling (you can view the status of workers in the CDSW Sessions pane), then try running the cell again.

In [12]:
lda = models.LdaModel(
    corpus=corpus,
    id2word=id2word,
    num_topics=3,
    distributed=True,
    ns_conf={'host': name_server_ip}
)

Check that the model actually did something by viewing the top topics. It's a tiny corpus, don't expect these to make sense.

In [13]:
lda.top_topics(corpus)

[([(0.20227258, 'system'),
   (0.10964208, 'user'),
   (0.10845309, 'computer'),
   (0.10782733, 'eps'),
   (0.107211985, 'interface'),
   (0.106523745, 'human'),
   (0.06484261, 'response'),
   (0.0647507, 'survey'),
   (0.06440762, 'time'),
   (0.023797294, 'trees'),
   (0.020941954, 'graph'),
   (0.019329047, 'minors')],
  -14.640053358132091),
 ([(0.16038772, 'user'),
   (0.15631276, 'time'),
   (0.1553815, 'response'),
   (0.14309631, 'trees'),
   (0.05046085, 'graph'),
   (0.050012518, 'human'),
   (0.049263958, 'system'),
   (0.04765852, 'minors'),
   (0.047450796, 'interface'),
   (0.046915285, 'eps'),
   (0.046590485, 'survey'),
   (0.046469327, 'computer')],
  -14.683057233234472),
 ([(0.26073998, 'graph'),
   (0.19288148, 'trees'),
   (0.1826831, 'minors'),
   (0.10565308, 'survey'),
   (0.034172226, 'user'),
   (0.03386162, 'system'),
   (0.032380056, 'interface'),
   (0.03181678, 'human'),
   (0.031690598, 'eps'),
   (0.031655017, 'time'),
   (0.031543408, 'response'),
   

Clean up after ourselves.

In [14]:
cdsw.stop_workers(*[worker["id"] for worker in name_server + dispatcher + workers])

[<Response [204]>,
 <Response [204]>,
 <Response [204]>,
 <Response [204]>,
 <Response [204]>]