In [26]:
from pathlib import Path


from mpflash.mpboard_id.board_id import read_known_boardinfo

from mpflash.mpboard_id.add_boards import boards_from_repo
# import jsons

from mpflash.vendor.board_database import Database



In [27]:
# known_boards = read_known_boardinfo()
# print(f"Known boards: {len(known_boards)}")

In [28]:
mpy_path = Path("../../micropython")

# all = boards_from_repo(mpy_path, version="")

# print(f"All boards_from_repo: {len(all)}")

In [29]:
# db = Database(mpy_root_directory = mpy_path)


# print(f"Database boards: {len(db.boards)}")



In [30]:
# boards = [(
#             db.boards[b].port.name,
#            db.boards[b].name,
#            db.boards[b].mcu,
#            ) for b in db.boards]

In [31]:

def iter_boards(db: Database, version: str = ""):
    version=version.strip()
    for b in db.boards:
        board = db.boards[b]
        yield (
           version,
           board.name,
           board.name,
           board.mcu,
           "", # no variant
           board.path,
           board.description,
           "" # no text
           )
        if board.variants:
            for v in board.variants:
                yield (
                   version,
                    f"{board.name}-{v.name}",
                    board.name,
                    board.mcu,
                    v.name,
                    board.path,
                    v.description,
                    v.text
                )

# longlist = list(iter_boards(db))

# print(f"Database boards: {len(longlist)}")

In [32]:
    
from typing import List

from numpy import long
import mpflash.basicgit as git
from mpflash.versions import get_preview_mp_version, get_stable_mp_version, micropython_versions

def get_boards(versions:List[str], mpy_dir:Path, ):
    longlist = []
    if not mpy_dir.is_dir():
        print(f"Directory {mpy_dir} not found")
    for version in versions:
        print("-" * 60)
        build_nr = ""
        if "preview" in version:
            ok = git.checkout_tag("master", mpy_dir)
            if describe := git.get_git_describe(mpy_dir):
                parts = describe.split("-", 3)
                if len( parts) >=3:
                    build_nr = parts[2]
        else:
            ok = git.checkout_tag(version, mpy_dir)
        if not ok:
            print(f"Failed to checkout {version} in {mpy_dir}")
            continue
        
        print( f"{git.get_git_describe(mpy_dir)} - {build_nr}")
        # un-cached database 
        db = Database(mpy_dir)
        shortlist = list(iter_boards(db, version=version))
        print (f"boards found {len(db.boards.keys())}")
        print (f"boards-variants found {len(shortlist)}")
        longlist.extend(shortlist)
    return longlist

mpy_path = Path("../../micropython")

if False:
    assert mpy_path.exists()
    longlist = get_boards(
        # versions = [
        #     get_stable_mp_version(), 
        #     get_preview_mp_version(), 
        #     # "preview",
        # ],
        versions = micropython_versions(minver="1.9.4"),
        mpy_dir = mpy_path,
        )        

    print("=" * 60)
    print(f"Total boards-variants: {len(longlist)}")


In [None]:
import os
import zipfile
import pandas as pd

csv_filename = 'micropython_boards.csv'
zip_filename = 'micropython_boards.zip'

if False:
    columns = ['version', 'board_id', 'base_name', 'mcu', 'variant', 'path', 'description', 'text']
    df = pd.DataFrame(longlist, columns=columns)

    # Create a CSV file from the dataframe and compress it into a zip file

    # Define output filenames
    csv_filename = 'micropython_boards.csv'
    zip_filename = 'micropython_boards_data.zip'

    # Create the ZIP file and add the CSV data directly without creating an intermediate file
    with zipfile.ZipFile(zip_filename, 'w', zipfile.ZIP_DEFLATED) as zipf:
        # Create a temporary in-memory CSV string
        csv_data = df.to_csv(index=False)
        # Write the CSV data directly to the zip file
        zipf.writestr(csv_filename, csv_data)

    # # For comparison, we'll still create the CSV file to measure compression
    # df.to_csv(csv_filename, index=False)

    # # Get file sizes to show compression ratio
    # csv_size = os.path.getsize(csv_filename)
    zip_size = os.path.getsize(zip_filename)
    # compression_ratio = (1 - (zip_size / csv_size)) * 100

    print(f"ZIP file created: {zip_filename} ({zip_size:,} bytes)")
    # print(f"CSV file created: {csv_filename} ({csv_size:,} bytes)")
    # print(f"Compression ratio: {compression_ratio:.2f}%")

Now open directly from the zipfile

In [None]:
import zipfile
import io
import sqlite3
import pandas as pd
# # Define the new database path
db_path2 = 'temp_micropython_boards_from_zip.db'

