# Setup

## imports

In [38]:
import time
import os
import json
import glob
import re
import requests
import xml.etree.ElementTree as ET
import datetime
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request

## base helper methods

In [39]:
def find_file_with_wildcard(pattern):
    """Find file with wildcard pattern, raise error if no match or multiple matches."""
    # Use glob to find matching files
    matching_files = glob.glob(pattern)
    
    # Check if no file was found
    if len(matching_files) == 0:
        raise FileNotFoundError(f"No files found matching pattern: {pattern}")
    
    # Check if more than one file was found
    if len(matching_files) > 1:
        raise ValueError(f"Multiple files found matching pattern: {pattern}")
    
    # Return the matched file path
    return matching_files[0]

## base vars

In [40]:
# Define scopes for accessing Google Drive
SCOPES = ['https://www.googleapis.com/auth/drive']
MEMORY_FILE = 'processed_files.json'
CREDENTIALS_FILE_WILDCARDED = './.credentials/client_secret*.json'
CREDENTIALS_FILE = find_file_with_wildcard(CREDENTIALS_FILE_WILDCARDED)
TOKEN_FILE = './.credentials/token.json'
SERVER_FOLDER = "server_files"

## drive helper methods

In [41]:
def authenticate_google_drive():
    """Authenticate with Google Drive and return the service object."""
    creds = None
    # Check if token.json exists (token storage for authenticated user)
    if os.path.exists(TOKEN_FILE):
        print("Loading credentials from token.json")
        creds = Credentials.from_authorized_user_file(TOKEN_FILE, SCOPES)
    
    # Authenticate if credentials are not valid or do not exist
    if not creds or not creds.valid:
        if creds and creds.expired and creds.refresh_token:
            print("Refreshing expired credentials")
            creds.refresh(Request())
        else:
            print("Authenticating with Google Drive")
            flow = InstalledAppFlow.from_client_secrets_file(CREDENTIALS_FILE, SCOPES)
            creds = flow.run_local_server(port=0)
        # Save credentials for the next run
        with open(TOKEN_FILE, 'w') as token:
            token.write(creds.to_json())
    
    # Build the Drive API service
    return build('drive', 'v3', credentials=creds)

def get_items_by_name(service, name, is_folder=False, is_root=False):
    """Get a list of items by name, including their modified time, sorted by modified time."""
    try:
        q = f"name = '{name}'"
        if is_folder:
            q += " and mimeType = 'application/vnd.google-apps.folder'"
        else:
            q += " and mimeType != 'application/vnd.google-apps.folder'"
        if is_root:
            q += " and 'root' in parents"

        results = service.files().list(
            q=q,
            fields="nextPageToken, files(id, name, mimeType, modifiedTime, parents)",
            orderBy="modifiedTime desc"
        ).execute()
        return results.get('files', [])
    except HttpError as error:
        print(f"An error occurred: {error}")
        return []

def move_file_to_folder(service, file_id, folder_id):
    """Move a file to a specific folder in Google Drive."""
    try:
        # Retrieve the current parents
        file = service.files().get(fileId=file_id, fields='parents').execute()
        previous_parents = ",".join(file.get('parents', []))

        # Move the file to the new folder
        service.files().update(
            fileId=file_id,
            addParents=folder_id,
            removeParents=previous_parents,
            fields='id, parents'
        ).execute()
        print(f"File {file_id} has been moved to folder {folder_id}.")
    except HttpError as error:
        print(f"An error occurred while moving the file: {error}")

## drive variables

In [42]:
SERVICE = authenticate_google_drive()
FLASCHARD_FILE_ARCHIVE_FOLDER = "_Pleco"
FLASCHARD_FILE_ARCHIVE_FOLDER_ID = get_items_by_name(SERVICE, FLASCHARD_FILE_ARCHIVE_FOLDER, is_folder=True, is_root=True)[0]['id']
FLASHCARD_FILE_NAME = "pleco_flashcards.xml"

Loading credentials from token.json
Refreshing expired credentials


## anki connect variables

In [43]:
# AnkiConnect API endpoint
ANKI_CONNECT_URL = "http://localhost:8765"

## anki connect methods

