# Read

> Read X for llm context

In [None]:
#| default_exp read

In [None]:
#| hide
from nbdev.showdoc import *

## Goals

## Imports

In [None]:
#| export
import httpx 
import html2text
from fastcore.all import delegates, ifnone

import re, os, glob, string, warnings, functools
import requests
import fnmatch, mimetypes

from pypdf import PdfReader
from toolslm.download import html2md, read_html

import tempfile, subprocess, os, re, shutil
from pathlib import Path

from typing import Optional, List, Dict, Union

## Defining read_ functions

### URL

In [None]:
#| export
def read_text(url, # URL to read
             ): # Text from page
    "Get text from `url`"
    return httpx.get(url, follow_redirects=True).text

In [None]:
read_text('https://example.org/')[:80]

'<!doctype html>\n<html>\n<head>\n    <title>Example Domain</title>\n\n    <meta chars'

In [None]:
#| export
def read_link(url: str,   # URL to read
             heavy: bool = False,   # Use headless browser (requires extra setup steps before use)
             sel: Optional[str] = None,  # Css selector to pull content from
             useJina: bool = False, # Use Jina for the markdown conversion
             ignore_links: bool = False, # Whether to keep links or not
             ): 
    "Reads a url and converts to markdown"
    if not heavy and not useJina: return read_html(url,sel=sel, ignore_links=ignore_links)
    elif not heavy and useJina:   return httpx.get(f"https://r.jina.ai/{url}").text
    elif heavy and not useJina: 
        import playwrightnb
        return playwrightnb.url2md(url,sel=ifnone(sel,'body'))
    elif heavy and useJina: raise NotImplementedError("Unsupported. No benefit to using Jina with playwrightnb")

In [None]:
read_link('https://fastht.ml/docs/', sel='#quarto-content')[:200]

'  * [ Get Started](./index.html)\n\n  * [ Tutorials](./tutorials/index.html) __\n\n    * [ FastHTML By Example](./tutorials/by_example.html)\n\n    * [ Web Devs Quickstart](./tutorials/quickstart_for_web_de'

In [None]:
#| eval: false
read_link('https://fastht.ml/docs/',useJina=True)[:200]

'Title: FastHTML – fasthtml\n\nURL Source: https://fastht.ml/docs/\n\nPublished Time: Sun, 06 Jul 2025 21:56:52 GMT\n\nMarkdown Content:\nWelcome to the official FastHTML documentation.\n\nFastHTML is a new nex'

In [None]:
read_link('https://fastht.ml/docs/',sel='#quarto-margin-sidebar')

'## On this page\n\n  * Installation\n  * Usage\n    * Getting help from AI\n  * Next Steps\n  * Other languages and related projects\n\n  * [__Report an issue](https://github.com/AnswerDotAI/fasthtml/issues/new)\n\n## Other Formats\n\n  * [ __CommonMark](index.html.md)\n\n'

In [None]:
#| export
def read_url(*args,**kwargs):
    warnings.warn("read_url() is deprecated, use read_link() instead", 
                  DeprecationWarning, stacklevel=2)
    return read_link(*args,**kwargs)

read_url = functools.wraps(read_link)(read_url)

In [None]:
read_url('https://fastht.ml/docs/',sel='#quarto-margin-sidebar')

  read_url('https://fastht.ml/docs/',sel='#quarto-margin-sidebar')


'## On this page\n\n  * Installation\n  * Usage\n    * Getting help from AI\n  * Next Steps\n  * Other languages and related projects\n\n  * [__Report an issue](https://github.com/AnswerDotAI/fasthtml/issues/new)\n\n## Other Formats\n\n  * [ __CommonMark](index.html.md)\n\n'

### Github

#### Gist

In [None]:
#| export
def read_gist(url:str  # gist URL, of gist to read
             ):
    "Returns raw gist content, or None"
    pattern = r'https://gist\.github\.com/([^/]+)/([^/]+)'
    match = re.match(pattern, url)
    if match:
        user, gist_id = match.groups()
        raw_url = f'https://gist.githubusercontent.com/{user}/{gist_id}/raw'
        return httpx.get(raw_url).text
    else:
        return None

In [None]:
sample_gist_url = "https://gist.github.com/algal/a490024ad088de1b857531c83abef0a0"
read_gist("https://gist.github.com/algal/a490024ad088de1b857531c83abef0a0")[:200]

