In [12]:
import sqlite3
import os
import shutil
from pathlib import Path
import re
from anki.collection import Collection

def setup_database(db_path):
    """Create a new SQLite database with the specified schema."""
    conn = sqlite3.connect(db_path)
    cursor = conn.cursor()
    
    cursor.execute("""CREATE TABLE IF NOT EXISTS cards (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    front TEXT NOT NULL,
                    back TEXT NOT NULL,
                    front_audio TEXT,
                    back_audio TEXT
                )""")
    
    conn.commit()
    return conn, cursor

def list_decks(collection_path):
    """List all available decks and let user choose one."""
    col = Collection(collection_path)
    try:
        decks = col.decks.all_names_and_ids()
        print("\nAvailable decks:")
        for idx, deck in enumerate(decks, 1):
            print(f"{idx}. {deck.name} (ID: {deck.id})")
        
        while True:
            try:
                choice = int(input("\nEnter the number of the deck you want to extract (1-{}): ".format(len(decks))))
                if 1 <= choice <= len(decks):
                    selected_deck = decks[choice - 1]
                    return selected_deck.id, selected_deck.name
                else:
                    print("Invalid choice. Please try again.")
            except ValueError:
                print("Please enter a valid number.")
    finally:
        col.close()

def extract_audio_filenames(field_content):
    """Extract all audio filenames from field content."""
    return re.findall(r'\[sound:(.*?)\]', field_content)

def copy_media_file(media_dir, filename, output_dir):
    """Copy media file from Anki media directory to output directory."""
    if not filename:
        return False
    
    try:
        source_path = os.path.join(media_dir, filename)
        if not os.path.exists(source_path):
            print(f"Warning: Audio file not found: {source_path}")
            return False
            
        output_path = os.path.join(output_dir, filename)
        shutil.copy2(source_path, output_path)
        return True
    except Exception as e:
        print(f"Error copying media file {filename}: {e}")
        return False

def clean_field_content(content):
    """Remove audio tags and clean up the field content."""
    return re.sub(r'\[sound:.*?\]', '', content).strip()

def extract_anki_deck(anki_collection_path, deck_id, deck_name, output_db_path, audio_output_dir):
    """
    Extract cards from specified Anki deck and save to new database with audio files.
    """
    # Ensure output directories exist
    os.makedirs(audio_output_dir, exist_ok=True)
    
    # Connect to Anki collection
    col = Collection(anki_collection_path)
    
    # Setup new database
    conn, cursor = setup_database(output_db_path)
    
    try:
        # Get all cards from the selected deck
        card_ids = col.find_cards(f"did:{deck_id}")
        
        if not card_ids:
            print(f"No cards found in deck: {deck_name}")
            return
            
        print(f"\nExtracting {len(card_ids)} cards from deck: {deck_name}")
        
        # Get media directory
        media_dir = os.path.join(os.path.dirname(anki_collection_path), "collection.media")
        print(f"Media directory: {media_dir}")
        
        audio_files_found = 0
        
        for i, card_id in enumerate(card_ids, 1):
            try:
                card = col.get_card(card_id)
                note = card.note()

                # Get all fields
                fields = note.fields
                
                # Looking at your screenshot, it seems the fields are arranged as:
                # [front_content, back_content, front_audio, back_audio]
                front_content = fields[0] if len(fields) > 0 else ""
                back_content = fields[1] if len(fields) > 1 else ""
                front_audio = fields[2] if len(fields) > 2 else ""
                back_audio = fields[3] if len(fields) > 3 else ""
                
                # Extract audio files from the audio fields
                front_audio_files = extract_audio_filenames(front_audio)
                back_audio_files = extract_audio_filenames(back_audio)
                
                # Copy audio files
                for audio_file in front_audio_files:
                    if copy_media_file(media_dir, audio_file, audio_output_dir):
                        audio_files_found += 1
                
                for audio_file in back_audio_files:
                    if copy_media_file(media_dir, audio_file, audio_output_dir):
                        audio_files_found += 1
                
                # Clean content (no need to clean audio fields)
                front_clean = front_content.strip()
                back_clean = back_content.strip()
                
                # Create comma-separated lists of audio files
                front_audio_str = ', '.join(front_audio_files) if front_audio_files else None
                back_audio_str = ', '.join(back_audio_files) if back_audio_files else None
                
                # Insert into database
                cursor.execute("""
                    INSERT INTO cards (front, back, front_audio, back_audio)
                    VALUES (?, ?, ?, ?)
                """, (front_clean, back_clean, front_audio_str, back_audio_str))
                
                # Show progress
                if i % 100 == 0 or i == len(card_ids):
                    print(f"Processed {i}/{len(card_ids)} cards... Found {audio_files_found} audio files so far")
                    conn.commit()
                
            except Exception as e:
                print(f"Error processing card {i}: {e}")
                continue
        
        conn.commit()
        print(f"\nExtraction completed successfully!")
        print(f"Total audio files found and copied: {audio_files_found}")
        
    finally:
        col.close()
        conn.close()

