In [1]:
import re
import random
import time
from statistics import mode

from PIL import Image
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torchvision
from torchvision import transforms
from transformers import AutoModel, AutoTokenizer, BitsAndBytesConfig

def set_seed(seed):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

def process_text(text):
    if isinstance(text, list):
        text = ' '.join(text)
    text = text.lower()
    num_word_to_digit = {
        'zero': '0', 'one': '1', 'two': '2', 'three': '3', 'four': '4',
        'five': '5', 'six': '6', 'seven': '7', 'eight': '8', 'nine': '9',
        'ten': '10'
    }
    for word, digit in num_word_to_digit.items():
        text = text.replace(word, digit)
    text = re.sub(r'(?<!\d)\.(?!\d)', '', text)
    text = re.sub(r'\b(a|an|the)\b', '', text)
    contractions = {
        "dont": "don't", "isnt": "isn't", "arent": "aren't", "wont": "won't",
        "cant": "can't", "wouldnt": "wouldn't", "couldnt": "couldn't"
    }
    for contraction, correct in contractions.items():
        text = text.replace(contraction, correct)
    text = re.sub(r"[^\w\s':]", ' ', text)
    text = re.sub(r'\s+,', ',', text)
    text = re.sub(r'\s+', ' ', text).strip()
    return text

def clean_text(text):
    if isinstance(text, tuple):
        text = text[0]  # タプルの最初の要素を使用
    text = re.sub(r"[\(\)\"\',]", '', text)
    return text

class VQADataset(torch.utils.data.Dataset):
    def __init__(self, df_path, image_dir, transform=None, answer=True):
        self.transform = transform
        self.image_dir = image_dir
        self.df = pd.read_json(df_path, convert_axes=False)
        self.answer = answer

        self.question2idx = {}
        self.answer2idx = {}
        self.idx2question = {}
        self.idx2answer = {}

        for question in self.df["question"].values:
            question = process_text(question)
            words = question.split(" ")
            for word in words:
                if word not in self.question2idx:
                    self.question2idx[word] = len(self.question2idx)
        self.idx2question = {v: k for k, v in self.question2idx.items()}

        if self.answer:
            for answers in self.df["answers"].values:
                for answer in answers:
                    word = answer["answer"]
                    word = process_text(word)
                    if word not in self.answer2idx:
                        self.answer2idx[word] = len(self.answer2idx)
            self.idx2answer = {v: k for k, v in self.answer2idx.items()}

    def update_dict(self, dataset):
        self.question2idx = dataset.question2idx
        self.answer2idx = dataset.answer2idx
        self.idx2question = dataset.idx2question
        self.idx2answer = dataset.idx2answer

    def __getitem__(self, idx):
        image = Image.open(f"{self.image_dir}/{self.df['image'].iloc[idx]}")
        image = self.transform(image)
        question_text = self.df["question"].iloc[idx]
        question_text = clean_text(str(question_text))
        question = np.zeros(len(self.idx2question) + 1)
        question_words = process_text(question_text).split(" ")
        for word in question_words:
            try:
                question[self.question2idx[word]] = 1
            except KeyError:
                question[-1] = 1
        if self.answer:
            answers = [self.answer2idx[process_text(answer["answer"])] for answer in self.df["answers"].iloc[idx]]
            if len(answers) > 0:
                mode_answer_idx = mode(answers)
                if 0 <= mode_answer_idx < len(self.df["answers"].iloc[idx]):
                    answer_text = process_text(self.df["answers"].iloc[idx][mode_answer_idx]["answer"])
                    answer_text = clean_text(str(answer_text))
                    return image, question_text, answer_text
                else:
                    return image, question_text, "unanswerable"
            else:
                return image, question_text, "unanswerable"
        else:
            return image, question_text

    def __len__(self):
        return len(self.df)

