In [1]:
import re
import numpy as np
import tensorflow as tf
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.models import Model, Sequential
from tensorflow.keras.layers import Input, Dense, TimeDistributed, Activation, RepeatVector, Bidirectional, Dropout, LSTM, GRU, Embedding
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.losses import sparse_categorical_crossentropy
from tensorflow.keras.layers import Bidirectional

In [9]:
with open('/content/kod.txt', 'r') as file:
      data = re.split('\n',file.read())

In [13]:
x=[]
y=[]
for i in range(len(data)):
  if(i%2==0):
    x.append(data[i])
  else:
    y.append(data[i])

In [4]:
class Tokenizer():
  def __init__(self):
    # Dwa osobne słowniki dla treści zadania o odpowiadającemu im kodu
    self.dictionaryX={'':0}
    self.dictionaryY={'':0}

  def tokenize(self, sentence:str, isInput:bool=True)->list:
    # Dobór słownika z zależności czy przetwarzany jest kod lub tekst
    tokenized=[]
    if(isInput):
      tokenDict=self.dictionaryX
    else:
      tokenDict=self.dictionaryY
    # Zamiana słowa na odpowiadający mu token ze słownika
    # Jeśli słowo nie istnieje to najpierw dodanie go do słownika a następnie zamiana
    idx=max(tokenDict.values())+1
    for word in sentence.split(" "):
      if(word in tokenDict):
        tokenized.append(tokenDict[word])
      else:
        tokenDict[word]=idx
        idx+=1
        tokenized.append(tokenDict[word])
    return tokenized

  def detokenize(self, logits:np.array, isOutput:bool=True)->list:
    # Odwrócenie słownika w zależności czy przetwarzany jest tekst czy kod
    if(isOutput):
      tokenDict = {v: k for k, v in self.dictionaryY.items()}
    else:
      tokenDict = {v: k for k, v in self.dictionaryX.items()}
    return [tokenDict[token] for token in logits]

In [5]:
def loss_fn(model, x, y, y_len, max_sequence):
    sequence_loss = tf.keras.losses.sparse_categorical_crossentropy(
        y_true=y, y_pred=model(x), from_logits=True
    )
    sequence_loss = tf.reduce_mean(tf.reduce_sum(sequence_loss, axis=1) )
    return sequence_loss

class Pipeline():
  def __init__(self,x:list, y:list):
    self.tokenizer=Tokenizer()
    # Zamiania danych na tokeny a następnie dodanie paddingu w celu
    # ujednolicenia długości tekstu
    self.x=tf.keras.preprocessing.sequence.pad_sequences(
      [self.tokenizer.tokenize(s, True) for s in x],  padding="post"
    )
    self.y=tf.keras.preprocessing.sequence.pad_sequences(
      [self.tokenizer.tokenize(s, False) for s in y], padding="post"
    )
    self.model=self.create_model()
    self.sequnce_length = (self.y != 0).astype(np.float32)
    self.model.compile()
    self.model.summary()

  def create_model(self)->tf.keras.Model:
    input_vocab_size=len(self.tokenizer.dictionaryX) # Ilość unikalnych tokenów w słowniku X
    output_vocab_size=len(self.tokenizer.dictionaryY) # Ilość unikalnych tokenów w słowniku Y
    embedded_vector_size=1000 # długość wektora po embeddingu
    input_length=self.x.shape[1] # maksymalna długość wejścia z paddingiem
    output_length=self.y.shape[1] # maksymalna długość wyjścia z paddingiem
    batch_size=2
    model = Sequential([
    Embedding(input_dim=input_vocab_size, output_dim=embedded_vector_size,
              mask_zero=False, trainable=False, input_length=input_length,
              embeddings_initializer=tf.keras.initializers.random_normal()),
    Bidirectional(LSTM(input_length, return_sequences=False)),
    RepeatVector(output_length),
    LSTM(64, return_sequences=True),
    LSTM(128, return_sequences=True),
    Dropout(0.2),
    TimeDistributed(Dense(512)),
    Dropout(0.2),
    TimeDistributed(Dense(units=output_vocab_size))
    ])
    return model

  def predict(self,sentence:str):
    print(sentence)
    sentence=self.tokenizer.tokenize(sentence)
    sentence=tf.keras.preprocessing.sequence.pad_sequences(
      np.expand_dims(sentence, axis=0), maxlen=self.x.shape[1] , padding="post"
    )
    print(sentence)
    prediction=self.model.predict(sentence)
    prediction=list(np.argmax(prediction[0],axis=1))
    print(prediction)
    return ' '.join(self.tokenizer.detokenize(prediction))

In [19]:
pipeline=Pipeline(x,y)

Model: "sequential_1"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 embedding_1 (Embedding)     (None, 39, 1000)          97000     
                                                                 
 bidirectional_1 (Bidirectio  (None, 78)               324480    
 nal)                                                            
                                                                 
 repeat_vector_1 (RepeatVect  (None, 60, 78)           0         
 or)                                                             
                                                                 
 lstm_4 (LSTM)               (None, 60, 64)            36608     
                                                                 
 lstm_5 (LSTM)               (None, 60, 128)           98816     
                                                                 
 dropout_2 (Dropout)         (None, 60, 128)          

In [20]:
X_mask = (pipeline.x != 0).astype(np.float32)
Y_mask = (pipeline.y != 0).astype(np.float32)
X_len = np.array([len(sentence.split(" ")) for sentence in x], dtype=np.float32)
Y_len = np.array([len(sentence.split(" ")) for sentence in y], dtype=np.float32)
train_ds = tf.data.Dataset.from_tensor_slices((pipeline.x, pipeline.y, Y_len)).shuffle(buffer_size=4).batch(batch_size=10)
optimizer = tf.keras.optimizers.Adam(learning_rate=0.01)

In [21]:
tr_loss_hist = []
for e in range(500):
  avg_tr_loss = 0
  tr_step = 0
  for x_mb, y_mb, x_mb_len in train_ds:
      with tf.GradientTape() as tape:
          tr_loss = loss_fn(pipeline.model, x_mb, y_mb, x_mb_len, max_sequence=pipeline.y.shape[1])
      grads = tape.gradient(tr_loss, pipeline.model.trainable_variables)
      optimizer.apply_gradients(grads_and_vars=zip(grads, pipeline.model.trainable_variables))
      avg_tr_loss += tr_loss
      tr_step += 1
  avg_tr_loss /= tr_step
  tr_loss_hist.append(avg_tr_loss)
  
  if (e + 1) % 100 == 0:
      print('Epoch: {:3}, tr_loss: {:.3f}'.format(e+1, avg_tr_loss))

Epoch: 100, tr_loss: 84.110
Epoch: 200, tr_loss: 46.651
Epoch: 300, tr_loss: 38.187
Epoch: 400, tr_loss: 2.566
Epoch: 500, tr_loss: 43.163


In [23]:
num=0
sentence=pipeline.x[num]
y_pred = pipeline.model.predict(np.expand_dims(sentence,axis=0))
y_pred = np.argmax(y_pred, axis=-1) 
print(x[num])
print(y_pred)
print(' '.join(pipeline.tokenizer.detokenize(list(y_pred[0]))))

wykorzystując funkcję printf napisz program wyświetlający na ekranie napis " Moj pierwszy program "
[[ 1  2  3  4  5  6  3  7  8  9 10  7  4 11 12 13 11 14  0  0  0  0  0  0
   0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0
   0  0  0  0  0  0  0  0  0  0  0  0]]
int main ( ) { printf ( " Moj pierwszy program " ) ; return 0 ; }                                          