"#!/usr/bin/env python3\nimport os, os.path, sys, urllib.parse, base64, subprocess\n\ndef on_iterm2(): return 'ITERM_SESSION_ID' in os.environ or os.environ.get('LC_TERMINAL','') == 'iTerm2'\n\ndef on_macOS"

#### URL

#### File

In [None]:
#| export
def read_gh_file(url:str # GitHub URL of the file to read
                ):
    "Reads the contents of a file from its GitHub URL"
    pattern = r'https://github\.com/([^/]+)/([^/]+)/blob/([^/]+)/(.+)'
    replacement = r'https://raw.githubusercontent.com/\1/\2/refs/heads/\3/\4'
    raw_url = re.sub(pattern, replacement, url)
    return httpx.get(raw_url).text

In [None]:
read_gh_file("https://github.com/AnswerDotAI/fasthtml/blob/main/README.md")[:200]



### Local Files

####  Files

In [None]:
#| export
def read_file(path:str):
    "returns file contents"
    with open(path,'r') as f: return f.read()

In [None]:
#|export
def _is_unicode(filepath:str, sample_size:int=1024):
    try:
        with open(filepath, 'r') as file: sample = file.read(sample_size)
        return True
    except UnicodeDecodeError: return False

In [None]:
assert _is_unicode('_quarto.yml')

#### Directory

In [None]:
#| export
def read_dir(path: str,                          # path to read
             unicode_only: bool = True,             # ignore non-unicode files
             included_patterns: List[str] = ["*"],       # glob pattern of files to include
             excluded_patterns: List[str] = [".git/**"], # glob pattern of files to exclude
             verbose: bool = False,                # log paths of files being read
             as_dict: bool = False                  # returns dict of {path,content}
            ) -> Union[str, Dict[str, str]]:            # returns string with contents of files read
    """Reads files in path, returning a dict with the filenames and contents if as_dict=True, otherwise concatenating file contents into a single string. Takes optional glob patterns for files to include or exclude."""
    pattern = '**/*'
    result = {}
    for file_path in glob.glob(os.path.join(path, pattern), recursive=True):
        if any(fnmatch.fnmatch(file_path, pat) for pat in excluded_patterns):
            continue
        if not any(fnmatch.fnmatch(file_path, pat) for pat in included_patterns):
            continue
        if os.path.isfile(file_path):
            if unicode_only and not _is_unicode(file_path):
                continue
            if verbose:
                print(f"Including {file_path}")
            with open(file_path, 'r', errors='ignore') as f:
                result[file_path] = f.read()
    if not as_dict:
        return '\n'.join([f"--- File: {file_path} ---\n{v}\n--- End of {file_path} ---" for file_path,v in result.items()])
    else:
        return result

In [None]:
read_dir('.',verbose=False)[:200]

'--- File: ./_quarto.yml ---\nproject:\n  type: website\n\nformat:\n  html:\n    theme: cosmo\n    css: styles.css\n    toc: true\n    keep-md: true\n  commonmark: default\n\nwebsite:\n  twitter-card: true\n  open-g'

### PDF reader

In [None]:
#| export
def read_pdf(file_path: str # path of PDF file to read
            ) -> str:
    "Reads the text of a PDF with PdfReader"
    with open(file_path, 'rb') as file:
        reader = PdfReader(file)
        return ' '.join(page.extract_text() for page in reader.pages)

In [None]:
read_pdf('./test_dir/test.pdf')

' \n \n \n \n \n \nThis is a test PDF document. \nIf you can read this, you have Adobe Acrobat Reader installed on your computer. '

### YT Transcript

In [None]:
#| hide
# def read_yt_transcript(yt_url: str):
#     "Gets the text of a YouTube transcript"
#     from pytube import YouTube
#     from youtube_transcript_api import YouTubeTranscriptApi
#     try:
#         yt = YouTube(yt_url)
#         video_id = yt.video_id
#     except Exception as e:
#         print(f"An error occurred parsing yt urul: {e}")
#         return None
#     transcript = YouTubeTranscriptApi.get_transcript(video_id)
#     return ' '.join(entry['text'] for entry in transcript) 

# yt_url = "https://www.youtube.com/watch?v=BGgsoIgbT_Y"
# s = read_yt_transcript(yt_url)
# s[:200]
# Currently seems broken, removing #| export 

### Google Sheet

In [None]:
#| export
def read_google_sheet(url: str # URL of a Google Sheet to read
                     ):
    "Reads the contents of a Google Sheet into text"
    sheet_id = url.split('/d/')[1].split('/')[0]
    csv_url = f'https://docs.google.com/spreadsheets/d/{sheet_id}/export?format=csv&id={sheet_id}&gid=0'
    res = requests.get(url=csv_url)
    return res.content