def main():
    set_seed(42)
    device = "cuda" if torch.cuda.is_available() else "cpu"

    transform = transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ToTensor()
    ])
    dataset_path = "./data"
    train_dataset = VQADataset(df_path=f"{dataset_path}/train.json", image_dir=f"{dataset_path}/train", transform=transform, answer=True)

    train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=1, shuffle=False)
    q_config = BitsAndBytesConfig(
        load_in_4bit=True,
        bnb_4bit_quant_type="nf4",
        bnb_4bit_use_double_quant=True,
        bnb_4bit_compute_dtype=torch.bfloat16,
        llm_int8_skip_modules=['out_proj', 'kv_proj', 'lm_head'],
    )

    model = AutoModel.from_pretrained("./models/mini_cpm_raw", trust_remote_code=True, quantization_config=q_config)
    tokenizer = AutoTokenizer.from_pretrained("./models/mini_cpm_raw", trust_remote_code=True)
    model.eval()

    submission = []
    counter = 0
    total = len(train_dataset)
    correct = 0
    for image, question_text, answers in train_loader:
        answer = clean_text(answers)
        if answer == "unanswerable":
            continue
        image = image.squeeze(0).permute(1, 2, 0).cpu().numpy()
        image = Image.fromarray((image * 255).astype(np.uint8))
        question = f"""Please answer the question about the image.
        # Question
        {clean_text(question_text)}
        
        The answer to the above question should be indicated by setting answer_confidence to yes or no, followed by a line break, as in the following example. If you just can't figure it out, please indicate the answer as unanswerable as in Example 4.
        # Example 1
        "answer_confidence":"no","answer":"cherry"
        # Example 2
        "answer_confidence":"yes","answer":"blue"
        # Example 3
        "answer_confidence":"yes","answer":"The broken display"
        # Example 4
        "answer_confidence":"yes","answer":"unanswerable"
"""
        msgs = [{'role': 'user', 'content': question}]
        
        response = model.chat(
            image=image,
            msgs=msgs,
            tokenizer=tokenizer,
            sampling=True,
            temperature=0.5,
        )
        counter += 1
        progress = counter * 100 / total
        res = re.sub(r'(<box>.*</box>)', '', response)
        res = res.replace('<ref>', '')
        res = res.replace('</ref>', '')
        res = res.replace('<box>', '')
        text = res.replace('</box>', '')
        # 正規表現を使用して "answer" の後の内容を抽出
        match = re.search(r'"answer":\s*"([^"]*)"', text)
        if match:
            predict = match.group(1)
        else:
            print(f'No answer found\n Raw Res = {text}')
            
            predict = "unanswerable"
        predict =process_text(predict)
        print(f"Res:{predict}\n A:{answer}")
        submission.append(predict)
        if answer == predict:
            correct +=1
        print(f"# Progress{progress},Accuracy{correct/counter}")

    submission = np.array(submission)
    np.save("./submission.npy", submission)

if __name__ == "__main__":
    main()


  from .autonotebook import tqdm as notebook_tqdm
`low_cpu_mem_usage` was None, now set to True since model is quantized.
Loading checkpoint shards: 100%|██████████| 7/7 [00:02<00:00,  2.54it/s]
Special tokens have been added in the vocabulary, make sure the associated word embeddings are fine-tuned or trained.


No answer found
 Raw Res = The item in the image is a package of beef chuck steak. This determination is based on the visible label that includes the name "beef chuck steak" and the typical packaging style for such meat products. The label also provides nutritional information, which is common for packaged food items. The answer to what the item is, with high confidence, is "beef chuck steak."
Res:unanswerable
 A:beef chuck steak
# Progress0.005031952900920848,Accuracy0.0
Res:unanswerable
 A:candle
# Progress0.010063905801841695,Accuracy0.0
No answer found
 Raw Res = The color of the item in the image is not clearly discernible due to the lack of focus and the grainy texture. The image does not provide enough visual information to accurately determine the color. Therefore, based on the available evidence, the answer to what color the item is would be "unanswerable."
Res:unanswerable
 A:grey
# Progress0.015095858702762543,Accuracy0.0
Res:lotion
 A:no
# Progress0.02012781160368339,Accura

KeyboardInterrupt: 

In [1]:
import re
import random
import time
from statistics import mode

from PIL import Image
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torchvision
from torchvision import transforms
from transformers import AutoModel, AutoTokenizer, BitsAndBytesConfig

def set_seed(seed):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

def process_text(text):
    if isinstance(text, list):
        text = ' '.join(text)
    text = text.lower()
    num_word_to_digit = {
        'zero': '0', 'one': '1', 'two': '2', 'three': '3', 'four': '4',
        'five': '5', 'six': '6', 'seven': '7', 'eight': '8', 'nine': '9',
        'ten': '10'
    }
    for word, digit in num_word_to_digit.items():
        text = text.replace(word, digit)
    text = re.sub(r'(?<!\d)\.(?!\d)', '', text)
    text = re.sub(r'\b(a|an|the)\b', '', text)
    contractions = {
        "dont": "don't", "isnt": "isn't", "arent": "aren't", "wont": "won't",
        "cant": "can't", "wouldnt": "wouldn't", "couldnt": "couldn't"
    }
    for contraction, correct in contractions.items():
        text = text.replace(contraction, correct)
    text = re.sub(r"[^\w\s':]", ' ', text)
    text = re.sub(r'\s+,', ',', text)
    text = re.sub(r'\s+', ' ', text).strip()
    return text

def clean_text(text):
    if isinstance(text, tuple):
        text = text[0]  # タプルの最初の要素を使用
    text = re.sub(r"[\(\)\"\',]", '', text)
    return text

