In [11]:
import ast

In [12]:
code = """
import uuid
from dataclasses import dataclass
from typing import List, Optional
import pandas as pd
from synesis_data_structures.time_series.df_dataclasses import TimeSeriesStructure, TimeSeriesAggregationStructure

@dataclass
class FunctionArgs:
    window_size: int = 100
    overlap: int = 0
    target_columns: Optional[List[str]] = None

@dataclass
class OutputVariables:
    number_of_windows: int

@dataclass
class FunctionInput:
    function_args: FunctionArgs
    input_time_series: TimeSeriesStructure

@dataclass
class FunctionOutput:
    aggregated_means: TimeSeriesAggregationStructure
    output_variables: OutputVariables

def window_mean_aggregation(input: FunctionInput) -> FunctionOutput:
    time_series_data = input.input_time_series.time_series_data
    entity_metadata = input.input_time_series.entity_metadata
    feature_info = input.input_time_series.feature_information
    window_size = input.function_args.window_size
    overlap = input.function_args.overlap
    if input.function_args.target_columns is None:
        target_columns = list(time_series_data.columns)
    else:
        target_columns = input.function_args.target_columns

    # Prepare outputs
    outputs_list = []
    inputs_list = []
    agg_metadata_list = []
    number_of_windows = 0

    for entity in time_series_data.index.get_level_values(0).unique():
        entity_df = time_series_data.loc[entity].sort_index()
        timestamps = entity_df.index
        num_points = len(entity_df)
        if overlap == 0:
            step = window_size
        else:
            step = window_size - overlap
        windows = []
        for start_idx in range(0, num_points - window_size + 1, step):
            end_idx = start_idx + window_size
            windows.append((start_idx, end_idx))
        number_of_windows += len(windows)
        for start_idx, end_idx in windows:
            agg_id = str(uuid.uuid4())
            start_ts = timestamps[start_idx]
            end_ts = timestamps[end_idx - 1]
            # Compute means
            mean_values = {}
            for col in target_columns:
                mean_val = entity_df.iloc[start_idx:end_idx][col].mean()
                mean_values[f'mean_{col}'] = mean_val
                # Add to inputs
                inputs_list.append({
                    'aggregation_id': agg_id,
                    'time_series_id': entity,
                    'input_feature_name': col,
                    'start_timestamp': start_ts,
                    'end_timestamp': end_ts
                })
            # Add to outputs
            outputs_list.append({**mean_values, 'aggregation_id': agg_id})
            # Add to agg metadata
            agg_metadata_list.append({
                'aggregation_id': agg_id,
                'is_multi_series_computation': False,
                'window_size': window_size
            })

    # Create dataframes
    outputs_df = pd.DataFrame(outputs_list).set_index('aggregation_id')
    inputs_df = pd.DataFrame(inputs_list)
    agg_metadata_df = pd.DataFrame(agg_metadata_list).set_index('aggregation_id')

    # Feature information for outputs
    output_features = []
    for col in target_columns:
        input_feat = feature_info.loc[col]
        output_feat = {
            'name': f'mean_{col}',
            'unit': input_feat['unit'],
            'description': f'Mean value of {input_feat["description"]}',
            'type': 'numerical',
            'subtype': 'continuous',
            'scale': 'ratio',
            'source': 'data',
            'category_id': pd.NA
        }
        output_features.append(output_feat)
    feature_info_outputs = pd.DataFrame(output_features).set_index('name')

    # Create the structure
    agg_structure = TimeSeriesAggregationStructure(
        time_series_aggregation_outputs=outputs_df,
        time_series_aggregation_inputs=inputs_df,
        entity_metadata=agg_metadata_df,
        feature_information=feature_info_outputs
    )

    output_vars = OutputVariables(number_of_windows=number_of_windows)

    return FunctionOutput(
        aggregated_means=agg_structure,
        output_variables=output_vars
    )
"""

In [13]:
descs=[]
tree = ast.parse(code)
for node in ast.walk(tree):
    if isinstance(node, ast.FunctionDef):
        descs.append(node.name)
"\n\n".join(descs)

'window_mean_aggregation'

In [14]:
def extract_dataclass_definition(source_code):
    tree = ast.parse(source_code)
    dataclass_definitions = []

    for node in ast.walk(tree):
        if isinstance(node, ast.ClassDef):
            for decorator in node.decorator_list:
                if isinstance(decorator, ast.Name) and decorator.id == "dataclass":
                    source_segment = ast.get_source_segment(source_code, node)
                    if source_segment:
                        dataclass_definitions.append(source_segment)
                elif isinstance(decorator, ast.Call) and isinstance(decorator.func, ast.Name) and decorator.func.id == "dataclass":
                    source_segment = ast.get_source_segment(source_code, node)
                    if source_segment:
                        dataclass_definitions.append(source_segment)

    return dataclass_definitions

extract_dataclass_definition(code)

['class FunctionArgs:\n    window_size: int = 100\n    overlap: int = 0\n    target_columns: Optional[List[str]] = None',
 'class OutputVariables:\n    number_of_windows: int',
 'class FunctionInput:\n    function_args: FunctionArgs\n    input_time_series: TimeSeriesStructure',
 'class FunctionOutput:\n    aggregated_means: TimeSeriesAggregationStructure\n    output_variables: OutputVariables']

In [15]:
def extract_function_definitions(source_code):
    tree = ast.parse(source_code)
    function_summaries = []

    for node in ast.walk(tree):
        if isinstance(node, ast.FunctionDef):
            func_name = node.name

            params = []
            for arg in node.args.args:
                param_name = arg.arg
                param_type = ast.unparse(
                    arg.annotation) if arg.annotation else "Any"
                params.append(f"{param_name}: {param_type}")
            for kwarg in node.args.kwonlyargs:
                param_name = kwarg.arg
                param_type = ast.unparse(
                    kwarg.annotation) if kwarg.annotation else "Any"
                params.append(f"{param_name}: {param_type} (keyword-only)")

            return_type = ast.unparse(node.returns) if node.returns else "Any"

            summary = f"Function: {func_name}\n\n"
            summary += "  Parameters:\n"
            if params:
                for param in params:
                    summary += f"    - {param}\n"
            else:
                summary += "    - None\n"
            summary += f"\nReturn Type: {return_type}"

            function_summaries.append(summary)

    return function_summaries

extract_function_definitions(code)

['Function: window_mean_aggregation\n\n  Parameters:\n    - input: FunctionInput\n\nReturn Type: FunctionOutput']