In [None]:
read_google_sheet('https://docs.google.com/spreadsheets/d/17Q3LzRCyM4md28IBxzSSERpaafLgOH8MjH5r6UkyVz8/edit?gid=0#gid=0')

b'Band Pull Around/Aparts\r\nShoulder Dislocations Straight\r\nShoulder Dislocations Side\r\nSuperman Dislocation\r\nScorpion Chest Stretch\r\nLatt Pulldown\r\nTwisty Shoulders\r\nRotator Cuff Pull\r\nWide bent over row'

### Google Doc

In [None]:
def _gdoc_url_to_parseable(url: str):
    pattern = r'(https://docs\.google\.com/document/d/[^/]+)/edit'
    replacement = r'\1/export?format=html'
    return re.sub(pattern, replacement, url)

In [None]:
result = _gdoc_url_to_parseable("https://docs.google.com/document/d/13g-IDyuJyk5wE60bOH1YhhFgW8rlh2LnSXccBS0CQd0/edit")
print(result)

https://docs.google.com/document/d/13g-IDyuJyk5wE60bOH1YhhFgW8rlh2LnSXccBS0CQd0/export?format=html


In [None]:
#| export
def read_gdoc(url: str  # URL of Google Doc to read
             ):
    "Gets the text content of a Google Doc using html2text"
    import html2text
    doc_url = url
    doc_id = doc_url.split('/d/')[1].split('/')[0]
    export_url = f'https://docs.google.com/document/d/{doc_id}/export?format=html'
    html_doc_content = requests.get(export_url).text
    doc_content = html2text.html2text(html_doc_content)
    return doc_content

In [None]:
read_gdoc("https://docs.google.com/document/d/13g-IDyuJyk5wE60bOH1YhhFgW8rlh2LnSXccBS0CQd0/edit")[:200]

'# Top heading\n\nHello this is a context reading test\n\n## Heading 2\n\nBolded text is here as well as italisized\n\n  * I have bullets\n  * Of things\n\n## Heading 3\n\nAnd ordered\n\n  1. Lists\n  2. Of\n  3. Thing'

### Arxiv

In [None]:
#| export
def read_arxiv(url:str, # arxiv PDF URL, or arxiv abstract URL, or arxiv ID
               save_pdf:bool=False, # True, will save the downloaded PDF
               save_dir:str='.' # directory in which to save the PDF
              ):
    "Get paper information from arxiv URL or ID, optionally saving PDF to disk"
    import re, httpx, tarfile, io, os
    import xml.etree.ElementTree as ET
    
    if save_pdf: os.makedirs(save_dir, exist_ok=True)
    arxiv_id = url.split('/')[-1] if '/' in url else url
    
    # Remove version number if present but save it for downloads
    version = re.search(r'v(\d+)$', arxiv_id)
    version_num = version.group(1) if version else None
    arxiv_id = re.sub(r'v\d+$', '', arxiv_id)
    
    api_url = f'https://export.arxiv.org/api/query?id_list={arxiv_id}'
    
    response = httpx.get(api_url)
    
    if response.status_code != 200: raise Exception(f"Failed to fetch arxiv data: {response.status_code}")
    
    root = ET.fromstring(response.text)
    ns = {'arxiv': 'http://www.w3.org/2005/Atom'}
    entry = root.find('arxiv:entry', ns)
    if entry is None: raise Exception("No paper found")
    
    links = entry.findall('arxiv:link', ns)
    pdf_url = next((l.get('href') for l in links if l.get('title') == 'pdf'), None)
    
    result = {
        'title': entry.find('arxiv:title', ns).text.strip(),
        'authors': [author.find('arxiv:name', ns).text for author in entry.findall('arxiv:author', ns)],
        'summary': entry.find('arxiv:summary', ns).text.strip(),
        'published': entry.find('arxiv:published', ns).text,
        'link': entry.find('arxiv:id', ns).text,
        'pdf_url': pdf_url
    }
    
    if save_pdf and pdf_url:
        pdf_response = httpx.get(pdf_url)
        if pdf_response.status_code == 200:
            pdf_filename = f"{arxiv_id}{'v'+version_num if version_num else ''}.pdf"
            pdf_path = os.path.join(save_dir, pdf_filename)
            with open(pdf_path, 'wb') as f:
                f.write(pdf_response.content)
            result['pdf_path'] = pdf_path
    
    source_url = f'https://arxiv.org/e-print/{arxiv_id}{"v"+version_num if version_num else ""}'
    try:
        source_response = httpx.get(source_url)
        if source_response.status_code == 200:
            # Try to extract main tex file from tar archive
            tar_content = io.BytesIO(source_response.content)
            with tarfile.open(fileobj=tar_content, mode='r:*') as tar:
                # Look for main tex file
                tex_files = [f for f in tar.getnames() if f.endswith('.tex')]
                if tex_files:
                    main_tex = tar.extractfile(tex_files[0])
                    result['source'] = main_tex.read().decode('utf-8', errors='ignore')
    except Exception as e:
        result['source_error'] = str(e)
    
    return result