class VQADataset(torch.utils.data.Dataset):
    def __init__(self, df_path, image_dir, transform=None, answer=True):
        self.transform = transform
        self.image_dir = image_dir
        self.df = pd.read_json(df_path, convert_axes=False)
        self.answer = answer

        self.question2idx = {}
        self.answer2idx = {}
        self.idx2question = {}
        self.idx2answer = {}

        for question in self.df["question"].values:
            question = process_text(question)
            words = question.split(" ")
            for word in words:
                if word not in self.question2idx:
                    self.question2idx[word] = len(self.question2idx)
        self.idx2question = {v: k for k, v in self.question2idx.items()}

        if self.answer:
            for answers in self.df["answers"].values:
                for answer in answers:
                    word = answer["answer"]
                    word = process_text(word)
                    if word not in self.answer2idx:
                        self.answer2idx[word] = len(self.answer2idx)
            self.idx2answer = {v: k for k, v in self.answer2idx.items()}

    def update_dict(self, dataset):
        self.question2idx = dataset.question2idx
        self.answer2idx = dataset.answer2idx
        self.idx2question = dataset.idx2question
        self.idx2answer = dataset.idx2answer

    def __getitem__(self, idx):
        image = Image.open(f"{self.image_dir}/{self.df['image'].iloc[idx]}")
        image = self.transform(image)
        question_text = self.df["question"].iloc[idx]
        question_text = clean_text(str(question_text))
        question = np.zeros(len(self.idx2question) + 1)
        question_words = process_text(question_text).split(" ")
        for word in question_words:
            try:
                question[self.question2idx[word]] = 1
            except KeyError:
                question[-1] = 1
        if self.answer:
            answers = [self.answer2idx[process_text(answer["answer"])] for answer in self.df["answers"].iloc[idx]]
            if len(answers) > 0:
                mode_answer_idx = mode(answers)
                if 0 <= mode_answer_idx < len(self.df["answers"].iloc[idx]):
                    answer_text = process_text(self.df["answers"].iloc[idx][mode_answer_idx]["answer"])
                    answer_text = clean_text(str(answer_text))
                    return image, question_text, answer_text
                else:
                    return image, question_text, "unanswerable"
            else:
                return image, question_text, "unanswerable"
        else:
            return image, question_text,"Noanswer"

    def __len__(self):
        return len(self.df)

def main():
    set_seed(42)
    device = "cuda" if torch.cuda.is_available() else "cpu"

    transform = transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ToTensor()
    ])
    dataset_path = "./data"
    train_dataset = VQADataset(df_path=f"{dataset_path}/train.json", image_dir=f"{dataset_path}/train", transform=transform, answer=True)
    train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=1, shuffle=False)
    test_dataset = VQADataset(df_path=f"{dataset_path}/valid.json", image_dir=f"{dataset_path}/valid", transform=transform, answer=False)
    test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=1, shuffle=False)
    q_config = BitsAndBytesConfig(
        load_in_8bit=True,
        bnb_4bit_quant_type="nf4",
        bnb_4bit_use_double_quant=True,
        bnb_4bit_compute_dtype=torch.bfloat16,
        llm_int8_skip_modules=['out_proj', 'kv_proj', 'lm_head'],
    )

    model = AutoModel.from_pretrained("./models/mini_cpm_raw", trust_remote_code=True, quantization_config=q_config)
    tokenizer = AutoTokenizer.from_pretrained("./models/mini_cpm_raw", trust_remote_code=True)
    model.eval()

    submission = []
    counter = 0
    total = len(train_dataset)
    correct = 0
    for image, question_text, answers in test_loader:
        #answer = clean_text(answers)
        # if answer == "unanswerable":
        #     continue
        image = image.squeeze(0).permute(1, 2, 0).cpu().numpy()
        image = Image.fromarray((image * 255).astype(np.uint8))
        question = f"""Please answer the question about the image.
        # Question
        {clean_text(question_text)}
        
        The answer to the above question should be indicated by setting answer_confidence to yes or no, followed by a line break, as in the following example. If you just can't figure it out, please indicate the answer as unanswerable as in Example 4.
   # Example 1
        "answer_confidence":"no","answer":"cherry"
        # Example 2
        "answer_confidence":"yes","answer":"blue"
        # Example 3
        "answer_confidence":"yes","answer":"The broken display"
        # Example 4
        "answer_confidence":"yes","answer":"unanswerable"     
"""
        msgs = [{'role': 'user', 'content': question}]
        
        response = model.chat(
            image=image,
            msgs=msgs,
            tokenizer=tokenizer,
            sampling=True,
            temperature=0.5,
        )
        counter += 1
        progress = counter * 100 / total
        res = re.sub(r'(<box>.*</box>)', '', response)
        res = res.replace('<ref>', '')
        res = res.replace('</ref>', '')
        res = res.replace('<box>', '')
        text = res.replace('</box>', '')
        # 正規表現を使用して "answer" の後の内容を抽出
        match = re.search(r'"answer":\s*"([^"]*)"', text)
        if match:
            predict = match.group(1)
        else:
            print(f'No answer found\n Raw Res = {text}')
            
            predict = "unanswerable"
        predict =process_text(predict)
        print(f"Qustion :{clean_text(question_text)}Res:{predict}\n ")
        # print(f"Res:{predict}\n A:{answer}")
        submission.append(predict)
        # if answer == predict:
        #     correct +=1
        print(f"# Progress{progress},Accuracy{correct/counter}")
        print(f"# Progress{progress}")

    submission = np.array(submission)
    np.save("./submission.npy", submission)

