In [1]:
%load_ext autoreload
%autoreload 2

from coeditor.common import *
import os

os.chdir(proj_root())

In [2]:
from coeditor.history import *

old_code = """
def apply_edit(edit: Edit, file: File):
    "Apply the edit to the content of given file"
    # implementation ...
    
def suggest_edit(
    file: Path, 
    line: int, 
    apply: bool = False
):
    suggestion = ...
    if apply:
        apply_edit(suggestion, file)
"""

new_code = """
def apply_edit(edit: Edit, text: str) -> str:
    "Apply the edit to the given string"
    # implementation ...
    
def suggest_edit(
    file: Path, 
    line: int, 
    apply: bool = False
):
    suggestion = ...
    if apply:
        new_code = apply_edit(suggestion, current_code)
        file.write(new_code)
"""

code_change = Modified(old_code, new_code)
print(show_change(code_change))

* Modified: 
 
    - def apply_edit(edit: Edit, file: File):
    + def apply_edit(edit: Edit, text: str) -> str:
    -     "Apply the edit to the content of given file"
    +     "Apply the edit to the given string"
          # implementation ...
     
      def suggest_edit(
          file: Path, 
          line: int, 
          apply: bool = False
      ):
          suggestion = ...
          if apply:
    -         apply_edit(suggestion, file)
    +         new_code = apply_edit(suggestion, current_code)
    +         file.write(new_code)
 


In [13]:
from coeditor.encoders import change_tks_to_query_context, change_to_tokens
from coeditor.encoding import *

query, ctx = change_tks_to_query_context(change_to_tokens(code_change), 10)
print("input:")
print(decode_tokens(ctx, prettify=True))
print(decode_tokens(query[0], prettify=True))
print("output:")
print(decode_tokens(query[1], prettify=True))

input:

 <del> def apply_edit(edit: Edit, file: File):
 <add> def apply_edit(edit: Edit, text: str) -> str:
 <del>     "Apply the edit to the content of given file"
 <add>     "Apply the edit to the given string"
    # implementation...
    
def suggest_edit(
    file: Path, 
    line: int, 
    apply: bool = False
):
<mask_0>    suggestion =...
<mask_1>    if apply:
<mask_2>        apply_edit(suggestion, file)
<mask_3>
<mask_4>
output:
<mask_0><mask_1><mask_2> <del> <mask_3> <add>         new_code = apply_edit(suggestion, current_code)
 <add>         file.write(new_code)
<mask_4>


In [3]:
from coeditor.history import *

history = get_commit_history(proj_root(), 25)
for cinfo in history[:10]:
    print(cinfo.msg)

# edits = edits_from_commit_history(proj_root(), history)


Implement file-level dataset creation.
Implement file-level edit encoder.
Update installation instructions.
Add DevGuide.md.
Implement encoding format for CodeT5.
Line-diff-based format for encoding edits.
Implement EditSelectors.
Improve diff visualization.
Improve edit context construction.
- Bugfix: `from_code_changes` mutates original copy. - Collect only usees in editing ctx. - Improve editing visualization.


Retriving edits:   0%|          | 0/1 [00:00<?, ?it/s]Starting task: Retriving initial project from commit: 3c17c4ea794ce495edd62698a46aa696e384bed1
(0.1s) Finished task: Retriving initial project from commit: 3c17c4ea794ce495edd62698a46aa696e384bed1
Edits from commits: 100%|██████████| 215/215 [03:32<00:00,  1.01it/s]
Retriving edits: 100%|██████████| 1/1 [05:24<00:00, 324.13s/it]
Encoding edits: 100%|██████████| 215/215 [05:21<00:00,  1.50s/it]

In [3]:
from coeditor.history import *
from coeditor.encoding import *

all_mods = [c for e in edits for c in e.all_elem_changes() if isinstance(c, Modified)]
c = all_mods[0]
print(show_change(c))


* Modified: 
    def preds_to_accuracies(
        preds: Sequence[Sequence[PythonType]],
        dataset: ChunkedDataset,
        metric: AccuracyMetric,
    ):
            cats = [an.cat for info in dataset.chunks_info for an in info.annots_info]
            labels = [ty for info in dataset.chunks_info for ty in info.types]
    -       poses = [i for info in dataset.chunks_info for i in info.label_ids]
            return type_accuracies(
                list(seq_flatten(preds)),
                labels,
                cats,
    -           poses,
                metric=metric,
            )


In [4]:
from coeditor.history import *

n_add = n_del = n_mod = 0
for e in edits:
    for c in e.all_elem_changes():
        if isinstance(c, Added):
            n_add += 1
        elif isinstance(c, Deleted):
            n_del += 1
        elif isinstance(c, Modified):
            n_mod += 1
        else:
            raise ValueError(c)
print("n_commit:", len(edits))
print(f"n_add: {n_add}")
print(f"n_del: {n_del}")
print(f"n_mod: {n_mod}")


n_commit: 49
n_add: 339
n_del: 246
n_mod: 240


In [5]:
analyzed_edits = analyze_edits(edits)


Starting task: Performing intial module-level analysis...
(6.4s) Finished task: Performing intial module-level analysis...


Analyzing edits: 100%|██████████| 49/49 [03:52<00:00,  4.75s/it]


