In [1]:
import os
import sys
import urllib
import tarfile


In [2]:
corpus_url = "http://www.cs.cornell.edu/people/pabo/movie-review-data/review_polarity.tar.gz"

corpus_root = os.path.join(os.getcwd(), "review_polarity", "txt_sentoken")

In [4]:
# download corpus as a zip and then unzip
# downloads and unzips in the same directory
# by default set to current dir
def download_and_unzip():
    file_name = corpus_url.split("/")[-1]
    download_path = os.path.join(os.getcwd(), file_name)
    # where the zip will get extracted
    extracted_path = os.path.join(os.getcwd(), "review_polarity")

    if os.path.exists(extracted_path):
        print("Already downloaded and extracted!")
    else:
        # ============================================ download
        print("Downloading, sit tight!")

        def _progress(count, block_size, total_size):
            sys.stdout.write(
                f"\r>> Downloading {file_name} {float(count * block_size) / float(total_size) * 100.0}%")
            sys.stdout.flush()

        file_path, _ = urllib.request.urlretrieve(
            corpus_url, download_path, _progress)
        print()
        print(
            f"Successfully downloaded {file_name} {os.stat(file_path).st_size} bytes")

        # ======================================= unzip
        print()
        print("Unzipping ...")
        # create dir at extracted_path
        os.mkdir(extracted_path)
        tarfile.open(file_path, "r:gz").extractall(extracted_path)

        # =========================================== clean up
        # delete the downloaded zip file
        print("Deleting downloaded zip file")
        os.remove(file_path)


In [5]:
download_and_unzip()

Already downloaded and extracted!


In [20]:
import nltk
nltk.download()

showing info https://raw.githubusercontent.com/nltk/nltk_data/gh-pages/index.xml


True

In [17]:
from dataclasses import dataclass
from typing import List


@dataclass
class Review:
    tokens: List[str]
    label: int # 1 if pos else 0
    
    def __str__(self) -> str:
        return str(self.__dict__)

In [18]:
from tqdm.auto import tqdm
from nltk.tokenize import word_tokenize as tokenize

# just read all files for a category


def load_data_from_path(path) -> List[Review]:
    file_list = os.listdir(path)
    data: List[Review] = []

    for _, fname in tqdm(enumerate(file_list)):
        fpath = os.path.join(path, fname)

        # read text from the file
        f = open(fpath, mode="r")
        lines = f.read()
        # close
        f.close()
        
        # tokenize
        tokens = tokenize(lines)
        
        # create review object
        review = Review(tokens, 1 if "pos" in path else 0)
        
        # add to the data list
        data.append(review)

    return data


In [21]:
pos = load_data_from_path(
    "./review_polarity/txt_sentoken/pos")
neg = load_data_from_path("./review_polarity/txt_sentoken/neg")

# combine into a single list
all_data = pos + neg

0it [00:00, ?it/s]

0it [00:00, ?it/s]

In [10]:
import gensim.downloader
import numpy as np

glove_vectors = gensim.downloader.load('glove-twitter-200')

shape = glove_vectors["good"].shape
unk = np.zeros(shape=shape)

glove_vectors.add_vector("<UNK>", unk)
glove_vectors.add_vector("<PAD>", np.ones(shape=shape) * -1.0)



1193515

In [41]:
# find the max sequence length
max_seq_len = max([len(d.tokens) for d in all_data])
max_seq_len

2753

In [46]:
@dataclass
class EncodedData:
    tokens: np.ndarray
    label: int
    
    def __str__(self) -> str:
        return str(self.__dict__)


# encode with indexes from the word vectors
# also pad
def encode_pad_text(data: List[Review]=all_data, pad_len=max_seq_len) -> List[EncodedData]:
    encoded: List[EncodedData] = []
   
    # encode
    for _, review in tqdm(enumerate(data)):
        tokens = review.tokens
        indexes = list()
       
        for tok in tokens:
            try:
                idx = glove_vectors.key_to_index[tok]
            except:
                idx = glove_vectors.key_to_index["<UNK>"]

            indexes.append(idx)
    
        # pad
        indexes = np.array(indexes)
        padded = np.ones(shape=(pad_len, ), dtype=np.int32) * \
            glove_vectors.key_to_index["<PAD>"]
        # insert indexes to padded
        padded[:indexes.shape[0]] = indexes
        
        # create an encoded data object
        enc = EncodedData(padded, review.label)
        encoded.append(enc)
        
   
    return encoded


encoded_data = encode_pad_text()


0it [00:00, ?it/s]

In [47]:
encoded_data[0]

EncodedData(tokens=array([   6866,  111052,     133, ..., 1193515, 1193515, 1193515],
      dtype=int32), label=1)

In [50]:
# create train test split
from sklearn.model_selection import train_test_split

train_data, test_data = train_test_split(encoded_data, test_size=0.2, random_state=42)

In [54]:
print(len(train_data))
print(len(test_data))

assert len(train_data) + len(test_data) == len(encoded_data)

1600
400


In [None]:
from typing import Any
import jax
import jax.numpy as jnp
import flax
import flax.linen as nn


class Classifier(nn.Module):
    kernel_sizes = [3, 4, 5]
    n_filters = 100
    embedding_dim = 200
    out_dim = 2
    
    def setup(self):
        self.conv_layer = [
            nn.Conv(features=self.n_filters, kernel_size=ksize) for ksize in self.kernel_sizes
        ]
        
        self.embedding = nn.Embed(features=200, 
                                  embedding=glove_vectors, 
                                  num_embeddings=len(glove_vectors.index_to_key))
        
        self.dense = nn.Dense(features=self.out_dim)
        
    def conv_and_pool(self, x, conv):
        out = nn.relu(conv(x))