### GitHub Repo

In [None]:
#| export
def _gh_ssh_from_gh_url(gh_repo_address:str):
    "Given a GH URL or SSH remote address, returns a GH URL or None"
    pattern = r'https://github\.com/([^/]+)/([^/]+)(?:/.*)?'
    if gh_repo_address.startswith("git@github.com:"): return gh_repo_address
    elif match := re.match(pattern, gh_repo_address):
        user, repo = match.groups()
        return f'git@github.com:{user}/{repo}.git'
    # Not a GitHub URL or a GitHub SSH remote address
    else: return None

def _get_default_branch(repo_path:str):
    "master or main"
    try:
        result = subprocess.run(['git', 'symbolic-ref', 'refs/remotes/origin/HEAD'], 
                                cwd=repo_path, capture_output=True, text=True, check=True)
        return result.stdout.strip().split('/')[-1]
    except subprocess.CalledProcessError:
        return 'main'  # Default to 'main' if we can't determine the branch

def _get_git_repo(gh_ssh:str):
    "Fetchs from a GH SSH address, returns a path"
    repo_name = gh_ssh.split('/')[-1].replace('.git', '')
    cache_dir = Path(os.environ.get('XDG_CACHE_HOME', Path.home() / '.cache')) / 'contextkit_git_clones'
    cache_dir.mkdir(parents=True, exist_ok=True)
    repo_dir = cache_dir / repo_name

    if repo_dir.exists():
        try:
            subprocess.run(['git', 'fetch'], cwd=repo_dir, check=True, capture_output=True)
            default_branch = _get_default_branch(repo_dir)
            subprocess.run(['git', 'reset', '--hard', f'origin/{default_branch}'], 
                           cwd=repo_dir, check=True, capture_output=True)
            return str(repo_dir)
        except subprocess.CalledProcessError:
            shutil.rmtree(repo_dir)  # Remove the cached directory if update fails

    with tempfile.TemporaryDirectory() as temp_dir:
        try:
            print("Cloning repo.")
            subprocess.run(['git', 'clone', gh_ssh], cwd=temp_dir, check=True, capture_output=False)
            cloned_dir = Path(temp_dir) / repo_name
            shutil.move(str(cloned_dir), str(repo_dir))
            return str(repo_dir)
        except subprocess.CalledProcessError as e:
            print(f"Error cloning repo from cwd {temp_dir} with error {e}")
            return None

In [None]:
#| export
def read_gh_repo(path_or_url:str,    # Repo's GitHub URL, or GH SSH address, or file path
                 as_dict:bool=True,  # if True, will return repo contents {path,content} dict
                 verbose:bool=False  # if True, will log paths of files being read
                ):
    "Repo contents from path, GH URL, or GH SSH address"
    gh_ssh = _gh_ssh_from_gh_url(path_or_url)
    path = path_or_url if not gh_ssh else _get_git_repo(gh_ssh)
    return read_dir(path,verbose=verbose,as_dict=as_dict)

How to use it:

In [None]:
ghurl="https://github.com/AnswerDotAI/claudette"
d = read_gh_repo(ghurl,as_dict=True)
list(d.keys())[:5]

['/Users/jhoward/.cache/contextkit_git_clones/claudette/llms.txt',
 '/Users/jhoward/.cache/contextkit_git_clones/claudette/00_core.ipynb',
 '/Users/jhoward/.cache/contextkit_git_clones/claudette/_quarto.yml',
 '/Users/jhoward/.cache/contextkit_git_clones/claudette/LICENSE',
 '/Users/jhoward/.cache/contextkit_git_clones/claudette/styles.css']

## Export -

In [None]:
#|hide
#|eval: false
from nbdev.doclinks import nbdev_export
nbdev_export()