In [1]:
from typing import *
import libcst as cst
import networkx as nx
import pandas as pd
import numpy as np

In [2]:
from alkh import cst_utils

In [3]:
file_path = 'play.py'

In [4]:
file_lines = open(file_path, 'r').readlines()
file_content = open(file_path, 'r').read()
wrapper = cst.metadata.MetadataWrapper(cst.parse_module(file_content))
scopes = set(wrapper.resolve(cst.metadata.ScopeProvider).values())
ranges = wrapper.resolve(cst.metadata.PositionProvider)
file_number_of_lines = len(file_lines)

In [None]:
def get_range(scope, file_number_of_lines, ranges):
    if isinstance(scope, cst.metadata.scope_provider.GlobalScope):
        start_line_number = 1
        end_line_number = file_number_of_lines
        scope_name = 'global'
    else:
        start_line_number = ranges[scope.node].start.line
        end_line_number = ranges[scope.node].end.line
        scope_name = scope.name
    scope_length = end_line_number - start_line_number + 1
    values = [scope, start_line_number, end_line_number, scope_length, scope_name]
    names = ["scope", "start_line_number", "end_line_number", "length", "name"]
    output_series = pd.Series(values, index=names)
    return output_series

In [None]:
a = pd.Series(list(scopes)).to_frame('scope')
scopes_df = a["scope"].apply(get_range, args=(file_number_of_lines, ranges))
scopes_df["scope_index"] = range(len(scopes_df))
scopes_df

In [None]:
def _get_call_graph_with_df(wrapper, ranges, scopes_df) -> (nx.DiGraph, pd.DataFrame):
    ranges = wrapper.resolve(cst.metadata.PositionProvider)    
    visitor = FunctionCollector(ranges)    
    wrapper.visit(visitor)
    call_df = pd.DataFrame(visitor.get_info(), columns=['assigned', 'data', 'line'])
    call_df['assigner'] = call_df['data'].apply(lambda x: x['names'])
    call_df["scope_index"] = call_df["line"].apply(get_scope_index, args=(scopes_df,))
    call_df['hash_name'] = call_df.apply(lambda x: (x["assigned"], x["scope_index"]), axis=1)
    di_graph = _create_di_graph_from_call_df(call_df)
    return di_graph, call_df

class FunctionCollector(cst.CSTVisitor):
    METADATA_DEPENDENCIES = (cst.metadata.PositionProvider,)

    def __init__(self, ranges):
        super().__init__()
        self._ranges = ranges
        self._assign_info: List[Tuple] = []

    def get_info(self):
        return self._assign_info

    def visit_Assign(self, node: cst.FunctionDef) -> None:
        pos = self._ranges[node].start
        collector = ValueCollector()
        node.value.visit(collector)
        value_dict = {'names': collector.names, 'ints': collector.ints, 'floats': collector.floats}
        self._assign_info.append((node.targets[0].target.value, value_dict, pos.line))


class ValueCollector(cst.CSTVisitor):
    def __init__(self):
        super().__init__()
        self.names: List[str] = []
        self.ints: List[str] = []
        self.floats: List[str] = []

    def visit_Name(self, node: cst.FunctionDef) -> None:
        self.names.append(node.value)

    def visit_Integer(self, node: cst.FunctionDef) -> None:
        self.ints.append(node.value)

    def visit_Float(self, node: cst.FunctionDef) -> None:
        self.floats.append(node.value)

        
def _get_all_variables_names(call_df):
    assigned_list = list(call_df.apply(lambda x: (x["assigned"], x["scope_index"]), axis=1))
    assigners_list = list(call_df.explode(['assigner']).dropna().apply(lambda x: (x["assigner"], x["scope_index"]), axis=1))
    return set(assigned_list + assigners_list)


def _create_di_graph_from_call_df(call_df):
    var_names = _get_all_variables_names(call_df)
    di_graph = nx.DiGraph()
    for name in var_names:
        di_graph.add_node(name)
    for index, a_series in call_df.iterrows():
        scope_index = a_series["scope_index"]
        if a_series['assigner']:
            for assigner in a_series['assigner']:
                di_graph.add_edge((assigner, scope_index), (a_series['assigned'], scope_index))
    return di_graph

def get_scope_index(line_number, scopes_df):
    c = scopes_df.query(f"start_line_number <= {line_number} and end_line_number >= {line_number}").sort_values("length")
    scope_index = c.iloc[0]['scope_index']
    return scope_index

In [None]:
di_graph, call_df = _get_call_graph_with_df(wrapper, ranges, scopes_df)

In [None]:
call_df

In [None]:
nx.draw_networkx(di_graph)

In [None]:
scopes_df

In [None]:
line_number = 17

In [None]:
a_series = call_df.query(f"line == {line_number}").iloc[0]
a_series

