<a href="https://colab.research.google.com/github/ddsntc1/Chatbot_FutFut/blob/main/making_dataset%26modify_dataset.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Datasets

* /entities: Entity key-value extracted for each image

* /img: Image files of the SROIE dataset

* .txt: BIO tagging information for each bounding box in conll format

* _image.txt: Coordinate information for each bounding box in each image

* _box.txt: Normalised coordinates for each bounding box

In [None]:
!pip install --upgrade pip
!pip install transformers datasets seqeval
!git lfs install
!wget https://aistages-api-public-prod.s3.amazonaws.com/app/Competitions/000230/data/data.tar.gz
!wget https://aistages-api-public-prod.s3.amazonaws.com/app/Competitions/000230/data/code.tar.gz
!tar -xvzf data.tar.gz
!tar -xvzf code.tar.gz
!rm code.tar.gz data.tar.gz
!mv code/* data/

In [None]:
# 데이터 확인용
# 각각 불러왔을때 다르게 표시되는것이 발견되어
# 검토용으로 둠 - 데이터 검토 및 line 불러오는 방법 검토

target = ['train','test','op_test']
for targ in target:
  # 파일 경로
  image_path = f"data/{targ}_image.txt"
  test_path = f"data/{targ}.txt"
  box_path = f"data/{targ}.txt"

  # 파일 열기 및 데이터 처리
  with open(image_path, "r", encoding="utf-8") as img_file,\
        open(test_path, "r", encoding="utf-8") as test_file,\
        open(box_path, "r", encoding="utf-8") as box_file:
      # 빈 줄 무시 및 첫 번째 열 추출
      im = [line.strip().split('\t')[0] for line in img_file if line.strip()]
      tst = [line.strip().split('\t')[0] for line in test_file if line.strip()]
      box = [line.strip().split('\t')[0] for line in box_file if line.strip()]

  # 비교 출력
  for i in range(min(len(im), len(tst))):
      if im[i] != tst[i] or im[i] != box[i] or tst[i] != box[i]:
          print(f"Mismatch at line {i + 1}: '{im[i]}' != '{tst[i]}' != '{box[i]}' at {targ} ")

## train, test dataset 형식 만들기

In [None]:
import json
from glob import glob

try2 = ['train','test']
for targ in try2:

  # 파일 경로
  txt_path = f"data/{targ}.txt"
  box_path = f"data/{targ}_box.txt"
  image_path = f"data/{targ}_image.txt"
  image_folder = f"data/{targ}/img/"  # 이미지 파일이 저장된 폴더 경로

  dataset = []

  # 파일 읽기
  with open(txt_path, "r", encoding="utf-8") as txt_file, \
      open(box_path, "r", encoding="utf-8") as box_file, \
      open(image_path, "r", encoding="utf-8") as image_file:

      # 각 파일 내용 읽기
      txt_lines = txt_file.readlines()
      box_lines = box_file.readlines()
      image_lines = image_file.readlines()

      # 이미지별 데이터를 담기 위한 초기 변수 설정
      words = []
      bboxes = []
      norm_bboxes = []
      labels = []
      image_info = None
      max_x = max_y = None

      for txt_line, box_line, image_line in zip(txt_lines, box_lines, image_lines):
          # 빈 줄을 만나면 데이터를 새로운 이미지로 저장
          if txt_line.strip() == "" or box_line.strip() == "" or image_line.strip() == "":
              if image_info:
                  # 데이터를 하나의 딕셔너리로 묶어 리스트에 추가
                  dataset.append({
                      "words": words,
                      "bboxes": bboxes,
                      "norm_bboxes": norm_bboxes,
                      "labels": labels,
                      "image_path": f"{image_folder}{image_info}.jpg",
                      "max_x": max_x,
                      "max_y": max_y,
                      "file_name" : image_info #--
                  })

              # 초기화
              words = []
              bboxes = []
              norm_bboxes = []
              labels = []
              image_info = None
              max_x = max_y = None
              continue

          # train.txt에서 단어와 BIO 태그 추출
          word, bio_tag = txt_line.strip().split('\t')
          label = bio_tag

          # train_box.txt에서 정규화된 바운딩 박스 좌표 추출
          box_parts = box_line.strip().split()
          normalised_bbox = list(map(float, box_parts[-4:]))

          # train_image.txt에서 비정규화된 바운딩 박스와 이미지 크기 추출
          image_parts = image_line.strip().split()
          img_name = image_parts[-1]
          width, height = int(image_parts[-3]), int(image_parts[-2])
          bbox = list(map(float, image_parts[-7:-3])) # x0,y0,x1,y1

          if image_info is None:
              image_info = img_name
              max_x = width
              max_y = height

          words.append(word)
          norm_bboxes.append(normalised_bbox)
          bboxes.append(bbox)
          labels.append(label)

  # 마지막 이미지 정보 저장
  if image_info:
      dataset.append({
          "words": words,
          "bboxes": bboxes,
          "norm_bboxes": norm_bboxes,  # 마지막에도 norm_bboxes 추가
          "labels": labels,
          "image_path": f"{image_folder}{image_info}.jpg",
          "max_x": max_x,
          "max_y": max_y,
          "file_name" : image_info #--
      })

  # JSON 파일로 저장
  with open(f"{targ}_dataset.json", "w", encoding="utf-8") as json_file:
      json.dump(dataset, json_file, ensure_ascii=False, indent=4)

  print(f"{targ}_dataset.json 생성 완료")

train_dataset.json 생성 완료
test_dataset.json 생성 완료


In [None]:
from datasets import Dataset, DatasetDict, Features, Sequence, Value, Image, ClassLabel
import json

# train과 test JSON 파일 경로
train_json_path = "train_dataset.json"
test_json_path = "test_dataset.json"
label2id = {"S-COMPANY": 0, "S-DATE": 1, "S-ADDRESS": 2, "S-TOTAL": 3, "O": 4}
# JSON 파일 불러오기
with open(train_json_path, "r", encoding="utf-8") as f:
    train_data = json.load(f)

with open(test_json_path, "r", encoding="utf-8") as f:
    test_data = json.load(f)

# Dataset 특성 정의: 이미지 형식으로 Image() 사용
features = Features({
    'image': Image(),  # 이미지를 직접 파일 경로로 불러오기
    'label': Sequence(feature=ClassLabel(num_classes=5, names=["S-COMPANY", "S-DATE","S-ADDRESS", "S-TOTAL", "O"], id=None), length=-1, id=None),
    'words': Sequence(feature=Value(dtype='string', id=None), length=-1, id=None),
    'bbox': Sequence(feature=Sequence(feature=Value(dtype='int64', id=None), length=-1, id=None), length=-1, id=None),
    'filename': Value(dtype='string', id=None), #--
})

# 파일 경로만 유지하며 데이터셋 생성
train_formatted_data = {
    "image": [item["image_path"] for item in train_data],
    "label": [item["labels"] for item in train_data],
    "words": [item["words"] for item in train_data],
    "bbox": [item["norm_bboxes"] for item in train_data],
    "filename" : [item["file_name"] for item in train_data] #--
}

test_formatted_data = {
    "image": [item["image_path"] for item in test_data],
    "label": [item["labels"] for item in test_data],
    "words": [item["words"] for item in test_data],
    "bbox": [item["norm_bboxes"] for item in test_data],
    "filename" : [item["file_name"] for item in test_data] #--
}

# Dataset 생성
train_dataset = Dataset.from_dict(train_formatted_data, features=features)
test_dataset = Dataset.from_dict(test_formatted_data, features=features)

# DatasetDict 생성
dataset = DatasetDict({
    "train": train_dataset,
    "test": test_dataset
})

print(dataset)

DatasetDict({
    train: Dataset({
        features: ['image', 'label', 'words', 'bbox', 'filename'],
        num_rows: 626
    })
    test: Dataset({
        features: ['image', 'label', 'words', 'bbox', 'filename'],
        num_rows: 347
    })
})


## OP Dataset 생성

In [None]:
from datasets import Dataset, DatasetDict, Features, Sequence, Value, Image, ClassLabel
import json
from glob import glob

targ = 'op_test'

# 파일 경로
image_path = f"data/{targ}_image.txt"
image_folder = "data/test/img/"  # 이미지 파일이 저장된 폴더 경로

opdata = []

# 파일 열기 및 데이터 처리
with open(image_path, "r", encoding="utf-8") as img_file:

    words = []
    norm_bboxes = []
    bboxes = []
    image_info = None
    max_x = max_y = None

    # 각 줄을 처리
    for line in img_file:
        line = line.strip()
        if not line:  # 빈 줄은 건너뜁니다
            if image_info:
                # 데이터를 하나의 딕셔너리로 묶어 리스트에 추가
                opdata.append({
                    "words": words,
                    "bboxes": bboxes,
                    "norm_bboxes": norm_bboxes,
                    "image_path": f"{image_folder}{image_info}.jpg",
                    "max_x": width,
                    "max_y": height
                })
                # 초기화
                words = []
                norm_bboxes = []
                image_info = None
                max_x = max_y = None
                continue

        # 각 줄을 공백으로 분리
        parts = line.split('\t')
        word = parts[0]
        bbox = list(map(float, parts[1].split()))  # x0, y0, x1, y1
        width, height = map(int, parts[2].split())  # width, height는 마지막에서 두 번째와 세 번째 값
        x0,y0,x1,y1 = bbox
        normalised_bbox = list(map(float, [1000*(x0/width),1000*(y0/height),1000*(x1/width),1000*(y0/height)]))
        img_name = parts[3]  # 이미지명

        if image_info is None:
            image_info = img_name
            max_x = width
            max_y = height

        words.append(word)
        norm_bboxes.append(normalised_bbox)
        bboxes.append(bbox)

if image_info:
    opdata.append({
        "words": words,
        "bboxes": bboxes,
        "norm_bboxes": norm_bboxes,  # 마지막에도 norm_bboxes 추가
        "image_path": f"{image_folder}{image_info}.jpg",
        "max_x": width,
        "max_y": height
    })

# # JSON 파일로 저장
# with open(f"{targ}_dataset.json", "w", encoding="utf-8") as json_file:
#     json.dump(dataset, json_file, ensure_ascii=False, indent=4)

##################################################################################################
# train과 test JSON 파일 경로
# op_json_path = "op_test_dataset.json"
label2id = {"S-COMPANY": 0, "S-DATE": 1, "S-ADDRESS": 2, "S-TOTAL": 3, "O": 4}

# JSON 파일 불러오기
# with open(op_json_path, "r", encoding="utf-8") as f:
#     op_data = json.load(f)
op_data = opdata
# 파일명 추출 함수 정의
def extract_filename(image_path):
    return image_path.split("/")[-1].split('.')[0]  # 경로에서 파일명만 추출

# Features 정의
features = Features({
    'image': Image(),
    'words': Sequence(feature=Value(dtype='string', id=None), length=-1, id=None),
    'bbox': Sequence(feature=Sequence(feature=Value(dtype='int64', id=None), length=-1, id=None), length=-1, id=None),
    'file_name': Value(dtype='string', id=None),  # 파일명 추가
})

# 파일 경로만 유지하며 데이터셋 생성
op_formatted_data = {
    "image": [item["image_path"] for item in op_data],
    "words": [item["words"] for item in op_data],
    "bbox": [item["norm_bboxes"] for item in op_data], # normalised bbox
    "file_name": [extract_filename(item["image_path"]) for item in op_data],  # 파일명 추가
}

# Dataset 생성
op_dataset = Dataset.from_dict(op_formatted_data, features=features)

# DatasetDict 생성
opdataset = DatasetDict({
    "op": op_dataset,
})

print(opdataset)

DatasetDict({
    op: Dataset({
        features: ['image', 'words', 'bbox', 'file_name'],
        num_rows: 347
    })
})


In [None]:
#!pip install huggingface_hub

!huggingface-cli login


    _|    _|  _|    _|    _|_|_|    _|_|_|  _|_|_|  _|      _|    _|_|_|      _|_|_|_|    _|_|      _|_|_|  _|_|_|_|
    _|    _|  _|    _|  _|        _|          _|    _|_|    _|  _|            _|        _|    _|  _|        _|
    _|_|_|_|  _|    _|  _|  _|_|  _|  _|_|    _|    _|  _|  _|  _|  _|_|      _|_|_|    _|_|_|_|  _|        _|_|_|
    _|    _|  _|    _|  _|    _|  _|    _|    _|    _|    _|_|  _|    _|      _|        _|    _|  _|        _|
    _|    _|    _|_|      _|_|_|    _|_|_|  _|_|_|  _|      _|    _|_|_|      _|        _|    _|    _|_|_|  _|_|_|_|

    To log in, `huggingface_hub` requires a token generated from https://huggingface.co/settings/tokens .
Enter your token (input will not be visible): 
Add token as git credential? (Y/n) n
Token is valid (permission: write).
The token `p1` has been saved to /root/.cache/huggingface/stored_tokens
Your token has been saved to /root/.cache/huggingface/token
Login successful.
The current active token is: `p1`


## Huggingface 데이터셋 저장

- 이후 모델 훈련 혹은 테스트 할경우 편리한 사용을 위해

In [None]:
dataset.push_to_hub("Dongwookss/SROIE",private=True)
opdataset.push_to_hub("Dongwookss/SROIE_op",private=True)

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


Uploading the dataset shards:   0%|          | 0/1 [00:00<?, ?it/s]

Map:   0%|          | 0/347 [00:00<?, ? examples/s]

Creating parquet from Arrow format:   0%|          | 0/4 [00:00<?, ?ba/s]

README.md:   0%|          | 0.00/438 [00:00<?, ?B/s]

CommitInfo(commit_url='https://huggingface.co/datasets/Dongwookss/SROIE_op/commit/fb748d34ec5ccf42bc37d0482f8564cad526a7ba', commit_message='Upload dataset', commit_description='', oid='fb748d34ec5ccf42bc37d0482f8564cad526a7ba', pr_url=None, repo_url=RepoUrl('https://huggingface.co/datasets/Dongwookss/SROIE_op', endpoint='https://huggingface.co', repo_type='dataset', repo_id='Dongwookss/SROIE_op'), pr_revision=None, pr_num=None)

## Dataset_수정작접 for label predict

In [None]:
!huggingface-cli login


    _|    _|  _|    _|    _|_|_|    _|_|_|  _|_|_|  _|      _|    _|_|_|      _|_|_|_|    _|_|      _|_|_|  _|_|_|_|
    _|    _|  _|    _|  _|        _|          _|    _|_|    _|  _|            _|        _|    _|  _|        _|
    _|_|_|_|  _|    _|  _|  _|_|  _|  _|_|    _|    _|  _|  _|  _|  _|_|      _|_|_|    _|_|_|_|  _|        _|_|_|
    _|    _|  _|    _|  _|    _|  _|    _|    _|    _|    _|_|  _|    _|      _|        _|    _|  _|        _|
    _|    _|    _|_|      _|_|_|    _|_|_|  _|_|_|  _|      _|    _|_|_|      _|        _|    _|    _|_|_|  _|_|_|_|

    To log in, `huggingface_hub` requires a token generated from https://huggingface.co/settings/tokens .
Enter your token (input will not be visible): 
Add token as git credential? (Y/n) n
Token is valid (permission: write).
The token `p1` has been saved to /root/.cache/huggingface/stored_tokens
Your token has been saved to /root/.cache/huggingface/token
Login successful.
The current active token is: `p1`


In [None]:
id2label = ["S-COMPANY", "S-DATE", "S-ADDRESS", "S-TOTAL", "O"]

In [None]:
import json
from pathlib import Path
import re
from difflib import SequenceMatcher

def load_entity_files(entity_dir):
   """entities 폴더에서 정답 파일들을 로드"""
   entity_data = {}
   for file_path in Path(entity_dir).glob('*.txt'):
       with open(file_path, 'r', encoding='utf-8') as f:
           try:
               data = json.load(f)
               file_name = file_path.stem
               entity_data[file_name] = data
           except json.JSONDecodeError:
               print(f"Error loading {file_path}")
               continue
   return entity_data

def group_by_lines(words, bboxes, y_threshold=5):
   """세로 위치 기반으로 라인 그룹화"""
   if not words:
       return []

   items = list(zip(words, bboxes, range(len(words))))
   items.sort(key=lambda x: (x[1][1] + x[1][3]) / 2)

   lines = []
   current_line = [items[0]]
   current_y = (items[0][1][1] + items[0][1][3]) / 2

   for item in items[1:]:
       y = (item[1][1] + item[1][3]) / 2
       if abs(y - current_y) <= y_threshold:
           current_line.append(item)
       else:
           current_line.sort(key=lambda x: x[1][0])
           lines.append(current_line)
           current_line = [item]
           current_y = y

   if current_line:
       current_line.sort(key=lambda x: x[1][0])
       lines.append(current_line)

   return lines

def similar_text(text1, text2):
   """두 텍스트의 유사도 계산"""
   return SequenceMatcher(None, text1.lower(), text2.lower()).ratio()

def is_valid_amount(text, target_amount):
   """금액 문자열 매칭 확인"""
   # target_amount에서 숫자만 추출
   target_numbers = re.findall(r'\d+\.?\d*', target_amount)
   if not target_numbers:
       return False
   target_value = float(target_numbers[0])

   # text에서 숫자만 추출
   numbers = re.findall(r'\d+\.?\d*', text)
   return any(float(num) == target_value for num in numbers if num)

def find_entities_in_document(lines, entity):
   """문서에서 엔티티 찾기"""
   company_indices = []
   address_indices = []
   date_idx = -1
   total_idx = -1

   # 회사명과 주소는 보통 문서 상단에 위치
   header_lines = lines[:5]
   company_text = entity.get('company', '').lower()
   address_text = entity.get('address', '').lower()

   # 회사명 찾기 (첫 부분에서)
   if company_text:
       for line in header_lines:
           line_text = ' '.join(word for word, _, _ in line).lower()
           if similar_text(line_text, company_text) > 0.8:
               company_indices.extend(idx for _, _, idx in line)
               break

   # 주소 찾기
   if address_text:
    address_started = False
    address_ended = False
    for line in lines:
        line_text = ' '.join(word for word, _, _ in line).lower()

        # 이미 주소가 끝났으면 더 이상 진행하지 않음
        if address_ended:
            break

        # 주소 시작 확인
        if not address_started:
            if ('no.' in line_text or 'no ' in line_text or 'lot' in line_text or
                any(word in line_text for word in ['jalan', 'jln', 'lorong', 'taman']) or
                re.search(r'^\d+[,\s]', line_text)):
                if similar_text(line_text, address_text) > 0.3:
                    address_started = True

        # 주소 계속 이어짐
        if address_started:
            # 주소가 아닌 내용이 나오면 중단
            if any(word in line_text for word in [
                'tel:', 'fax:', 'gst', 'tax invoice', 'email:', 'www.',
                'guest check', 'cashier', 'server', 'invoice no'
            ]):
                address_ended = True
                break

            # 현재 라인이 주소의 일부인지 확인
            current_part = line_text.strip()
            if similar_text(current_part, address_text) > 0.2:  # 더 낮은 threshold로 주소 부분 확인
                address_indices.extend(idx for _, _, idx in line)

                # 주소 끝점 체크
                if any(state in line_text for state in [
                    'selangor', 'johor', 'kuala lumpur', 'penang', 'pahang',
                    'malaysia', 'darul ehsan'
                ]):
                    address_ended = True
                    break

   # 날짜 찾기
   target_date = entity.get('date', '')
   if target_date:
       date_patterns = [
           rf"\b{target_date}\b",
           target_date.replace('/', '-'),
           target_date.replace('/', '.')
       ]

       for line in lines:
           for word, _, idx in line:
               if any(re.search(pattern, word) for pattern in date_patterns):
                   date_idx = idx
                   break
           if date_idx != -1:
               break

   # 총액 찾기
   target_amount = entity.get('total', '')
   if target_amount:
       found_total = False
       for line in reversed(lines):  # 문서 아래쪽부터 검색
           line_text = ' '.join(word for word, _, _ in line).lower()
           if 'total' in line_text or 'amount' in line_text:
               for word, _, idx in line:
                   if is_valid_amount(word, target_amount):
                       total_idx = idx
                       found_total = True
                       break
           if found_total:
               break

   return company_indices, address_indices, date_idx, total_idx

def update_labels(dataset, entity_data):
   """데이터셋의 라벨 업데이트"""
   def process_example(example):
       file_name = example['filename']
       if file_name not in entity_data:
           return example

       entity = entity_data[file_name]
       words = example['words']
       bboxes = example['bbox']
       new_labels = ['O'] * len(words)

       lines = group_by_lines(words, bboxes)
       company_indices, address_indices, date_idx, total_idx = find_entities_in_document(lines, entity)

       # 라벨 할당
       for idx in company_indices:
           new_labels[idx] = 'S-COMPANY'
       for idx in address_indices:
           new_labels[idx] = 'S-ADDRESS'
       if date_idx != -1:
           new_labels[date_idx] = 'S-DATE'
       if total_idx != -1:
           new_labels[total_idx] = 'S-TOTAL'

       return {**example, 'label': new_labels}

   return dataset.map(process_example)

def update_full_dataset(dataset_dict):
   """전체 데이터셋 업데이트"""
   train_entity_dir = "data/train/entities"
   test_entity_dir = "data/test/entities"

   print("Loading train entities...")
   train_entity_data = load_entity_files(train_entity_dir)
   print(f"Loaded {len(train_entity_data)} train entity files")

   print("Loading test entities...")
   test_entity_data = load_entity_files(test_entity_dir)
   print(f"Loaded {len(test_entity_data)} test entity files")

   print("Updating train dataset...")
   updated_train = update_labels(dataset_dict['train'], train_entity_data)
   print("Updating test dataset...")
   updated_test = update_labels(dataset_dict['test'], test_entity_data)

   return DatasetDict({
       'train': updated_train,
       'test': updated_test
   })

# 실행 및 테스트
updated_dataset = update_full_dataset(dataset)



In [None]:
# 테스트
for i in range(50,55):
   print(f"\n=== Sample {i} ===")
   print("Filename:", updated_dataset['train'][i]['filename'])

   entity_path = f"data/train/entities/{updated_dataset['train'][i]['filename']}.txt"
   with open(entity_path, 'r', encoding='utf-8') as f:
       print("\nEntity data:", json.dumps(json.load(f), indent=2))

   print("\nWords and Labels:")
   for word, label in zip(updated_dataset['train'][i]['words'],
                         updated_dataset['train'][i]['label']):
       print(f"{word}: {id2label[label]}")

In [None]:
updated_dataset.push_to_hub("Dongwookss/SROIE_lb1",private=True)