if __name__ == "__main__":
    main()


  from .autonotebook import tqdm as notebook_tqdm
`low_cpu_mem_usage` was None, now set to True since model is quantized.
Loading checkpoint shards: 100%|██████████| 7/7 [00:02<00:00,  2.59it/s]
Special tokens have been added in the vocabulary, make sure the associated word embeddings are fine-tuned or trained.


No answer found
 Raw Res = The question appears to be related to the state of a mirror, possibly in a context of a technical or troubleshooting scenario. However, based on the image provided, it is not possible to determine whether the mirrors were cleared or not. The image does not show any mirrors or their states; instead, it shows a computer screen with an open window. Therefore, the answer to the question is:

"answer_confidence": "unanswerable"

This is because the image does not provide sufficient information to ascertain the condition of any mirrors, especially in relation to clearing them.
Qustion :Was I able to clear either of the mirrors of the OK button?Res:unanswerable
 
# Progress0.005031952900920848,Accuracy0.0
# Progress0.005031952900920848
No answer found
 Raw Res = The image provided does not contain any visible page number. It is a close-up photograph of a document with text, and the angle and focus do not allow for the identification of such details as page numbers. 

KeyboardInterrupt: 

In [5]:
import re
import random
import time
from statistics import mode

from PIL import Image
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torchvision
from torchvision import transforms
from transformers import AutoModel, AutoTokenizer, BitsAndBytesConfig

def set_seed(seed):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

def process_text(text):
    if isinstance(text, list):
        text = ' '.join(text)
    text = text.lower()
    num_word_to_digit = {
        'zero': '0', 'one': '1', 'two': '2', 'three': '3', 'four': '4',
        'five': '5', 'six': '6', 'seven': '7', 'eight': '8', 'nine': '9',
        'ten': '10'
    }
    for word, digit in num_word_to_digit.items():
        text = text.replace(word, digit)
    text = re.sub(r'(?<!\d)\.(?!\d)', '', text)
    text = re.sub(r'\b(a|an|the)\b', '', text)
    contractions = {
        "dont": "don't", "isnt": "isn't", "arent": "aren't", "wont": "won't",
        "cant": "can't", "wouldnt": "wouldn't", "couldnt": "couldn't"
    }
    for contraction, correct in contractions.items():
        text = text.replace(contraction, correct)
    text = re.sub(r"[^\w\s':]", ' ', text)
    text = re.sub(r'\s+,', ',', text)
    text = re.sub(r'\s+', ' ', text).strip()
    return text

def clean_text(text):
    if isinstance(text, tuple):
        text = text[0]  # タプルの最初の要素を使用
    text = re.sub(r"[\(\)\"\',]", '', text)
    return text

class VQADataset(torch.utils.data.Dataset):
    def __init__(self, df_path, image_dir, transform=None, answer=True):
        self.transform = transform
        self.image_dir = image_dir
        self.df = pd.read_json(df_path, convert_axes=False)
        self.answer = answer

        self.question2idx = {}
        self.answer2idx = {}
        self.idx2question = {}
        self.idx2answer = {}

        for question in self.df["question"].values:
            question = process_text(question)
            words = question.split(" ")
            for word in words:
                if word not in self.question2idx:
                    self.question2idx[word] = len(self.question2idx)
        self.idx2question = {v: k for k, v in self.question2idx.items()}

        if self.answer:
            for answers in self.df["answers"].values:
                for answer in answers:
                    word = answer["answer"]
                    word = process_text(word)
                    if word not in self.answer2idx:
                        self.answer2idx[word] = len(self.answer2idx)
            self.idx2answer = {v: k for k, v in self.answer2idx.items()}

    def update_dict(self, dataset):
        self.question2idx = dataset.question2idx
        self.answer2idx = dataset.answer2idx
        self.idx2question = dataset.idx2question
        self.idx2answer = dataset.idx2answer

    def __getitem__(self, idx):
        image = Image.open(f"{self.image_dir}/{self.df['image'].iloc[idx]}")
        image = self.transform(image)
        question_text = self.df["question"].iloc[idx]
        question_text = clean_text(str(question_text))
        question = np.zeros(len(self.idx2question) + 1)
        question_words = process_text(question_text).split(" ")
        for word in question_words:
            try:
                question[self.question2idx[word]] = 1
            except KeyError:
                question[-1] = 1
        if self.answer:
            answers = [self.answer2idx[process_text(answer["answer"])] for answer in self.df["answers"].iloc[idx]]
            if len(answers) > 0:
                mode_answer_idx = mode(answers)
                if 0 <= mode_answer_idx < len(self.df["answers"].iloc[idx]):
                    answer_text = process_text(self.df["answers"].iloc[idx][mode_answer_idx]["answer"])
                    answer_text = clean_text(str(answer_text))
                    return image, question_text, answer_text
                else:
                    return image, question_text, "unanswerable"
            else:
                return image, question_text, "unanswerable"
        else:
            return image, question_text,"Noanswer"

    def __len__(self):
        return len(self.df)

