In [1]:
!pip install transformers
!pip install sentencepiece
!pip install wordpiece


Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting transformers
  Downloading transformers-4.21.1-py3-none-any.whl (4.7 MB)
[K     |████████████████████████████████| 4.7 MB 7.2 MB/s 
Collecting huggingface-hub<1.0,>=0.1.0
  Downloading huggingface_hub-0.8.1-py3-none-any.whl (101 kB)
[K     |████████████████████████████████| 101 kB 4.0 MB/s 
[?25hCollecting pyyaml>=5.1
  Downloading PyYAML-6.0-cp37-cp37m-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_12_x86_64.manylinux2010_x86_64.whl (596 kB)
[K     |████████████████████████████████| 596 kB 35.9 MB/s 
Collecting tokenizers!=0.11.3,<0.13,>=0.11.1
  Downloading tokenizers-0.12.1-cp37-cp37m-manylinux_2_12_x86_64.manylinux2010_x86_64.whl (6.6 MB)
[K     |████████████████████████████████| 6.6 MB 20.5 MB/s 
Installing collected packages: pyyaml, tokenizers, huggingface-hub, transformers
  Attempting uninstall: pyyaml
    Found existing installation: PyYAML 3.13
    Uninstall

In [2]:
%%writefile roBERTa.py

from typing import List, Tuple, Union
from io import StringIO
from nltk.corpus import stopwords
import nltk
nltk.download('stopwords')
import torch
from torch import Tensor
import torch.nn.functional as F
import numpy as np
import torch
from transformers.tokenization_utils import PreTrainedTokenizer
from transformers import RobertaTokenizer, RobertaForMaskedLM
import heapq
from heapq import heappop, heappush

class MaskedStego:
  


    def __init__(self,name = 'roberta-base'): #cased bert means that uppercase, accent markers are accepted 
        self._tokenizer: PreTrainedTokenizer = RobertaTokenizer.from_pretrained(name)
        self._model = RobertaForMaskedLM.from_pretrained(name)
        self._STOPWORDS: List[str] = stopwords.words('english')
        

    def __call__(self, cover_text, message, mask_interval = 4, score_threshold = 0.01): 
        assert set(message) <= set('01') #make sure that the binary message is a set of 0s and 1s
        message_io = StringIO(message) 
        #pre-processing cover text inserted 
        #encode process goes through 4 steps normalization of cover text, pre-tokenizing, model, decoding. (similar to tokenizer.encode)
        encoded_ids= self._tokenizer([cover_text],return_tensors='pt').input_ids[0] #returns a tensor for each word of the cover text, tensor is numerical representation for each word within the cover text
        
        masked_ids= self._mask(encoded_ids.clone(),mask_interval) #masked ids are the position embeddings for each word
        sorted_score, indices= self._predict(masked_ids)
        

        processed= { 'input_ids': encoded_ids, 'masked_ids': masked_ids, 'sorted_output': (sorted_score, indices) }
        
        input_ids = processed['input_ids'] 
        masked_ids = processed['masked_ids'] #1 for words and 0 for paddings
        sorted_score, indices = processed['sorted_output']
        for i_token, token in enumerate(masked_ids):
            if token != self._tokenizer.mask_token_id:
                continue
            ids = indices[i_token]
            scores = sorted_score[i_token]
            #picks candidates according to their scores that must be of probability more than the set threshold 0.01 for encoding and 0.005 for decoding
            candidates = self._pick_candidates_threshold(ids, scores, score_threshold)
            print(candidates)
            print(self._tokenizer.convert_ids_to_tokens(candidates)) #all suitable words to be placed in the [mask] position
            
            replace_token_id = self._block_encode_single(candidates, message_io).item() #perfect binary tree btakhod el candidates w btkhtar el suitable words for the bit sequence of our secret message
            
            print('replace', replace_token_id, self._tokenizer.convert_ids_to_tokens([replace_token_id]))
            input_ids[i_token] = replace_token_id #hena byghyar el [Mask] tokens into the replace_token_id tensor to be converted to tokens in the decode part coming (tokenizer.decode)
            
        encoded_message: str = message_io.getvalue()[:message_io.tell()]
        
        message_io.close()
        stego_text = self._tokenizer.decode(input_ids, skip_special_tokens=True, clean_up_tokenization_spaces=True)
        #self._roBERTa(cover_text,message);
        
        return { 'stego_text by roBERTa': stego_text, 'encoded_message': encoded_message }

   

    def _mask(self, input_ids: Union[Tensor, List[List[int]]], mask_interval: int) -> Tensor:
        length = len(input_ids)
        tokens: List[str] = self._tokenizer.convert_ids_to_tokens(input_ids)
        
        offset = mask_interval // 2 #the offset specify the number of [mask] tokens according to the specified mask_interval 
        
        mask_count = offset
        for i, token in enumerate(tokens):
            # Skip initial subword
            if i + 1 < length and tokens[i + 1].startswith('##'): continue
            if not self._substitutable_single(token): continue #skipping stopwords, subwords, and words that are not within the (a-z) letters.
            if mask_count % mask_interval == 0: 
                input_ids[i] = self._tokenizer.mask_token_id #replace the value of this index (normal word) with the [mask] token whose input_id equals to 103   
            mask_count += 1  
       
        return input_ids

    
    #forward neural network layer
    def _predict(self, input_ids: Union[Tensor, List[List[int]]]): #this is the MLM (masked language model)
        self._model.eval()
        with torch.no_grad():
            output = self._model(input_ids.unsqueeze(0))['logits'][0] #The predicted token_id is extracted from this logit (embedding vector resulting from last encoder layer in bert) using a softmax transformation.
            #softmaxed score is applying softmax on the scores to focus on the ones with high scores
            softmaxed_score = F.softmax(output, dim=1)  # [word_len, vocab_len]
            return softmaxed_score.sort(dim=1, descending=True)

    
   
    def _pick_candidates_threshold(self, ids: Tensor, scores: Tensor, threshold: float) -> List[int]:
        filtered_ids: List[int] = ids[scores >= threshold]
        def filter_fun(idx: Tensor) -> bool:
            
            return self._substitutable_single(self._tokenizer.convert_ids_to_tokens(idx.item()))
        return list(filter(filter_fun, filtered_ids))

    def _substitutable_single(self, token: str) -> bool:
        if token.startswith('##'): return False
        if token.lower() in self._STOPWORDS: return False
        if not token.isalpha(): return False
        return True

    @staticmethod
    def _block_encode_single(ids: List[int], message: StringIO) -> int:  #Indexing
        assert len(ids) > 0
        if len(ids) == 1:
            return ids[0]
        capacity = len(ids).bit_length() - 1
        #print(capacity)
        bits_str = message.read(capacity)
        print("part of bit sequence",bits_str)
        if len(bits_str) < capacity:
            padding: str = '0' * (capacity - len(bits_str))
            bits_str = bits_str + padding
            message.write(padding)
        index = int(bits_str, 2) #from binary to number
        print("index",index)
        return ids[index]


#Huffman Encoding 
 
def isLeaf(root):
    return root.left is None and root.right is None
 
 
# A Tree node
class Node:
    def __init__(self, ch, freq, left=None, right=None):
        self.ch = ch
        self.freq = freq
        self.left = left
        self.right = right
 
    
    def __lt__(self, other):
        return self.freq < other.freq
 
 
# Traverse the Huffman Tree and store Huffman Codes in a dictionary
def encode(root, s, huffman_code):
 
    if root is None:
        return
 
    # found a leaf node
    if isLeaf(root):
        huffman_code[root.ch] = s if len(s) > 0 else '1'
 
    encode(root.left, s + '0', huffman_code)
    encode(root.right, s + '1', huffman_code)
 
 

 
 
# Builds Huffman Tree and decodes the given input text
def buildHuffmanTree(text):
 
    # base case: empty string
    if len(text) == 0:
        return
 
    # count the frequency of appearance of each character
    # and store it in a dictionary
    freq = {i: text.count(i) for i in set(text)}
 
    # Create a priority queue to store live nodes of the Huffman tree.
    pq = [Node(k, v) for k, v in freq.items()]
    heapq.heapify(pq)
 
    # do till there is more than one node in the queue
    while len(pq) != 1:
 
        # Remove the two nodes of the highest priority
        # (the lowest frequency) from the queue
 
        left = heappop(pq)
        right = heappop(pq)
 
        # create a new internal node with these two nodes as children and
        # with a frequency equal to the sum of the two nodes' frequencies.
        # Add the new node to the priority queue.
 
        total = left.freq + right.freq
        heappush(pq, Node(None, total, left, right))
 
    # `root` stores pointer to the root of Huffman Tree
    root = pq[0]
 
    # traverse the Huffman tree and store the Huffman codes in a dictionary
    huffmanCode = {}
    encode(root, '', huffmanCode)
 
 
    # print the encoded string
    s = ''
    for c in text:
        s += huffmanCode.get(c)
 
    return s
    
 
#Final Testing
if __name__ == '__main__':

  val = input("Enter your cover text: ")
  message= input("Enter secret message ")
  tobits=buildHuffmanTree(message)
  masked_stego = MaskedStego()
  print(masked_stego(val, tobits, 3, 0.01))
  print("Secret message to bits: ",tobits)
 
  
   

Writing roBERTa.py


In [3]:
!python3 roBERTa.py 

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Unzipping corpora/stopwords.zip.
Enter your cover text: The internet has revolutionized the way we shop. Because of the numerous advantages and benefits, more and more people these days prefer purchasing things online than the conventional way of going to stores.What are some reasons many people love online shopping, and why is it so great? Below are the top ten reasons for shopping online.
Enter secret message meet me at downtown at 9
Downloading vocab.json: 100% 878k/878k [00:00<00:00, 2.38MB/s]
Downloading merges.txt: 100% 446k/446k [00:00<00:00, 1.43MB/s]
Downloading config.json: 100% 481/481 [00:00<00:00, 277kB/s]
Downloading pytorch_model.bin: 100% 478M/478M [00:09<00:00, 53.3MB/s]
[tensor(7977)]
['Ġrevolution']
replace 7977 ['Ġrevolution']
[tensor(169)]
['Ġway']
replace 169 ['Ġway']
[tensor(3047), tensor(14812)]
['ĠBecause', 'ĠRegardless']
part of bit sequence 0
index 0
replace 3047 ['ĠBecause']
[tenso