<a href="https://colab.research.google.com/github/Ufifus/bio_inform_tasks/blob/main/W2Vec.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install Bio
!pip install sentence_splitter

In [165]:
from Bio import Entrez, Medline
from sentence_splitter import SentenceSplitter, split_text_into_sentences
import io, re, string, tqdm
import tensorflow as tf
from nltk.corpus import stopwords as sw
import numpy as np
from tensorflow.keras import layers

In [28]:
class Embedding_task:
    def __init__(self, path_to_text, text, stopwords=None):
        self.stopwords = stopwords
        self.path_to_text = path_to_text
        self.text = text

    def clear_str(self, string):
        replace_simbols = ['"', "'", "/", "\\", "[", "]", "1", "2", "3", "4", "5", "6", "7", "8", "9", "0",]
        for replace_simbol in replace_simbols:
            string = string.replace(replace_simbol, ' ')
            if self.stopwords is None:
                pass
            else:
                for stopword in self.stopwords:
                    string = string.replace(stopword, ' ')
            return string

    @classmethod
    def get_stopwords(cls):
        stopwords = sw.words('english')
        return cls(stopwords=stopwords)

    @classmethod
    def create_file(cls, path_to_text):
        text = io.open(path_to_text, 'w', encoding='utf-8')
        return cls(path_to_text=path_to_text, text=text)

    def record_text(self, rules, search_strings):
        Entrez.email = rules
        i = 0
        splitter = SentenceSplitter(language='en')

        for search_str in search_strings:
            handle = Entrez.esearch(db='pubmed', sort='relevance', term=search_str, retmax='10000')
            record = Entrez.read(handle)
            rec_count = record['Count']
            print(search_str, ' - ', rec_count)

            idlist = record['IdList']
            handle = Entrez.efetch(db="pubmed", id=idlist, rettype="medline", retmode="text")
            records = Medline.parse(handle)

            for record in records:
                try:
                    title = self.clear_str(str(record['TI']))
                except:
                    title = ' . '

                try:
                    abstract = self.clear_str(str(record['AB']))
                except:
                    abstract = ' . '

                ap = title + abstract
                lines = splitter.split(ap)
                for line in lines:
                    self.text.write(line + '\n')
                i += 1
                if i % 500 == 0:
                    print(i, ap)

        print(i, ap)
        self.text.close()
        return self

    def get_attributes(self):
      length_max = 0
      with io.open(self.path_to_text, 'r', encoding='utf-8') as txt:
          lines = txt.read().splitlines()
          for line in lines:
              if length_max < len(line.split()):
                  length_max = len(line.split())
      attributes = {
          'num_lines': len(lines),
          'length_max_line': length_max
      }
      return print('num_lines:', len(lines), 'length_max_line:', length_max)

In [152]:
class Vecrorization:
    def __init__(self, path_to_text, vocab_size, max_len_seq,
                 batch_size, train_data=None, vocab=None, vec_layer=None):
      self.vocab = vocab
      self.vocab_size = vocab_size
      self.max_len_seq = max_len_seq
      self.path_to_text = path_to_text
      self.batch_size = batch_size
      self.train_data = train_data

    def prepocessing(self, input_text):
      lowercase = tf.strings.lower(input_text)
      return tf.strings.regex_replace(lowercase, '[%s]' % re.escape(string.punctuation), '')

    def create_vocab(self):
      vec_layer = tf.keras.layers.TextVectorization(
          standardize=self.prepocessing,
          max_tokens=self.vocab_size,
          output_mode='int',
          output_sequence_length=self.max_len_seq
      )
      vec_layer.adapt(self.text_ds().batch(self.batch_size))
      return vec_layer

    def text_ds(self):
      return tf.data.TextLineDataset(self.path_to_text)\
                                      .filter(lambda x: tf.cast(tf.strings.length(x), bool))

    def get_vocab(self):
      self.vocab = self.create_vocab().get_vocabulary()
      return self.vocab

    def vectorize_text(self):
      text_vector_ds = self.text_ds().batch(self.batch_size).prefetch(tf.data.AUTOTUNE)\
                                                        .map(self.create_vocab()).unbatch()
      sequences = list(text_vector_ds.as_numpy_iterator())
      return sequences

    def generate_training_data(self, window_size, num_ns, seed):
      targets, contexts, labels = [], [], []
      sampling_table = tf.keras.preprocessing.sequence.make_sampling_table(self.vocab_size)
      for sequence in tqdm.tqdm(self.vectorize_text()):
            positive_skip_grams, _ = tf.keras.preprocessing.sequence.skipgrams(
                sequence,
                vocabulary_size=self.vocab_size,
                sampling_table=sampling_table,
                window_size=window_size,
                negative_samples=0)

            for target_word, context_word in positive_skip_grams:
                context_class = tf.expand_dims(
                    tf.constant([context_word], dtype="int64"), 1)
                negative_sampling_candidates, _, _ = tf.random.log_uniform_candidate_sampler(
                    true_classes=context_class,
                    num_true=1,
                    num_sampled=num_ns,
                    unique=True,
                    range_max=self.vocab_size,
                    seed=seed,
                    name="negative_sampling")

                negative_sampling_candidates = tf.expand_dims(negative_sampling_candidates, 1)

                context = tf.concat([context_class, negative_sampling_candidates], 0)
                label = tf.constant([1] + [0] * num_ns, dtype="int64")

                targets.append(target_word)
                contexts.append(context)
                labels.append(label)

      self.train_data = {
          'targets': np.array(targets),
          'contexts': np.array(contexts)[:, :, 0],
          'labels': np.array(labels)
        }
      return self

    def create_dataset(self, batch_size, buffer_size):
        dataset = tf.data.Dataset.from_tensor_slices(((self.train_data['targets'],
                                                      self.train_data['contexts']),
                                                        self.train_data['labels']))
        dataset = dataset.shuffle(buffer_size).batch(batch_size, drop_remainder=True)
        print(dataset)
        dataset = dataset.cache().prefetch(buffer_size=tf.data.AUTOTUNE)
        return dataset

