In [None]:
#| default_exp core

In [None]:
#| hide
from nbdev.showdoc import *

In [None]:
from fastlite import Database
from pathlib import Path
from datetime import datetime, timedelta
import my_blog.config as config
from urllib.parse import quote, unquote
from fasthtml.common import *
from monsterui.all import *
from fasthtml_auth import AuthManager
from fasthtml.jupyter import *
import re
import frontmatter

In [None]:
#| hide
import nbdev

In [None]:
@dataclass
class AppState:
    pdb: Database # for managing posts
    posts_t: Table
    tags_t: Table
    post_tags_t: Table
    auth: AuthManager
    db: Database # For managing users and authorising access

In [None]:
def create_database_tables(pdb: Database):

    class Posts:
        id: int # primary key
        title: str
        slug: str # unique
        content: str
        created: datetime
        updated: datetime
        published: bool
        excerpt: str

    posts = pdb.create(Posts, pk='id', defaults={'published': False}, transform=True)
    posts.create_index(['slug'], unique=True, if_not_exists=True)

    class Tags:
        id: int # primary key
        name: str # unique
    
    tags = pdb.create(Tags, pk='id', transform=True)
    tags.create_index(['name'], unique=True, if_not_exists=True)

    class PostTags:
        post_id: int # foreign key > posts.id
        tag_id: int # foreign key > tags.id
    
    post_tags = pdb.create(PostTags, pk=['post_id', 'tag_id'], transform=True)

    


In [None]:
def create_post_database(db_path: str):
    pdb = Database(db_path)
    create_database_tables(pdb)
    return pdb

In [None]:
def create_app():
    # Create databases and apps, return these within and AppState class.
    # Once created then create the server with srv = serve()
    # Add the routes with rt = app.route
    pdb = create_post_database(config.POSTS_DB_PATH)
    posts_t = pdb.t.posts
    tags_t = pdb.t.tags
    post_tags_t = pdb.t.post_tags
    # Initialize auth database
    auth = AuthManager(
        db_path=str(config.USERS_DB_PATH),
        config={
            'allow_registration': config.ALLOW_REGISTRATION,
            'public_paths': [],  # No public paths - all routes require auth except /auth/*
            'login_path': '/auth/login',
        }
    )
    db = auth.initialize()  
    beforeware = auth.create_beforeware()
    hdrs = (*Theme.blue.headers(highlightjs=True), Script(src="https://unpkg.com/hyperscript.org@0.9.12"))
    app = FastHTML(
        before=beforeware,
        secret_key=config.SECRET_KEY,
        hdrs=hdrs,
        exts='ws'  # Enable WebSocket support
    )
    config.STATIC_DIR.mkdir(parents=True, exist_ok=True)
    app.mount("/static", StaticFiles(directory=str(config.STATIC_DIR)), name="static")
    auth.register_routes(app, include_admin=True)
    state = AppState(
        pdb=pdb,
        posts_t=posts_t,
        tags_t=tags_t,
        post_tags_t=post_tags_t,
        auth=auth,
        db=db
    )
    return app, state

In [None]:
def intro():
    return Article(
        H3("Welcome", cls="text-2xl font-semibold mb-4"),
        Div(cls="text-base gap-1 text-muted-foreground leading-relaxed space-y-4")(
        P("I'm a retired engineer and AI developer.  This website is an opportunity to develop my web development skills as well as to post and record things that interest me and that I have learnt."),

        P("Specific areas I would like to cover include cycling, motorhome trips (often combined with cycling), coding and development and application of AI and machine learning.  In previous roles I was responsible for advanced engineering in an automotive company and worked extensively upon the development of early electric and hybrid vehicles.  I have also been a director of AI in a major pharma company looking to apply AI to drug discovery and development. For more information see my ", hx_link("About", about), " page. "),

        P("This site is developed using fastHTML and the Solveit platform, both technologies developed by Jeremy Howard and ",hx_link('Answer.ai.', href='https://answer.ai'), " The desgn is based upon the site of ",hx_link('Jack Hogan.', href='https://jackhogan.net/'),

        " See my latest blog posts below or find the full list on my ", hx_link("Blog", blog), " page. ")
        )
    )


In [None]:
def hx_attrs(target="#main-content"): return dict(hx_target=target, hx_push_url="true", hx_swap="innerHTML show:window:top")

def hx_link(txt, href, cls="text-primary underline", target="#main-content", **kw):
    return A(txt, href=href, hx_get=href, cls=cls, **hx_attrs(target), **kw)