Unnamed: 0,name,count,avg_time,total_time
1,UsageAnalysis,98,1.579122,154.753982
2,ModuleAnlaysis/Incremental,182,0.412723,75.115604
0,ModuleAnlaysis/Initial,1,6.426628,6.426628
3,_select_change_ctx,240,5e-06,0.001288


In [27]:
selected, all_cedits = select_edits(
    analyzed_edits, EditSelectors.api_change_to_callsite
)
coverage = set[tuple[ProjectPath, str]]()

out_file = Path("output/api_change_to_callsite.txt")
with open(out_file, "w") as f:
    for ce in selected:
        for c in ce.grouped_ctx_changes["users"]:
            coverage.add((get_change_path(c), not_none(ce.commit_info).hash))

        ce.pprint(file=f)
        print("~" * 50, "\n", file=f)

print("All modifications:", len(all_cedits))
print("User changes:", len(coverage))
print("Coverage:", f"{len(coverage) / len(all_cedits):.1%}")


All modifications: 240
User changes: 29
Coverage: 12.1%


In [6]:
selected2, all_cedits2 = select_edits(
    analyzed_edits, EditSelectors.usee_changes_to_user
)

out_file = Path("output/pretrain.txt")
with open(out_file, "w") as f:
    for ce in selected2:
        ce.pprint(file=f)
        print("~" * 50, "\n", file=f)

print("All modifications:", len(all_cedits2))
print("User changes:", len(selected2))
print("Coverage:", f"{len(selected2) / len(all_cedits2):.1%}")


All modifications: 240
User changes: 156
Coverage: 65.0%


In [None]:
# ==== End of new contents ====


In [None]:
dataset = "ManyTypes4Py"

result_paths = {
    "CodeT5": get_eval_dir(dataset, ""),
    "TypeT5": get_eval_dir(
        dataset,
        "(implicit_imports, new) model-v7--TrainingConfig(drop_env_types=False, add_implicit_rel_imports=True)",
    ),
}


In [None]:
ex_proj = PythonProject.from_root(Path("/home/jiayi/Projects/type4py"))
analysis = UsageAnalysis(
    ex_proj, add_implicit_rel_imports=True, add_override_usages=True
)
pretty_print_dict(analysis.get_stats())


In [None]:
from spot.data import (
    create_tokenized_srcsets,
    get_tk_dataset_name,
    load_tokenized_srcsets,
    TypeCheckSettings,
)
from spot.tokenized_src import PreprocessArgs

pre_args = PreprocessArgs()
dataset = "InferTypes4Py"
sdata_name = get_tk_dataset_name(dataset, pre_args, False)
sdata_path = get_dataroot() / "TokenizedSrcSets" / sdata_name
create_tokenized_srcsets(
    dataset,
    sdata_path,
    func_only=False,
    pre_args=pre_args,
)
tk_dataset = load_tokenized_srcsets(sdata_path)
tk_dataset["test"].print_stats()


In [None]:
from spot import proj_root
from spot.static_analysis import ProjectPath, UsageAnalysis, PythonProject
from pprint import pprint


proj = PythonProject.from_root(proj_root())
for caller, callees in UsageAnalysis(proj).user2used.items():
    if caller.module == "spot.static_analysis":
        print(caller)
        for callee in callees:
            print("\t", callee.used, "" if callee.is_certain else "  (maybe)")


In [None]:
import libcst as cst

from spot.tokenized_src import TokenizedSrc, PreprocessArgs
from spot.utils import Path, decode_tokens

ex_code = '''# document comment 1
  # document comment 2
"""String document commnet"""
import os; import spot;
from sys import argv, exit
# after import
@wraps(function)
def catch_permission_denied(function):
    import some.inner.imports
    """
    Decorator to catch :class:`psycopg2.ProgrammingError` exceptions with the
    ``INSUFFICIENT_PRIVILEGE`` error code and rethrow them as
    :class:`~werkzeug.exceptions.Forbidden` exceptions instead.
    """
    @wraps(function)
    def decorated(x: str, y: int) -> str:
        try:
            # comment 1
            # comment 1 cont
            return function(*args, **kwargs)

        except InsufficientPrivilege as error:
            LOG.error("Forbidden: %s", error) # comment 2
            raise Forbidden()

    return decorated
'''
pre_args = PreprocessArgs(stub_in_preamble=True)
ex_src = TokenizedSrc.parse(ex_code, Path("test_file"), Path("test_repo"), pre_args)
print(decode_tokens(ex_src.tokenized_code))


In [None]:
from spot.data import src_to_chunks_, CtxArgs, PreprocessArgs
from ipywidgets import interactive

pre_args = PreprocessArgs(stub_in_preamble=True)
ex_src = TokenizedSrc.parse(ex_code, Path("test_file"), Path("test_repo"), pre_args)


def print_code(
    preamble: int,
    left: int,
    right: int,
    ctx_size: int,
    max_labels: int,
    chunk_id: int,
    inline_prev: bool,
):
    chunks = []
    args = CtxArgs(
        ctx_size,
        preamble,
        left,
        right,
        max_labels=max_labels,
        inline_prev_gold=inline_prev,
    )
    src_to_chunks_(chunks, [], ex_src, (0, len(ex_src.types)), args)
    print(decode_tokens(chunks[chunk_id]["input_ids"]))


interactive(
    print_code,
    preamble=(1, 100),
    left=(1, 200),
    right=(1, 100),
    ctx_size=(1, 500),
    max_labels=(1, 10),
    chunk_id=(0, 1),
    inline_prev=True,
)
