In [None]:
" 1. Data Processing Module "

In [None]:
" 1a. Data Ingestion "

In [None]:
import pandas as pd
import json

def load_data(file_path):
    if file_path.endswith('.csv'):
        return pd.read_csv(file_path)
    elif file_path.endswith('.json'):
        with open(file_path, 'r') as file:
            data = json.load(file)
        return pd.DataFrame(data)
    elif file_path.endswith('.xlsx') or file_path.endswith('.xls'):
        return pd.read_excel(file_path)
    else:
        raise ValueError("Unsupported file format. Please provide a CSV, JSON, or Excel file.")

In [None]:
" 1b. Data Pre-Processing (& cleaning)"

In [None]:
from sklearn.preprocessing import StandardScaler, LabelEncoder

def preprocess_data(df):
    # Handling missing values by filling them with the mean of the column
    df.fillna(df.mean(), inplace=True)

    # Encoding categorical variables
    for column in df.select_dtypes(include=['object']).columns:
        le = LabelEncoder()
        df[column] = le.fit_transform(df[column])

    # Normalizing numerical features
    scaler = StandardScaler()
    df[df.select_dtypes(include=['float64', 'int64']).columns] = scaler.fit_transform(df.select_dtypes(include=['float64', 'int64']))

    return df

In [None]:
" 2. Analysis Engine "

In [None]:
import numpy as np
from sklearn.linear_model import LinearRegression
from sklearn.cluster import KMeans
from sklearn.ensemble import RandomForestClassifier

def analyze_data(df):
    results = {}

    # 1. Trend Analysis using Linear Regression
    for column in df.select_dtypes(include=[np.number]).columns:
        X = np.array(range(len(df))).reshape(-1, 1)
        y = df[column].values.reshape(-1, 1)
        lr = LinearRegression()
        lr.fit(X, y)
        trend = lr.coef_[0][0]
        results[f'{column}_trend'] = trend

    # 2. Clustering to identify patterns using KMeans
    kmeans = KMeans(n_clusters=3)
    clusters = kmeans.fit_predict(df)
    df['Cluster'] = clusters
    results['clusters'] = df['Cluster'].value_counts().to_dict()

    # 3. Feature Importance using RandomForestClassifier (Assuming 'target' is the column to predict)
    if 'target' in df.columns:
        X = df.drop('target', axis=1)
        y = df['target']
        rf = RandomForestClassifier()
        rf.fit(X, y)
        feature_importances = rf.feature_importances_
        results['feature_importances'] = dict(zip(X.columns, feature_importances))

    return results

In [None]:
" 3. Report Generation "

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd

def generate_report(df, analysis_results, report_name="report"):

    # Creating a summary report as a text file
    with open(f"{report_name}.txt", "w") as report:
        report.write("Data Analysis Report\n")
        report.write("="*50 + "\n\n")

        # Summary of trends
        report.write("Trends Summary:\n")
        for key, value in analysis_results.items():
            if 'trend' in key:
                report.write(f"{key}: {'increasing' if value > 0 else 'decreasing' if value < 0 else 'stable'}\n")

        report.write("\n")

        # Summary of clustering
        if 'clusters' in analysis_results:
            report.write("Cluster Summary:\n")
            for cluster, count in analysis_results['clusters'].items():
                report.write(f"Cluster {cluster}: {count} instances\n")

        report.write("\n")

        # Summary of feature importance
        if 'feature_importances' in analysis_results:
            report.write("Feature Importance Summary:\n")
            sorted_importances = sorted(analysis_results['feature_importances'].items(), key=lambda x: x[1], reverse=True)
            for feature, importance in sorted_importances:
                report.write(f"{feature}: {importance:.4f}\n")

    # Visualizing trends using line plots
    plt.figure(figsize=(12, 6))
    for column in df.select_dtypes(include=[float, int]).columns:
        plt.plot(df[column], label=column)
    plt.title('Trends in Numerical Data')
    plt.legend()
    plt.savefig(f"{report_name}_trends.png")
    plt.show()

    # Visualizing clusters using pair plots (if applicable)
    if 'Cluster' in df.columns:
        sns.pairplot(df, hue="Cluster", palette="Set2")
        plt.savefig(f"{report_name}_clusters.png")
        plt.show()

    # Visualizing feature importance using a bar plot
    if 'feature_importances' in analysis_results:
        plt.figure(figsize=(10, 5))
        sns.barplot(x=list(analysis_results['feature_importances'].keys()), y=list(analysis_results['feature_importances'].values()))
        plt.title('Feature Importances')
        plt.xticks(rotation=45, ha='right')
        plt.tight_layout()
        plt.savefig(f"{report_name}_feature_importances.png")
        plt.show()

