We will build a simple ingestion pipeline to ingest pdf documents into litesearch database for searching.

In [None]:
#| default_exp data

In [None]:
#| export
from fastcore.all import L, concat, patch, ifnone, Path, delegates, globtastic, parallel, type2str
from pymupdf import Document, Pixmap, csRGB, Page

Extensions to pymupdf Document and Page classes to extract texts, images and links

In [None]:
#| export
@patch
def get_texts(self: Document, st=0, end=-1, **kw):
	return L(self[st:end]).map(lambda p: p.get_text(**kw))

@patch
def get_links(self: Document, st=0, end=-1):
	return L(self[st:end]).map(lambda p: p.get_links()).concat()

@patch
def ext_im(self: Document, it=None):
    if not it: return None
    assert isinstance(it, tuple) and len(it) > 2, 'Invalid image tuple'
    xref, smask = it[0], it[1]
    if smask > 0:
        pix0 = Pixmap(self.extract_image(xref)['image'])
        if pix0.alpha: pix0 = Pixmap(pix0, 0)  # remove alpha channel
        mask = Pixmap(self.extract_image(smask)['image'])
        try: pix = Pixmap(pix0, mask)
        except: pix = Pixmap(self.extract_image(xref)['image'])
        ext = 'pam' if pix0.n > 3 else 'png'
        return dict(ext=ext, colorspace=pix.colorspace.n, image=pix.tobytes(ext))
    if '/ColorSpace' in self.xref_object(xref, compressed=True):
        pix = Pixmap(csRGB, Pixmap(self, xref))
        return dict(ext='png', colorspace=3, image=pix.tobytes('png'))
    return self.extract_image(xref)

@patch
def ext_imgs(self: Document, st=0, end=-1):
	f = lambda p: [ext_im(self,it) for it in p.get_images(full=True)]
	return L(self[st:end]).map(f).concat()

Code extraction utilities

In [None]:
#| export
def pyparse(p:Path=None,    # path to a python file
            code:str=None,  # code string to parse
            imports=False   # include import statements as code chunks
) -> L:
    'Parse a code string or python file and return code chunks as list of dicts with content and metadata.'
    assert bool(code) ^ bool(p), 'Either code or p must be provided, not both.'
    if not code: code = Path(p).read_text(encoding='utf-8')
    import ast
    from ast import get_source_segment as gs
    tree=ast.parse(code)
    [setattr(c,'parent',n) for n in ast.walk(tree) for c in ast.iter_child_nodes(n)]
    def meta(xtra=None): return dict(path=p,uploaded_at=Path(p).stat().st_mtime if p else None,**ifnone(xtra, {}))
    def n2c(n): return dict(content=gs(code, n).strip(), metadata=meta(dict(name=getattr(n,'name',None), type=type2str(n.__class__), lineno=getattr(n,'lineno',None), end_lineno=getattr(n,'end_lineno',None))))
    def is_mod(n): return isinstance(n, (ast.FunctionDef, ast.AsyncFunctionDef, ast.ClassDef))
    def is_assign(n): return isinstance(n, (ast.Assign, ast.AnnAssign)) and n.value
    def is_p_mod(n): return getattr(getattr(n,'parent',None),'__class__',None) == ast.Module
    def is_allowed(n): return is_p_mod(n) and (is_mod(n) or is_assign(n) or (imports and isinstance(n, ast.ImportFrom)))
    return L(ast.walk(tree)).filter(is_allowed).map(n2c)

You can use `pyparse` to extract code chunks from a python file or code string.

In [None]:
txt = """
from fastcore.all import *
a=1
class SomeClass:
    def __init__(self,x): store_attr()
    def method(self): return self.x + a
 """
pyparse(code=txt)