# Connect to the new database
conn2 = sqlite3.connect(db_path2)

conn2.row_factory = sqlite3.Row  # return rows as dicts
# Create the same table schema
conn2.execute('''
CREATE TABLE IF NOT EXISTS boards (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    version TEXT,
    board_id TEXT,
    base_name TEXT,
    mcu TEXT,
    variant TEXT,
    path TEXT,
    description TEXT,
    text TEXT
)
''')

# Load data directly from the zip file
with zipfile.ZipFile(zip_filename, 'r') as zipf:
    # Read the CSV file from the zip
    with zipf.open(csv_filename) as csv_file:
        # Use pandas to read the CSV data
        df_from_zip = pd.read_csv(io.TextIOWrapper(csv_file, 'utf-8'))
        # Replace NaN values with empty strings to avoid NULL values in the database
        df_from_zip = df_from_zip.fillna('')
        # Insert data into the new SQLite database
        df_from_zip.to_sql('boards', conn2, if_exists='replace', index=False)

# Create indices for faster searching
conn2.execute('CREATE INDEX IF NOT EXISTS idx_version ON boards (version)')
conn2.execute('CREATE INDEX IF NOT EXISTS idx_board_id ON boards (board_id)')
conn2.execute('CREATE INDEX IF NOT EXISTS idx_mcu ON boards (mcu)')

conn2.commit()

# Test retrieving some data
cursor2 = conn2.cursor()
cursor2.execute("SELECT COUNT(*) FROM boards")
record_count = cursor2.fetchone()[0]

print(f"Total records stored in database from zip: {record_count}")



Total records stored in database from zip: 2256


In [38]:
import sqlite3


In [None]:
from os import path
from pydantic import BaseModel
from typing import List
import json

class BoardVersion(BaseModel):
    board_id: str
    variant: str
    description: str
    versions: List[str]
    path: str = ""

    @classmethod
    def from_db_row(cls, row):
        return cls(
            board_id=row["board_id"],
            variant=row["variant"],
            description=row["description"],
            versions=json.loads(row["versions"]),
            path=row["path"],
        )

In [None]:
import sqlite3
    # conn = sqlite3.connect(db_path)
    # conn.row_factory = sqlite3.Row  # Access columns by name
    # cursor = conn.cursor()

def get_board_versions(cursor, search_desc: str, search_variant: str) -> List[BoardVersion]:

    query = """
    SELECT DISTINCT
        UPPER(board_id) as board_id,
        UPPER(variant) as variant,
        description,
        json_group_array(version) as versions,
        path
    FROM boards
    WHERE description LIKE ?
    AND variant like ?
    GROUP BY UPPER(board_id) , UPPER(variant), description;
    """

    cursor.execute(query, (search_desc, search_variant))
    rows = cursor.fetchall()
    # conn.close()

    return [BoardVersion.from_db_row(row) for row in rows]

In [55]:
# db_path = r"d:\mypython\mpflash\scripts\micropython_boards_from_zip.db"

conn2.row_factory = sqlite3.Row  # return rows as dicts
cursor = conn2.cursor()
description = "Pimoroni Pico LiPo"#  16MB with RP2040"
variant = "FLASH_16M"

description = "PYBv1.1"
variant = "DP"

descr = description.rsplit(" with ",1)[0].strip()
print(f"Searching for description: {descr} and variant: {variant}")
results = get_board_versions(cursor, f"{descr}%", variant)


for board in results:
    print(board)

Searching for description: PYBv1.1 and variant: DP
board_name='PYBV11-DP' variant='DP' description='PYBv1.1 with STM32F405RG' versions=['v1.18', 'v1.19', 'v1.19.1', 'v1.20.0', 'v1.21.0', 'v1.22.0', 'v1.22.1', 'v1.22.2', 'v1.23.0', 'v1.24.0', 'v1.24.1', 'v1.25.0-preview'] path='../../micropython/ports/stm32/boards/PYBV11'


In [56]:
import pandas as pd
import sqlite3
import os
from pathlib import Path

