In [None]:
from pydrive.auth import GoogleAuth
from pydrive.drive import GoogleDrive
from google.colab import auth
from oauth2client.client import GoogleCredentials

import numpy as np
import random
import json
import nltk
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader

from nltk.stem.porter import PorterStemmer
stemmer = PorterStemmer()
import nltk
nltk.download('punkt')

auth.authenticate_user()
gauth = GoogleAuth()
gauth.credentials = GoogleCredentials.get_application_default()
drive = GoogleDrive(gauth)
downloaded = drive.CreateFile({'id': "File ID here"})

[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Package punkt is already up-to-date!


In [None]:
def tokenize(sentence):
  return nltk.word_tokenize(sentence)

def stem(word):
  return stemmer.stem(word.lower())

In [None]:
def bag_of_words(tokenized_setence, words):
  sentence_words = [stem(word) for word in tokenized_setence]
  bag = np.zeros(len(words), dtype=np.float32)
  for idx, w in enumerate(words):
    if w in sentence_words:
      bag[idx] = 1
  return bag

In [None]:
import requests
from pathlib import Path

if Path("LLM.json").is_file():
  print("Already")
else:
  print("Downloading")
  request = requests.get("https://raw.githubusercontent.com/bellayu17/LLM/main/LLM.json")
  with open("LLM.json", "wb") as f:
    f.write(request.content)

Already


In [None]:
with open('LLM.json', 'r') as f:
  intents = json.load(f)

In [None]:
all_words=[]
tags=[]
xy=[]
for intent in intents["intents"]:
  tag = intent['tag']
  tags.append(tag)

  for pattern in intent['patterns']:
    w = tokenize(pattern)
    all_words.extend(w)
    xy.append((w,tag))

In [None]:
# Update the keywords. Let the model only take keywords and make response.
ignore_words = ['?','.',',','-']
all_words = [stem(w) for w in all_words if w not in ignore_words]
all_words = sorted(set(all_words))
tags = sorted(set(tags))

In [None]:
X_train=[]
y_train=[]

for (pattern_sentence, tag) in xy:
  bag = bag_of_words(pattern_sentence, all_words)
  X_train.append(bag)
  label = tags.index(tag)
  y_train.append(label)

X_train = np.array(X_train)
y_train = np.array(y_train)

# Build model

In [None]:
class NeuralNet(nn.Module):
  def __init__(self,
               input_size,
               hidden_size,
               num_classes):
    super(NeuralNet, self).__init__()
    self.l1 = nn.Linear(input_size, hidden_size)
    self.l2 = nn.Linear(hidden_size, hidden_size)
    self.l3 = nn.Linear(hidden_size, num_classes)
    self.relu = nn.ReLU()

  def forward(self, X):
    X = self.l1(X)
    X = self.relu(X)
    X = self.l2(X)
    X = self.relu(X)
    X = self.l3(X)
    return X

In [None]:
class ChatDataset(Dataset):
  def __init__(self):
    self.n_samples = len(X_train)
    self.X_data = X_train
    self.y_data = y_train

  def __getitem__(self, index):
    return self.X_data[index], self.y_data[index]

  def __len__(self):
    return self.n_samples

In [None]:
# Setup hyperparameters for specifying the model
NUM_EPOCHS = 1000
BATCH_SIZE = 8
LEARNING_RATE = 0.1
INPUT_SIZE = len(X_train[0])
HIDDEN_SIZE = 8
OUTPUT_SIZE = len(tags)

In [None]:
dataset = ChatDataset()
train_loader = DataLoader(dataset=dataset,
                          batch_size=BATCH_SIZE,
                          shuffle=True,
                          num_workers=0)

device = "cuda" if torch.cuda.is_available() else "cpu"

model = NeuralNet(input_size=INPUT_SIZE,
                  hidden_size=HIDDEN_SIZE,
                  num_classes=OUTPUT_SIZE).to(device)

In [None]:
# Setup loss function and optimizer
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(params=model.parameters(),
                            lr=LEARNING_RATE)

In [None]:
# Build training and testing loop
for epoch in range(NUM_EPOCHS):
  for (words, labels) in train_loader:
    words = words.to(device)
    labels = labels.to(dtype=torch.long).to(device)

    # Do the Forward Pass
    labels_pred = model(words)

    # Calculate the loss
    loss = loss_fn(labels_pred, labels)

    # Optimizer zero grad
    optimizer.zero_grad()

    # Loss backward
    loss.backward()

    # Optimizer step
    optimizer.step()

  if epoch % 100 == 0:
    print(f"Epoch: {epoch} | Train loss: {loss:.5f}")

Epoch: 0 | Train loss: 1.78206
Epoch: 100 | Train loss: 0.22446
Epoch: 200 | Train loss: 0.04590
Epoch: 300 | Train loss: 0.01136
Epoch: 400 | Train loss: 0.00322
Epoch: 500 | Train loss: 0.00430
Epoch: 600 | Train loss: 0.00296
Epoch: 700 | Train loss: 0.00137
Epoch: 800 | Train loss: 0.00123
Epoch: 900 | Train loss: 0.00037


# Chatbot

In [None]:
bot_name = "BabyBlues"
print("Let's Chat!")
while True:
  sentence = input("You: ")
  if sentence == "quit":
    break

  sentence = tokenize(sentence)
  X = bag_of_words(sentence, all_words)
  X = X.reshape(1, X.shape[0])
  X = torch.from_numpy(X).to(device)

  labels_pred = model(X)
  __, predicted = torch.max(labels_pred, dim=1)

  tag = tags[predicted.item()]

  probs = torch.softmax(labels_pred, dim=1)
  prob = probs[0][predicted.item()]
  if prob.item() > 0.75:
    for intent in intents['intents']:
      if tag == intent['tag']:
        print(f"{bot_name}: {random.choice(intent['responses'])}")
  else:
        print(f"{bot_name}: Could you elaborate more?")

Let's Chat!


KeyboardInterrupt: ignored