In [None]:
def navbar():
    brand = A(Img(src="/static/image/john_pixelated.png", alt="John Richmond", cls="w-6 h-6 rounded-full"), Span("John Richmond "), href="/", hx_get="/", cls="flex items-center gap-2 text-lg font-bold", **hx_attrs())
    links = Div(hx_link("About", "/about"), hx_link("Blog", "/blog"), cls="flex gap-4")
    return Nav(Div(brand, links, cls="flex items-center gap-2 justify-between p-4"), cls="border rounded-lg shadow bg-background")

In [None]:
def x_icon(): return Svg(ft_hx("path", d="M12.6.75h2.454l-5.36 6.142L16 15.25h-4.937l-3.867-5.07-4.425 5.07H.316l5.733-6.57L0 .75h5.063l3.495 4.633L12.601.75Zm-.86 13.028h1.36L4.323 2.145H2.865z"), width=20, height=20, fill="currentColor", viewBox="0 0 16 16", aria_hidden="true")



In [None]:
def social_link(icon, href, **kw):
    kw = dict(rel="nofollow noindex") if k == "mail" else dict(target="_blank", rel="noopener noreferrer")
    return A(x_icon() if k == "twitter" else UkIcon(icon, width=20, height=20), href=href, cls="hover:text-primary transition-colors", target="_blank", rel="noopener noreferrer", **kw)

def social_link(k, v):
    ext = dict(rel="nofollow noindex") if k == "mail" else {} if k == "rss" else dict(target="_blank", rel="noopener noreferrer")
    return A(x_icon() if k == "twitter" else UkIcon(k, width=20, height=20), href=v, aria_label=k.title(), cls="hover:text-primary transition-colors", **ext)


def footer():
    links = dict(twitter="https://x.com/YOUR_HANDLE", youtube="https://youtube.com/@YOUR_CHANNEL", github="https://github.com/YOUR_USERNAME")
    icons = Div(*[social_link(k, v) for k, v in links.items()], social_link("mail", "mailto:confusedjohn46@gmail.com"), cls="flex justify-center gap-6 text-muted-foreground")
    return Footer(Divider(), icons, cls="max-w-2xl mx-auto px-6 mt-auto mb-6")

In [None]:
def layout(*content, htmx, title=None):
    if htmx and htmx.request: return (Title(title), *content)
    main = Main(*content, cls='w-full max-w-2xl mx-auto px-6 py-8 space-y-8', id="main-content")
    return Title(title), Div(Div(navbar(), cls='max-w-2xl mx-auto px-4 mt-4'), main, footer(), cls="flex flex-col min-h-screen")

In [None]:
def slug_exists(slug):
    if bool(list(state.posts_t.rows_where("slug = ?", [slug], limit=1))):
        return list(state.posts_t.rows_where("slug = ?", [slug], limit=1))[0]['id']
    else:
        return False

In [None]:
from datetime import datetime

def add_post(title, content, excerpt="", tags=None, published=True):
    slug = title.lower().replace(" ", "-")
    slug = ''.join(c for c in slug if c.isalnum() or c == '-')[:60]
    posts = state.pdb.t.posts
    tags_tbl = state.pdb.t.tags
    post_tags = state.pdb.t.post_tags
    post_id = slug_exists(slug)
    if post_id:
        post = posts.update(dict(id=post_id, title=title, slug=slug, content=content, excerpt=excerpt, 
                                    created=datetime.now(), updated=datetime.now(), published=published))
    else:
        post = posts.insert(dict(title=title, slug=slug, content=content, excerpt=excerpt, 
                                    created=datetime.now(), updated=datetime.now(), published=published))
    post_id = post.id
    if tags:
        for tag in tags:
            # Get existing tags
            existing = list(tags_tbl.rows_where("name = ?", [tag], limit=1))
            # If existing tag then load the relevant id.  If not then create a new one and get the id.
            tag_id = existing[0]['id'] if existing else tags_tbl.insert(dict(name=tag))['id']
            # Check if post_tags exists for this combination and if not add
            existing = list(post_tags.rows_where("post_id= ? and tag_id= ?", [post_id, tag_id], limit=1))
            # print(existing)
            if not existing:
                # Implies no link exists for this post and tag so create one
                post_tags.insert(dict(post_id=post_id, tag_id=tag_id))
    return post_id

In [None]:
@rt('/blog/{slug}')
def blogpost(htmx, slug: str):
    row = state.posts_t.rows_where("slug = ?", [slug], limit=1)
    p = next((dict(r) for r in row), None)
    if not p: return layout(H2("Not Found"), P("Post not found."), title="Not Found", htmx=htmx)
    p['created'] = datetime.fromisoformat(p['created']) if isinstance(p['created'], str) else p['created']
    content = render_md(p['content'])
    return layout(H1(p['title'], cls="text-3xl font-bold mb-2"), Span(p['created'].strftime('%B %d, %Y'), cls="text-muted-foreground text-sm mb-8 block"), content, title=p['title'], htmx=htmx)

