diff --git a/codeflash/code_utils/formatter.py b/codeflash/code_utils/formatter.py index 927a4d4cb..b1cb58540 100644 --- a/codeflash/code_utils/formatter.py +++ b/codeflash/code_utils/formatter.py @@ -1,35 +1,78 @@ from __future__ import annotations +import difflib import os +import re import shlex +import shutil import subprocess -from typing import TYPE_CHECKING +import tempfile +from pathlib import Path +from typing import Optional, Union import isort from codeflash.cli_cmds.console import console, logger -if TYPE_CHECKING: - from pathlib import Path +def generate_unified_diff(original: str, modified: str, from_file: str, to_file: str) -> str: + line_pattern = re.compile(r"(.*?(?:\r\n|\n|\r|$))") -def format_code(formatter_cmds: list[str], path: Path, print_status: bool = True) -> str: # noqa + def split_lines(text: str) -> list[str]: + lines = [match[0] for match in line_pattern.finditer(text)] + if lines and lines[-1] == "": + lines.pop() + return lines + + original_lines = split_lines(original) + modified_lines = split_lines(modified) + + diff_output = [] + for line in difflib.unified_diff(original_lines, modified_lines, fromfile=from_file, tofile=to_file, n=5): + if line.endswith("\n"): + diff_output.append(line) + else: + diff_output.append(line + "\n") + diff_output.append("\\ No newline at end of file\n") + + return "".join(diff_output) + + +def apply_formatter_cmds( + cmds: list[str], + path: Path, + test_dir_str: Optional[str], + print_status: bool, # noqa +) -> tuple[Path, str]: # TODO: Only allow a particular whitelist of formatters here to prevent arbitrary code execution - formatter_name = formatter_cmds[0].lower() + formatter_name = cmds[0].lower() + should_make_copy = False + file_path = path + + if test_dir_str: + should_make_copy = True + file_path = Path(test_dir_str) / "temp.py" + + if not cmds or formatter_name == "disabled": + return path, path.read_text(encoding="utf8") + if not path.exists(): - msg = f"File {path} does not exist. Cannot format the file." + msg = f"File {path} does not exist. Cannot apply formatter commands." raise FileNotFoundError(msg) - if formatter_name == "disabled": - return path.read_text(encoding="utf8") + + if should_make_copy: + shutil.copy2(path, file_path) + file_token = "$file" # noqa: S105 - for command in formatter_cmds: + + for command in cmds: formatter_cmd_list = shlex.split(command, posix=os.name != "nt") - formatter_cmd_list = [path.as_posix() if chunk == file_token else chunk for chunk in formatter_cmd_list] + formatter_cmd_list = [file_path.as_posix() if chunk == file_token else chunk for chunk in formatter_cmd_list] try: result = subprocess.run(formatter_cmd_list, capture_output=True, check=False) if result.returncode == 0: if print_status: - console.rule(f"Formatted Successfully with: {formatter_name.replace('$file', path.name)}") + console.rule(f"Formatted Successfully with: {command.replace('$file', path.name)}") else: logger.error(f"Failed to format code with {' '.join(formatter_cmd_list)}") except FileNotFoundError as e: @@ -44,7 +87,60 @@ def format_code(formatter_cmds: list[str], path: Path, print_status: bool = True raise e from None - return path.read_text(encoding="utf8") + return file_path, file_path.read_text(encoding="utf8") + + +def get_diff_lines_count(diff_output: str) -> int: + lines = diff_output.split("\n") + + def is_diff_line(line: str) -> bool: + return line.startswith(("+", "-")) and not line.startswith(("+++", "---")) + + diff_lines = [line for line in lines if is_diff_line(line)] + return len(diff_lines) + + +def format_code( + formatter_cmds: list[str], + path: Union[str, Path], + optimized_function: str = "", + check_diff: bool = False, # noqa + print_status: bool = True, # noqa +) -> str: + with tempfile.TemporaryDirectory() as test_dir_str: + if isinstance(path, str): + path = Path(path) + + original_code = path.read_text(encoding="utf8") + original_code_lines = len(original_code.split("\n")) + + if check_diff and original_code_lines > 50: + # we dont' count the formatting diff for the optimized function as it should be well-formatted + original_code_without_opfunc = original_code.replace(optimized_function, "") + + original_temp = Path(test_dir_str) / "original_temp.py" + original_temp.write_text(original_code_without_opfunc, encoding="utf8") + + formatted_temp, formatted_code = apply_formatter_cmds( + formatter_cmds, original_temp, test_dir_str, print_status=False + ) + + diff_output = generate_unified_diff( + original_code_without_opfunc, formatted_code, from_file=str(original_temp), to_file=str(formatted_temp) + ) + diff_lines_count = get_diff_lines_count(diff_output) + + max_diff_lines = min(int(original_code_lines * 0.3), 50) + + if diff_lines_count > max_diff_lines and max_diff_lines != -1: + logger.debug( + f"Skipping formatting {path}: {diff_lines_count} lines would change (max: {max_diff_lines})" + ) + return original_code + # TODO : We can avoid formatting the whole file again and only formatting the optimized code standalone and replace in formatted file above. + _, formatted_code = apply_formatter_cmds(formatter_cmds, path, test_dir_str=None, print_status=print_status) + logger.debug(f"Formatted {path} with commands: {formatter_cmds}") + return formatted_code def sort_imports(code: str) -> str: diff --git a/codeflash/optimization/function_optimizer.py b/codeflash/optimization/function_optimizer.py index 4edbf8974..fd33b2386 100644 --- a/codeflash/optimization/function_optimizer.py +++ b/codeflash/optimization/function_optimizer.py @@ -335,7 +335,10 @@ def optimize_function(self) -> Result[BestOptimization, str]: # noqa: PLR0911 ) new_code, new_helper_code = self.reformat_code_and_helpers( - code_context.helper_functions, explanation.file_path, self.function_to_optimize_source_code + code_context.helper_functions, + explanation.file_path, + self.function_to_optimize_source_code, + optimized_function=best_optimization.candidate.source_code, ) existing_tests = existing_tests_source_for( @@ -642,20 +645,23 @@ def write_code_and_helpers(original_code: str, original_helper_code: dict[Path, f.write(helper_code) def reformat_code_and_helpers( - self, helper_functions: list[FunctionSource], path: Path, original_code: str + self, helper_functions: list[FunctionSource], path: Path, original_code: str, optimized_function: str ) -> tuple[str, dict[Path, str]]: should_sort_imports = not self.args.disable_imports_sorting if should_sort_imports and isort.code(original_code) != original_code: should_sort_imports = False - new_code = format_code(self.args.formatter_cmds, path) + new_code = format_code(self.args.formatter_cmds, path, optimized_function=optimized_function, check_diff=True) if should_sort_imports: new_code = sort_imports(new_code) new_helper_code: dict[Path, str] = {} - helper_functions_paths = {hf.file_path for hf in helper_functions} - for module_abspath in helper_functions_paths: - formatted_helper_code = format_code(self.args.formatter_cmds, module_abspath) + for hp in helper_functions: + module_abspath = hp.file_path + hp_source_code = hp.source_code + formatted_helper_code = format_code( + self.args.formatter_cmds, module_abspath, optimized_function=hp_source_code, check_diff=True + ) if should_sort_imports: formatted_helper_code = sort_imports(formatted_helper_code) new_helper_code[module_abspath] = formatted_helper_code diff --git a/tests/test_formatter.py b/tests/test_formatter.py index 5c0a91c38..fbd7d0b9d 100644 --- a/tests/test_formatter.py +++ b/tests/test_formatter.py @@ -1,12 +1,17 @@ +import argparse import os import tempfile from pathlib import Path import pytest +import shutil from codeflash.code_utils.config_parser import parse_config_file from codeflash.code_utils.formatter import format_code, sort_imports +from codeflash.discovery.functions_to_optimize import FunctionToOptimize +from codeflash.optimization.function_optimizer import FunctionOptimizer +from codeflash.verification.verification_utils import TestConfig def test_remove_duplicate_imports(): """Test that duplicate imports are removed when should_sort_imports is True.""" @@ -209,3 +214,589 @@ def foo(): tmp_path = tmp.name with pytest.raises(FileNotFoundError): format_code(formatter_cmds=["exit 1"], path=Path(tmp_path)) + + +def _run_formatting_test(source_code: str, should_content_change: bool, expected = None, optimized_function: str = ""): + try: + import ruff # type: ignore + except ImportError: + pytest.skip("ruff is not installed") + + with tempfile.TemporaryDirectory() as test_dir_str: + test_dir = Path(test_dir_str) + source_file = test_dir / "source.py" + + source_file.write_text(source_code) + original = source_code + target_path = test_dir / "target.py" + + shutil.copy2(source_file, target_path) + + function_to_optimize = FunctionToOptimize( + function_name="process_data", + parents=[], + file_path=target_path + ) + + test_cfg = TestConfig( + tests_root=test_dir, + project_root_path=test_dir, + test_framework="pytest", + tests_project_rootdir=test_dir, + ) + + args = argparse.Namespace( + disable_imports_sorting=False, + formatter_cmds=[ + "ruff check --exit-zero --fix $file", + "ruff format $file" + ], + ) + + optimizer = FunctionOptimizer( + function_to_optimize=function_to_optimize, + test_cfg=test_cfg, + args=args, + ) + + optimizer.reformat_code_and_helpers( + helper_functions=[], + path=target_path, + original_code=optimizer.function_to_optimize_source_code, + optimized_function=optimized_function, + ) + + content = target_path.read_text(encoding="utf8") + + if expected is not None: + assert content == expected, f"Expected content to be \n===========\n{expected}\n===========\nbut got\n===========\n{content}\n===========\n" + + if should_content_change: + assert content != original, f"Expected content to change for source.py" + else: + assert content == original, f"Expected content to remain unchanged for source.py" + + + +def test_formatting_file_with_many_diffs(): + """Test that files with many formatting errors are skipped (content unchanged).""" + source_code = '''import os,sys,json,datetime,re +from collections import defaultdict,OrderedDict +import numpy as np,pandas as pd + +class DataProcessor: + def __init__(self,config_path,data_path,output_path): + self.config_path=config_path + self.data_path=data_path + self.output_path=output_path + self.config={} + self.data=[] + self.results={} + + def load_config(self): + with open(self.config_path,'r') as f: + self.config=json.load(f) + if 'required_fields' not in self.config:self.config['required_fields']=[] + if 'optional_fields' not in self.config:self.config['optional_fields']=[] + return self.config + + def validate_data(self,data): + errors=[] + for idx,record in enumerate(data): + if not isinstance(record,dict): + errors.append(f"Record {idx} is not a dictionary") + continue + for field in self.config.get('required_fields',[]): + if field not in record: + errors.append(f"Record {idx} missing required field: {field}") + elif record[field] is None or record[field]=='': + errors.append(f"Record {idx} has empty required field: {field}") + return errors + + def process_data(self,data,filter_func=None,transform_func=None,sort_key=None): + if filter_func:data=[item for item in data if filter_func(item)] + if transform_func:data=[transform_func(item) for item in data] + if sort_key:data=sorted(data,key=sort_key) + aggregated_data=defaultdict(list) + for item in data: + category=item.get('category','unknown') + aggregated_data[category].append(item) + final_results={} + for category,items in aggregated_data.items(): + total_value=sum(item.get('value',0) for item in items) + avg_value=total_value/len(items) if items else 0 + final_results[category]={'count':len(items),'total':total_value,'average':avg_value,'items':items} + return final_results + + def save_results(self,results): + with open(self.output_path,'w') as f: + json.dump(results,f,indent=2,default=str) + print(f"Results saved to {self.output_path}") + + def run_pipeline(self): + try: + config=self.load_config() + with open(self.data_path,'r') as f: + raw_data=json.load(f) + validation_errors=self.validate_data(raw_data) + if validation_errors: + print("Validation errors found:") + for error in validation_errors:print(f" - {error}") + return False + processed_results=self.process_data(raw_data,filter_func=lambda x:x.get('active',True),transform_func=lambda x:{**x,'processed_at':datetime.datetime.now().isoformat()},sort_key=lambda x:x.get('name','')) + self.save_results(processed_results) + return True + except Exception as e: + print(f"Pipeline failed: {str(e)}") + return False + +def main(): + processor=DataProcessor('/path/to/config.json','/path/to/data.json','/path/to/output.json') + success=processor.run_pipeline() + if success:print("Pipeline completed successfully") + else:print("Pipeline failed") + +if __name__=='__main__':main() +''' + _run_formatting_test(source_code, False) + + +def test_formatting_file_with_few_diffs(): + """Test that files with few formatting errors are formatted (content changed).""" + source_code = '''import json +from datetime import datetime + +def process_data(data, config=None): + """Process data with optional configuration.""" + if not data: + return {"success": False, "error": "No data provided"} + + if config is None: + config = {"filter_active": True} + + # Minor formatting issues that should be fixed + result=[] + for item in data: + if config.get("filter_active") and not item.get("active",True): + continue + processed_item={ + "id": item.get("id"), + "name": item.get("name",""), + "value": item.get("value",0), + "processed_at": datetime.now().isoformat() + } + result.append(processed_item) + + return {"success": True, "data": result, "count": len(result)} +''' + _run_formatting_test(source_code, True) + + +def test_formatting_file_with_no_diffs(): + """Test that files with no formatting errors are unchanged.""" + # this test assumes you use ruff defaults for formatting + source_code = '''from datetime import datetime + + +def process_data(data, config=None): + """Process data with optional configuration.""" + if not data: + return {"success": False, "error": "No data provided"} + + if config is None: + config = {"filter_active": True} + + result = [] + for item in data: + if config.get("filter_active") and not item.get("active", True): + continue + + processed_item = { + "id": item.get("id"), + "name": item.get("name", ""), + "value": item.get("value", 0), + "processed_at": datetime.now().isoformat(), + } + result.append(processed_item) + + return {"success": True, "data": result, "count": len(result)} +''' + _run_formatting_test(source_code, False) + + +def test_formatting_extremely_messy_file(): + """Test that extremely messy files with 100+ potential changes are skipped.""" + source_code = '''import os,sys,json,datetime,re,collections,itertools,functools,operator +from pathlib import Path +from typing import Dict,List,Optional,Union,Any,Tuple +import numpy as np,pandas as pd,matplotlib.pyplot as plt +from dataclasses import dataclass,field + +@dataclass +class Config: + input_path:str + output_path:str + batch_size:int=100 + max_retries:int=3 + timeout:float=30.0 + debug:bool=False + filters:List[str]=field(default_factory=list) + transformations:Dict[str,Any]=field(default_factory=dict) + +class DataProcessorAdvanced: + def __init__(self,config:Config): + self.config=config + self.data=[] + self.results={} + self.errors=[] + self.stats={'processed':0,'failed':0,'skipped':0} + + def load_data(self,file_path:str)->List[Dict]: + try: + with open(file_path,'r',encoding='utf-8') as f: + if file_path.endswith('.json'):data=json.load(f) + elif file_path.endswith('.csv'): + import csv + reader=csv.DictReader(f) + data=[row for row in reader] + else:raise ValueError(f"Unsupported file format: {file_path}") + return data + except Exception as e:self.errors.append(f"Failed to load {file_path}: {str(e)}");return[] + + def validate_record(self,record:Dict,schema:Dict)->Tuple[bool,List[str]]: + errors=[] + for field,rules in schema.items(): + if rules.get('required',False) and field not in record: + errors.append(f"Missing required field: {field}") + elif field in record: + value=record[field] + if 'type' in rules and not isinstance(value,rules['type']): + errors.append(f"Field {field} has wrong type") + if 'min_length' in rules and isinstance(value,str) and len(value)rules['max_length']: + errors.append(f"Field {field} too long") + if 'min_value' in rules and isinstance(value,(int,float)) and valuerules['max_value']: + errors.append(f"Field {field} above maximum") + return len(errors)==0,errors + + def apply_filters(self,data:List[Dict])->List[Dict]: + filtered_data=data + for filter_name in self.config.filters: + if filter_name=='active_only':filtered_data=[r for r in filtered_data if r.get('active',True)] + elif filter_name=='has_value':filtered_data=[r for r in filtered_data if r.get('value') is not None] + elif filter_name=='recent_only': + cutoff=datetime.datetime.now()-datetime.timedelta(days=30) + filtered_data=[r for r in filtered_data if datetime.datetime.fromisoformat(r.get('created_at','1970-01-01'))>cutoff] + return filtered_data + + def apply_transformations(self,data:List[Dict])->List[Dict]: + for transform_name,params in self.config.transformations.items(): + if transform_name=='add_timestamp': + for record in data:record['processed_at']=datetime.datetime.now().isoformat() + elif transform_name=='normalize_names': + for record in data: + if 'name' in record:record['name']=record['name'].strip().title() + elif transform_name=='calculate_derived': + for record in data: + if 'value' in record and 'multiplier' in params: + record['derived_value']=record['value']*params['multiplier'] + return data + + def process_batch(self,batch:List[Dict])->Dict[str,Any]: + try: + processed_batch=[] + for record in batch: + try: + processed_record=dict(record) + processed_record['batch_id']=len(self.results) + processed_record['processed_at']=datetime.datetime.now().isoformat() + processed_batch.append(processed_record) + self.stats['processed']+=1 + except Exception as e: + self.errors.append(f"Failed to process record: {str(e)}") + self.stats['failed']+=1 + return {'success':True,'data':processed_batch,'count':len(processed_batch)} + except Exception as e: + self.errors.append(f"Batch processing failed: {str(e)}") + return {'success':False,'error':str(e)} + + def run_processing_pipeline(self)->bool: + try: + raw_data=self.load_data(self.config.input_path) + if not raw_data:return False + filtered_data=self.apply_filters(raw_data) + transformed_data=self.apply_transformations(filtered_data) + batches=[transformed_data[i:i+self.config.batch_size] for i in range(0,len(transformed_data),self.config.batch_size)] + all_results=[] + for i,batch in enumerate(batches): + if self.config.debug:print(f"Processing batch {i+1}/{len(batches)}") + result=self.process_batch(batch) + if result['success']:all_results.extend(result['data']) + else:self.stats['failed']+=len(batch) + with open(self.config.output_path,'w',encoding='utf-8') as f: + json.dump({'results':all_results,'stats':self.stats,'errors':self.errors},f,indent=2,default=str) + return True + except Exception as e: + self.errors.append(f"Pipeline failed: {str(e)}") + return False + +def create_sample_config()->Config: + return Config(input_path='input.json',output_path='output.json',batch_size=50,max_retries=3,timeout=60.0,debug=True,filters=['active_only','has_value'],transformations={'add_timestamp':{},'normalize_names':{},'calculate_derived':{'multiplier':1.5}}) + +def main(): + config=create_sample_config() + processor=DataProcessorAdvanced(config) + success=processor.run_processing_pipeline() + print(f"Processing {'completed' if success else 'failed'}") + print(f"Stats: {processor.stats}") + if processor.errors: + print("Errors encountered:") + for error in processor.errors:print(f" - {error}") + +if __name__=='__main__':main() +''' + _run_formatting_test(source_code, False) + + +def test_formatting_edge_case_exactly_100_diffs(): + """Test behavior when exactly at the threshold of 100 changes.""" + # Create a file with exactly 100 minor formatting issues + source_code = '''import json\n''' + ''' +def func{}(): + x=1;y=2;z=3 + return x+y+z +'''.replace('{}', '_{i}').format(i='{i}') * 33 # This creates exactly 100 potential formatting fixes + + _run_formatting_test(source_code, False) + + +def test_formatting_with_syntax_errors(): + """Test that files with syntax errors are handled gracefully.""" + source_code = '''import json + +def process_data(data): + if not data: + return {"error": "No data" + # Missing closing brace above + + result = [] + for item in data + # Missing colon above + result.append(item) + + return result +''' + _run_formatting_test(source_code, False) + + +def test_formatting_mixed_quotes_and_spacing(): + """Test files with mixed quote styles and inconsistent spacing.""" + source_code = '''import json +from datetime import datetime + +def process_mixed_style(data): + """Process data with mixed formatting styles.""" + config={'default_value':0,'required_fields':["id","name"],'optional_fields':["description","tags"]} + + results=[] + for item in data: + if not isinstance(item,dict):continue + + # Mixed quote styles + item_id=item.get("id") + item_name=item.get('name') + item_desc=item.get("description",'') + + # Inconsistent spacing + processed={ + 'id':item_id, + "name": item_name, + 'description':item_desc, + "processed_at":datetime.now().isoformat( ), + 'status':'processed' + } + results.append(processed) + + return {'data':results,"count":len(results)} +''' + _run_formatting_test(source_code, True) + + +def test_formatting_long_lines_and_imports(): + """Test files with long lines and import formatting issues.""" + source_code = '''import os, sys, json, datetime, re, collections, itertools +from pathlib import Path +from typing import Dict, List, Optional + +def process_with_long_lines(data, filter_func=lambda x: x.get('active', True) and x.get('value', 0) > 0, transform_func=lambda x: {**x, 'processed_at': datetime.datetime.now().isoformat(), 'status': 'processed'}): + """Function with very long parameter line.""" + return [transform_func(item) for item in data if filter_func(item) and isinstance(item, dict) and 'id' in item] + +def another_function_with_long_line(): + very_long_dictionary = {'key1': 'value1', 'key2': 'value2', 'key3': 'value3', 'key4': 'value4', 'key5': 'value5'} + return very_long_dictionary +''' + _run_formatting_test(source_code, True) + + +def test_formatting_class_with_methods(): + """Test formatting of classes with multiple methods and minor issues.""" + source_code = '''class DataProcessor: + def __init__(self, config): + self.config=config + self.data=[] + + def load_data(self,file_path): + with open(file_path,'r') as f: + self.data=json.load(f) + return len(self.data) + + def process(self): + result=[] + for item in self.data: + if item.get('active',True): + result.append({ + 'id':item['id'], + 'processed':True + }) + return result +''' + _run_formatting_test(source_code, True) + + +def test_formatting_with_complex_comprehensions(): + """Test files with complex list/dict comprehensions and formatting.""" + source_code = '''def complex_comprehensions(data): + # Various comprehension styles with formatting issues + result1=[item['value'] for item in data if item.get('active',True) and 'value' in item] + + result2={item['id']:item['name'] for item in data if item.get('type')=='user'} + + result3=[[x,y] for x in range(10) for y in range(5) if x*y>10] + + # Nested comprehensions + nested=[[item for item in sublist if item%2==0] for sublist in data if isinstance(sublist,list)] + + return { + 'simple':result1, + 'mapping':result2, + 'complex':result3, + 'nested':nested + } +''' + _run_formatting_test(source_code, True) + + +def test_formatting_with_decorators_and_async(): + """Test files with decorators and async functions.""" + source_code = '''import asyncio +from functools import wraps + +def timer_decorator(func): + @wraps(func) + def wrapper(*args,**kwargs): + start=time.time() + result=func(*args,**kwargs) + end=time.time() + print(f"{func.__name__} took {end-start:.2f} seconds") + return result + return wrapper + +@timer_decorator +async def async_process_data(data): + result=[] + for item in data: + await asyncio.sleep(0.01) # Simulate async work + processed_item={'id':item.get('id'),'processed':True} + result.append(processed_item) + return result + +class AsyncProcessor: + @staticmethod + async def process_batch(batch): + return [{'id':item['id'],'status':'done'} for item in batch if 'id' in item] +''' + _run_formatting_test(source_code, True) + + +def test_formatting_threshold_configuration(): + """Test that the diff threshold can be configured (if supported).""" + # This test assumes the threshold might be configurable + source_code = '''import json,os,sys +def func1():x=1;y=2;return x+y +def func2():a=1;b=2;return a+b +def func3():c=1;d=2;return c+d +''' + # Test with a file that has moderate formatting issues + _run_formatting_test(source_code, True, optimized_function="def func2():a=1;b=2;return a+b") + + +def test_formatting_empty_file(): + """Test formatting of empty or minimal files.""" + source_code = '''# Just a comment pass +''' + _run_formatting_test(source_code, False) + + +def test_formatting_with_docstrings(): + """Test files with various docstring formats.""" + source_code = """def function_with_docstring( data): + ''' + This is a function with a docstring. + + Args: + data: Input data to process + + Returns: + Processed data + ''' + return [item for item in data if item.get('active',True)] + +class ProcessorWithDocs: + '''A processor class with documentation.''' + + def __init__(self,config): + '''Initialize with configuration.''' + self.config=config + + def process(self,data): + '''Single quote docstring with formatting issues.''' + return{'result':[item for item in data if self._is_valid(item)]} + + def _is_valid(self,item): + return isinstance(item,dict) and 'id' in item""" + expected = '''def function_with_docstring(data): + """This is a function with a docstring. + + Args: + data: Input data to process + + Returns: + Processed data + + """ + return [item for item in data if item.get("active", True)] + + +class ProcessorWithDocs: + """A processor class with documentation.""" + + def __init__(self, config): + """Initialize with configuration.""" + self.config = config + + def process(self, data): + """Single quote docstring with formatting issues.""" + return {"result": [item for item in data if self._is_valid(item)]} + + def _is_valid(self, item): + return isinstance(item, dict) and "id" in item +''' + + optimization_function = """ def process(self,data): + '''Single quote docstring with formatting issues.''' + return{'result':[item for item in data if self._is_valid(item)]}""" + _run_formatting_test(source_code, True, optimized_function=optimization_function, expected=expected) \ No newline at end of file