def main():
    set_seed(42)
    device = "cuda" if torch.cuda.is_available() else "cpu"

    transform = transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ToTensor()
    ])
    dataset_path = "./data"
    train_dataset = VQADataset(df_path=f"{dataset_path}/train.json", image_dir=f"{dataset_path}/train", transform=transform, answer=True)
    train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=1, shuffle=False)
    test_dataset = VQADataset(df_path=f"{dataset_path}/valid.json", image_dir=f"{dataset_path}/valid", transform=transform, answer=False)
    test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=1, shuffle=False)
    print(f"Total number of samples in test_loader: {len(test_loader.dataset)}")

    q_config = BitsAndBytesConfig(
        load_in_4bit=True,
        bnb_4bit_quant_type="nf4",
        bnb_4bit_use_double_quant=True,
        bnb_4bit_compute_dtype=torch.bfloat16,
        llm_int8_skip_modules=['out_proj', 'kv_proj', 'lm_head'],
    )

    model = AutoModel.from_pretrained("./models/mini_cpm_raw", trust_remote_code=True, quantization_config=q_config)
    tokenizer = AutoTokenizer.from_pretrained("./models/mini_cpm_raw", trust_remote_code=True)
    model.eval()

    submission = []
    counter = 0
    total = len(train_dataset)
    for image, question_text, answers in test_loader:
        #answer = clean_text(answers)
        # if answer == "unanswerable":
        #     continue
        image = image.squeeze(0).permute(1, 2, 0).cpu().numpy()
        image = Image.fromarray((image * 255).astype(np.uint8))
        question = f"""Please answer the question about the image.
        # Question
        {clean_text(question_text)}
        
        Please provide a concise response to the question in the answer field, following the format and examples below. If you truly don't know the answer, please use "unanswerable" as shown in Example 4.

        ## Format
        "answer":your_answer"
        ### Example 1
        "answer":"cherry"
        ### Example 2
        "answer":"no"
        ### Example 3
        "answer":"The broken display"
        ### Example 4
        "answer":"unanswerable"

"""
        msgs = [{'role': 'user', 'content': question}]
        
        response = model.chat(
            image=image,
            msgs=msgs,
            tokenizer=tokenizer,
            sampling=True,
            temperature=0.5,
        )
        counter += 1
        progress = (counter * 100) / total
        print(counter)
        #print(f"This is the raw response\n{response}")
        res = re.sub(r'(<box>.*</box>)', '', response)
        res = res.replace('<ref>', '')
        res = res.replace('</ref>', '')
        res = res.replace('<box>', '')
        text = res.replace('</box>', '')
        # 正規表現を使用して "answer" の後の内容を抽出
        match = re.search(r'"answer":\s*"([^"]*)"', text)
        if match:
            predict = match.group(1)
        else:
            # print(f'No answer found\n Raw Res = {text}')
            
            predict = "unanswerable"
        predict =process_text(predict)
        # print(f"Qustion :{clean_text(question_text)}\nRes:{predict}")
        # print(f"Res:{predict}\n A:{answer}")
        submission.append(predict)
        # if answer == predict:
        #     correct +=1
        # print(f"# Progress{progress},Accuracy{correct/counter}")
        if counter % 2 == 1:
            print(f"# Progress{progress}")

    submission = np.array(submission)
    np.save("./submission.npy", submission)

if __name__ == "__main__":
    main()

`low_cpu_mem_usage` was None, now set to True since model is quantized.


Total number of samples in test_loader: 4969


Loading checkpoint shards: 100%|██████████| 7/7 [00:03<00:00,  2.19it/s]
Special tokens have been added in the vocabulary, make sure the associated word embeddings are fine-tuned or trained.


1
# Progress0.005031952900920848
2
3
# Progress0.015095858702762543


KeyboardInterrupt: 

In [1]:
import re
import random
import time
from statistics import mode

from PIL import Image
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torchvision
from torchvision import transforms
from transformers import AutoModel, AutoTokenizer, BitsAndBytesConfig

def set_seed(seed):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