In [None]:
@rt
def index(htmx):
    posts = get_posts(n=4)
    items = [A(H3(p['title']), P(p['excerpt'], cls="text-muted-foreground"), Span(p['created'].strftime('%d %b %Y'), cls="text-sm text-muted-foreground"), href=f"/blog/{p['slug']}", hx_get=f"/blog/{p['slug']}", cls="block border-b pb-4 hover:bg-muted/50 transition-colors", **hx_attrs()) for p in posts]
    content = Div(*items, cls="space-y-4") if items else P("No posts yet.", cls="text-muted-foreground")
    return layout((intro(), Divider(), Section(H3("Latest Posts", cls="text-xl font-semibold mb-4"), content)), title="Welcome to my Blog", htmx=htmx)

In [None]:
def get_tags(tags_tbl):
    tags = [row.name for row in tags_tbl()]
    return tags


In [None]:
def get_post_tags(post_id: int):
    query = """
    SELECT name FROM tags WHERE id IN (SELECT tag_id FROM post_tags WHERE post_id=?)
    """
    tag_name_dicts = state.pdb.q(query, [post_id])
    tag_names = [name['name'] for name in tag_name_dicts]
    return tag_names

In [None]:
def get_posts(n: Union[int, None]=None, tags: Union[List, None] = None):
    if tags:
        place_holders = ','.join('?'*len(tags))
        post_query = f"SELECT DISTINCT post_id FROM post_tags WHERE tag_id IN (SELECT id FROM tags WHERE name IN ({place_holders}))"
        post_ids = state.pdb.q(post_query, tags)
        post_ids = [post['post_id'] for post in post_ids]
        if post_ids:
            place_holders = ','.join('?'*len(post_ids))
            post_query = f"id IN ({place_holders})"
            posts = list(state.posts_t.rows_where(post_query, post_ids, limit=n, order_by="-created"))
        else:
            posts = []
    else:
        posts = list(state.posts_t.rows_where(order_by="-created", limit=n))

    if posts:
        result = [dict(r) for r in posts]
        for p in result: p['created'] = datetime.fromisoformat(p['created']) if isinstance(p['created'], str) else p['created']
        for p in result: p['tags'] = get_post_tags(p['id'])
        return result
    return []

In [None]:
def tag_pill(tag_name, selected_tags):
    if tag_name in selected_tags:
        new_tags = selected_tags - {tag_name}
        selected = True
    else:
        new_tags = selected_tags | {tag_name}
        selected = False
    link = f"/blog?tags={','.join(new_tags)}" if new_tags else "/blog"
    cls = [ButtonT.primary if selected else ButtonT.secondary, ButtonT.sm, "rounded-lg"]
    return Button(tag_name, cls=cls, hx_get=link, **hx_attrs("#posts-list"))

In [None]:
def tag_filter(selected):
    # Return a div containing all of the tags and their selection status. We also need a button to clear the current selection
    selected: set # a set containing the names of the selected tags
    tags = get_tags(state.tags_t)
    tag_pills = [tag_pill(tag_name, selected) for tag_name in tags]
    clear_btn = Button("X Clear", cls=[ButtonT.default, ButtonT.sm, "rounded-lg"], hx_get="/blog", **hx_attrs("#posts-list"))
    return Div(P("Filter: "), *tag_pills, clear_btn, cls="flex flex-wrap gap-2 items-center", id="tag-filter")

In [None]:
def tag_badge(name):
    return Span(name, cls="text-xs px-2 py-1 rounded bg-muted")

In [None]:
@rt
def blog(htmx, tags:str=None):
    # selected is a SET of the name of the selected tags
    selected = {unquote(t.strip()) for t in (tags or '').split(',') if t.strip()}
    filtered = get_posts(tags=selected)
    tag_filter_div = tag_filter(selected)
    items = [post_card(p) for p in filtered]
    post_content = Div(*items, cls="space-y-2", id="posts-list") if items else P("No posts yet.", cls="text-muted-foreground", id="posts-list")
    if htmx and htmx.target == "posts-list":
        tag_filter_div.attrs['hx-swap-oob'] = 'true'
        return post_content, tag_filter_div
    return layout(H2("Blog"), tag_filter_div, Divider(cls=('my-2')), post_content, title="Blog", htmx=htmx)

In [None]:
def get_post_image(p):
    # check post content for image (/static/image/post_images/*)
    # If image found then load a thumbnail of it
    img_ptn = r"!\[.*?\]\((/static/image/post_images/[^)]+)\)"
    imgs = re.findall(img_ptn,p["content"])
    img = imgs[0] if len(imgs)>0 else None
    if img:
        img_path = Path(img)
        img_path = config.STATIC_DIR.parent / img_path
        return img_path
    else:
        return None

