# Define Input/Output/Global Variables

In [176]:
# global vars ###################################
from opencc import OpenCC

toTrad = OpenCC('s2tw')
toSimp = OpenCC('t2s')
SEPARATOR = "\x1f"

# input vars #####################################

# folders
WORDLISTS = "wordlists"
APKGS = "apkgs"
EXPERIMENTS = f"{APKGS}/experiments"
OUTPUTS = "outputs"

# files
hsk3_wordlist_file = f"{WORDLISTS}/final_formatted_wordlist.txt"
pleco_imported_apkg_file = f"{APKGS}/backfilled_pleco_imports_v1.0.apkg"
original_deck_name = "Pleco Import"
new_deck_name = "HSK 3.0 (Pleco Definitions)"
extracted_anki_sqlite_folder = f"{EXPERIMENTS}/pleco_import_bkfill2_direct_sqlite_apkg"
output_apkg_file = f"{OUTPUTS}/pleco_import_bkfill2_direct_sqlite.apkg"

# Load HSK3 Wordlist

In [177]:
# define word entry class
from typing import List
import dataclasses

@dataclasses.dataclass
class WordEntry:
    sort_numbers: List[int]
    tags: List[str]

In [178]:
# process wordlist into dictionary
hsk3_wordlist = {}
n_words_in_wordlist, n_dup = 0, 0
current_tag = ""
for line in open(hsk3_wordlist_file, encoding="utf-8"):
    line = line.strip()
    if "hsk_level" in line:
        current_tag = line.replace("hsk_l", "HSK_L")
        continue
    else:
        trad_char = toTrad.convert(line)
        if trad_char not in hsk3_wordlist:
            hsk3_wordlist[trad_char] = WordEntry(sort_numbers=[n_words_in_wordlist], tags=[current_tag])
        else:
            n_dup += 1
            hsk3_wordlist[trad_char].sort_numbers.append(n_words_in_wordlist)
            hsk3_wordlist[trad_char].tags.append(current_tag)
        n_words_in_wordlist += 1

hsk3_wordlist_set = set(hsk3_wordlist.keys())

In [179]:
# print stats on wordlist dict
print("\n".join([str(t) for t in list(hsk3_wordlist.items())[:10]]), "\n...\n")

print(f"current wordlist is '{hsk3_wordlist_file}'")
print("number of words inside original wordlist:", n_words_in_wordlist)
print("number of unique words:                  ", len(hsk3_wordlist_set))
print("num_duplicates:                          ", n_dup)

('愛', WordEntry(sort_numbers=[0], tags=['HSK_Level_1']))
('愛好', WordEntry(sort_numbers=[1], tags=['HSK_Level_1']))
('八', WordEntry(sort_numbers=[2], tags=['HSK_Level_1']))
('爸爸', WordEntry(sort_numbers=[3], tags=['HSK_Level_1']))
('吧', WordEntry(sort_numbers=[4], tags=['HSK_Level_1']))
('白', WordEntry(sort_numbers=[5, 1275], tags=['HSK_Level_1', 'HSK_Level_3']))
('白天', WordEntry(sort_numbers=[6], tags=['HSK_Level_1']))
('百', WordEntry(sort_numbers=[7], tags=['HSK_Level_1']))
('班', WordEntry(sort_numbers=[8], tags=['HSK_Level_1']))
('半', WordEntry(sort_numbers=[9], tags=['HSK_Level_1'])) 
...

current wordlist is 'wordlists/final_formatted_wordlist.txt'
number of words inside original wordlist: 11073
number of unique words:                   10921
num_duplicates:                           152


# Connect to SQLite DB

In [180]:
# setup experiments folder (.apkg are compressed sqlite databases, this just unzips it)

from zipfile import ZipFile
import shutil
import os

if os.path.isdir(extracted_anki_sqlite_folder):
    # delete if target experiment folder already exists
    shutil.rmtree(extracted_anki_sqlite_folder)
 

with ZipFile(pleco_imported_apkg_file) as zf:
    # unzip apkg to target folder
    zf.extractall(extracted_anki_sqlite_folder)

In [181]:
# setup sqlite db connection
import sqlite3

con = sqlite3.connect(f"{extracted_anki_sqlite_folder}/collection.anki2")
cur = con.cursor()


In [182]:
# show all tables
cur.execute("SELECT name FROM sqlite_master WHERE type='table';")
print(cur.fetchall())

[('col',), ('notes',), ('cards',), ('revlog',), ('sqlite_stat1',), ('sqlite_stat4',), ('graves',)]


# missing words

In [183]:
# helper functions
def get_trad_word(simp_word):
    return toTrad.convert(toSimp.convert(simp_word))

def get_trad_word_from_row(row):
    _id, flds = row
    flds_arr = flds.split(SEPARATOR)
    return get_trad_word(flds_arr[0])

sqlite_wordset = set()
def refresh_sqlite_wordset():
    global sqlite_wordset
    sqlite_wordset = set([get_trad_word_from_row(row) for row in list(cur.execute('SELECT id, flds FROM notes;'))])

In [184]:
# Add exceptions
variants = [('燻', '薰')] 
exceptions = {}  # maps toTrad.convert(hsk3_simp_word) <-> anki_sqlite front field (traditional)
for w1, w2 in variants:
    exceptions[w1], exceptions[w2] = w2, w1

In [185]:
# extract words present inside anki apkg & compare to the hsk3 wordlist

refresh_sqlite_wordset()
misfits = sqlite_wordset - hsk3_wordlist_set
missing = hsk3_wordlist_set - sqlite_wordset 