def process_text(text):
    text = text.lower()
    num_word_to_digit = {
        'zero': '0', 'one': '1', 'two': '2', 'three': '3', 'four': '4',
        'five': '5', 'six': '6', 'seven': '7', 'eight': '8', 'nine': '9',
        'ten': '10'
    }
    for word, digit in num_word_to_digit.items():
        text = text.replace(word, digit)
    text = re.sub(r'(?<!\d)\.(?!\d)', '', text)
    text = re.sub(r'\b(a|an|the)\b', '', text)
    contractions = {
        "dont": "don't", "isnt": "isn't", "arent": "aren't", "wont": "won't",
        "cant": "can't", "wouldnt": "wouldn't", "couldnt": "couldn't"
    }
    for contraction, correct in contractions.items():
        text = text.replace(contraction, correct)
    text = re.sub(r"[^\w\s':]", ' ', text)
    text = re.sub(r'\s+,', ',', text)
    text = re.sub(r'\s+', ' ', text).strip()
    return text

class VQADataset(torch.utils.data.Dataset):
    def __init__(self, df_path, image_dir, transform=None, answer=True):
        self.transform = transform
        self.image_dir = image_dir
        self.df = pd.read_json(df_path, convert_axes=False)
        self.answer = answer

        self.question2idx = {}
        self.answer2idx = {}
        self.idx2question = {}
        self.idx2answer = {}

        for question in self.df["question"]:
            question = process_text(question)
            words = question.split(" ")
            for word in words:
                if word not in self.question2idx:
                    self.question2idx[word] = len(self.question2idx)
        self.idx2question = {v: k for k, v in self.question2idx.items()}

        if self.answer:
            for answers in self.df["answers"]:
                for answer in answers:
                    word = answer["answer"]
                    word = process_text(word)
                    if word not in self.answer2idx:
                        self.answer2idx[word] = len(self.answer2idx)
            self.idx2answer = {v: k for k, v in self.answer2idx.items()}

    def update_dict(self, dataset):
        self.question2idx = dataset.question2idx
        self.answer2idx = dataset.answer2idx
        self.idx2question = dataset.idx2question
        self.idx2answer = dataset.idx2answer

    def __getitem__(self, idx):
        image = Image.open(f"{self.image_dir}/{self.df['image'][str(idx)]}")
        image = self.transform(image)
        question_text = self.df["question"][str(idx)]
        question = np.zeros(len(self.idx2question) + 1)
        question_words = process_text(question_text).split(" ")
        for word in question_words:
            try:
                question[self.question2idx[word]] = 1
            except KeyError:
                question[-1] = 1

        if self.answer:
            answers = [self.answer2idx[process_text(answer["answer"])] for answer in self.df["answers"][str(idx)]]
            mode_answer_idx = mode(answers)
            return image, question_text, str(process_text(self.df["answers"][mode_answer_idx])), int(mode_answer_idx)
        else:
            return image, question_text

    def __len__(self):
        return len(self.df)

def main():
    set_seed(42)
    device = "cuda" if torch.cuda.is_available() else "cpu"

    transform = transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ToTensor()
    ])
    dataset_path = "./data"
    # test_dataset = VQADataset(df_path=f"{dataset_path}/valid.json", image_dir=f"{dataset_path}/valid", transform=transform, answer=False)
    # train_dataset = VQADataset(df_path=f"{dataset_path}/train.json", image_dir=f"{dataset_path}/train", transform=transform, answer=True)

    test_dataset = VQADataset(df_path=f"{dataset_path}/valid.json", image_dir=f"{dataset_path}/sharped_valid", transform=transform, answer=False)
    train_dataset = VQADataset(df_path=f"{dataset_path}/train.json", image_dir=f"{dataset_path}/train", transform=transform, answer=True)


    train_loader = torch.utils.data.DataLoader(test_dataset, batch_size=1, shuffle=False)
    test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=1, shuffle=False)
    q_config = BitsAndBytesConfig(
        load_in_8bit=True,
        bnb_4bit_quant_type="nf4",
        bnb_4bit_use_double_quant=True,
        bnb_4bit_compute_dtype=torch.bfloat16,
        llm_int8_skip_modules=['out_proj', 'kv_proj', 'lm_head'],
    )

    model = AutoModel.from_pretrained("./models/mini_cpm_raw", trust_remote_code=True, quantization_config=q_config)
    tokenizer = AutoTokenizer.from_pretrained("./models/mini_cpm_raw", trust_remote_code=True)
    model.eval()

    submission = []
    counter = 0
    total = len(test_dataset)
    accuracy = 0
    for image, question_text ,answers in test_loader:
        image = image.squeeze(0).permute(1, 2, 0).cpu().numpy()
        image = Image.fromarray((image * 255).astype(np.uint8))
        question = f"""Please answer the question about the image.
        # Question
        {str(question_text)}
        
        The answer to the above question should be indicated by setting answer_confidence to yes or no, followed by a line break, as in the following example. If the answer is not possible, please indicate the answer as "ununswerable" as in Example 3.
        # Example 1
        "answer_confidence":"no","answer":"cherry"
        # Example 2
        "answer_confidence":"yes","answer":"blue"
        # Example 3
        "answer_confidence":"yes","answer":"unanswerable"
"""
        msgs = [{'role': 'user', 'content': question}]
        
        response = model.chat(
            image=image,
            msgs=msgs,
            tokenizer=tokenizer,
            sampling=True,
            temperature=0.5,
        )
        counter += 1
        progress = counter * 100 / total
        res = re.sub(r'(<box>.*</box>)', '', response)
        res = res.replace('<ref>', '')
        res = res.replace('</ref>', '')
        res = res.replace('<box>', '')
        text = res.replace('</box>', '')
        print(f"Q :{question_text}\r\n A:{text}")
        # 正規表現を使用して "answer" の後の内容を抽出
        match = re.search(r'"answer":\s*"([^"]*)"', text)
        if match:
            answer = match.group(1)
        else:
            print('No answer found')
            answer = "unanswerable"
        print(f'Extracted answer: {answer}')
        submission.append(answer)
        print(f"# Progress{progress}")

    submission = np.array(submission)
    np.save("./submission.npy", submission)

