In [1]:
import pandas as pd
import subprocess
import os
import requests
from dotenv import load_dotenv
from typing import Dict, List, Tuple, Any
import json
import pickle
import textwrap
import ast

# Load environment variables from .env file
load_dotenv()

# Define the workflow steps with assigned numbers and dependencies
workflow_steps = {
    11: {
        "description": "Load the CSV file as pandas DataFrame",
        "dependencies": []
    },
    21: {
        "description": "Examine the structure and characteristics of the data",
        "dependencies": [11]
    },
    22: {
        "description": "Identify missing values, data types, and statistical summary",
        "dependencies": [21]
    },
    31: {
        "description": "Handle missing values (remove or impute) if there are so",
        "dependencies": [22]
    },
    32: {
        "description": "Identify if there is a need to convert categorical variables to numerical representations. If yes, then convert them.",
        "dependencies": [31]
    },
    35: {
        "description": "Split the preprocessed data into training and testing sets",
        "dependencies": [32]
    },
    51: {
        "description": "Implement a single most appropriate machine learning algorithm for the dataset (choose from scikit-learn, XGBoost, LightGBM, or CatBoost).",
        "dependencies": [35]
    },
    52: {
        "description": "Fine-tune the model if necessary",
        "dependencies": [51]
    },
    53: {
        "description": "Train the selected model on the training data and evaluate its performance on the training data",
        "dependencies": [52]
    },
    61: {
        "description": "Evaluate the trained model's performance on the testing data",
        "dependencies": [53]
    },
    62: {
        "description": "Calculate evaluation metrics (e.g., accuracy, precision, recall, F1-score)",
        "dependencies": [61]
    }
}

# OpenAI API configuration
api_url = "https://openrouter.ai/api/v1"
api_key = os.getenv('OPENROUTER_API_KEY')

