In [1]:
# --utf-8---
import gzip
import shutil
import pandas as pd
import sys
import io
import json
import re
import time
import sys
import ast
import astunparse
import autopep8

In [3]:
def code_visitor(node, extractor, indent=-1):
    '''
        Recursively visit AST nodes with indent for a Module.
        Generally, only Statements, excepthandler, comprehension are in the consideration.
        Other types like expr, expr_context, boolop, operator, unaryop, cmpop, arguments are not.
        
        Following the AST document of python3: https://docs.python.org/3/library/ast.html#
        
        We consider to visit the logic behind the program. Therefore, it's trivial to visit 
        low-level syntax node, such as Literals, Variables and the nodes inside Expression.
        
        We visit all Statement (stmt) nodes such as Assign, Raise, Delete, Pass, Break, Continue.,etc. 
        Imports Statements, Control Flow (While, For, Try, With), Function and Class Defination: 
        (FunctionDef, AsyncFunctionDef, ClassDef, Return, Global) are allowed as long as they are statements
        
        ExceptHandler and comprehension are categories that we considered as well. Some other sub-componets 
        such as alias, arg, arguments are ignored.
        
        Args: 
            node: AST parser root node
            extractor: Empty list to save extracted cmd-logic
            indent: the indent of each cmd-logic (1 indent = 4 space)
            
        Returns:
            extractor: a list saves all extracted cmd-logics and indents 
    '''
        
    if isinstance(node, ast.stmt):
        extractor.append([type(node).__name__, indent])
#         print(f"{indent} {type(node).__name__}")
        
    if isinstance(node, ast.excepthandler):
        indent -= 1
        extractor.append([type(node).__name__, indent])
#         print(f"{indent} {type(node).__name__}")  
        
        
    if sum(1 for x in ast.iter_child_nodes(node)) == 0:
        return
    
    else:
        
        # Special case for if condition
        # Causing ast syntax doesn't consider else/elif is a statement, but we do.
        # IF func is composed by test, body and orelse, while orelse can be empty or nested If.
        
        if "orelse" in node._fields and isinstance(node, ast.If) and len(node.orelse) > 0 :

            code_visitor(node.test, extractor, indent) # Visit test component (Actual only bool inside)
            
            indent += 1                                # Visit body component with incremental indent
            for j in node.body:                        
                code_visitor(j, extractor, indent)
                                                       
            indent -= 1                                # A decremental indent for else
            extractor.append(["Else", indent])  
#             print(f"{indent} Else")
            
            indent += 1                                # Visit orelse component with incremental indent
            for k in node.orelse:
                code_visitor(k, extractor, indent)
                    
        else:
            
            indent += 1
            for i in ast.iter_child_nodes(node):
                code_visitor(i, extractor, indent)
                
                

def process_elif(extractor):
    """
        Handle the situation of Elif
        From the logic aspects, Elif == Else + if
    """
    i = 0
    while i < len(extractor)-1:
        
        if extractor[i][0] == 'Elif':
            
            for k in range(i+1, len(extractor)):
                # When Indent of next line logic is less than that of current elif
                # or equal but next line logic is Else or Elif.
                if extractor[i][1] < extractor[k][1] or \
                    extractor[i][1] == extractor[k][1] and extractor[k][0] in ["Else", "Elif"] :
                    extractor[k][1] += 1
                else:
                    break
                    
            extractor[i][0] = "Else"
            extractor.insert(i+1,["If", extractor[i][1]+1 ])
        
        i+=1

        return extractor


In [9]:
keywords= {"class":"ClassDef",
           "def":"FunctionDef",
           "for":"For",
           "while":"While",
           "if":"If", 
           "with":"With",
           "try":"Try",
           "import":"Import",
           "except": "Excepthandler",
           "finally":"Finally",
           "else":"Else",
           "elif":"Elif"}


def recover_back(code):
    '''
        The special tokens of output code need to be replaced.
    '''
    code=  re.sub("§","    ", code)
    code=  re.sub("ø","\n", code)
    return code
    

def replace_indent_newline(code):
    """
        Replace indent and newline with special symbol § and ø separately.
    """
    code = code.strip()
    lines = re.split(r'[\n;]', code)
    for i in range(len(lines)):
        lines[i] =  re.sub("\s{4}","§",lines[i])
    code = "ø".join(lines)
    return code


def num_of_indent(line):
    """
        Return the number of indent in the line.
    """
    indent = 0
    if not line:
        return 0
    while line[indent] == " ":
        indent += 1
    indent = indent // 4
    
    return indent


