In [None]:
#| default_exp diff

# diff

> Get ipynb diffs by cell

In [None]:
#| export
import json
from fastcore.utils import *
from fastcore.meta import delegates
from difflib import unified_diff
from fastgit import Git
from execnb.nbio import *

In [None]:
import shutil, tempfile, random

In [None]:
random.seed(42)

In [None]:
td = Path(tempfile.mkdtemp(prefix='nbdiff_test_'))
g = Git(td)
g.init(b='main')
nb_path = td/'test.ipynb'
nb = new_nb(['x=1', 'y=2'])
write_nb(nb, nb_path)
g.add('test.ipynb')
g.commit(m='initial notebook')
nb.cells[0].source = 'x = 100'
nb.cells.append(mk_cell('z=3'))
write_nb(nb, nb_path)

In [None]:
#| export
def read_nb_from_git(
    g:Git, # The git object
    path, # The path to the notebook (absolute or relative to git root)
    ref=None # The git ref to read from (e.g. HEAD); None for working dir
)->AttrDict: # The notebook
    "Read notebook from git ref (e.g. HEAD) at path, or working dir if ref is None"
    path = Path(path)
    if path.is_absolute(): path = path.relative_to(g.top())
    if ref is None: return read_nb(g.top()/path)
    raw = g.show(f'{ref}:{path}', split=False)
    return dict2nb(json.loads(raw))

In [None]:
read_nb_from_git(g, 'test.ipynb', 'HEAD').cells

[{'cell_type': 'code',
  'execution_count': 0,
  'id': '390c8c7d',
  'metadata': {},
  'outputs': [],
  'source': 'x=1',
  'idx_': 0},
 {'cell_type': 'code',
  'execution_count': 0,
  'id': '7247342c',
  'metadata': {},
  'outputs': [],
  'source': 'y=2',
  'idx_': 1}]

In [None]:
#| export
def _nb_srcdict(g:Git, nb_path, ref=None, f=noop):
    "Dict of id->source"
    nb = read_nb_from_git(g, nb_path, ref)
    return {c['id']: f(c) for c in nb.cells}

In [None]:
#| export
def nbs_pair(
    nb_path, # Path to the notebook
    ref_a='HEAD', # First git ref (None for working dir)
    ref_b=None, # Second git ref (None for working dir)
    f=noop  # Function to call on contents
): # Tuple of two notebooks
    "NBs at two refs; None means working dir. By default provides HEAD and working dir"
    nb_path = Path(nb_path).resolve()
    g = Git(nb_path.parent)
    return _nb_srcdict(g, nb_path, ref_a, f), _nb_srcdict(g, nb_path, ref_b, f)

In [None]:
a,b = nbs_pair(nb_path)
a

{'390c8c7d': {'cell_type': 'code',
  'execution_count': 0,
  'id': '390c8c7d',
  'metadata': {},
  'outputs': [],
  'source': 'x=1',
  'idx_': 0},
 '7247342c': {'cell_type': 'code',
  'execution_count': 0,
  'id': '7247342c',
  'metadata': {},
  'outputs': [],
  'source': 'y=2',
  'idx_': 1}}

In [None]:
#| export
def _cell_changes(
    nb_path, # Path to the notebook
    fn, # function to call to get dict values
    ref_a='HEAD', # First git ref (None for working dir)
    ref_b=None, # Second git ref (None for working dir)
    adds=True, # Include cells in b but not in a
    changes=True, # Include cells with different content
    dels=False, # Include cells in a but not in b
    metadata=False, # Consider cell metadata when comparing
    outputs=False # Consider cell outputs when comparing
): # Dict of results
    "Apply fn(cell_id, old_content, new_content) to changed cells between two refs"
    def cell_content(c):
        res = c.get('source', '')
        if metadata: res += '\n# metadata: ' + json.dumps(c.get('metadata', {}), sort_keys=True)
        if outputs: res += '\n# outputs: ' + json.dumps(c.get('outputs', []), sort_keys=True)
        return res
    old,new = nbs_pair(nb_path, ref_a, ref_b, f=cell_content)
    res = {}
    if adds: res |= {cid: fn(cid, '', new[cid]) for cid in new if cid not in old}
    if changes: res |= {cid: fn(cid, old[cid], new[cid]) for cid in new if cid in old and new[cid] != old[cid]}
    if dels: res |= {cid: fn(cid, old[cid], '') for cid in old if cid not in new}
    return res

In [None]:
#| export
@delegates(_cell_changes)
def changed_cells(nb_path, **kwargs):
    "Return set of cell IDs for changed/added/deleted cells between two refs"
    def f(cid,o,n): return cid
    return set(_cell_changes(nb_path, f, **kwargs).keys())

In [None]:
changed_cells(td/'test.ipynb')

{'390c8c7d', 'd8100f2f'}

In [None]:
#| export
def source_diff(
    old_source, # Original source string
    new_source # New source string
): # Unified diff string
    "Return unified diff string for source change"
    return '\n'.join(unified_diff(old_source.splitlines(), new_source.splitlines(), lineterm=''))

In [None]:
print(source_diff('x = 1\ny=2', 'x = 100\ny=2'))

--- 
+++ 
@@ -1,2 +1,2 @@
-x = 1
+x = 100
 y=2


In [None]:
#| export
@delegates(_cell_changes)
def cell_diffs(nb_path, **kwargs):
    "{cell_id:diff} for changed/added/deleted cells between two refs"
    def f(cid,o,n): return source_diff(o,n)
    return _cell_changes(nb_path, f, **kwargs)

In [None]:
d = cell_diffs(td/'test.ipynb')
d

{'d8100f2f': '--- \n+++ \n@@ -0,0 +1 @@\n+z=3',
 '390c8c7d': '--- \n+++ \n@@ -1 +1 @@\n-x=1\n+x = 100'}

In [None]:
g.add('test.ipynb')
g.commit(m='update notebook')
assert not changed_cells(td/'test.ipynb')
assert not cell_diffs(td/'test.ipynb')

In [None]:
shutil.rmtree(td)

## export -

In [None]:
#| hide
from nbdev import nbdev_export
nbdev_export()