if __name__ == "__main__":
    main()


  from .autonotebook import tqdm as notebook_tqdm
`low_cpu_mem_usage` was None, now set to True since model is quantized.
Loading checkpoint shards: 100%|██████████| 7/7 [00:02<00:00,  2.75it/s]
Special tokens have been added in the vocabulary, make sure the associated word embeddings are fine-tuned or trained.


Q :('Was I able to clear either of the mirrors of the OK button?',)
 A:The question appears to be related to the process of clearing mirrors, which is not depicted in the image provided. Since the image does not show any mirrors or actions being performed on them, it is not possible to determine if the OK button was cleared from either mirror based on this image. Therefore, the appropriate response to the question would be:

"answer_confidence":"no","answer":"unanswerable"

This indicates that the answer is not possible to ascertain from the given image.
Extracted answer: unanswerable
0.02012477359629704
Q :('What page number is this above? Thank you.',)
 A:The question asks for the page number above the text in the image. However, the image provided does not show a page number or any context that would allow us to determine the page number. Therefore, based on the information available, the answer to the question is:

"answer_confidence": "no", "answer": "unanswerable"

This indicates

In [6]:
from notify import send_email
send_email("Notify from Ubuntu in Hlab","The prediction has just finished")

In [None]:
import zipfile
import os
def create_zip(zip_file_name,path):
    with zipfile.ZipFile(zip_file_name,'w',zipfile.ZIP_DEFLATED)as zipf:
        if os.path.isfile(path):
            zipf.write(path,os.path.basename(path))
        else:
            print("file not exist")

create_zip("submission.zip","./submission.npy")
send_email("Notify from Ubuntu in Hlab","The prediction and compress has just accomplished",attachment_path="./submission.zip")

In [3]:
import re
import random
import time
from statistics import mode

from PIL import Image
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torchvision
from torchvision import transforms
from transformers import AutoModel, AutoTokenizer, BitsAndBytesConfig

def set_seed(seed):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

def process_text(text):
    if isinstance(text, list):
        text = ' '.join(text)
    text = text.lower()
    num_word_to_digit = {
        'zero': '0', 'one': '1', 'two': '2', 'three': '3', 'four': '4',
        'five': '5', 'six': '6', 'seven': '7', 'eight': '8', 'nine': '9',
        'ten': '10'
    }
    for word, digit in num_word_to_digit.items():
        text = text.replace(word, digit)
    text = re.sub(r'(?<!\d)\.(?!\d)', '', text)
    text = re.sub(r'\b(a|an|the)\b', '', text)
    contractions = {
        "dont": "don't", "isnt": "isn't", "arent": "aren't", "wont": "won't",
        "cant": "can't", "wouldnt": "wouldn't", "couldnt": "couldn't"
    }
    for contraction, correct in contractions.items():
        text = text.replace(contraction, correct)
    text = re.sub(r"[^\w\s':]", ' ', text)
    text = re.sub(r'\s+,', ',', text)
    text = re.sub(r'\s+', ' ', text).strip()
    return text

def clean_text(text):
    if isinstance(text, tuple):
        text = text[0]  # タプルの最初の要素を使用
    text = re.sub(r"[\(\)\"\',]", '', text)
    return text