In [None]:
def post_card(p):
    """Create a card to view a summary of the post including
    - Title (linked)
    - Excerpt
    - possible thumbnail image if one is in the post, to the right of the text
    - Date
    - Tags as small pills?
    - Hover effect?
    """
    img_url = get_post_image(p)
    post = Div(cls="flex gap-2 p-3 -mx-3 rounded-lg hover:bg-muted/50 hover:shadow-lg transition-all cursor-pointer")(
        Div(cls="flex-1")(
            A(cls="flex gap-4 block border-b pb-4 transition-colors")(
                Div(H3(p['title']), P(p['excerpt'], cls="text-muted-foreground"),  
                Div(Span(p['created'].strftime('%d %b %Y'), cls="text-sm text-muted-foreground"), Div([tag_badge(tag) for tag in p["tags"]], cls="flex gap-2 flex-wrap")
                )),
                Img(src=img_url, cls="p-4 max-w-48 h-auto object-contain ml-auto") if img_url else None,
                href=f"/blog/{p['slug']}",
                hx_get=f"/blog/{p['slug']}", **hx_attrs()
            )
        # Add image here at a later time
        )
    )
    return post

In [None]:
def process_upload(content: bytes, filename: str, slug:str=None):
    # check there is no parent path element and if so just get the filename part
    file_path = Path(filename)
    if file_path.suffix == '.md':
        # post content, extract metadata and save contents to posts table
        md_text = content.decode('utf-8')
        post = frontmatter.loads(md_text)
        # Add checks that the meta data needed is present
        title = post.metadata['title']
        tags = post.metadata['tags']
        excerpt = post.metadata['excerpt']
        slug = title.lower().replace(" ", "-")
        slug = ''.join(c for c in slug if c.isalnum() or c == '-')[:60]
        content_rewritten = rewrite_image_paths(post.content, slug)
        post.content = content_rewritten
        try:
            add_post(title=title, content=post.content, excerpt=excerpt, tags=tags)
        except:
            # add a message that the post could not be added and the failure mode and return to the upload form
            success = False
            message = "Unable to save post"
            return success, message
        return True, "Post saved", slug

    elif file_path.suffix in ['.jpg', '.png', '.jpeg', '.tif', '.svg']:
        # file is an image, save to image folder
        path_to_save = Path(config.POST_IMAGE_DIR) / Path(slug) / file_path.name
        # Create directory if it doen't exist
        path_to_save.parent.mkdir(parents=True, exist_ok=True)
        path_to_save.write_bytes(content)
        return True, f"File {path_to_save.name} saved", slug

    else:
        # unknown file type, raise an error Toast
        return False, f"Unknown file type {filepath.suffix}", slug

In [None]:
@rt('/admin/upload')
def get(htmx):
    # Create file upload form for the post
    return Div(Div(A('Cancel', href='/', cls=f"{ButtonT.secondary} px-4 py-2"), Upload("Upload Button!", id='upload1', multiple=True), cls='flex gap-2'),
               Div(id='upload-message'),
               UploadZone(DivCentered(Span("Upload Zone"), UkIcon("upload")), id='upload2', accept='.md',
               hx_target='#upload-message', hx_swop='innerHTML'),
               cls='space-y-4')

In [None]:
@rt('/admin/upload')    
def post(upload2: list[UploadFile]):
    files = [(f.filename, f.file.read()) for f in upload2]
    md_files = [(n, c) for n, c in files if n.endswith('.md')]
    img_files = [(n, c) for n, c in files if not n.endswith('.md')]
    results = []
    slug = None
    if img_files and len(md_files)==0:
        return Div(Alert("Please upload a post (.md file) with images, or upload images separately", cls=AlertT.warning))
    for name, content in md_files:
        success, message, slug = process_upload(content, name)
        results.append((name, success, message))
    for name, content in img_files:
        success, message, _ = process_upload(content, name, slug=slug)
        results.append((name, success, message))
    header = ["Name", 'Success', 'Message']
    body = [[r[0], r[1], r[2]] for r in results]
    return Div(H2("Post upload results"), TableFromLists(header, body))

In [None]:
def rewrite_image_paths(content: str, slug: str) -> str:
    img_ptn = r"(!\[.*?\])\(([^/)]+\.(jpg|jpeg|png|gif|svg))\)"
    replacement = rf"\1(/static/image/post_images/{slug}/\2)"
    return re.sub(img_ptn, replacement, content, flags=re.IGNORECASE)

In [None]:
nbdev.nbdev_export()