In [None]:

def main():
    # Configuration
    
    ANKI_COLLECTION_PATH = Path("/mnt/c/Users/ambru/AppData/Roaming/Anki2/User 1/collection.anki2")  # Adjust path as needed
    
    # Verify collection path
    if not os.path.exists(ANKI_COLLECTION_PATH):
        print(f"Error: Anki collection not found at {ANKI_COLLECTION_PATH}")
        custom_path = input("Please enter the path to your Anki collection file: ")
        if os.path.exists(custom_path):
            ANKI_COLLECTION_PATH = custom_path
        else:
            print("Invalid path provided. Exiting...")
            return
    
    try:
        # Let user choose a deck
        deck_id, deck_name = list_decks(ANKI_COLLECTION_PATH)
        
        # Create output paths based on deck name
        safe_deck_name = "".join(c if c.isalnum() else "_" for c in deck_name)
        OUTPUT_DB_PATH = f"extracted_cards_{safe_deck_name}.db"
        AUDIO_OUTPUT_DIR = f"extracted_audio_{safe_deck_name}"
        
        # Extract the chosen deck
        extract_anki_deck(ANKI_COLLECTION_PATH, deck_id, deck_name, OUTPUT_DB_PATH, AUDIO_OUTPUT_DIR)
        
        print(f"\nOutput database: {OUTPUT_DB_PATH}")
        print(f"Audio files: {AUDIO_OUTPUT_DIR}")
        
    except Exception as e:
        print(f"Error: {e}")

if __name__ == "__main__":
    main()

In [1]:
print(2)

2


In [4]:
import sqlite3
import psycopg2
import os
from psycopg2 import sql
from pathlib import Path

# SQLite and PostgreSQL database details
sqlite_db_path = 'path_to_your_sqlite.db'
postgres_conn_details = {
    'dbname': 'your_postgres_db',
    'user': 'your_postgres_user',
    'password': 'your_postgres_password',
    'host': 'localhost',
    'port': '5432'
}

# SQLite and PostgreSQL database details
sqlite_db_path = 'extracted_cards_Danish_Learning_Deck.db'
postgres_conn_details = {
    'dbname': 'cards',
    'user': 'cards_db',
    'password': 'Q9W8E7R0',
    'host': '192.168.0.225',
    'port': '5432'
}

# Function to convert file to bytes
def file_to_bytes(file_path):
    with open(file_path, 'rb') as file:
        return file.read()

# Connect to SQLite database
sqlite_conn = sqlite3.connect(sqlite_db_path)
sqlite_cursor = sqlite_conn.cursor()

# Fetch all cards from the SQLite database
sqlite_cursor.execute("SELECT front, back, front_audio, back_audio FROM cards")
cards = sqlite_cursor.fetchall()

# Connect to PostgreSQL database
postgres_conn = psycopg2.connect(**postgres_conn_details)
postgres_cursor = postgres_conn.cursor()

# Insert data into PostgreSQL
for card in cards:
    front, back, front_audio, back_audio = card

    # Convert audio files to bytes if they exist
    front_audio_bytes = None
    back_audio_bytes = None

    if front_audio:
        front_audio_bytes = file_to_bytes(Path("/mnt/c/Users/ambru/OneDrive/Projects/anki-words-builder/notebooks/media") / front_audio)

    if back_audio:
        back_audio_bytes = file_to_bytes(Path("/mnt/c/Users/ambru/OneDrive/Projects/anki-words-builder/notebooks/media") / back_audio)

    # Prepare the insert query
    insert_query = sql.SQL("""
        INSERT INTO cards (front, back, front_audio, back_audio)
        VALUES (%s, %s, %s, %s)
    """)

    # Execute the insert query
    postgres_cursor.execute(insert_query, (front, back, front_audio_bytes, back_audio_bytes))

# Commit and close the connections
postgres_conn.commit()

# Close both cursors and connections
postgres_cursor.close()
postgres_conn.close()
sqlite_cursor.close()
sqlite_conn.close()

print("Data migration complete.")


Data migration complete.