In [None]:
graph_node_name = a_series['hash_name']
graph_node_name

In [None]:
ancestors = nx.ancestors(di_graph, graph_node_name)
ancestors

In [None]:
call_df[call_df['hash_name'].isin(ancestors)]

# try3

In [None]:
class FunctionCollector(cst.CSTVisitor):
    METADATA_DEPENDENCIES = (cst.metadata.PositionProvider,)

    def __init__(self, ranges):
        super().__init__()
        self._ranges = ranges
        self._assign_info: List[Tuple] = []

    def get_info(self):
        return self._assign_info

    def visit_Assign(self, node: cst.FunctionDef) -> None:
        pos = self._ranges[node].start
        collector = ValueCollector()
        node.value.visit(collector)
        value_dict = {'names': collector.names, 'ints': collector.ints, 'floats': collector.floats}
        self._assign_info.append((node.targets[0].target.value, value_dict, pos.line))


class ValueCollector(cst.CSTVisitor):
    def __init__(self):
        super().__init__()
        self.names: List[str] = []
        self.ints: List[str] = []
        self.floats: List[str] = []

    def visit_Name(self, node: cst.FunctionDef) -> None:
        self.names.append(node.value)

    def visit_Integer(self, node: cst.FunctionDef) -> None:
        self.ints.append(node.value)

    def visit_Float(self, node: cst.FunctionDef) -> None:
        self.floats.append(node.value)

In [None]:
class CallGraphManager:
    def __init__(self, file_path):
        self._call_graph, self._call_df = self._get_call_graph_with_df(file_path)
        
    def get_variable_affecting_lines_numbers(self, line_number: str) -> List[int]:
        a_series = self._call_df.query(f"line == {line_number}").iloc[0]
        graph_node_name = a_series['hash_name']
        ancestors = nx.ancestors(self._call_graph, graph_node_name)       
        ancestors_df = self._get_ancestors_call_df(ancestors, graph_node_name)
        lines_numbers_list = self._get_lines_numbers_list(ancestors_df)
        return lines_numbers_list
    
    def _get_ancestors_call_df(self, ancestors, graph_node_name):
        return self._call_df[self._call_df['hash_name'].isin(ancestors.union({graph_node_name}))]
    
    @staticmethod
    def _get_lines_numbers_list(ancestors_df: pd.DataFrame):
        return list(ancestors_df['line'].values)
        
    def _get_call_graph_with_df(self, file_path: str) -> (nx.DiGraph, pd.DataFrame):
        file_lines = open(file_path, 'r').readlines()
        file_content = open(file_path, 'r').read()
        wrapper = cst.metadata.MetadataWrapper(cst.parse_module(file_content))
        scopes = set(wrapper.resolve(cst.metadata.ScopeProvider).values())
        ranges = wrapper.resolve(cst.metadata.PositionProvider)
        file_number_of_lines = len(file_lines)
        
        a = pd.Series(list(scopes)).to_frame('scope')
        scopes_df = a["scope"].apply(self._get_range, args=(file_number_of_lines, ranges))
        scopes_df["scope_index"] = range(len(scopes_df))
        
        self._scopes_df = scopes_df
        self._ranges = ranges
        
        di_graph, call_df = self._get_call_graph_with_df_from_objects(wrapper, ranges, scopes_df)
        return di_graph, call_df
        
    def _get_range(self, scope, file_number_of_lines, ranges):
        if isinstance(scope, cst.metadata.scope_provider.GlobalScope):
            start_line_number = 1
            end_line_number = file_number_of_lines
            scope_name = 'global'
            header_end_line_number = start_line_number
        else:
            start_line_number = ranges[scope.node].start.line
            end_line_number = ranges[scope.node].end.line
            scope_name = scope.name
            header_end_line_number = start_line_number
            if hasattr(scope.node, 'params') and scope.node.params:
                header_end_line_number = ranges[scope.node.params].end.line
            if hasattr(scope.node, 'returns') and scope.node.returns:
                header_end_line_number = ranges[scope.node.returns].end.line 
        scope_length = end_line_number - start_line_number + 1       
        values = [scope, start_line_number, end_line_number, header_end_line_number, scope_length, scope_name]
        names = ["scope", "start_line_number", "end_line_number", "header_end_line_number", "length", "name"]
        output_series = pd.Series(values, index=names)
        return output_series
    
    def _get_call_graph_with_df_from_objects(self, wrapper, ranges, scopes_df) -> (nx.DiGraph, pd.DataFrame):
        ranges = wrapper.resolve(cst.metadata.PositionProvider)    
        visitor = FunctionCollector(ranges)    
        wrapper.visit(visitor)
        call_df = pd.DataFrame(visitor.get_info(), columns=['assigned', 'data', 'line'])
        call_df['assigner'] = call_df['data'].apply(lambda x: x['names'])
        call_df["scope_index"] = call_df["line"].apply(self._get_scope_index, args=(scopes_df,))
        call_df['hash_name'] = call_df.apply(lambda x: (x["assigned"], x["scope_index"]), axis=1)
        di_graph = self._create_di_graph_from_call_df(call_df)
        return di_graph, call_df
    
    def _get_all_variables_names(self, call_df):
        assigned_list = list(call_df.apply(lambda x: (x["assigned"], x["scope_index"]), axis=1))
        assigners_list = list(call_df.explode(['assigner']).dropna().apply(lambda x: (x["assigner"], x["scope_index"]), axis=1))
        return set(assigned_list + assigners_list)


    def _create_di_graph_from_call_df(self, call_df):
        var_names = self._get_all_variables_names(call_df)
        di_graph = nx.DiGraph()
        for name in var_names:
            di_graph.add_node(name)
        for index, a_series in call_df.iterrows():
            scope_index = a_series["scope_index"]
            if a_series['assigner']:
                for assigner in a_series['assigner']:
                    di_graph.add_edge((assigner, scope_index), (a_series['assigned'], scope_index))
        return di_graph

    def _get_scope_index(self, line_number, scopes_df):
        c = scopes_df.query(f"start_line_number <= {line_number} and end_line_number >= {line_number}").sort_values("length")
        scope_index = c.iloc[0]['scope_index']
        return scope_index

