In [42]:
# reload python modules
%load_ext autoreload
%autoreload 2

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [43]:
import pandas as pd
from snapdiff.snapper import snapper

In [45]:
from snapdiff.utils import get_path, get_normalized_code

In [46]:
get_path(get_normalized_code)

'snapdiff/utils.py'

In [13]:
@snapper(mode='diff')
def function(s, x, y):
    return x + y + s

In [15]:
function(1, 2, 3)

6

In [6]:
from deepdiff import DeepDiff, Delta

In [7]:
from pathlib import Path

In [8]:
pathone = Path('snapdiff/snapshots/function/1/2/3/1.json')
pathtwo = Path('snapdiff/snapshots/function/1/2/3/1.json')

In [9]:
str(pathone)

'snapdiff/snapshots/function/1/2/3/1.json'

In [10]:
pathone == pathtwo

True

In [11]:
pathone = Path('snapdiff/snapshots/function/1/2/3/1.json')
pathtwo = Path('snapdiff/snapshots/function/1/2/3/1.json')
b = DeepDiff([2,[pathone, 1,6], 1,2,8], [0, [pathtwo, 1,6],pathtwo,2,3])
Delta(b)

<Delta: {'type_changes': {'root[2]': {'old_type': <class 'int'>, 'new_type': <class 'pathlib.PosixPath'>,...}>

In [12]:
from datadiff import diff

In [33]:
from deepdiff import DeepDiff
from deepdiff.operator import BaseOperator
from pathlib import Path

class PathInsensitiveOperator(BaseOperator):
    def match(self, level):
        # Only match when both objects are Path instances
        return isinstance(level.t1, Path) and isinstance(level.t2, Path)
    
    def give_up_diffing(self, level, diff_instance):
        # Compare paths as lowercase resolved strings for insensitivity
        path1, path2 = map(lambda p: p.resolve().as_posix().lower(), (level.t1, level.t2))
        return path1 == path2  # Return True if paths are considered the same

# Instantiate the operator and use it in DeepDiff
path_operator = PathInsensitiveOperator()
diff = DeepDiff([pathtwo, 5, 7], [1, 5,7],
    custom_operators=[PathInsensitiveOperator()]
)
print(diff)  # Should show no difference if paths match under custom comparison


{'type_changes': {'root[0]': {'old_type': <class 'pathlib.PosixPath'>, 'new_type': <class 'int'>, 'old_value': PosixPath('snapdiff/snapshots/function/1/2/3/1.json'), 'new_value': 1}}}


In [50]:
from deepdiff import DeepDiff
from deepdiff.operator import BaseOperator
from pathlib import Path

class PathComparisonOperator(BaseOperator):
    def match(self, level):
        # Ensure both objects at this level are Path instances
        return isinstance(level.t1, Path) and isinstance(level.t2, Path)
    
    def give_up_diffing(self, level, diff_instance):
        # Convert paths to lowercase and resolve for comparison
        path1, path2 = map(lambda p: p.resolve().as_posix().lower(), (level.t1, level.t2))
        if path1 != path2:
            # Use custom_report_result to add a custom difference entry
            diff_instance.custom_report_result(
                "path_difference", 
                level, 
                {"old_value": path1, "new_value": path2}
            )
            return True  # Stop further diffing at this level
        return False  # Paths are the same, continue with standard diffing


# Testing the custom operator in DeepDiff
path_operator = PathComparisonOperator()
diff = DeepDiff(
    {"file_path": Path("path/to/file")},
    {"file_path": pathtwo},
    custom_operators=[path_operator]
)
print(diff)


here
{'path_difference': {"root['file_path']": {'old_value': '/home/ahmed/desktop/learning/snapdiff/snapdiff/path/to/file', 'new_value': '/home/ahmed/desktop/learning/snapdiff/snapdiff/snapdiff/snapshots/function/1/2/3/1.json'}}}


In [36]:
import pandas as pd
from deepdiff import DeepDiff
from deepdiff.operator import BaseOperator

class DataFrameComparisonOperator(BaseOperator):
    def __init__(self, ignore_columns=None, ignore_index=True):
        self.ignore_columns = ignore_columns or []
        self.ignore_index = ignore_index
    
    def match(self, level):
        # Ensure both objects at this level are DataFrames
        return isinstance(level.t1, pd.DataFrame) and isinstance(level.t2, pd.DataFrame)
    
    def give_up_diffing(self, level, diff_instance):
        # Remove specified columns
        df1 = level.t1.drop(columns=self.ignore_columns, errors='ignore')
        df2 = level.t2.drop(columns=self.ignore_columns, errors='ignore')
        
        # Reset indices if ignoring index
        if self.ignore_index:
            df1 = df1.reset_index(drop=True)
            df2 = df2.reset_index(drop=True)
        
        # Compare DataFrames for equality
        return df1.equals(df2)

# Sample DataFrames for comparison
df1 = pd.DataFrame({
    'A': [1, 2, 3],
    'B': ['x', 'y', 'z']
})
df2 = pd.DataFrame({
    'A': [1, 2, 3],
    'B': ['x', 'y', 'w']  # Note the change in the last row
})

# Instantiate and use the custom operator in DeepDiff
df_operator = DataFrameComparisonOperator(ignore_columns=['B'], ignore_index=True)
diff = DeepDiff(
    {'data': df1},
    {'data': df2},
    custom_operators=[df_operator]
)
print(diff)  # Displays differences if the DataFrames are not equal


{}


In [20]:
from deepdiff import DeepDiff
from deepdiff.operator import PrefixOrSuffixOperator
t1 = {
    "key1": ["foo", "bar's food", "jack", "joe"]
}
t2 = {
    "key1": ["foo", "bar", "jill", "joe'car"]
}

DeepDiff(t1, t2)


{'values_changed': {"root['key1'][1]": {'new_value': 'bar',
   'old_value': "bar's food"},
  "root['key1'][2]": {'new_value': 'jill', 'old_value': 'jack'},
  "root['key1'][3]": {'new_value': "joe'car", 'old_value': 'joe'}}}

In [21]:

DeepDiff(t1, t2, custom_operators=[
    PrefixOrSuffixOperator()
])


{'values_changed': {"root['key1'][2]": {'new_value': 'jill',
   'old_value': 'jack'}}}

In [74]:
[i for i in dir(a) if not i.startswith('_')]

['CACHE_AUTO_ADJUST_THRESHOLD',
 'affected_paths',
 'affected_root_keys',
 'cache_size',
 'cache_tuning_sample_size',
 'clear',
 'copy',
 'custom_operators',
 'custom_report_result',
 'cutoff_distance_for_pairs',
 'cutoff_intersection_for_pairs',
 'deephash_parameters',
 'encodings',
 'exclude_obj_callback',
 'exclude_obj_callback_strict',
 'exclude_paths',
 'exclude_regex_paths',
 'exclude_types',
 'exclude_types_tuple',
 'from_json_pickle',
 'fromkeys',
 'get',
 'get_ignore_types_in_groups',
 'get_significant_digits',
 'get_stats',
 'group_by',
 'group_by_sort_key',
 'hasher',
 'ignore_encoding_errors',
 'ignore_nan_inequality',
 'ignore_numeric_type_changes',
 'ignore_order',
 'ignore_order_func',
 'ignore_private_variables',
 'ignore_string_case',
 'ignore_string_type_changes',
 'ignore_type_in_groups',
 'ignore_type_subclasses',
 'include_obj_callback',
 'include_obj_callback_strict',
 'include_paths',
 'is_root',
 'items',
 'iterable_compare_func',
 'keys',
 'log_scale_similarity

In [32]:
aa = {'a': 1, 'b': 2, 'c': 3, 'd': 4, 'e': 5}
bb = aa.copy()
bb['b'] = 33
aadf = pd.DataFrame(aa, index=[0])
bbdf = pd.DataFrame(bb, index=[0])


diff = DeepDiff(aadf, bbdf)


In [37]:
import inspect

In [39]:
def function(s, x, y):
    return x + y + s

In [41]:
inspect.getfile(function)

'/tmp/ipykernel_326473/156289867.py'

In [34]:
a.to_dict()

{}

In [12]:
from snapdiff.utils import get_normalized_code

In [21]:
import ast
import hashlib
import inspect

class NormalizeNames(ast.NodeTransformer):
    def __init__(self):
        self.func_name_counter = 0
        self.var_name_counter = 0
        self.func_name_map = {}
        self.var_name_map = {}

    def visit_FunctionDef(self, node):
        # Assign a generic name to function names
        if node.name not in self.func_name_map:
            self.func_name_map[node.name] = f"func_{self.func_name_counter}"
            self.func_name_counter += 1
        node.name = self.func_name_map[node.name]
        # Continue transforming function arguments and body
        self.generic_visit(node)
        return node

    def visit_Name(self, node):
        # Assign generic names to variable names used in the function
        if isinstance(node.ctx, ast.Store) or isinstance(node.ctx, ast.Load):
            if node.id not in self.var_name_map:
                self.var_name_map[node.id] = f"var_{self.var_name_counter}"
                self.var_name_counter += 1
            node.id = self.var_name_map[node.id]
        return node

def get_normalized_code(func):
    # Get the source code of the function
    source_code = inspect.getsource(func)
    # Parse the source code into an Abstract Syntax Tree
    parsed_code = ast.parse(source_code)
    # Normalize function and variable names
    normalizer = NormalizeNames()
    normalized_tree = normalizer.visit(parsed_code)
    # Convert the normalized AST back to source code (as a string) for hashing
    normalized_code = ast.dump(normalized_tree, annotate_fields=False)
    # Hash the normalized code
    code_hash = hashlib.sha256(normalized_code.encode()).hexdigest()
    return code_hash, normalized_code

In [22]:
def buble_sort(l):
    n = len(l)
    for i in range(n):
        for j in range(0, n-i-1):
            if l[j] > l[j+1]:
                l[j], l[j+1] = l[j+1], l[j]
    return l

In [23]:
hash_, norm = get_normalized_code(buble_sort)

In [25]:
def buble_sort2(l):
    n = len(l)
    for i in range(n):
        for j in range(0, n-i-1):
            if l[j] > l[j+1]:
                l[j], l[j+1] = l[j+1], l[j]
    return l

In [26]:
hash_2, norm2 = get_normalized_code(buble_sort2)

In [34]:
print(norm2)

Module([FunctionDef('func_0', arguments([], [arg('l')], kwonlyargs=[], kw_defaults=[], defaults=[]), [Assign([Name('var_0', Store())], Call(Name('var_1', Load()), [Name('var_2', Load())], [])), For(Name('var_3', Store()), Call(Name('var_4', Load()), [Name('var_0', Load())], []), [For(Name('var_5', Store()), Call(Name('var_4', Load()), [Constant(0), BinOp(BinOp(Name('var_0', Load()), Sub(), Name('var_3', Load())), Sub(), Constant(1))], []), [If(Compare(Subscript(Name('var_2', Load()), Name('var_5', Load()), Load()), [Gt()], [Subscript(Name('var_2', Load()), BinOp(Name('var_5', Load()), Add(), Constant(1)), Load())]), [Assign([Tuple([Subscript(Name('var_2', Load()), Name('var_5', Load()), Store()), Subscript(Name('var_2', Load()), BinOp(Name('var_5', Load()), Add(), Constant(1)), Store())], Store())], Tuple([Subscript(Name('var_2', Load()), BinOp(Name('var_5', Load()), Add(), Constant(1)), Load()), Subscript(Name('var_2', Load()), Name('var_5', Load()), Load())], Load()))], [])], [])], [

In [49]:
import inspect
import ast
import hashlib
import yaml
import os
import json
from deepdiff import DeepDiff
import astor


def get_normalized_code(func):
    # get the source code of the function
    source_code = inspect.getsource(func)
    # parse the source code into an Abstract Syntax Tree
    parsed_code = ast.parse(source_code)
    # convert the Abstract Syntax Tree back to normalized source code
    normalized_code = ast.dump(parsed_code, annotate_fields=True, indent=4)
    # hash the normalized code
    formatted_code = astor.to_source(parsed_code)
    code_hash = hashlib.sha256(normalized_code.encode()).hexdigest()
    return code_hash, normalized_code, formatted_code


In [50]:
get_normalized_code(function)[1])

Module(
    body=[
        FunctionDef(
            name='function',
            args=arguments(
                posonlyargs=[],
                args=[
                    arg(arg='s'),
                    arg(arg='x'),
                    arg(arg='y')],
                kwonlyargs=[],
                kw_defaults=[],
                defaults=[]),
            body=[
                Expr(
                    value=Constant(value='asdfasdf')),
                Return(
                    value=BinOp(
                        left=BinOp(
                            left=Name(id='x', ctx=Load()),
                            op=Add(),
                            right=Name(id='y', ctx=Load())),
                        op=Add(),
                        right=Call(
                            func=Name(id='str', ctx=Load()),
                            args=[
                                Name(id='s', ctx=Load())],
                            keywords=[])))],
            decorator_list=[
      

In [52]:
DeepDiff('asdf', 'sdfg', )

{'values_changed': {'root': {'new_value': 'sdfg', 'old_value': 'asdf'}}}

In [55]:
i = 0
a = i if i else (2 if 2==2 else 3)

In [56]:
a

2

In [None]:
# 1 - the function name changed but the hash is the same
# 2 - the function name changed and the hash is different
# 3 - the function name is the same and the hash is different
# 4 - the function name is the same and the hash is the same

In [21]:
import json
import uuid

ID_FILE = "snapper_ids.json"

# Load ID mappings from a file
def load_id_map():
    try:
        with open(ID_FILE, "r") as f:
            return json.load(f)
    except FileNotFoundError:
        return {}

# Save ID mappings to a file
def save_id_map(id_map):
    with open(ID_FILE, "w") as f:
        json.dump(id_map, f)

# Decorator function to automatically assign a unique ID to each function
def snapper():
    def decorator(func):
        id_map = load_id_map()

        # If the function has been wrapped before, reuse its ID
        if func.__name__ in id_map:
            unique_id = id_map[func.__name__]
        else:
            # Generate a new ID for a new function
            unique_id = str(uuid.uuid4())
            id_map[func.__name__] = unique_id
            save_id_map(id_map)  # Save the updated mapping

        # Assign the ID to the function (or Snapper instance)
        func._snapper_id = unique_id

        # Wrap the function as needed here
        def wrapper(*args, **kwargs):
            print(f"Function {func.__name__} has ID: {func._snapper_id}")
            return func(*args, **kwargs)

        return wrapper

    return decorator



In [35]:

# Example usage
@snapper()
def func3(x, y, z):
    return x + y + z


In [36]:

# Run functions to see the ID assignment
func3(1, 2, 3)  # This should print the ID for func1

Function func3 has ID: a3d671bc-ede7-4653-914a-5f41df6b2b64


6

Function func1 has ID: fe79bafc-1880-4ff8-8c24-9a69973fb80e


3

In [25]:

func2(3, 4)  # This should print the same ID if it's logically the same function


Function func2 has ID: d543b916-a0a7-49e9-b2d1-a859aec8f11e


12

In [57]:
a = set([1,3,2, 8]) 
b = set([1,2,3, 7])

In [59]:
# diff between a and b what's in a that's not in b and what's in b that's not in a
a.difference(b), b.difference(a)

({8}, {7})

In [63]:
df = pd.DataFrame({'a': [1, 2, 3, ], 'b': [4, 5, 6, ], 'string': ['a', 'b', 'c']})
df2 = df = pd.DataFrame({'a': [1, 2, 3, ], 'b': ['4', '5', '6'], 'string': ['a', 'b', 'c']})

In [64]:
aa = df.dtypes.to_dict()
bb = df2.dtypes.to_dict()

In [65]:
DeepDiff(aa, bb)

{}

In [91]:


class compareDataFrames(BaseOperator):
    def __init__(self, shallow_diff=True):
        self.shallow_diff = shallow_diff
    
    def match(self, level):
        # Only match when both objects are DataFrames
        return (
            level.t1.__class__.__name__ == "DataFrame"
            and level.t2.__class__.__name__ == "DataFrame"
        )

    def shalow_diff(self, level):
        diff_shape = False
        
        shadow_diff = {}
        if level.t1.shape != level.t2.shape:
            shadow_diff["shape"] = {
                    "old_value": level.t1.shape,
                    "new_value": level.t2.shape,
                }
            diff_shape = True
            
        old_cols = set(level.t1.columns)
        new_cols = set(level.t2.columns)
        if old_cols != new_cols:
            shadow_diff["columns"] = {
                    "columns in old df not in new": old_cols - new_cols,
                    "columns in new df not in old": new_cols - old_cols,
                }

        old_types_dict = level.t1.dtypes.to_dict()
        new_types_dict = level.t2.dtypes.to_dict()
        types_diff = {}
        for col in old_cols.intersection(new_cols):
            if old_types_dict[col] != new_types_dict[col]:
                types_diff[col] = {
                    "old_value": old_types_dict[col],
                    "new_value": new_types_dict[col],
                }
        if types_diff:
            shadow_diff["column_types"] = types_diff
        
        if not diff_shape:
            # index changed
            if not level.t1.index.equals(level.t2.index):
                shadow_diff["index"] = 'index changed'
        
        return shadow_diff
        
    def give_up_diffing(self, level, diff_instance):
        if self.shallow_diff:
            diff = self.shalow_diff(level)
            if diff:
                diff_instance.custom_report_result("DataFrame_difference", level, diff)
                return True
        return False
        

In [92]:
df2

Unnamed: 0,a,b,string
0,1,4,a
1,2,5,b
2,3,6,c


In [106]:
df['a'][1] = 15

A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df['a'][1] = 15


In [120]:
df['string'][2] = 'asdf'

You are setting values through chained assignment. Currently this works in certain cases, but when using Copy-on-Write (which will become the default behaviour in pandas 3.0) this will never work to update the original DataFrame or Series, because the intermediate object on which we are setting values will behave as a copy.
A typical example is when you are setting values in a column of a DataFrame, like:

df["col"][row_indexer] = value

Use `df.loc[row_indexer, "col"] = values` instead, to perform the assignment in a single step and ensure this keeps updating the original `df`.

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy

  df['string'][2] = 'asdf'
A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df['string'][2] = 'asdf'


In [114]:
df2 = df.copy()

In [117]:
df

Unnamed: 0,a,b,string
0,1,4,a
1,15,5,b
2,3,6,c


In [121]:
df.compare(df2)

Unnamed: 0_level_0,a,a,string,string
Unnamed: 0_level_1,self,other,self,other
1,15.0,16.0,,
2,,,asdf,c


In [122]:
df_operator = compareDataFrames(shallow_diff=True)

DeepDiff(
    [df1, 5, 7],
    [df2, 5, 9],
    custom_operators=[df_operator]
)

{'values_changed': {'root[2]': {'new_value': 9, 'old_value': 7}},
 'DataFrame_difference': {'root[0]': {'shape': {'old_value': (3, 2),
    'new_value': (3, 3)},
   'columns': {'columns in old df not in new': {'A', 'B'},
    'columns in new df not in old': {'a', 'b', 'string'}}}}}

pandas.core.series.Series

In [125]:
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.svm import SVC

classification_pipeline = Pipeline([
    ('scaler', StandardScaler()),
    ('pca', PCA(n_components=2)),  # Reducing to 2 principal components for simplicity
    ('svc', SVC(kernel='linear', C=1.0, random_state=42))
])


In [131]:
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, PolynomialFeatures
from sklearn.linear_model import Ridge

regression_pipeline1 = Pipeline([
    ('poly_features', PolynomialFeatures(degree=2, include_bias=False)),
    ('scaler', StandardScaler()),
    ('ridge', Ridge(alpha=1.0))
])

regression_pipeline2 = Pipeline([
    ('poly_features', PolynomialFeatures(degree=2, include_bias=False)),
    ('scaler', StandardScaler()),
    ('ridge', Ridge(alpha=0.9))
])


In [132]:
DeepDiff(regression_pipeline1.steps, regression_pipeline2.steps)

{'values_changed': {'root[2][1].alpha': {'new_value': 0.9, 'old_value': 1.0}},
 'unprocessed': ['root[0][1]: PolynomialFeatures(include_bias=False) and PolynomialFeatures(include_bias=False)']}

In [133]:
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import PolynomialFeatures, StandardScaler
from sklearn.linear_model import Ridge

# Create regression pipelines with different configurations
regression_pipeline1 = Pipeline(steps=[
    ('poly_features', PolynomialFeatures(degree=2, include_bias=False)),
    ('scaler', StandardScaler()),
    ('ridge', Ridge(alpha=1.0))
])

regression_pipeline2 = Pipeline(steps=[
    ('poly_features', PolynomialFeatures(degree=2, include_bias=False)),
    ('scaler', StandardScaler()),
    ('ridge', Ridge(alpha=0.9))
])

# Convert pipeline steps to a dictionary to provide custom keys in DeepDiff
pipeline1_dict = {name: step for name, step in regression_pipeline1.steps}
pipeline2_dict = {name: step for name, step in regression_pipeline2.steps}

# Perform comparison with DeepDiff
from deepdiff import DeepDiff
diff = DeepDiff(pipeline1_dict, pipeline2_dict)
print(diff)


{'values_changed': {"root['ridge'].alpha": {'new_value': 0.9, 'old_value': 1.0}}, 'unprocessed': ["root['poly_features']: PolynomialFeatures(include_bias=False) and PolynomialFeatures(include_bias=False)"]}


In [139]:
type(PolynomialFeatures())

sklearn.preprocessing._polynomial.PolynomialFeatures

In [140]:
type(Ridge())

sklearn.linear_model._ridge.Ridge

In [142]:
#  make classificaiton pipeline
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.datasets import make_classification

In [143]:


classification_pipeline = Pipeline([
    ('scaler', StandardScaler()),
    ('pca', PCA(n_components=2)),  # Reducing to 2 principal components for simplicity
    ('svc', SVC(kernel='linear', C=1.0, random_state=42))
])

X, y = make_classification(n_samples=100, n_features=10, n_classes=2, random_state=42)

classification_pipeline.fit(X, y)

In [150]:
classification_pipeline.steps[1][1].get_params()

{'copy': True,
 'iterated_power': 'auto',
 'n_components': 2,
 'n_oversamples': 10,
 'power_iteration_normalizer': 'auto',
 'random_state': None,
 'svd_solver': 'auto',
 'tol': 0.0,
 'whiten': False}

In [169]:
from sklearn.pipeline import Pipeline, FeatureUnion

from sklearn.pipeline import Pipeline, FeatureUnion
from sklearn.compose import ColumnTransformer

def get_pipeline_params(pipeline):
    def retrieve_params(step, step_name=""):
        params_dict = {}
        
        if isinstance(step, Pipeline):
            # Handle standard pipeline
            for sub_step_name, sub_step in step.steps:
                full_step_name = f"{step_name}__{sub_step_name}" if step_name else sub_step_name
                params_dict[full_step_name] = retrieve_params(sub_step, full_step_name)
                
        elif isinstance(step, FeatureUnion):
            # Handle FeatureUnion specifically
            for sub_step_name, sub_step in step.transformer_list:
                full_step_name = f"{step_name}__{sub_step_name}" if step_name else sub_step_name
                params_dict[full_step_name] = retrieve_params(sub_step, full_step_name)
                
        elif isinstance(step, ColumnTransformer):
            # Handle ColumnTransformer specifically
            for sub_step_name, sub_step, _ in step.transformers:
                full_step_name = f"{step_name}__{sub_step_name}" if step_name else sub_step_name
                params_dict[full_step_name] = retrieve_params(sub_step, full_step_name)
                
        else:
            # Handle individual transformers or estimators
            params_dict = step.get_params()
        
        return params_dict

    return retrieve_params(pipeline)


# Example usage with a nested pipeline
from sklearn.linear_model import Ridge
from sklearn.preprocessing import StandardScaler, PolynomialFeatures

nested_pipeline = Pipeline([
    ('preprocessing', FeatureUnion([
        ('poly_features', Pipeline([
            ('poly', PolynomialFeatures(degree=2, include_bias=False)),
            ('scaler', StandardScaler())
        ])),
        
        ('scaler', StandardScaler())
    ])),
    ('col_transformer', ColumnTransformer([
            ('poly_features', PolynomialFeatures(degree=2, include_bias=False), ['feature1', 'feature2']),
            ('scaler', StandardScaler(), ['feature3'])
        ])),
    ('regressor', Ridge(alpha=1.0))
])

# Retrieve the parameters as a nested dictionary
params1 = get_pipeline_params(nested_pipeline)
nested_pipeline.steps[0][1].transformer_list[0][1].steps[0][1].include_bias = True
params2 = get_pipeline_params(nested_pipeline)


In [170]:
DeepDiff(params1, params2)

{'values_changed': {"root['preprocessing']['preprocessing__poly_features']['preprocessing__poly_features__poly']['include_bias']": {'new_value': True,
   'old_value': False}}}

In [172]:
type(regression_pipeline1)

sklearn.pipeline.Pipeline

In [1]:
from snapdiff.invoke_utils import add_decorator_to_functions

In [2]:
add_decorator_to_functions('example.py', 'snapper', {'mode': 'diff'})

Decorator 'snapper' added to all functions in example.py


In [87]:
file_path = 'example.py'


In [88]:


with open(file_path, "r") as file:
    file_content = file.read()


In [89]:
import ast
# Parse the file content into an AST
tree = ast.parse(file_content)


In [90]:
clean = lambda x: [i for i in dir(x) if not i.startswith('_')]

In [91]:
tree.body[0].decorator_list[0].func.id

'snapper'

In [92]:
[i for i in dir(tree.body[0].decorator_list[0]) if not i.startswith('_')]

['args',
 'col_offset',
 'end_col_offset',
 'end_lineno',
 'func',
 'keywords',
 'lineno']

In [93]:
decorator_params = {'mode': 'snap'}
decorator_name = 'snapper'

# Build the decorator string with or without parameters
if decorator_params:
    params = "("
    for key, value in decorator_params.items():
        params += f"""{key}="{value}", """
    params += ")"
    decorator_with_params = f"{decorator_name}" + params
else:
    raise ValueError("Decorator parameters are required")


# Define the decorator node
decorator_node = ast.parse(decorator_with_params).body[0].value

# Loop through all the nodes in the AST and find function definitions
for node in ast.walk(tree):
    if isinstance(node, ast.FunctionDef):  # Check if it's a function
        # Check if the function already has the decorator if it already has the same decorator then delte the old one and add the new one
        for decorator in node.decorator_list:
            # print(decorator)
            if decorator.func.id == decorator_name:
                node.decorator_list.remove(decorator)
                break
        # Add the decorator to the function
        node.decorator_list.append(decorator_node)

modified_code = astor.to_source(tree)

# Write the modified code back to the file (or you could return it)
with open(file_path, "w") as file:
    file.write(modified_code)

print(f"Decorator '{decorator_name}' added to all functions in {file_path}")


In [10]:
from snapdiff.utils import load_snapper_config


In [33]:
res = load_snapper_config('models')

In [35]:
res.log_to_file

False