def remove_parentheses_newline_quotes(text):  
    """
        Place all command in oneline, discard the multiple line style in string.
    """
    stack = [] 
    quote = []
    status = False       # Indicates Whether parentheses and quotes are matched
    i = 0
    
    # Special Case: Remove s = “”“ multi-line string ”“”
    text = re.sub(r"(\"{3}|\'{3})[\s\S]*(\"{3}|\'{3})","\"\"", text)

    
    while i < len(text):
        
        if text[i] in ["(", "{", "["] and len(quote) == 0: 
            stack.append(i)
            
        elif text[i] in ["\'","\""] :                
            if len(quote) < 1:
                quote.append(i)
                
            elif len(quote) == 1:
                j = quote[0]
                if (text[j] == text[i]):
                    text = text[:j+1] + text[i:]
                    quote.pop()
                    i = j+1
                else:
                    quote.append(i)
                    
            elif len(quote) > 1:
                j1 = quote[0]
                j2 = quote[1]
                
                if (text[j1] == text[i]):
                    text = text[:j1+1] + text[i:]
                    quote.pop()
                    quote.pop()
                    i = j1 + 1
                else:
                    text = text[:j2+1] + text[i:]
                    quote.pop()
                    i = j2 + 1

        elif text[i] in [")", "}", "]"] and len(quote) == 0:
            j = stack.pop()
            
            if (text[j] == '(' and text[i] == ")") or \
               (text[j] == '{' and text[i] == "}") or \
               (text[j] == '[' and text[i] == "]"):
                
                if "\n" in text[j:i]:
                    tmp = re.sub(r"\n\s*","",text[j:i])
                    text = text[:j] +  tmp + text[i:]
                    i = j + len(tmp)+1

        i += 1
        
    # Check Empty Stack 
    if len(stack) == 0 and len(quote) == 0: 
        status = True

    return text, status
    

    
def extractor_generated_code(code):
    extractor = []
#     code = autopep8.fix_code(tmp, options={'aggressive': 1, 'ignore': ['W'], "jobs":4 })

    code, _ = remove_parentheses_newline_quotes(code) # Remove newline in the parentheses of string
    code = re.sub(r'(?m)^\s*@.*?\n', '', code) # Remove decorator like @staticmethod
    code = code.strip()
    
    list_code = re.split('\n|;', code)
    
    for i in list_code:
        
        indent = num_of_indent(i)
        i = i.strip()
        
        try:
            code_visitor(ast.parse(i), indent-1, extractor)
            
        except:

            i = i.rstrip(":")           # Remove symbol : at the end before split into words            
            items = list(set(keywords.keys())& set(i.split()))
            
            if len(items) > 0:
                kw = keywords.get(items.pop(0))
                extractor.append([kw,indent])
#                 print(f"{indent} {kw}")
                
            else:
                extractor.append(["X","unable_parse"])
#                 print(f'{"X"} {"Bad"}')
                      
    return extractor
            

In [115]:
def code_logic(text):
    '''
    Args:
        text: A string of code to be scanned.

    Returns:
        exec_: A Bool value to show whether the code is compilable under AST Parser.
        cmd_logics: A list of code logic with its indent.
    
    Minor-error exists
    
    '''
    
    exec_ = False
    cmd_logics = []
    
    try:    
#         text = autopep8.fix_code(text, options={'aggressive': 1, 'ignore': ['W']})
#         text = remove_parentheses_newline_quotes(text)
        code = ast.parse(text)
        code_visitor(code, cmd_logics)
        exec_ = True

    except:
        
        # Parse cmd by cmd
        cmd_logics = process_elif(extractor_generated_code(text))
        exec_ = False 
    
    return cmd_logics, exec_

## Test

In [21]:
test = pd.read_csv("./processed_data/finetuning/test.csv")

In [122]:
k = 0
unvalid =[]
exect =[]

for i, text in enumerate(test["code"][:400000]):
    
    try:
        output, exec_ = code_logic(recover_back(text))
        if not exec_:
            exect.append(i)
    except:
        unvalid.append(i)

In [123]:
print(len(test))
print(len(exect))
print(len(unvalid))

21948
409
0


In [80]:
# BLEU
from nltk.translate.bleu_score import sentence_bleu

def BLEU(reference, candidate, weights):
    '''
        Todo: Smooth function may required later.
    '''
    ref = reference.split()
    cand = candidate.split()
    score = sentence_bleu([ref], cand,  weights=weights)
    return score

In [132]:
# Rouge F1
from rouge_score import rouge_scorer, scoring

ROUGE_KEYS = ["rouge1", "rouge2", "rougeL"]

def rouge(reference_lns, output_lns):
    scorer = rouge_scorer.RougeScorer(ROUGE_KEYS, use_stemmer=True)
    aggregator = scoring.BootstrapAggregator()

    for reference_ln, output_ln in zip(reference_lns, output_lns):
        scores = scorer.score(reference_ln, output_ln)
        aggregator.add_scores(scores)

    result = aggregator.aggregate()
    return {k: v.mid.fmeasure for k, v in result.items()}

In [218]:
# Evaluate

def logic_evaluate(ref_code, hypo_code, sigma):
    '''
        There are two important criterions to evalute whether the logic of generated 
        code is qualified. One is whether the code is executable, the other one is 
        the control flow of the code, which includes indent and cmd_logics.

        We define the following formula to evalute the logic:

            logic_score = exec_ * sigma + BLEU * (1-sigma), where sigma can be 0.5. 

        Given a higher logic score, the hypothesis code is closer to reference code.

        Considering the logic comparsion doesn't require semantic and contextual embedding,
        therefore, normal MT metrics is sufficent for evaluation. We are going to use BLUE, 
        given more weights to n-gram, which n is larger than 1, such as bigram, tri-gram, etc.

    '''
    
    logic_score = []
    assert len(ref_code) == len(hypo_code)

    for r,h  in zip(ref_code, hypo_code):

        ref_logic, _ = code_logic(recover_back(r))  # Doesn't consider syntax error in ref
        hypo_logic, hypo_exec_ = code_logic(recover_back(h))