def load_jsonl_to_sqlite_pandas(jsonl_path, db_path):
    """
    Load a JSONL file into a SQLite database using pandas.
    
    Args:
        jsonl_path (str or Path): Path to the JSONL file
        db_path (str or Path): Path to the SQLite database
    
    Returns:
        int: Number of records imported
    """
    # Convert to Path objects
    jsonl_path = Path(jsonl_path)
    db_path = Path(db_path)
    
    # Ensure file exists
    if not jsonl_path.exists():
        raise FileNotFoundError(f"JSONL file not found: {jsonl_path}")
    
    # Read JSONL file into pandas DataFrame
    print("Reading JSONL file into DataFrame...")
    df = pd.read_json(jsonl_path, lines=True)
    record_count = len(df)
    
    if record_count == 0:
        print("JSONL file is empty")
        return 0
    
    # Create connection to SQLite database
    conn = sqlite3.connect(str(db_path))
    
    # Write DataFrame to SQLite
    print(f"Writing {record_count} records to database...")
    df.to_sql('firmware', conn, if_exists='replace', index=True, index_label='id')
    
    # Create indices for faster searching
    cursor = conn.cursor()
    for col in df.columns:
        if col.lower() in ['name', 'version', 'board', 'mcu']:
            cursor.execute(f'CREATE INDEX IF NOT EXISTS idx_{col} ON firmware ("{col}")')
    
    conn.commit()
    conn.close()
    
    print(f"Successfully imported {record_count} records to {db_path}")
    return record_count

# Execute the function
jsonl_path = r"C:\\Users\\josverl\\Downloads\\firmware\\firmware.jsonl"
db_path = r"C:\\Users\\josverl\\Downloads\\firmware\\firmware.db"
record_count = load_jsonl_to_sqlite_pandas(jsonl_path, db_path)
print(f"Total records imported: {record_count}")


Reading JSONL file into DataFrame...
Writing 91 records to database...
Successfully imported 91 records to C:\Users\josverl\Downloads\firmware\firmware.db
Total records imported: 91


In [None]:
f"ATTACH DATABASE '{db_path}' AS downloads"

"ATTACH DATABASE 'C:\\\\Users\\\\josverl\\\\Downloads\\\\firmware\\\\firmware.db' AS firmware_db"

In [79]:
import sqlite3

# Connect to the first database (this will be your main connection)
conn = sqlite3.connect('temp_micropython_boards_from_zip.db')
conn.row_factory = sqlite3.Row  # for dictionary-like row access

# Attach the second database
conn.execute(f"ATTACH DATABASE '{db_path}' AS downloads")

# Now you can perform joins between tables in both databases
cursor = conn.cursor()

# Example query joining boards from the main database with firmware from the attached database
# query = """
# SELECT 
#     b.board_id, b.variant, b.version, b.description,
#     f.name as firmware_name, f.version as firmware_version
# FROM 
#     boards b
# LEFT JOIN 
#     firmware_db.firmware f ON b.board_id LIKE f.board || '%'
# WHERE 
#     b.board_id LIKE ?
# LIMIT 10
# """

query = """
SELECT 
    b.board_id, b.variant, b.version, f.filename
FROM boards b
LEFT JOIN 
    downloads.firmware f ON b.board_id like f.board || '%'
WHERE 
    f.filename LIKE '%'
"""
    # -- b.board_id LIKE ?

# cursor.execute(query, ('PYBD%',))
cursor.execute(query )
results = cursor.fetchall()

# Print results
for row in results:
    print(dict(row))

# Detach the database when done
conn.execute("DETACH DATABASE downloads")

# Close the connection
conn.close()

{'board_id': 'ARDUINO_NANO_RP2040_CONNECT', 'variant': '', 'version': 'v1.18', 'filename': 'rp2\\ARDUINO_NANO_RP2040_CONNECT-v1.24.1.uf2'}
{'board_id': 'ARDUINO_NANO_RP2040_CONNECT', 'variant': '', 'version': 'v1.18', 'filename': 'rp2\\ARDUINO_NANO_RP2040_CONNECT-v1.25.0-preview.304.uf2'}
{'board_id': 'PIMORONI_PICOLIPO_16MB', 'variant': '', 'version': 'v1.18', 'filename': 'rp2\\PIMORONI_PICOLIPO-FLASH_16M-v1.24.1.uf2'}
{'board_id': 'PIMORONI_PICOLIPO_16MB', 'variant': '', 'version': 'v1.18', 'filename': 'rp2\\PIMORONI_PICOLIPO-FLASH_16M-v1.25.0-preview.389.uf2'}
{'board_id': 'PIMORONI_PICOLIPO_16MB', 'variant': '', 'version': 'v1.18', 'filename': 'rp2\\PIMORONI_PICOLIPO-v1.24.1.uf2'}
{'board_id': 'PIMORONI_PICOLIPO_16MB', 'variant': '', 'version': 'v1.18', 'filename': 'rp2\\PIMORONI_PICOLIPO-v1.25.0-preview.389.uf2'}
{'board_id': 'PIMORONI_PICOLIPO_4MB', 'variant': '', 'version': 'v1.18', 'filename': 'rp2\\PIMORONI_PICOLIPO-FLASH_16M-v1.24.1.uf2'}
{'board_id': 'PIMORONI_PICOLIPO_4MB',