In [166]:
class Word2Vec(tf.keras.Model):
  def __init__(self, vocab_size, embedding_dim):
    super(Word2Vec, self).__init__()
    self.target_embedding = layers.Embedding(vocab_size,
                                             embedding_dim,
                                             input_length=1,
                                             name='w2v_embedding')
    self.context_embedding = layers.Embedding(vocab_size,
                                              embedding_dim,
                                              input_length=4)
    
  def call(self, pair):
    target, context = pair
    if len(target.shape) == 2:
      target = tf.squeeze(target, axis=1)
    word_emb = self.target_embedding(target)
    context_emb = self.context_embedding(context)
    dots = tf.einsum('be,bce->bc', word_emb, context_emb)
    return dots

In [None]:
search_strings = ["nafld and 2020/01/01:2021/10/26[dp]"]
path_to_text = 'w2v_text.txt'

text = Embedding_task.create_file(path_to_text)
text.record_text("e.p@d_health.pro", search_strings)
text.get_attributes()

e = Vecrorization('w2v_text.txt', 10000, 50, 1024)
e.generate_training_data(5, 3, 42)
dataset = e.create_dataset(2048, 15000)

In [167]:
embedding_dim = 300
vocab_size = e.vocab_size
word2vec = Word2Vec(vocab_size, embedding_dim)
word2vec.compile(optimizer='adam',
                 loss=tf.keras.losses.CategoricalCrossentropy(from_logits=True),
                 metrics=['accuracy'])

In [168]:
tensorboard_callback = tf.keras.callbacks.TensorBoard(log_dir="logs")
word2vec.fit(dataset, epochs=20, callbacks=[tensorboard_callback])

Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20
Epoch 15/20
Epoch 16/20
Epoch 17/20
Epoch 18/20
Epoch 19/20
Epoch 20/20


<keras.callbacks.History at 0x7fe118029290>

In [1]:
%tensorboard --logdir logs

UsageError: Line magic function `%tensorboard` not found.


In [172]:
weights = word2vec.get_layer('w2v_embedding').get_weights()[0]


out_v = io.open('vectors.tsv', 'w', encoding='utf-8')
out_m = io.open('metadata.tsv', 'w', encoding='utf-8')

for index, word in enumerate(vocab):
  if index == 0:
    continue  # skip 0, it's padding.
  vec = weights[index]
  out_v.write('\t'.join([str(x) for x in vec]) + "\n")
  out_m.write(word + "\n")
out_v.close()
out_m.close()

try:
  from google.colab import files
  files.download('vectors.tsv')
  files.download('metadata.tsv')
except Exception:
  pass

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

In [175]:
print(vocab[-20::])

['nmidb', 'nmethyltransferase', 'nma', 'nitrite', 'ni', 'nglycosylation', 'nglycopeptides', 'neuromuscular', 'neurocognitive', 'nephrolithiasis', 'ndb', 'nchcc', 'nb', 'nagala', 'musclederived', 'multisystemic', 'multidrug', 'mrtfa', 'mrjps', 'mrilsn']