In [44]:
def get_card_info(deck_name):
    """Retrieve detailed card information from a specified deck."""
    find_payload = {
        "action": "findCards",
        "version": 6,
        "params": {
            "query": f'deck:"{deck_name}"'  # Query for cards only in the specified deck
        }
    }

    response = requests.post(ANKI_CONNECT_URL, json=find_payload)

    if response.status_code == 200:
        find_result = response.json()
        if 'result' in find_result and find_result['result']:
            card_ids = find_result['result']
            info_payload = {
                "action": "cardsInfo",
                "version": 6,
                "params": {
                    "cards": card_ids  # Pass the list of card IDs
                }
            }

            info_response = requests.post(ANKI_CONNECT_URL, json=info_payload)

            if info_response.status_code == 200:
                info_result = info_response.json()
                if 'result' in info_result and info_result['result']:
                    return info_result['result']
                else:
                    raise ConnectionError(f"Error retrieving card information: {info_result.get('error')}")
            else:
                raise ConnectionError("Failed to connect to AnkiConnect for card information.")
        else:
            raise ValueError(f"No cards found or error: {find_result.get('error')}")
    else:
        raise ConnectionError("Failed to connect to AnkiConnect for finding cards.")

def sync_anki():
    sync_payload = {
        "action": "sync",
        "version": 6
    }

    sync_response = requests.post(ANKI_CONNECT_URL, json=sync_payload)

    if sync_response.status_code == 200:
        sync_result = sync_response.json()
        if sync_result.get('error'):
            raise ConnectionError(f"Error syncing Anki: {sync_result['error']}")
        else:
            print("Anki sync successful.")
    else:
        raise ConnectionError("Failed to connect to AnkiConnect for syncing.")

def get_latest_anki_flaschard_words():
    flashcard_set = set()

    # Step 0: Sync Anki with the cloud
    sync_anki()
    
    # Step 1: Find all card information in the "Pleco Import" deck
    card_info = get_card_info("Pleco Import")
    print(f"Total cards found in 'Pleco Import': {len(card_info)}")

    # Step 2: Extract the "Front" field for each card and add to the set
    for card in card_info:
        front_field = re.sub(r'[^\u4e00-\u9fff]', '', card['fields']['Front']['value'])  # Filter out non-Chinese characters
        flashcard_set.add(front_field)
    print("success")

    return flashcard_set


# Main

In [45]:
# main google drive functions
def get_latest_flashcard_xml():
    files = get_items_by_name(SERVICE, FLASHCARD_FILE_NAME, is_folder=False, is_root=True)
    if len(files) == 0:
        print("No files found.")
        return None
        
    # Download the content of the target file
    target_file = files[0]
    request = SERVICE.files().get_media(fileId=target_file['id'])
    file_content = request.execute()
    file_text = file_content.decode('utf-8')
    modified_time = datetime.datetime.fromisoformat(target_file['modifiedTime'].replace('Z', '+00:00'))
    local_time = modified_time.astimezone()
    print("Latest flashcard xml last modified time:", local_time.strftime('%Y-%m-%d %H:%M:%S %Z%z'))
    return file_text

def archive_flashcard_xmls(archive_latest=False):
    files = get_items_by_name(SERVICE, FLASHCARD_FILE_NAME, is_folder=False, is_root=True)
    if len(files) == 0:
        print("No files found.")
        return

    # move irrelevant files to archive folder
    for f in files[0 if archive_latest else 1:]:
        move_file_to_folder(SERVICE, f['id'], FLASCHARD_FILE_ARCHIVE_FOLDER_ID)

In [46]:
# main flashcard processing functions
def convert_pinyin(pinyin):
    tone_marks = {
        'a': 'āáǎàa',
        'e': 'ēéěèe',
        'i': 'īíǐìi',
        'o': 'ōóǒòo',
        'u': 'ūúǔùu',
        'ü': 'ǖǘǚǜü'
    }
    result = []
    cleaned_pinyin= re.sub(r'\s', '', pinyin)

    for syllable in re.split(r'(?<=\d)(?=\D)', cleaned_pinyin):
        if syllable[-1].isdigit():
            tone = int(syllable[-1])
            syllable = syllable[:-1]
            for vowel in 'aeiouü':
                if vowel in syllable:
                    syllable = syllable.replace(vowel, tone_marks[vowel][tone - 1])
                    break
        result.append(syllable)
    return ' '.join(result)

