<a href="https://colab.research.google.com/github/bryanbayup/petpoint/blob/main/PetpointWithGeneralConver.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
!git clone https://github.com/bryanbayup/petpoint

fatal: destination path 'petpoint' already exists and is not an empty directory.


In [3]:
import json
import pandas as pd
import glob

In [4]:
# List untuk menyimpan pasangan percakapan
conversation_pairs = []

In [5]:
# Fungsi untuk mengekstrak pasangan percakapan dari intents
def extract_conversation_pairs(intents):
    pairs = []
    for intent in intents:
        for utterance in intent['utterances']:
            for answer in intent['answers']:
                pairs.append((utterance, answer))
    return pairs

In [None]:
# Load semua JSON files dari corpus/id/
intents = []
for file in glob.glob('/content/petpoint/corpus/id/*.json'):
    with open(file, 'r', encoding='utf-8') as f:
        data = json.load(f)
        intents.extend(data)

In [7]:
# Load domain-specific data
with open('/content/petpoint/kucing_anjing/kucing_anjing.json', 'r', encoding='utf-8') as f:
    data = json.load(f)
    intents.extend(data)

In [8]:
# Ekstrak pasangan percakapan
conversation_pairs.extend(extract_conversation_pairs(intents))

In [9]:
# Membuat DataFrame
df = pd.DataFrame(conversation_pairs, columns=['input', 'response'])

In [12]:
# Load normalization dictionary
normalization_dict = {}
with open('/content/petpoint/normalization/normalization.txt', 'r', encoding='utf-8') as f:
    for line in f:
        # Check if the line contains the delimiter before splitting
        if '\t' in line:
            slang, normal = line.strip().split('\t')
            normalization_dict[slang] = normal
        else:
            print(f"Warning: Skipping line '{line.strip()}', no tab delimiter found.")



In [13]:
# Function to normalize text
def normalize_text(text):
    words = text.split()
    normalized_words = [normalization_dict.get(word, word) for word in words]
    return ' '.join(normalized_words)

In [14]:
# Load stopwords
with open('/content/petpoint/normalization/stopword.txt', 'r', encoding='utf-8') as f:
    stopwords = set(f.read().splitlines())

In [15]:
# Function to remove stopwords
def remove_stopwords(text):
    words = text.split()
    filtered_words = [word for word in words if word not in stopwords]
    return ' '.join(filtered_words)

In [16]:
import stanza

# Initialize the Indonesian pipeline
nlp = stanza.Pipeline('id')

def tokenize_and_lemmatize(text):
    doc = nlp(text)
    lemmas = [word.lemma for sentence in doc.sentences for word in sentence.words]
    return ' '.join(lemmas)

INFO:stanza:Checking for updates to resources.json in case models have been updated.  Note: this behavior can be turned off with download_method=None or download_method=DownloadMethod.REUSE_RESOURCES


Downloading https://raw.githubusercontent.com/stanfordnlp/stanza-resources/main/resources_1.9.0.json:   0%|   …

INFO:stanza:Downloaded file to /root/stanza_resources/resources.json
INFO:stanza:Loading these models for language: id (Indonesian):
| Processor    | Package      |
-------------------------------
| tokenize     | gsd          |
| mwt          | gsd          |
| pos          | gsd_charlm   |
| lemma        | gsd_nocharlm |
| constituency | icon_charlm  |
| depparse     | gsd_charlm   |

INFO:stanza:Using device: cpu
INFO:stanza:Loading: tokenize
  checkpoint = torch.load(filename, lambda storage, loc: storage)
INFO:stanza:Loading: mwt
  checkpoint = torch.load(filename, lambda storage, loc: storage)
INFO:stanza:Loading: pos
  checkpoint = torch.load(filename, lambda storage, loc: storage)
  data = torch.load(self.filename, lambda storage, loc: storage)
  state = torch.load(filename, lambda storage, loc: storage)
INFO:stanza:Loading: lemma
  checkpoint = torch.load(filename, lambda storage, loc: storage)