# process variants/exceptions
for miss in missing.copy():
    if miss in exceptions:
        missing.remove(miss)
        misfits.remove(exceptions[miss])


print(f"""
words inside anki sqlite cards: {len(sqlite_wordset)}
words inside hsk3 wordlist:     {len(hsk3_wordlist_set)}
misfits (in anki, but not hsk3 wordlist): {len(misfits)}
missing (in hsk 3wordlist, but not anki): {len(missing)}
""")


words inside anki sqlite cards: 10926
words inside hsk3 wordlist:     10921
misfits (in anki, but not hsk3 wordlist): 5
missing (in hsk 3wordlist, but not anki): 0



In [186]:
# Analyze misfits
print(f"""
num misfits (in anki but not in hsk3 wordlist): {len(misfits)}
simplified:  {[toSimp.convert(word) for word in list(misfits)[:10]]}
traditional: {list(misfits)[:10]}
""")


num misfits (in anki but not in hsk3 wordlist): 5
simplified:  ['不难理会', '揹', '见过世面', '指著和尚骂秃子', '怀']
traditional: ['不難理會', '揹', '見過世面', '指著和尚罵禿子', '懷']



In [187]:
# Analyze missing
print(f"""
num missing (in hsk3 wordlist but not in anki): {len(missing)}
simplified:  {[toSimp.convert(word) for word in list(missing)[:10]]}
traditional: {list(missing)[:10]}
""")


num missing (in hsk3 wordlist but not in anki): 0
simplified:  []
traditional: []



In [188]:
# write missing words to a txt file

# backfill_todo_wordlist_file = "backfill_todo_wordlist_file.txt"
# with open(backfill_todo_wordlist_file, "w", encoding="utf-8") as f:
#     f.write("\n".join(sorted(list(missing))))

# Delete Misfit Words

In [189]:
# helper function
def delete_words(cur, words):
    word_to_id = {get_trad_word_from_row(row) : row[0] 
          for row in list(cur.execute('SELECT id, flds FROM notes;'))}
    for word in words:
        del_id = word_to_id[word]
        cur.execute("DELETE FROM notes WHERE id=?", (del_id,))

In [190]:
# delete misfit words and print out stats

print("sqlite wordset length:", len(sqlite_wordset))
print("deleting misfit words...")

delete_words(cur, misfits)
refresh_sqlite_wordset()

print("sqlite wordset length:", len(sqlite_wordset))

sqlite wordset length: 10926
deleting misfit words...
sqlite wordset length: 10921


# Update New Fields for Cards

In [191]:
# helper functions
import re

def extract_pinyin(back):
    """Extract pinyin from card definition html."""
    x = re.match(r"""[\s\S]*?<br>([\s\S]+?>PY <[\s\S]+?<\/span>)<\/p>""", back).group(1)
    x = re.sub("<.*?>", '', x)
    x = re.sub("//", '', x)
    x = re.sub("PY", '', x)
    x = x.strip()
    return x
    

def process_flds(flds):
    """Given anki fields, extract separate fields & combine with hsk3 tagging information."""
    flds_arr = flds.split(SEPARATOR)
    front, back = flds_arr[0], flds_arr[1]
    traditional = get_trad_word(front)
    simplified = toSimp.convert(traditional)

    word_entry = hsk3_wordlist[traditional] if traditional in hsk3_wordlist else hsk3_wordlist[exceptions[traditional]]
    definition = re.match(r"""[\s\S]*?<\/div>(<div align="left">[\s\S]*?<\/div>)""", back).group(1)
    pinyin = extract_pinyin(back)
    learning_order = str(min(word_entry.sort_numbers))
    metadata = str(word_entry)
    new_tags = f" {' '.join(word_entry.tags)} "
    
    new_flds = SEPARATOR.join([front, back, traditional, simplified, definition, pinyin, learning_order, metadata])
    return new_flds, new_tags

def update(cur, _id, new_flds, new_tags):
    """Update fields and tags."""
    cur.execute("UPDATE notes SET flds = ?, tags = ? WHERE id = ?", (new_flds, new_tags, _id))

def process_row(cur, row):
    """For the given row, generates new fields and updates the sqlite db."""
    _id, flds = row
    new_flds, new_tags = process_flds(flds)
    if new_flds.count(SEPARATOR) != flds.count(SEPARATOR):
        raise Exception("Generated fields do not match the number of old fields")
    update(cur, _id, new_flds, new_tags)

def update_all(cur):
    """Runs the process on all rows."""
    for row in list(cur.execute('SELECT id, flds FROM notes;')):
        process_row(cur, row)

In [192]:
# update all rows
update_all(cur)

# Update Deck Name

In [193]:
# Update the saved deck name
import json

# extract collection information
collections = list(cur.execute("SELECT id, decks FROM col;"))
assert len(collections) == 1
col_id, decks_json = collections[0]
decks = json.loads(decks_json)

# retrieve target deck object and rename
for deck_id, deck in decks.items():
    if deck["name"] == original_deck_name:
        deck["name"] = new_deck_name
        break
else:
    raise Exception(f"Deck '{original_deck_name}' not found in anki collection")

# update database
cur.execute("UPDATE col SET decks= ? WHERE id = ?", (json.dumps(decks), col_id))
print(f"Update deck name from '{original_deck_name}' to '{new_deck_name}'")

Update deck name from 'Pleco Import' to 'HSK 3.0 (Pleco Definitions)'


# Commit to DB

In [194]:
con.commit()

In [195]:
con.close()

# Zip to Output Apkg File

In [196]:
shutil.make_archive(output_apkg_file, "zip", extracted_anki_sqlite_folder)
os.rename(output_apkg_file + ".zip", output_apkg_file)