In [None]:
" 4. UI (CLI & NLP)"

In [None]:
import re

class AIEmployee:
    def __init__(self, data_file):
        self.data = load_data(data_file)
        self.analysis_results = None

    def analyze(self):
        self.analysis_results = analyze_data(self.data)
        print("Data analysis completed.")

    def generate_report(self, report_name="report"):
        if self.analysis_results:
            generate_report(self.data, self.analysis_results, report_name)
            print(f"Report '{report_name}' generated successfully.")
        else:
            print("Please run the analysis first.")

    def handle_query(self, query):
        # Basic NLP to understand user queries
        if re.search(r'trend|trends', query, re.I):
            self.display_trends()
        elif re.search(r'cluster|clusters', query, re.I):
            self.display_clusters()
        elif re.search(r'feature importance', query, re.I):
            self.display_feature_importance()
        elif re.search(r'analyze', query, re.I):
            self.analyze()
        elif re.search(r'report', query, re.I):
            report_name = re.search(r'report\s+(\w+)', query, re.I)
            report_name = report_name.group(1) if report_name else "report"
            self.generate_report(report_name)
        else:
            print("Sorry, I didn't understand that. Please try again.")

    def display_trends(self):
        if self.analysis_results:
            print("Trends Summary:")
            for key, value in self.analysis_results.items():
                if 'trend' in key:
                    print(f"{key}: {'increasing' if value > 0 else 'decreasing' if value < 0 else 'stable'}")
        else:
            print("Please run the analysis first.")

    def display_clusters(self):
        if self.analysis_results and 'clusters' in self.analysis_results:
            print("Cluster Summary:")
            for cluster, count in self.analysis_results['clusters'].items():
                print(f"Cluster {cluster}: {count} instances")
        else:
            print("No clusters found or analysis not run yet.")

    def display_feature_importance(self):
        if self.analysis_results and 'feature_importances' in self.analysis_results:
            print("Feature Importance Summary:")
            sorted_importances = sorted(self.analysis_results['feature_importances'].items(), key=lambda x: x[1], reverse=True)
            for feature, importance in sorted_importances:
                print(f"{feature}: {importance:.4f}")
        else:
            print("No feature importance data found or analysis not run yet.")

In [None]:
" 5. Documentation & Testing "

In [None]:
import unittest
import pandas as pd
from sklearn.linear_model import LinearRegression
from sklearn.ensemble import RandomForestRegressor
from sklearn.cluster import KMeans

class TestAIEmployee(unittest.TestCase):

    def setUp(self):
        # Sample data for testing
        self.df = pd.DataFrame({
            'Country': ['A', 'B', 'C'],
            'Gold': [1, 2, 3],
            'Silver': [2, 1, 3],
            'Bronze': [3, 3, 1],
            'Total': [6, 6, 7]
        })

    def test_linear_regression(self):
        lr_model = LinearRegression().fit(self.df[['Gold', 'Silver', 'Bronze']], self.df['Total'])
        self.assertEqual(len(lr_model.coef_), 3)

    def test_kmeans_clustering(self):
        kmeans = KMeans(n_clusters=2, random_state=0, n_init=10)
        clusters = kmeans.fit_predict(self.df[['Gold', 'Silver', 'Bronze']])
        self.assertEqual(len(set(clusters)), 2)

    def test_random_forest(self):
        rf_model = RandomForestRegressor(random_state=0).fit(self.df[['Gold', 'Silver', 'Bronze']], self.df['Total'])
        self.assertEqual(len(rf_model.feature_importances_), 3)

unittest.main(argv=['first-arg-is-ignored'], exit=False)

...
----------------------------------------------------------------------
Ran 3 tests in 0.297s

OK


<unittest.main.TestProgram at 0x77fe27406920>