INFO:stanza:Loading: constituency
  checkpoint = torch.load(filename, lambda storag

In [17]:
!pip install stanza
import stanza
stanza.download('id')



Downloading https://raw.githubusercontent.com/stanfordnlp/stanza-resources/main/resources_1.9.0.json:   0%|   …

INFO:stanza:Downloaded file to /root/stanza_resources/resources.json
INFO:stanza:Downloading default packages for language: id (Indonesian) ...
INFO:stanza:File exists: /root/stanza_resources/id/default.zip
INFO:stanza:Finished downloading models and saved to /root/stanza_resources


In [18]:
# Function for preprocessing
def preprocess_text(text):
    # Normalisasi
    text = normalize_text(text.lower())
    # Hapus stopwords
    text = remove_stopwords(text)
    # Tokenisasi dan Lematisasi
    text = tokenize_and_lemmatize(text)
    return text

# Apply preprocessing
df['input'] = df['input'].apply(preprocess_text)
df['response'] = df['response'].apply(preprocess_text)

In [19]:
!pip install torch torchvision



In [20]:
import torch

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [21]:
from collections import Counter
import itertools

def build_vocab(sentences, max_vocab_size=5000):
    word_counts = Counter(itertools.chain(*[s.split() for s in sentences]))
    most_common = word_counts.most_common(max_vocab_size)
    idx2word = ['<PAD>', '<SOS>', '<EOS>', '<UNK>'] + [word for word, _ in most_common]
    word2idx = {word: idx for idx, word in enumerate(idx2word)}
    return word2idx, idx2word

# Bangun vocabulary untuk input dan output
input_word2idx, input_idx2word = build_vocab(df['input'])
output_word2idx, output_idx2word = build_vocab(df['response'])

In [22]:
def sentence_to_indices(sentence, word2idx):
    indices = [word2idx.get(word, word2idx['<UNK>']) for word in sentence.split()]
    return indices

df['input_indices'] = df['input'].apply(lambda x: sentence_to_indices(x, input_word2idx))
df['response_indices'] = df['response'].apply(lambda x: [output_word2idx['<SOS>']] + sentence_to_indices(x, output_word2idx) + [output_word2idx['<EOS>']])

In [23]:
from torch.utils.data import Dataset, DataLoader

class ChatDataset(Dataset):
    def __init__(self, inputs, outputs):
        self.inputs = inputs
        self.outputs = outputs

    def __len__(self):
        return len(self.inputs)

    def __getitem__(self, idx):
        return torch.tensor(self.inputs[idx], dtype=torch.long), torch.tensor(self.outputs[idx], dtype=torch.long)

dataset = ChatDataset(df['input_indices'].tolist(), df['response_indices'].tolist())
dataloader = DataLoader(dataset, batch_size=32, shuffle=True, collate_fn=lambda x: x)

In [24]:
import torch.nn as nn

class Encoder(nn.Module):
    def __init__(self, input_size, embed_size, hidden_size, num_layers=1):
        super(Encoder, self).__init__()
        self.embedding = nn.Embedding(input_size, embed_size)
        self.lstm = nn.LSTM(embed_size, hidden_size, num_layers, batch_first=True)

    def forward(self, x):
        # x: [batch_size, seq_length]
        embedding = self.embedding(x)
        outputs, (hidden, cell) = self.lstm(embedding)
        return hidden, cell

class Decoder(nn.Module):
    def __init__(self, output_size, embed_size, hidden_size, num_layers=1):
        super(Decoder, self).__init__()
        self.embedding = nn.Embedding(output_size, embed_size)
        self.lstm = nn.LSTM(embed_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)

    def forward(self, x, hidden, cell):
        # x: [batch_size]
        x = x.unsqueeze(1)  # [batch_size, 1]
        embedding = self.embedding(x)
        outputs, (hidden, cell) = self.lstm(embedding, (hidden, cell))
        predictions = self.fc(outputs.squeeze(1))
        return predictions, hidden, cell

class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder, output_word2idx):
        super(Seq2Seq, self).__init__()
        self.encoder = encoder
        self.decoder = decoder
        self.output_word2idx = output_word2idx

    def forward(self, source, target, teacher_forcing_ratio=0.5):
        batch_size = source.size(0)
        target_len = target.size(1)
        target_vocab_size = len(self.output_word2idx)

        outputs = torch.zeros(batch_size, target_len, target_vocab_size).to(device)
        hidden, cell = self.encoder(source)

        x = target[:, 0]

        for t in range(1, target_len):
            output, hidden, cell = self.decoder(x, hidden, cell)
            outputs[:, t] = output
            best_guess = output.argmax(1)
            x = target[:, t] if torch.rand(1).item() < teacher_forcing_ratio else best_guess
        return outputs

In [32]:
# Hyperparameters

import torch.optim as optim
input_size_encoder = len(input_word2idx)
input_size_decoder = len(output_word2idx)
output_size = len(output_word2idx)
embed_size = 256
hidden_size = 512
num_layers = 1
learning_rate = 0.001
num_epochs = 100

encoder_net = Encoder(input_size_encoder, embed_size, hidden_size, num_layers).to(device)
decoder_net = Decoder(output_size, embed_size, hidden_size, num_layers).to(device)

model = Seq2Seq(encoder_net, decoder_net, output_word2idx).to(device)