class DataAnalysisPipeline:
    def __init__(self, csv_path: str):
        self.csv_path = csv_path
        self.df = None
        self.X_train = None
        self.X_test = None
        self.y_train = None
        self.y_test = None
        self.model = None
        self.cache = {}
        self.executed_steps = set()

    def openai_chat(self, request: str) -> str:
        headers = {
            "Authorization": f"Bearer {api_key}",
            "Content-Type": "application/json"
        }
        data = {
            "model": "meta-llama/llama-3-70b-instruct",
            "messages": [{"role": "user", "content": request}]
        }
        try:
            response = requests.post(f"{api_url}/chat/completions", headers=headers, json=data)
            response.raise_for_status()
            result = response.json()
            if 'choices' not in result or not result['choices']:
                raise KeyError("No 'choices' in the API response")
            return result["choices"][0]["message"]["content"]
        except requests.RequestException as e:
            print(f"API request failed: {e}")
            print(f"Response content: {response.text if 'response' in locals() else 'No response'}")
            raise
        except (KeyError, IndexError) as e:
            print(f"Unexpected API response format: {e}")
            print(f"Response content: {response.text if 'response' in locals() else 'No response'}")
            raise


    def validate_code(self, code: str) -> bool:
        try:
            ast.parse(code)
            return True
        except SyntaxError:
            return False
    
    def generate_code_snippet(self, request: str) -> str:
        return self.openai_chat(request)

    def clean_and_correct_code(self, generated_code: str) -> str:
        cleaned_code = generated_code.replace("```python", "").replace("```", "").strip()
        cleaned_code_lines = [line for line in cleaned_code.split("\n") if not line.lower().startswith("here is the")]
        cleaned_code = "\n".join(cleaned_code_lines)
        return cleaned_code

    def get_dataset_info(self) -> Dict[str, Any]:
        if 11 not in self.cache:
            self.df = pd.read_csv(self.csv_path)
            columns = self.df.columns.tolist()
            types = {k: str(v) for k, v in self.df.dtypes.to_dict().items()}
            sample_data = self.df.head().to_dict(orient='list')
            value_counts = {col: self.df[col].value_counts().to_dict() for col in self.df.columns}
            description = self.df.describe().to_dict()
            
            self.cache[11] = {
                'columns': columns,
                'types': types,
                'sample_data': sample_data,
                'value_counts': value_counts,
                'description': description
            }
        
        return self.cache[11]

    def summarize_dataset_info(self, dataset_info: Dict[str, Any]) -> Dict[str, Any]:
        summary = {
            'columns': dataset_info['columns'],
            'types': dataset_info['types'],
            'sample_data': {k: v[:3] for k, v in dataset_info['sample_data'].items()},
            'value_counts': {k: dict(list(v.items())[:3]) for k, v in dataset_info['value_counts'].items()},
            'description': {k: {sk: sv for sk, sv in v.items() if sk in ['count', 'mean', 'min', 'max']} 
                            for k, v in dataset_info['description'].items()}
        }
        return summary

    def generate_code_for_step(self, step: int) -> str:
        dataset_info = self.get_dataset_info()
        summarized_info = self.summarize_dataset_info(dataset_info)
        
        base_request = (
            f"Write a Python function named 'step_{step}' that takes a pandas DataFrame 'df' as input and returns the modified DataFrame. "
            f"The function should perform the following step: {workflow_steps[step]['description']}. "
            f"The dataset has the following columns: {summarized_info['columns']}. "
            f"The data types are: {summarized_info['types']}. "
            f"Here's a sample of the data: {summarized_info['sample_data']}. "
            f"The current state of the DataFrame is:\n{self.df.head() if self.df is not None else 'Not loaded yet'}\n"
            f"Only return the function definition without any additional explanations. "
            f"Use the actual column names from the dataset in your code."
        )

        # Split the request into smaller chunks if it's too large
        chunks = textwrap.wrap(base_request, width=4000)  # Adjust width as needed
        code_snippets = []

        for i, chunk in enumerate(chunks):
            if i == 0:
                chunk_request = chunk + "\nProvide the first part of the function."
            elif i == len(chunks) - 1:
                chunk_request = chunk + "\nComplete the function."
            else:
                chunk_request = chunk + "\nContinue the function."

            code_snippet = self.generate_code_snippet(chunk_request)
            code_snippets.append(self.clean_and_correct_code(code_snippet))

        full_code = "\n".join(code_snippets)
        return full_code

    def execute_step(self, step: int) -> None:
        if step in self.executed_steps:
            return

        for prereq in workflow_steps[step]['dependencies']:
            self.execute_step(prereq)

        code = self.generate_code_for_step(step)
        print(f"Generated code for step {step}:\n{code}\n")
        
        if not self.validate_code(code):
            print(f"Invalid code generated for step {step}. Attempting to fix...")
            code = self.fix_code(code, step)
        
        try:
            exec(code, globals())
            step_function = globals()[f"step_{step}"]
            
            if step == 35:  # Split data step
                self.X_train, self.X_test, self.y_train, self.y_test = step_function(self.df)
            elif step > 35:  # Model-related steps
                result = step_function(self.X_train, self.X_test, self.y_train, self.y_test, self.model)
                if isinstance(result, tuple):
                    self.model, metrics = result
                else:
                    self.model = result
            else:
                self.df = step_function(self.df)
            
            print(f"DataFrame after step {step}:")
            print(self.df.head())
            print(self.df.dtypes)
            print("\n")
            
            self.cache[step] = code
            self.executed_steps.add(step)
        except Exception as e:
            print(f"Error executing step {step}: {str(e)}")
            print("Attempting to fix the code...")
            fixed_code = self.fix_code(code, step, str(e))
            self.execute_step(step)  # Recursively try again with fixed code

    def fix_code(self, code: str, step: int, error_message: str = "") -> str:
        request = (
            f"The following Python code for step {step} has an error:\n\n{code}\n\n"
            f"Error message: {error_message}\n"
            "Please fix the code and provide only the corrected function without any explanations."
        )
        fixed_code = self.generate_code_snippet(request)
        return self.clean_and_correct_code(fixed_code)

    def run_pipeline(self, steps: List[int]) -> None:
        for step in steps:
            try:
                self.execute_step(step)
            except Exception as e:
                print(f"Failed to execute step {step}: {str(e)}")
                print("Skipping to the next step...")

    def get_combined_code(self) -> str:
        return "\n\n".join([self.cache[step] for step in sorted(self.executed_steps)])

    def save_results(self) -> None:
        with open("combined_code.py", "w") as f:
            f.write(self.get_combined_code())
        
        with open("model.pkl", "wb") as f:
            pickle.dump(self.model, f)
        
        results = {
            "X_train": self.X_train.to_dict() if self.X_train is not None else None,
            "X_test": self.X_test.to_dict() if self.X_test is not None else None,
            "y_train": self.y_train.to_list() if self.y_train is not None else None,
            "y_test": self.y_test.to_list() if self.y_test is not None else None,
        }
        with open("results.json", "w") as f:
            json.dump(results, f)

def main():
    csv_path = "/Users/ilya/Desktop/GitHub_Repositories/HW_University/Data_Mining/datasets/insurance.csv"
    pipeline = DataAnalysisPipeline(csv_path)
    steps_to_execute = [11, 21, 22, 31, 32, 35, 51, 52, 53, 61, 62]
    pipeline.run_pipeline(steps_to_execute)
    pipeline.save_results()
    print("Pipeline execution completed. Results saved.")

if __name__ == "__main__":
    main()

Generated code for step 11:

def step_11(df):
    # Assuming the input DataFrame 'df' is already loaded from the CSV file
    # with the correct column names and data types
    
    # No modifications needed in this step, return the original DataFrame
    return df

DataFrame after step 11:
   age     sex     bmi  children smoker     region      charges
0   19  female  27.900         0    yes  southwest  16884.92400
1   18    male  33.770         1     no  southeast   1725.55230
2   28    male  33.000         3     no  southeast   4449.46200
3   33    male  22.705         0     no  northwest  21984.47061
4   32    male  28.880         0     no  northwest   3866.85520
age           int64
sex          object
bmi         float64
children      int64
smoker       object
region       object
charges     float64
dtype: object


Failed to execute step 21: string indices must be integers
Skipping to the next step...
Failed to execute step 22: string indices must be integers
Skipping to the next 