class VQADataset(torch.utils.data.Dataset):
    def __init__(self, csv_path, image_dir, transform=None, answer=True):
        self.transform = transform
        self.image_dir = image_dir
        self.df = pd.read_csv(csv_path)
        self.answer = answer

    def __getitem__(self, idx):
        image_path = f"{self.image_dir}/{self.df['image'].iloc[idx]}"
        image = Image.open(image_path).convert('RGB')
        if self.transform:
            image = self.transform(image)
        question_text = self.df["question"].iloc[idx]
        if self.answer:
            answer_text = self.df["answer"].iloc[idx]
            return image, question_text, answer_text
        else:
            return image, question_text, "Noanswer"

    def __len__(self):
        return len(self.df)
    

def main():
    set_seed(42)
    device = "cuda" if torch.cuda.is_available() else "cpu"

    transform = transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ToTensor()
    ])
    train_dataset = VQADataset(csv_path='./data/extracted_train.csv', image_dir='./data/processed_train', transform=transform, answer=True)
    test_dataset = VQADataset(csv_path='./data/processed_valid.csv', image_dir='./data/sharped_valid', transform=transform, answer=False)

    train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=1, shuffle=False)
    test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=1, shuffle=False)
    print(f"Total number of samples in test_loader: {len(test_loader.dataset)}")

    q_config = BitsAndBytesConfig(
        load_in_8bit=True,
        bnb_4bit_quant_type="nf4",
        bnb_4bit_use_double_quant=True,
        bnb_4bit_compute_dtype=torch.bfloat16,
        llm_int8_skip_modules=['out_proj', 'kv_proj', 'lm_head'],
    )

    model = AutoModel.from_pretrained("./models/mini_cpm_raw", trust_remote_code=True, quantization_config=q_config)
    tokenizer = AutoTokenizer.from_pretrained("./models/mini_cpm_raw", trust_remote_code=True)
    model.eval()

    submission = []
    counter = 0
    total = len(test_loader.dataset)
    for image, question_text, answers in test_loader:
        #answer = clean_text(answers)
        # if answer == "unanswerable":
        #     continue
        image = image.squeeze(0).permute(1, 2, 0).cpu().numpy()
        image = Image.fromarray((image * 255).astype(np.uint8))
        question = f"""Please answer the question about the image.
        # Question
        {question_text}
        
        The answer to the above question should be shown as formatted with answer_confidence set to yes or no, followed by an answer, as in the following example. If the question cannot be answered, set "unswerable" as in Example 3.
        # Format
        "answer_confidence":"yes/no","answer":"your_answer"

        ## Example 1
        "answer_confidence":"no","answer":"cherry"
        ## Example 2
        "answer_confidence":"yes","answer":"blue"
        ## Example 3
        "answer_confidence":"yes","answer":"unanswerable"
        ## Example 4
        "answer_confidence":"yes","answer":"Brown"
"""
        msgs = [{'role': 'user', 'content': question}]
        
        response = model.chat(
            image=image,
            msgs=msgs,
            tokenizer=tokenizer,
            sampling=True,
            temperature=0.5,
        )
        counter += 1
        progress = (counter / total)*100
        #print(counter)
        #print(f"This is the raw response\n{response}")
        res = re.sub(r'(<box>.*</box>)', '', response)
        res = res.replace('<ref>', '')
        res = res.replace('</ref>', '')
        res = res.replace('<box>', '')
        text = res.replace('</box>', '')
        # 正規表現を使用して "answer" の後の内容を抽出
        match = re.search(r'"answer":\s*"([^"]*)"', text)
        if match:
            predict = match.group(1)
        else:
            # print(f'No answer found\n Raw Res = {text}')
            
            predict = "unanswerable"
        predict =process_text(predict)
        # print(f"Qustion :{clean_text(question_text)}\nRes:{predict}")
        # print(f"Res:{predict}\n A:{answer}")
        submission.append(predict)
        # if answer == predict:
        #     correct +=1
        # print(f"# Progress{progress},Accuracy{correct/counter}")
        if counter % 20 == 1:
            print(f"# Progress{progress} % has finished")

    submission = np.array(submission)
    np.save("./submission_sharped8bit.npy", submission)

main()
import zipfile
import os
def create_zip(zip_file_name,path):
    with zipfile.ZipFile(zip_file_name,'w',zipfile.ZIP_DEFLATED)as zipf:
        if os.path.isfile(path):
            zipf.write(path,os.path.basename(path))
        else:
            print("file not exist")

create_zip("submission.zip","./submission_sharped8bit.npy")
send_email("Notify from Ubuntu in Hlab","The prediction and compress has just accomplished",attachment_path="./submission.zip")

`low_cpu_mem_usage` was None, now set to True since model is quantized.


Total number of samples in test_loader: 4969


Loading checkpoint shards: 100%|██████████| 7/7 [00:02<00:00,  2.54it/s]
Special tokens have been added in the vocabulary, make sure the associated word embeddings are fine-tuned or trained.


# Progress0.02012477359629704 % has finished