def process_flashcard_xml(xml_text):
    """returns a list of dictionaries with simplified, traditional, pinyin, definition, and dictid"""
    # example xml text: <?xml version="1.0" encoding="UTF-8"?><plecoflash formatversion="2" creator="Pleco User 19097293" generator="Pleco 2.0 Flashcard Exporter" platform="iPhone OS" created="1735693200"><categories></categories><cards><card language="chinese" created="1735513394" modified="1735513394"><entry><headword charset="sc">游戏</headword><headword charset="tc">遊戲</headword><pron type="hypy" tones="numbers">you2xi4</pron><defn>noun recreation; game 做遊戲 Zuò yóuxì play games verb play 孩子們在公園裡遊戲。 Háizi men zài gōngyuán lǐ yóuxì. The children are playing in the park.</defn></entry><dictref dictid="PACE" entryid="35050240"/></card><card language="chinese" created="1735514285" modified="1735514285"><entry><headword charset="sc">革新</headword><headword charset="tc">革新</headword><pron type="hypy" tones="numbers">ge2xin1</pron><defn>noun innovation; renovation 技術革新 jìshù géxīn technological innovation verb innovate; improve 傳統的手工藝技術不斷革新。 Chuántǒng de shǒu gōngyì jìshù bùduàn géxīn. Traditional handicraft techniques are being steadily improved.</defn></entry><dictref dictid="PACE" entryid="21578752"/></card>

    # Parse the XML content
    root = ET.fromstring(xml_text)

    # Find all <card> tags
    cards = root.findall('.//card')

    # Extract and print each entry
    entries_data = []
    problematic_cards = []
    for card in cards:
        try:
            entries = card.findall('entry')
            if len(entries) != 1:
                print(ET.tostring(card, encoding='unicode'))
                raise ValueError("Card does not contain exactly one entry.")
            
            entry = entries[0]
            simplified = entry.find('headword[@charset="sc"]').text
            traditional = entry.find('headword[@charset="tc"]').text
            pinyin = convert_pinyin(entry.find('pron').text)
            if entry.find('defn') is None:
                definition = ""
            else:
                definition = entry.find('defn').text
            dictid = card.find('dictref').attrib['dictid']
            
            entry_data = {
                'simplified': simplified,
                'traditional': traditional,
                'pinyin': pinyin,
                'definition': definition,
                'dictid': dictid,
            }
            if definition:
                entries_data.append(entry_data)
            else:
                problematic_cards.append(entry_data)
        except Exception as e:
            print(ET.tostring(card, encoding='unicode'))
            raise e

    return entries_data, problematic_cards


## extra functions

In [47]:
def list_files_by_name(service, file_name):
    """List files with a specific name from Google Drive."""
    try:
        results = service.files().list(
            q=f"name = '{file_name}'",
            fields="nextPageToken, files(id, name, mimeType, modifiedTime, parents)",
            orderBy="modifiedTime desc"
        ).execute()
        return results.get('files', [])
    except HttpError as error:
        print(f"An error occurred: {error}")
        return []

def process_file(file):
    """Placeholder function to process a file."""
    print(f"Processing file: {file['name']} (ID: {file['id']})")


def load_processed_files():
    """Load the list of processed file IDs from a local memory file."""
    if os.path.exists(MEMORY_FILE):
        with open(MEMORY_FILE, 'r') as file:
            return set(json.load(file))
    return set()

def save_processed_files(processed_files):
    """Save the list of processed file IDs to a local memory file."""
    with open(MEMORY_FILE, 'w') as file:
        json.dump(list(processed_files), file)

def monitor_google_drive(target_folder_id, interval=60):
    """Periodically check for new files, process them, and move them."""
    service = authenticate_google_drive()
    processed_files = load_processed_files()

    while True:
        print("Checking for new files named 'pleco_flashcards.xml'...")
        files = list_files_by_name(service, "pleco_flashcards.xml")

        for file in files:
            if file['id'] not in processed_files:
                # Process the file
                process_file(file)

                # Move the file to the specified folder
                move_file_to_folder(service, file['id'], target_folder_id)

                # Mark the file as processed and save to memory
                processed_files.add(file['id'])
                save_processed_files(processed_files)
        
        # Wait for the specified interval before checking again
        time.sleep(interval)


## sandbox 1

In [None]:
import html

def convert_unicode_segments(text):
    """Convert unicode segments (like &#33368;) to the actual unicode character."""
    return html.unescape(text)

In [48]:
def main():
    current_anki_flash_card_words = get_latest_anki_flaschard_words()
    xml_text = get_latest_flashcard_xml()
    flashcard_entries, error_entries = process_flashcard_xml(xml_text)
    print(len(flashcard_entries), "flashcard entries found|", len(error_entries), "error entries found")

    anki_cards = get_card_info("Pleco Import")
    anki_cards_dict = {re.sub(r'[^\u4e00-\u9fff]', '', card['fields']['Front']['value']): card for card in anki_cards}
    for entry in flashcard_entries:
        entry['formatted_back'] = convert_unicode_segments(anki_cards_dict[entry['traditional']]['fields']['Back']['value'])

    with open("flashcard_entries.json", "w") as f:
        json.dump(flashcard_entries, f, ensure_ascii=False, indent=4)

Anki sync successful.
Total cards found in 'Pleco Import': 1659
success
Latest flashcard xml last modified time: 2025-01-02 12:17:32 EST-0500
1648 flashcard entries found| 1 error entries found