(#2) [{'content': 'a=1', 'metadata': {'path': None, 'uploaded_at': None, 'name': None, 'type': 'Assign', 'lineno': 3, 'end_lineno': 3}},{'content': 'class SomeClass:\n    def __init__(self,x): store_attr()\n    def method(self): return self.x + a', 'metadata': {'path': None, 'uploaded_at': None, 'name': 'SomeClass', 'type': 'ClassDef', 'lineno': 4, 'end_lineno': 6}}]

Setting imports to True will also include import statements as code chunks.

In [None]:
pyparse(code=txt, imports=True)

(#3) [{'content': 'from fastcore.all import *', 'metadata': {'path': None, 'uploaded_at': None, 'name': None, 'type': 'ImportFrom', 'lineno': 2, 'end_lineno': 2}},{'content': 'a=1', 'metadata': {'path': None, 'uploaded_at': None, 'name': None, 'type': 'Assign', 'lineno': 3, 'end_lineno': 3}},{'content': 'class SomeClass:\n    def __init__(self,x): store_attr()\n    def method(self): return self.x + a', 'metadata': {'path': None, 'uploaded_at': None, 'name': 'SomeClass', 'type': 'ClassDef', 'lineno': 4, 'end_lineno': 6}}]

In [None]:
#| export
py_dir_skip_re=r'(^tests?$|^__pycache__$|^\.eggs$|^\.mypy_cache$|^\.tox$|^examples?$|^docs?$|^build$|^dist$|^\.git$|^\.ipynb_checkpoints$)'
py_file_skip_re=r'(^__init__\.py$|^setup\.py$|^conftest\.py$|^test_.*\.py$|^tests?\.py$|^.*_test\.py$)'
py_glob, skip_py_glob = '*.py', '_*'

@delegates(globtastic)
def pkg2files(pkg:str,								# package name
              file_glob:str=py_glob,				# file glob to match
              skip_file_glob:str=skip_py_glob,		# file glob to skip
              skip_file_re=py_file_skip_re, 		# regex to skip files
              skip_folder_re=py_dir_skip_re, 		# regex to skip folders
			  **kwargs								# additional args to pass to globtastic
)->L:
	'Return list of python files in a package excluding tests and setup files.'
	from importlib.util import find_spec as fs
	if not fs(pkg): return L()
	return globtastic(Path(fs(pkg).origin).parent,file_glob=file_glob,skip_file_glob=skip_file_glob,
		folder_re=pkg, skip_folder_re=skip_folder_re, skip_file_re=skip_file_re, **kwargs)

def pkg2chunks(pkg:str,             # package name
               imports:bool=False,  # include import statements as code chunks
               **kw                 # additional args to pass to pkg2files
)->L:
    'Return code chunks from a package with extra metadata.'
    from importlib.metadata import version
    upd_v = lambda d: d['metadata'].update(dict(package=pkg, version=version(pkg)))
    return parallel(pyparse, pkg2files(pkg,**kw), imports=imports).concat().map(lambda d: upd_v(d) or d)

`pkg2chunks` can be used to extract code chunks from an entire package installed in your environment.

In [None]:
chunks=pkg2chunks('fastlite')
chunks.filter(lambda d: d['metadata']['type']=='FunctionDef')[0]

{'content': 'def t(self:Database): return _TablesGetter(self)',
 'metadata': {'path': '/Users/71293/code/litesearch/.venv/lib/python3.13/site-packages/fastlite/core.py',
  'uploaded_at': 1752468812.9739048,
  'name': 't',
  'type': 'FunctionDef',
  'lineno': 44,
  'end_lineno': 44,
  'package': 'fastlite',
  'version': '0.2.1'}}

In [None]:
#| export
def installed_packages(nms:list=None # list of package names
)->L:
    'Return list of installed packages. If nms is provided, return only those packages.'
    from importlib.util import find_spec as fs
    from importlib.metadata import distributions as dists, distribution as dist
    not_stdlib = lambda d: d.metadata.get('Author-email') not in ('Python', None)
    pkgs = L(nms).filter(fs).map(dist) if nms else L(dists())
    return pkgs.filter(not_stdlib).map(lambda d: d.metadata['Name'])

Get list of installed packages in your environment using `installed_packages`. If you pass a list of package names, it only returns them if they exist in your environment.

In [None]:
installed_packages(['fstlite']) # non existent package
installed_packages(['fastlite']) # existing package
installed_packages() # all installed packages that are not stdlib

(#179) ['litesearch','shellingham','jiter','ipykernel','simsimd','threadpoolctl','coloredlogs','uri-template','humanfriendly','socksio','rfc3339-validator','pexpect','jupyterlab-quarto','fqdn','requests','babel','rich','traitlets','tokenizers','urllib3'...]

Query Preprocessing utilities

In [None]:
#| export
def clean(q:str  # query to be passed for fts search
          ):
    '''Clean the query by removing * and returning None for empty queries.'''
    return q.replace('*', '') if q.strip() else None

def add_wc(q:str  # query to be passed for fts search
           ):
    '''Add wild card * to each word in the query.'''
    return ' '.join(map(lambda w: w + '*', q.split(' ')))

def mk_wider(q:str  # query to be passed for fts search
             ):
    '''Widen the query by joining words with OR operator.'''
    return ' OR '.join(map(lambda w: f'{w}', q.split(' ')))

def kw(q:str  # query to be passed for fts search
       ):
    '''Extract keywords from the query using YAKE library.'''
    from yake import KeywordExtractor as KW
    return ' '.join((set(concat([k.split(' ') for k, s in KW().extract_keywords(q)]))))

def pre(q:str,          # query to be passed for fts search
        wc=True,        # add wild card to each word
        wide=True,      # widen the query with OR operator
        extract_kw=True # extract keywords from the query
        ):
    '''Preprocess the query for fts search.'''
    q = clean(q)
    if not q.strip(): return ''
    if extract_kw: q = kw(q)
    if wc: q = add_wc(q)
    if wide: q = mk_wider(q)
    return q

You can clean queries passed into fts search using `clean`, add wild cards using `add_wc`, widen the query using `mk_wider` and extract keywords using `kw`. You can combine all these using `pre` function.

In [None]:
q = 'This is a sample query'
print('preprocessed q with defaults: `%s`' %pre(q))
print('keywords extracted: `%s`' %pre(q, wc=False, wide=False))
print('q with wild card: `%s`' %pre(q, extract_kw=False, wide=False, wc=True))

preprocessed q with defaults: `query* OR sample*`
keywords extracted: `query sample`
q with wild card: `This* is* a* sample* query*`


In [None]:
#| hide
import nbdev; nbdev.nbdev_export()