In [None]:
mm = CallGraphManager('play.py')

In [None]:
mm.get_variable_affecting_lines_numbers(17)

In [None]:
scopes_df = mm._scopes_df
ranges = mm._ranges

In [None]:
scopes_df

In [None]:
c = scopes_df.query(f"start_line_number <= {line_number} and end_line_number >= {line_number}").sort_values("length")

In [None]:
c.iloc[:-1]["start_line_number"].tolist()

In [None]:
c

In [None]:
def _get_scope_hierarchy_starts_list(scopes_df):
    c = scopes_df.query(f"start_line_number <= {line_number} and end_line_number >= {line_number}").sort_values("length")
    lines_numbers_list = c.iloc[:-1]["start_line_number"].tolist()
    return lines_numbers_list

In [None]:
# wrapper

In [None]:
scopes_df

In [None]:
scope = scopes_df.iloc[2]['scope']

In [None]:
scope.node

In [None]:
scope.node.params

In [None]:
ranges[scope.node.params]

In [None]:
ranges[scope.node]

In [None]:
ranges[scope.node.returns]

In [None]:
hasattr(scope.node, 'returns')

In [None]:
ranges[scope.node.params].end.line

In [None]:
scope.node

In [None]:
ranges[scope.node.returns]

In [None]:
scope.node.decorators

In [None]:
ranges[scope.node.decorators[0]]

# self

In [5]:
# wrapper.module.body[3].body.body[1].body.body[0].body
wrapper.module.body[3].body.body[1]