optimizer = optim.Adam(model.parameters(), lr=learning_rate)
criterion = nn.CrossEntropyLoss(ignore_index=output_word2idx['<PAD>'])

In [None]:
import random

for epoch in range(num_epochs):
    print(f'Epoch [{epoch+1}/{num_epochs}]')
    for batch_idx, batch in enumerate(dataloader):
        # Ambil batch data
        inputs, targets = zip(*batch)
        inputs = nn.utils.rnn.pad_sequence(inputs, batch_first=True, padding_value=input_word2idx['<PAD>']).to(device)
        targets = nn.utils.rnn.pad_sequence(targets, batch_first=True, padding_value=output_word2idx['<PAD>']).to(device)

        # Forward pass
        outputs = model(inputs, targets)

        # Reshape untuk menghitung loss
        outputs = outputs[:, 1:].reshape(-1, outputs.shape[2])
        targets = targets[:, 1:].reshape(-1)

        optimizer.zero_grad()
        loss = criterion(outputs, targets)
        loss.backward()

        # Gradient clipping
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1)

        optimizer.step()

        if batch_idx % 100 == 0:
            print(f'Batch {batch_idx}/{len(dataloader)}, Loss: {loss.item():.4f}')

Epoch [1/100]
Batch 0/25, Loss: 6.0538
Epoch [2/100]
Batch 0/25, Loss: 4.3893
Epoch [3/100]
Batch 0/25, Loss: 3.7374
Epoch [4/100]
Batch 0/25, Loss: 3.9797
Epoch [5/100]
Batch 0/25, Loss: 3.2109
Epoch [6/100]
Batch 0/25, Loss: 2.9435
Epoch [7/100]
Batch 0/25, Loss: 2.7337
Epoch [8/100]
Batch 0/25, Loss: 2.5754
Epoch [9/100]
Batch 0/25, Loss: 2.1392
Epoch [10/100]
Batch 0/25, Loss: 1.6517
Epoch [11/100]
Batch 0/25, Loss: 2.1491
Epoch [12/100]
Batch 0/25, Loss: 0.8187
Epoch [13/100]
Batch 0/25, Loss: 1.0773
Epoch [14/100]
Batch 0/25, Loss: 1.2805
Epoch [15/100]
Batch 0/25, Loss: 1.1590
Epoch [16/100]
Batch 0/25, Loss: 0.5523
Epoch [17/100]
Batch 0/25, Loss: 0.4831
Epoch [18/100]
Batch 0/25, Loss: 0.7559
Epoch [19/100]
Batch 0/25, Loss: 0.4969
Epoch [20/100]
Batch 0/25, Loss: 0.3064
Epoch [21/100]
Batch 0/25, Loss: 0.3873
Epoch [22/100]
Batch 0/25, Loss: 0.6354
Epoch [23/100]
Batch 0/25, Loss: 0.4686
Epoch [24/100]
Batch 0/25, Loss: 0.3012
Epoch [25/100]
Batch 0/25, Loss: 0.2571
Epoch [26

In [28]:
def translate_sentence(model, sentence, input_word2idx, output_idx2word, max_length=50):
    model.eval()
    with torch.no_grad():
        # Preprocessing
        sentence = preprocess_text(sentence)
        # Konversi ke indeks
        inputs = torch.tensor([sentence_to_indices(sentence, input_word2idx)], dtype=torch.long).to(device)

        hidden, cell = model.encoder(inputs)
        x = torch.tensor([output_word2idx['<SOS>']], dtype=torch.long).to(device)
        outputs = []
        for _ in range(max_length):
            output, hidden, cell = model.decoder(x, hidden, cell)
            best_guess = output.argmax(1)
            if best_guess.item() == output_word2idx['<EOS>']:
                break
            outputs.append(best_guess.item())
            x = best_guess
    translated_sentence = ' '.join([output_idx2word[idx] for idx in outputs])
    return translated_sentence

In [29]:
def adjust_response_by_sentiment(response, sentiment_score):
    if sentiment_score < 0:
        response = "Maaf mendengarnya. " + response
    elif sentiment_score > 0:
        response = "Senang mendengarnya! " + response
    return response

In [30]:
def extract_entities(text):
    doc = nlp(text)
    entities = [(ent.text, ent.type) for ent in doc.ents]
    return entities

In [31]:
def chatbot_response(user_input):
    # Analisis sentimen
    sentiment_score = analyze_sentiment(user_input)

    # Ekstrak entitas
    entities = extract_entities(user_input)

    # Dapatkan respon dari model
    response = translate_sentence(model, user_input, input_word2idx, output_idx2word)

    # Sesuaikan respon berdasarkan sentimen
    response = adjust_response_by_sentiment(response, sentiment_score)

    return response