In [None]:
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
from generate.constant import ignore_functions
import os
import json
import pickle
from tqdm import tqdm
from collections import Counter
from sklearn.model_selection import train_test_split

def filter_function(item):
    if item["name"] in ignore_functions:
        return True
    if item["name"].endswith("Error"):
        return True
    if item["name"].startswith("write"):
        return True
    if item["name"].startswith("assert"):
        return True
    if item["name"] in ["isinstance", "repr", "str", "int", "bool", "float", "list", "tuple", "type", "set", "dict"]:
        return True
    return False

def package_name(x):
    j = -1
    lis = x.split("/")
    for i, y in enumerate(lis):
        if y=="site-packages":
            j = i
            break
    assert j>-1
    x = lis[j+1]
    return x

def rectify(x):
    return ", ".join([y.strip() for y in x.split(",")])

def indent(text, ind):
    cur = "\n".join([x[ind:] if len(x[:ind].strip())==0 else x for x in text.split("\n") if len(x.strip())>0])
    return cur

def mask_single_function(folder, path, calls, edges, max_line=30):
    with open(path) as f:
        text = f.read()
    
    lines = text.split("\n")
    lengths = [0]
    for x in lines:
        y = len(x)+1 + lengths[-1]
        lengths.append(y)
    lengths[-1] -= 1
    assert lengths[-1] == len(text)
    def loc(lineno, offset):
        return lengths[lineno] + offset
    
    res_lis = []
    for item in calls:
        if filter_function(item):
            continue
            
        arg_x = loc(item["args_position"]["line"], item["args_position"]["character"])
        arg_y = loc(item["args_position"]["end_line"], item["args_position"]["end_character"])
        target = rectify(text[arg_x:arg_y])
        if not target.startswith("("):
            continue
            
        if target == "()": continue
        if target.find("'")!=-1 or target.find('"')!=-1:
            continue
        if not target.endswith(")"):
            continue
            
        target = target[1:]
        if item["context"] is None:
            continue

        arg_x = loc(item["args_position"]["line"], item["args_position"]["character"])
        arg_y = loc(item["args_position"]["end_line"], item["args_position"]["end_character"])
        ind = item["context"]["character"]
        cx = loc(item["context"]["line"], 0)
        cy = loc(item["context"]["end_line"], item["context"]["end_character"])
        if cx>=arg_x:
            continue

        ty = 0
        if item["definition"] is None:
            continue
        elif item["definition"][0]["uri"].find(folder)!=-1:
            ty = 0 # in this project
        elif item["definition"][0]["uri"].find("/home/ubuntu/mydata/jedi/")!=-1 or item["definition"][0]["uri"].find("/home/ubuntu/anaconda3/lib/")!=-1:
            ty = 2 # stdlib of python
        else:
            ty = 1 # from other dependencies
            x = item["definition"][0]["uri"]
            if x.startswith("file://"):
                x = x[7:]
            x = package_name(x)
            edges.add(x)

        cur = item.copy()
        cur["context_position"] = cur["context"].copy()
        cur["context"] = (indent(text[cx:arg_x], ind), indent(text[arg_y:cy], ind))
        cur["target"] = target
        cur["type"] = ty
        cur["folder"] = folder 
        cur["path"] = path
        res_lis.append(cur)
            
    return res_lis, edges


In [None]:
src = pickle.load(open("/home/ubuntu/mydata/src_env.pkl", "rb"))
src = {x:y for x,y in src}
pj2name = {}
name2pj = {}
dir_lis = ["/home/ubuntu/mydata/extract/"]
for direc in dir_lis:
    for file in tqdm(os.listdir(direc)):
        if not file.endswith(".json"): continue
        with open(os.path.join(direc, file)) as f:
            dic = json.load(f)
        folder = src[file[:-5]]
        name = package_name(folder) 
        if name2pj.get(name) is None:
            name2pj[name] = (file[:-5], len(dic))
        elif name2pj[name][1]<len(dic):
            name2pj[name] = (file[:-5], len(dic))
            
for name, (pj, _) in name2pj.items():
    pj2name[pj] = name
print(len(pj2name))
print(len(name2pj))


In [None]:
tot = {}
all_edges = {}
for direc in dir_lis:
    for file in tqdm(os.listdir(direc)):
        if not file.endswith(".json"): continue
        with open(os.path.join(direc, file)) as f:
            dic = json.load(f)
        pj = file[:-5]
        if pj2name.get(pj) is None: continue
        folder = src[pj]
        res_lis = []
        edges = set()
        tot[pj] = {}
        for k in dic:
            if len(dic[k])==0: continue
            res, edges = mask_single_function(folder, k, dic[k], edges)
            res_lis.extend(res)
        tot[pj] = res_lis
        all_edges[pj2name[pj]] = edges



In [None]:
du = {}
for x,y in pj2name.items():
    du[y] = 0 