#         ref = " ".join([f'{i[1]}_{i[0]}' for i in ref_logic])
#         hypo = " ".join([f'{i[1]}_{i[0]}' for i in hypo_logic])

        ref = " ".join([f'{i[1]}_{i[0]}' for i in ref_logic])
        hypo = " ".join([f'{i[1]}_{i[0]}' for i in hypo_logic])

    #     print(ref)
    #     print(hypo)
        
        score = BLEU(ref, hypo, (0.25, 0.25, 0.25, 0.25))

        logic_score.append(score)

    return logic_score


In [193]:
def expression_evaluate(ref_code, hypo_code):
    '''
        Simply evaluate what the code express.
    '''
    metric_bleu = []
    
    for r,h in zip(ref_code, hypo_code):

        metric_bleu.append(BLEU(r, h, (0.25,0.25,0.25,0.25)))
    
    return metric_bleu

In [229]:

ref = test["code"][10:30]
hypo = test["code"][40:60]

m1 = expression_evaluate(ref, hypo)

m2 = logic_evaluate(ref, hypo, 0.5)

In [230]:
# Expression score
for j, i in enumerate(m1):
    print(f'{j} {i}')
    
print("********************"*5)
# Logic score  
for j,k in enumerate(m2):
    print(f'{j} {k}')

0 1.0639732016681926e-233
1 5.403971076097566e-234
2 9.363140134330432e-232
3 1.0332112772921225e-231
4 7.850707993042515e-232
5 7.296382734947757e-232
6 7.337741777064293e-232
7 6.492476721861418e-232
8 8.416851712392762e-232
9 8.844844403089351e-232
10 5.3790889305317126e-232
11 3.983151682186866e-232
12 8.34060433022243e-232
13 1.0024921848014553e-231
14 6.3917876705550364e-232
15 2.030206422866862e-233
16 1.054198589880717e-245
17 1.2395288183339461e-231
18 1.7817694247232256e-232
19 8.614911585158347e-232
****************************************************************************************************
0 8.14192275585615e-233
1 1.3254154709578097e-233
2 0.21725443231455424
3 2.2080919053537955e-78
4 1.0518351895246305e-231
5 0.537845493675679
6 1.1200407237786664e-231
7 9.893133360884868e-232
8 5.635809992474887e-232
9 1.2882297539194154e-231
10 1.5364452057404436e-79
11 7.623236468879228e-232
12 1.384292958842266e-231
13 5.477489369001354e-155
14 5.287667392736108e-155
15 1.219

In [227]:
test1 = recover_back(test["code"][42])
print(test1)
print("***************"*5)
output, exec_ = code_logic(test1)
for i in output:
    print(i)

def get_content(url, headers={}, decoded=True):
    logging.debug(('get_content: %s' % url))
    req = request.Request(url, headers=headers)
    if cookies:
        cookies.add_cookie_header(req)
        req.headers.update(req.unredirected_hdrs)
    response = urlopen_with_retry(req)
    data = response.read()
    content_encoding = response.getheader('Content-Encoding')
    if (content_encoding == 'gzip'):
        data = ungzip(data)
    elif (content_encoding == 'deflate'):
        data = undeflate(data)
    if decoded:
        charset = match1(response.getheader('Content-Type', ''), 'charset=([\\w-]+)')
        if (charset is not None):
            data = data.decode(charset, 'ignore')
        else:
            data = data.decode('utf-8', 'ignore')
    return data
***************************************************************************
['FunctionDef', 0]
['Expr', 1]
['Assign', 1]
['If', 1]
['Expr', 2]
['Expr', 2]
['Assign', 1]
['Assign', 1]
['Assign', 1]
['If', 1]
['Assign', 2]
[

In [228]:
test2= recover_back(test["code"][12])
print(test2)
print("***************"*5)
output, exec_ = code_logic(test2)
for i in output:
    print(i)

def veoh_download(url, output_dir='.', merge=False, info_only=False, **kwargs):
    if re.match('http://www.veoh.com/watch/\\w+', url):
        item_id = match1(url, 'http://www.veoh.com/watch/(\\w+)')
    elif re.match('http://www.veoh.com/m/watch.php\\?v=\\.*', url):
        item_id = match1(url, 'http://www.veoh.com/m/watch.php\\?v=(\\w+)')
    else:
        raise NotImplementedError('Cannot find item ID')
    veoh_download_by_id(item_id, output_dir='.', merge=False, info_only=info_only, **kwargs)
***************************************************************************
['FunctionDef', 0]
['If', 1]
['Assign', 2]
['Else', 1]
['If', 2]
['Assign', 3]
['Else', 2]
['Raise', 3]
['Expr', 1]