SimpleStatementLine(
    body=[
        Assign(
            targets=[
                AssignTarget(
                    target=Name(
                        value='m',
                        lpar=[],
                        rpar=[],
                    ),
                    whitespace_before_equal=SimpleWhitespace(
                        value=' ',
                    ),
                    whitespace_after_equal=SimpleWhitespace(
                        value=' ',
                    ),
                ),
            ],
            value=Call(
                func=Attribute(
                    value=Name(
                        value='pd',
                        lpar=[],
                        rpar=[],
                    ),
                    attr=Name(
                        value='Series',
                        lpar=[],
                        rpar=[],
                    ),
                    dot=Dot(
                        whitespace_before=SimpleWhitespace(
        

In [None]:
# import libcst

# function_def = libcst.parse_statement("def hello_world():\n  print('Hello World')")
# print(libcst.Module([]).code_for_node(function_def))

In [6]:
class ValueCollector(cst.CSTVisitor):
    def __init__(self):
        super().__init__()
        self.names: List[Union[str, Tuple]] = []
        self.ints: List[str] = []
        self.floats: List[str] = []
        self._inside_attribute = False

    def visit_Name(self, node: cst.FunctionDef) -> None:
        if not self._inside_attribute:
            self.names.append(node.value)

    def visit_Integer(self, node: cst.FunctionDef) -> None:
        self.ints.append(node.value)

    def visit_Float(self, node: cst.FunctionDef) -> None:
        self.floats.append(node.value)

    def visit_Attribute(self, node: cst.FunctionDef) -> None:
        self._inside_attribute = True
        self.names.append((node.value.value, node.attr.value))

    def leave_Attribute(self, node: cst.FunctionDef) -> None:
        self._inside_attribute = False

In [7]:
collector = ValueCollector()

In [8]:
# wrapper.module.body[2].body.body[1].body.body[0].body[0].visit(collector)

In [10]:
collector.names

[]

In [11]:
print(file_content)

import alkh
alkh.analyze()
import pandas as pd


class A:
    k = 8
    m = pd.Series({"c": 20})

    def __init__(self):
        self.k, ll = 9, 10
        b, mm = 8 + self.k + self.m.c, self.k
        pass

    @staticmethod
    def run(
            n)\
            -> int:
        a = 5
        b = a + 7 + 5.0
        ll = a + 6.4
        c = a + b + 3
        d = b + c
        k = int(d * 2)
        return k


class B:
    def __init__(self):
        b = 8
        pass

    def run(self):
        a = 5
        b = a + 7 + 5.0
        ll = a + 6.4
        c = a + b + 3
        d = b + c



In [12]:
class ValueCollector(cst.CSTVisitor):
    def __init__(self):
        super().__init__()
        self.names: List[Union[str, Tuple]] = []
        self.ints: List[str] = []
        self.floats: List[str] = []
        self._attribute_level = 0

    def visit_Name(self, node: cst.FunctionDef) -> None:
        if self._attribute_level == 0:
            self.names.append([node.value])

    def visit_Integer(self, node: cst.FunctionDef) -> None:
        self.ints.append(node.value)

    def visit_Float(self, node: cst.FunctionDef) -> None:
        self.floats.append(node.value)

    def visit_Attribute(self, node: cst.FunctionDef) -> None:
        self._attribute_level += 1

    def leave_Attribute(self, node: cst.FunctionDef) -> None:
        if isinstance(node.value, cst._nodes.expression.Name):
            self.names.append([node.value.value, node.attr.value])
        else:
            self.names[len(self.names) - 1].append(node.attr.value)
        self._attribute_level -= 1

In [21]:
# wrapper.module.body[3].body.body[2].body.body[1].body[0].value

In [22]:
# a = wrapper.module.body[3].body.body[2].body.body[1].body[0].value.right
# a

In [23]:
collector = ValueCollector()

In [24]:
k = wrapper.module.body[3].body.body[2].body.body[0].body[0].value.right.visit(collector)

AttributeError: 'Tuple' object has no attribute 'right'

In [None]:
type(a.value.value)

In [None]:
type(a.value)

In [None]:
collector.names

In [27]:
collector = ValueCollector()
k = wrapper.module.body[3].body.body[2].body.body[1].body[0].visit(collector)
collector.names

[['b'], ['mm'], ['self', 'k'], ['self', 'm', 'c'], ['self', 'k']]

In [26]:
collector = ValueCollector()
k = wrapper.visit(collector)
collector.names

[['alkh'],
 ['alkh', 'analyze'],
 ['pandas'],
 ['pd'],
 ['A'],
 ['k'],
 ['m'],
 ['pd', 'Series'],
 ['__init__'],
 ['self'],
 ['self', 'k'],
 ['ll'],
 ['b'],
 ['mm'],
 ['self', 'k'],
 ['self', 'm', 'c'],
 ['self', 'k'],
 ['staticmethod'],
 ['run'],
 ['n'],
 ['int'],
 ['a'],
 ['b'],
 ['a'],
 ['ll'],
 ['a'],
 ['c'],
 ['a'],
 ['b'],
 ['d'],
 ['b'],
 ['c'],
 ['k'],
 ['int'],
 ['d'],
 ['k'],
 ['B'],
 ['__init__'],
 ['self'],
 ['b'],
 ['run'],
 ['self'],
 ['a'],
 ['b'],
 ['a'],
 ['ll'],
 ['a'],
 ['c'],
 ['a'],
 ['b'],
 ['d'],
 ['b'],
 ['c']]

In [29]:
from alkh.cst_utils import CallGraphManager

file_path = '/mnt/dev/open_source_projects/alkh/notebooks/play.py'
line_number = 22
call_graph_manager = CallGraphManager(file_path)
call_graph_manager.get_variable_affecting_lines_numbers(line_number)

[6, 15, 16, 17, 18, 19, 20, 22]

In [44]:
wrapper.module.body[3].body.body[2].body.body[1]

SimpleStatementLine(
    body=[
        Assign(
            targets=[
                AssignTarget(
                    target=Tuple(
                        elements=[
                            Element(
                                value=Name(
                                    value='b',
                                    lpar=[],
                                    rpar=[],
                                ),
                                comma=Comma(
                                    whitespace_before=SimpleWhitespace(
                                        value='',
                                    ),
                                    whitespace_after=SimpleWhitespace(
                                        value=' ',
                                    ),
                                ),
                            ),
                            Element(
                                value=Name(
                                    value='mm',
                