for x in all_edges:
    edges = all_edges[x]
    for y in edges:
        if du.get(y) is None:
            continue
        du[y]+=1

In [None]:
license = pickle.load(open("/home/ubuntu/mydata/license.pkl", "rb"))
allow = ["MIT", "Apache", "BSD", "CC0", "ZPL 2.1", "ISCL", "PSF", "Python Software Foundation License", "HPND"]

def rough(x):
    lis = ["BSD","MIT","Apache", "MPL", "LGPL","GPL","AFL","ISCL","PSF","ZPL 2.1","CC0", "HPND","EPL",
           "Python Software Foundation License", "Other/Proprietary License", "Public Domain"]
    for i in lis:
        if x.find(i)!=-1:
            return i
    if len(x)==0: return ""
    return None

def check_license(name):
    pj = name2pj[name][0]
    if rough(license.get(pj, "")) not in allow:
        return False
    for x in all_edges[name]:
        if x not in name2pj:
            return False
        nw = name2pj[x][0]
        assert nw in pj2name
        if rough(license.get(nw, "")) not in allow:
            return False
    return True

lis = sorted(list(name2pj.keys()))
pub_lis = [x for x in lis if check_license(x)]
print(len(pub_lis))

In [None]:
def find_t1(t):
    t1 = set(t)
    for x in t:
        for y in all_edges[x]:
            t1.add(y)
    return t1

def potential_training_set(t):
    t1 = find_t1(t)    
    potentials = {1:[], 2:[], 3:[], 4:[]}
    for x in pub_lis:
        if x in t: continue
        potentials[1].append(x)
        if len(all_edges[x]&set(t))>0: continue
        potentials[2].append(x)    
        if x in t1: continue
        potentials[3].append(x)
        if len(all_edges[x]&t1)>0: continue
        potentials[4].append(x)
        
    return potentials

def calls(lis):
    nw = [name2pj[x][0] for x in lis]
    return sum([len(tot[x]) for x in nw])

def types(lis):
    nw = [name2pj[x][0] for x in lis]
    cnt = Counter()
    for x in nw:
        sub = [y["type"] for y in tot[x]]
        cnt.update(sub)
    print(cnt)
    
for f in [0.01, 0.033, 0.05, 0.1, 0.11]:
    #lis = [x for x in du if du[x]==0]
    _, test = train_test_split(pub_lis, test_size=f, random_state=42)
    p = potential_training_set(test)
    t1 = find_t1(test)
    
    print("test set, projects:", len(test), ", t1 projects", len(t1), ", function calls:", calls(test))
    #types(test)
    for k in p:
        num = calls(p[k])
        print(f"level {k}, training projects:", len(p[k]), ", function calls: ", num)
        types(p[k])
    print()


In [None]:
from training.utils import make_sure_path_exists
def flatten(lis):
    al = []
    for x in lis:
        al.extend(tot[name2pj[x][0]])
    return al

def level4check(s0, t0):
    s1 = set(s0)
    t1 = set(t0)
    for x in s0:
        s1.update(all_edges[x])
    for x in t0:
        t1.update(all_edges[x])
    assert len(set(s1)&set(t1))==0
    
path = "/home/ubuntu/mydata/pkl_data/distributable/"
make_sure_path_exists(path)
for f in [0.1]:
    _, test = train_test_split(pub_lis, test_size=f, random_state=42)
    p = potential_training_set(test)
    t1 = find_t1(test)
    n0 = calls(test)
    print("valid + test set, projects:", len(test), ", t1 projects", len(t1), ", function calls:", n0)
    #types(test)
    valid, test = train_test_split(test, test_size=0.5, random_state=2333)
    test_data = flatten(test)
    valid_data = flatten(valid)
    assert len(test_data) + len(valid_data) == n0
    print(len(valid), len(valid_data), len(test), len(test_data))
    for k in p:
        if k!=4: 
            continue
        else:
            level4check(p[k], valid+test)
        make_sure_path_exists(f"{path}/level{k}")
        num = calls(p[k])
        print(f"level {k}, training projects:", len(p[k]), ", function calls: ", num)
        train_data = flatten(p[k])
        cur_path = f"/home/ubuntu/mydata/pkl_data/distributable/level{k}"
        """
        if len(train_data) > len(test_data) * 8:
            cur_path += "/sampled/"
            _, train_data = train_test_split(train_data, test_size=len(test_data) * 8, random_state=42)
            print("sample", len(train_data))
        else:
            print(len(train_data))
        """
        
        with open(f"{cur_path}/train.pkl", "wb") as f:
            pickle.dump(train_data, f)
        with open(f"{cur_path}/dev.pkl", "wb") as f:
            pickle.dump(valid_data, f)
        with open(f"{cur_path}/test.pkl", "wb") as f:
            pickle.dump(test_data, f)
            
    print()
