# Masked word completion with BERT

In [5]:
pip install pyspellchecker

Collecting pyspellchecker
  Downloading pyspellchecker-0.6.2-py3-none-any.whl (2.7 MB)
[K     |████████████████████████████████| 2.7 MB 2.0 MB/s eta 0:00:01
[?25hInstalling collected packages: pyspellchecker
Successfully installed pyspellchecker-0.6.2
Note: you may need to restart the kernel to use updated packages.


In [59]:
import re
import torch
import logging
import numpy as np
import matplotlib.pyplot as plt

In [3]:
from transformers import BertTokenizer, BertModel, pipeline

In [6]:
from spellchecker import SpellChecker

In [2]:
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
model = BertModel.from_pretrained("bert-base-uncased")
text = "Replace me by any text you'd like."
encoded_input = tokenizer(text, return_tensors='pt')
output = model(**encoded_input)

In [4]:
unmasker = pipeline('fill-mask', model='bert-base-uncased')

Some weights of the model checkpoint at bert-base-uncased were not used when initializing BertForMaskedLM: ['cls.seq_relationship.weight', 'cls.seq_relationship.bias']
- This IS expected if you are initializing BertForMaskedLM from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertForMaskedLM from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).


## 1. 입력 문장 처리
input으로 문장이 들어오면 공백 단위로 자르고 각각의 단어를[MASK]로 바꿔서 문장을 만든다.<br>
**Test Sentence: The man worked as a carpenter.**


In [None]:
text = "the man worked as a carpenter."

In [279]:
text = "i'm dying for a glass of waeter." # water찾기

In [280]:
text_split = list(map(str,text.split()))
text_list = []
for i in range(len(text_split)):
    temp = list(text_split)
    temp[i] = '[MASK]'
    if i == len(text_split)-1:
        temp[i] = '[MASK].'
    text_list.append(' '.join(map(str,temp)))
    print(' '.join(map(str,temp)))
text_split

[MASK] dying for a glass of waeter.
i'm [MASK] for a glass of waeter.
i'm dying [MASK] a glass of waeter.
i'm dying for [MASK] glass of waeter.
i'm dying for a [MASK] of waeter.
i'm dying for a glass [MASK] waeter.
i'm dying for a glass of [MASK].


["i'm", 'dying', 'for', 'a', 'glass', 'of', 'waeter.']

## 2. 오타 검사를 위해 spell checker 사용
spell checker에서는 구두점이 영향을 주는 것으로 판단.
숫자나 영어를 제외한 문자는 제거(특수문자 등)

In [281]:
spell = SpellChecker()

In [284]:
for i in range(len(text_split)):
    new = re.sub('[-=+,#/\:.?!<>]',"",text_split[i])
    text_split[i] = new
text_split

["i'm", 'dying', 'for', 'a', 'glass', 'of', 'waeter']

In [400]:
misspelled = spell.unknown(text_split) # find those words that may be misspelled

miss_word = [] # 오류가 있는 단어 리스트 
miss_index = [] # 오류가 있는 단어 인덱스
candidate = {} # 가능한 후보 딕셔너리
for word in misspelled:
    miss_word.append(word)
    miss_index.append(text_split.index(word))
    candidate[word] = list(spell.candidates(word))
    
    
    print("가장 가능성이 높은 결과:",spell.correction(word)) # 철자가 틀린 단어에 대해 가장 가능성이 높은 결과를 반환
    print("철자가 단어에 대해 가능한 후보 세트:", spell.candidates(word)) # 철자가 틀린 단어에 대해 가능한 후보 세트를 반환

가장 가능성이 높은 결과: water
철자가 단어에 대해 가능한 후보 세트: {'water', 'waiter', 'wheter', 'walter', 'waster'}


In [380]:
candidate.get('waeter')

['water', 'waiter', 'wheter', 'walter', 'waster']

## 3. 단어간 유사도 구하기
Levenshtein Distance (Edit Distance): 두 문자열 간의 차이를 거리로 계산하는 방법<br>
하나의 문자열 S를 수정하여 다른 문자열 T로 변환시키고자 할 때, 삽입 (insert), 삭제 (delete), 대체 (substitute) 연산이 사용된다. <br>S를 T로 변환시키는데 필요한 최소의 편집 연산 횟수를 편집 거리라고 한다.

### 1) 편집 거리 구하는 함수

In [265]:
# 두 문자열 길이비교
def compareLength(s1, s2):
    if len(s1) >= len(s2):
        longstr, shortstr = s1, s2
    else:
        longstr, shortstr = s2, s1
    return longstr, shortstr

def getDistanceList(s1, s2):
    M = [[0] * (len(s1)+1) for _ in range(len(s1)+1)]# 길이가 긴 문자열을 기준으로 2차원 리스트 초기
    long = len(s1)
    short = len(s2)        
    for i in range(long+1):
        M[i][0] = i
    for j in range(short+1):
        M[0][j] = j
    for i in range(1, long+1):
        for j in range(1, short+1):
            if s1[i-1] == s2[j-1]:
                M[i][j] = M[i-1][j-1]
            else:
                M[i][j] = min(M[i-1][j], M[i-1][j-1], M[i][j-1]) + 1
    return M, M[long][short] 

### 2) 문자열 바꾸는 과정(삽입, 삭제, 변경)을 출력하는 함수

In [266]:
def getTrace(m, s1, s2):
    print("[", s2, "을(를)", s1, "로 바꾸는 과정 ]")
    i, j = len(s1) - 1, len(s2) - 1
    while not(i == 0 and j ==0):
        s = min(m[i-1][j], m[i-1][j-1], m[i][j-1])
        if s == m[i][j]:
            i -= 1
            j -= 1
        elif s == m[i][j-1]:
            print(s1[i-1] + "을(를) 삭제")
            j -= 1
        elif s == m[i-1][j-1]:
            print(s2[j-1] + "을(를) " +s1[i-1]  + "(으)로 변경")
            i -= 1
            j -= 1
        else:
            print(s1[i] + "을(를) 추가")
            i -= 1

## edit distance 구하는 예시

In [289]:
# 비교할 문자열 s1, s2
s1="caert"
s2="dadrt"
s1, s2 = compareLength(s1, s2) #문자열의 길이를 비교해서 s1에 긴문자열, s2에 짧은 문자열을 넣는다.

M, edit_distance = getDistanceList(s1, s2) # list, 편집거리 

print(s2, "와(과)", s1, "의 최소 편집 거리는", edit_distance , "이다.", end="\n\n")
getTrace(M, s1, s2) # 문자열 바꾸는 과정 출력

dadrt 와(과) caert 의 최소 편집 거리는 2 이다.

[ dadrt 을(를) caert 로 바꾸는 과정 ]
d을(를) e(으)로 변경
d을(를) c(으)로 변경


&nbsp;&nbsp; **(1) spell checker의 단어 후보 들을 edit distance가 작은 순서로 정렬 (일치하는 단어가 없을 경우를 대비해)<br>**
만약  spell checker에서 오타가 발견되면 문장을 masked lm에 넣어 spell checker에서 나온 단어 후보들을 찾아 score 값이 어떻게 되는지 확인해 가장 확률이 높은 것을 반환해 오타를 교정하고 문장을 완성시킨다.

"i'm [MASK] for a glass of waeter."

In [None]:
spell checker에서 오타가 발견된 경우

In [406]:
if len(miss_index) > 0:
    for num in miss_index:
        masked_sentence = text_list[num]
        original_token = text_split[num]
        print("index number:",num,"\nmasked sentence:", masked_sentence, "\noriginal token:", original_token)

index number: 6 
masked sentence: i'm dying for a glass of [MASK]. 
original token: waeter


In [407]:
search_candidate = candidate.get(original_token)
candidate.get(original_token)

['water', 'waiter', 'wheter', 'walter', 'waster']

In [408]:
fill = unmasker(masked_sentence, original_token = original_token, top_k=30000)#"[MASK] man worked as a carpenter."

In [409]:
result1 = []

for i in search_candidate:
    find = i
    for j in range(len(fill)):
        if find == fill[j].get('token_str'):
            print(fill[j])
            result1.append(fill[j])

{'sequence': "i'm dying for a glass of water.", 'score': 0.3716032803058624, 'token': 2300, 'token_str': 'water', 'original_token': 'b'}
{'sequence': "i'm dying for a glass of waiter.", 'score': 9.63665002018388e-07, 'token': 15610, 'token_str': 'waiter', 'original_token': 'b'}
{'sequence': "i'm dying for a glass of walter.", 'score': 4.146808763039189e-08, 'token': 4787, 'token_str': 'walter', 'original_token': 'b'}


### spell checker에서 나온 단어 후보들을 찾고 score를 기준으로 내림차순으로 정렬한 결과

In [410]:
result1 = sorted(result1, key = lambda x:-x['score'])
result1

[{'sequence': "i'm dying for a glass of water.",
  'score': 0.3716032803058624,
  'token': 2300,
  'token_str': 'water',
  'original_token': 'b'},
 {'sequence': "i'm dying for a glass of waiter.",
  'score': 9.63665002018388e-07,
  'token': 15610,
  'token_str': 'waiter',
  'original_token': 'b'},
 {'sequence': "i'm dying for a glass of walter.",
  'score': 4.146808763039189e-08,
  'token': 4787,
  'token_str': 'walter',
  'original_token': 'b'}]

In [411]:
print('score가 가장 높은 단어:',result1[0]['token_str'])

score가 가장 높은 단어: water


&nbsp;&nbsp; **(2) masked language model의 결과에서 input word와 edit distance기준 오름차순 정렬 후 score 기준 내림차순 정렬한다.**

In [412]:
for i in range(len(fill)):
    s1 = fill[i].get('token_str')
    s2 = original_token
    s1, s2 = compareLength(s1, s2) #문자열의 길이를 비교해서 s1에 긴문자열, s2에 짧은 문자열을 넣는다.
    
    M, edit_distance = getDistanceList(s1, s2)
    #print(s2, "와(과)", s1, "의 최소 편집 거리는", edit_distance , "이다.", end="\n\n")
    fill[i]['distance'] = edit_distance

In [413]:
result2 = sorted(fill, key = lambda x:(x['distance'],-x['score']))
result2[:5]

[{'sequence': "i'm dying for a glass of water.",
  'score': 0.3716032803058624,
  'token': 2300,
  'token_str': 'water',
  'original_token': 'b',
  'distance': 1},
 {'sequence': "i'm dying for a glass of waiter.",
  'score': 9.63665002018388e-07,
  'token': 15610,
  'token_str': 'waiter',
  'original_token': 'b',
  'distance': 1},
 {'sequence': "i'm dying for a glass of walter.",
  'score': 4.146808763039189e-08,
  'token': 4787,
  'token_str': 'walter',
  'original_token': 'b',
  'distance': 1},
 {'sequence': "i'm dying for a glass of waters.",
  'score': 0.00033020376577042043,
  'token': 5380,
  'token_str': 'waters',
  'original_token': 'b',
  'distance': 2},
 {'sequence': "i'm dying for a glass of carter.",
  'score': 8.151549081958365e-06,
  'token': 5708,
  'token_str': 'carter',
  'original_token': 'b',
  'distance': 2}]

In [414]:
print('score가 가장 높은 단어:',result2[0]['token_str'])

score가 가장 높은 단어: water


## To Do
### 1. 정확도 측정: 음성인식결과로 돌려보기
#### 1) deep speech에 음성 데이터 셋을 넣은 결과와 그 결과를 '이 코드'를 돌려서 차이 비교하기
- 결과를 코드에 어떻게 돌려서 score를 측정할 것인지 고민

#### 2) 유튜브에서 음성 가져오기
- 유튜브 음성 가져와서 텍스트로 변환하는 작업 어떻게 할지
   - 기존 자막 서비스 이용해서 텍스트로 추출
   - 추출한 텍스트를 '이 코드' 돌리고 차이가 있는지 비교

### 2. 리팩토링 & 연결


