In [1]:
# Char Preprocessor
__author__ = "Shihao Lin"
## Acknowledge:
#  https://github.com/mozillazg/python-pinyin
!pip install pypinyin

Collecting pypinyin
  Downloading pypinyin-0.44.0-py2.py3-none-any.whl (1.3 MB)
[K     |████████████████████████████████| 1.3 MB 18.4 MB/s eta 0:00:01
[?25hInstalling collected packages: pypinyin
Successfully installed pypinyin-0.44.0


In [2]:
from pypinyin import pinyin, Style #, load_single_dict, lazy_pinyin
import re,json
# from pypinyin_dict.pinyin_data import kxhc1983
# kxhc1983.load()
# load_single_dict({ord('凉'):'liáng'})

# Example
# zha4o : TONE2
# neutral_tone_with_five: 声调使用数字表示的相关拼音风格下的结果是否使用 5 标识轻声
# heteronym: 是否启用多音字
pinyin('朝阳,',style=Style.TONE2, heteronym=False,neutral_tone_with_five=True)

[['zha1o'], ['ya2ng'], [',']]

In [3]:
# tranform the wrong Chinese Char in dataset to match Char in pinyin library
char_correct = {'凉':'凉','裏':'裹','郎':'郎','ㄚ':'丫','—':'一'}

In [4]:
# Example
pinyin('ㄚ丫',style=Style.TONE2)

[['ㄚ'], ['ya1']]

In [5]:
def collect_char_info(filenames:list,unique_pinyin:set=set(),
                      unique_char:set=set(),unique_sym: set =set(),
                      char_correct:dict = dict(),encoding='utf8'):
    """
    Find out all unique Chinese Char
    Find out all unique Chinese pingyin
    Find out all unique Non-Char symbol
    Find out all Char that cannot be translated by pinyin function
    (ex. \ue846)
    """
    trans2pinyin = lambda sent: {_[0] for _ in pinyin(sent,style=Style.TONE2, heteronym=False,neutral_tone_with_five=True)}
    errors = set()
    for filename in filenames:
        with open(filename,encoding=encoding) as f:
            for row in f.readlines():
                row = row.strip().split()
                tmp = []
                for i in range(len(row)):
                    if re.match(r'[\ue2a5-\ue8f0]+',row[i]): # detect improper word
                        errors.add(row[i])
                        print(''.join(row))
                    elif row[i] in char_correct:
                        tmp.append(char_correct[row[i]])
                        
                    elif re.match('\W',row[i]):
                        unique_sym.add(row[i])
                    else:
                        tmp.append(row[i])
                    
                try:
                    res = trans2pinyin(''.join(tmp))
                    if '' in res:
                        print("'' occurs:",tmp)
                    unique_pinyin.update(res)
                except:
                    print('Parsing fails, Non proper word exists:\n',
                          ''.join(tmp),'\n','-'*10)
                    continue
                unique_char.update(set(tmp))
    return unique_pinyin,unique_char,unique_sym,errors

In [6]:
train_fns = ["couplet/train/in.txt","couplet/train/out.txt"]
u_pinyin_tr, u_char_tr, u_sym_tr, err_tr = collect_char_info(train_fns,char_correct=char_correct)

创业艰辛业怠
败草腐花，挖锨铲连根去
八共会共青会
癯然山泽风姿，试引鹓花树下
特科正待醇儒，天不遗，镜水稽山销气色
叠起一房山，大好园林，最难得茅屋买春，竹消夏
几多神秘醉心头，看它峰谷藏奇，灵崖起将军骨
一老不遗，章云贡日含愁思
双桨碧云东，翠萦渚、锦翻葵经
何须掌指头皮
轻戒疤，或枯禅坐久，两行小字现盘陁
讲堂刊定本，奈校方半，九经中大义，从此付何人
一百年系定赤绳，愿李夭桃，都成眷属，情天不老月长圆
生烟、熟烟、姑烟、兰州水烟、鸦片公烟
一时酒遍骚人


In [7]:
# test filenames
test_fns =["couplet/test/in.txt","couplet/test/out.txt"]
u_pinyin_te, u_char_te, u_sym_te, err_te = collect_char_info(test_fns,char_correct=char_correct)

In [8]:
print('Unique pinyin in test:',u_pinyin_te.difference(u_pinyin_tr))
print('Unique Char in test:',u_char_te.difference(u_char_tr))
print('Unique Symbol in test:',u_sym_te.difference(u_sym_tr))
print('Size of unique pinyin:',len(u_char_tr))
print('Size of unique Char:',len(u_pinyin_tr))
print('Size of Non Char Symbol:', len(u_sym_tr))
print('Size of Unusual Char:', len(err_tr))
display('Num of error Char in train:',len(err_tr))
display(' '.join(err_tr))
display('Num of error Char in test:',len(err_te))
display(' '.join(err_te))

Unique pinyin in test: set()
Unique Char in test: set()
Unique Symbol in test: set()
Size of unique pinyin: 9098
Size of unique Char: 1286
Size of Non Char Symbol: 9
Size of Unusual Char: 15


'Num of error Char in train:'

15

'\ue44d \ue579 \ue846 \ue84f \ue83d \ue2a5 \ue828 \ue829 \ue494 \ue467 \ue82c \ue4a8 \ue847 \ue2b2 \ue4a3'

'Num of error Char in test:'

0

''

In [9]:
max([len(_) for _ in u_pinyin_tr])

7

In [10]:
# Check for improper pinyin translation
for i in u_pinyin_tr:
    if not re.match(u'^[a-z]+[0-9]{1}[a-z]*$',i):
        print(i)

π


In [15]:
# Consider all unique Symbol and π as Pinyin 
# and then intergate with unique pinyin
# to create a large pinyin dictionary
pinyin2idx = {}
# 0 is save for unknow
temp = u_pinyin_tr.union(u_sym_tr)
# temp.update({'[CLS]', '[SEP]','。'})
temp.update({'。'})
for i, _ in enumerate(temp,2):
    pinyin2idx[_] = i
del temp
with open('pinyin_map.json','w',encoding='utf-8')as f:
    json.dump(pinyin2idx,f)
len(pinyin2idx)

1295

In [16]:
# Consider all unique Symbol and π as Char 
# and then intergate with unique Char
# to create a large pinyin dictionary
char2idx = {}
# 0 is save for unknow
temp = u_char_tr.union(u_sym_tr)
temp.update({'。'})
for i, _ in enumerate(temp,2):
    char2idx[_] = i
del temp
with open('char_map.json','w',encoding='utf-8')as f:
    json.dump(char2idx,f)
len(char2idx)

9108

### Compare with vocab.txt

In [17]:
err = []
unique_pinyin3,unique_char3 =set(),set()
trans2pinyin = lambda sent: {_[0] for _ in pinyin(sent,style=Style.TONE2, heteronym=False,neutral_tone_with_five=True)}
with open('couplet/vocabs',encoding='utf-8') as f:
    for row in f.readlines():
        row = row.strip()
        if re.match(r'[\ue2a5-\ue8f0]+',row):
            err.append(row)
            continue
        if row in char_correct:
            row = char_correct[row]
        res = trans2pinyin(row)
        unique_pinyin3.update(res)
        unique_char3.add(row)

In [18]:
unique_char3.difference(u_char_tr).difference(u_sym_tr)

{'</s>', '<s>', '。'}

In [19]:
unique_pinyin3.difference(u_pinyin_tr)

{',', '-', '</s>', '<s>', '…', '、', '。', '！', '，', '：', '；', '？'}

# Check the correctness of the Data Set

In [20]:
# train 上联
with open("couplet/train/in.txt",encoding='utf8') as f:
    tr_in =  [row.strip().split() for row in f.readlines()]
# train 下联  
with open("couplet/train/out.txt",encoding='utf8') as f:
    tr_out = [row.strip().split() for row in f.readlines()]

#test
with open("couplet/test/in.txt",encoding='utf8') as f:
    te_in = [row.strip().split() for row in f.readlines()]
with open("couplet/test/out.txt",encoding='utf8') as f:
    te_out = [row.strip().split() for row in f.readlines()]

In [21]:
# make sure the length match
for i,j in zip(tr_in,tr_out):
    if len(i) != len(j):
        print(i,j)
for i,j in zip(te_in,te_out):
    if len(i) != len(j):
        print(i,j)