In [6]:
import numpy as np
import pandas as pd

class DataGenerator:
    def __init__(self, seed=42):
        np.random.seed(seed)

    def generate_data(self, n_samples=1000):
        data = {
            'transaction_amount': np.random.uniform(10, 1000, n_samples),
            'transaction_type': np.random.choice(['online', 'in-store', 'mobile'], n_samples),
            'customer_address': np.random.choice(['Urban', 'Suburban', 'Rural'], n_samples),
            'customer_email_domain': np.random.choice(['gmail.com', 'yahoo.com', 'outlook.com'], n_samples),
            'customer_bank_account': np.random.choice(['Bank A', 'Bank B', 'Bank C'], n_samples),
            'country': np.random.choice(['Country A', 'Country B', 'Country C'], n_samples),
            'aml_risk_flag': np.random.choice([0, 1], n_samples, p=[0.9, 0.1]),
            'is_fraud': np.random.choice([0, 1], n_samples, p=[0.95, 0.05])
        }
        return pd.DataFrame(data)

data_generator = DataGenerator()
df = data_generator.generate_data()
print(df.head())


   transaction_amount transaction_type customer_address customer_email_domain  \
0          380.794718           mobile         Suburban             yahoo.com   
1          951.207163         in-store            Urban             yahoo.com   
2          734.674002           mobile         Suburban           outlook.com   
3          602.671899           mobile            Rural           outlook.com   
4          164.458454           online            Urban             gmail.com   

  customer_bank_account    country  aml_risk_flag  is_fraud  
0                Bank B  Country C              0         0  
1                Bank A  Country B              0         0  
2                Bank A  Country B              0         1  
3                Bank A  Country C              0         0  
4                Bank B  Country C              0         0  


In [4]:
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import OneHotEncoder, StandardScaler

class DataPreprocessor:
    def __init__(self):
        self.numerical_cols = ['transaction_amount']
        self.categorical_cols = ['transaction_type', 'customer_address', 'customer_bank_account', 'country', 'aml_risk_flag']
        self.high_cardinality_cols = ['customer_email_domain']

    def get_preprocessor(self):
        numerical_transformer = Pipeline(steps=[
            ('imputer', SimpleImputer(strategy='mean')),
            ('scaler', StandardScaler())])

        categorical_transformer = Pipeline(steps=[
            ('imputer', SimpleImputer(strategy='most_frequent')),
            ('onehot', OneHotEncoder(handle_unknown='ignore'))])

        preprocessor = ColumnTransformer(
            transformers=[
                ('num', numerical_transformer, self.numerical_cols),
                ('cat', categorical_transformer, self.categorical_cols + self.high_cardinality_cols)])
        return preprocessor


In [5]:
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.metrics import classification_report

class ModelTrainer:
    def __init__(self, df):
        self.df = df
        self.X = df.drop('is_fraud', axis=1)
        self.y = df['is_fraud']
        self.preprocessor = DataPreprocessor().get_preprocessor()

    def train_model(self):
        X_train, X_test, y_train, y_test = train_test_split(self.X, self.y, test_size=0.2, random_state=42)
        
        # Create a preprocessing and modeling pipeline
        model = Pipeline(steps=[('preprocessor', self.preprocessor),
                                ('classifier', RandomForestClassifier(random_state=42))])

        # Define a grid of hyperparameters
        param_grid = {
            'classifier__n_estimators': [100, 200],
            'classifier__max_depth': [None, 10, 20],
            'classifier__min_samples_split': [2, 5],
            'classifier__min_samples_leaf': [1, 2]
        }

        # Set up the grid search
        grid_search = GridSearchCV(model, param_grid, cv=5, scoring='accuracy', n_jobs=-1, verbose=1)
        grid_search.fit(X_train, y_train)

        best_params = grid_search.best_params_
        best_score = grid_search.best_score_
        best_model = grid_search.best_estimator_

        # Predictions and evaluation
        y_pred = best_model.predict(X_test)
        report = classification_report(y_test, y_pred)
        
        # Feature importances (assuming the best model is a RandomForest)
        feature_importances = best_model.named_steps['classifier'].feature_importances_
        
        return best_params, best_score, report, feature_importances

model_trainer = ModelTrainer(df)
best_params, best_score, report, feature_importances = model_trainer.train_model()
print("Best Parameters:", best_params)
print("Best Score:", best_score)
print(report)

              precision    recall  f1-score   support

           0       0.97      0.99      0.98       194
           1       0.00      0.00      0.00         6

    accuracy                           0.96       200
   macro avg       0.48      0.49      0.49       200
weighted avg       0.94      0.96      0.95       200



# Conformal Prediction

In [None]:
from nonconformist.cp import IcpClassifier
from nonconformist.nc import NcFactory

# Create a nonconformity function based on the RandomForest model
nc = NcFactory.create_nc(clf)

# Initialize the Inductive Conformal Predictor
icp = IcpClassifier(nc)

# Fit the ICP using the training data
icp.fit(X_train, y_train)

# Calibrate the ICP with a separate calibration dataset if available
# For simplicity, we use the test set here, but it's better to use a separate calibration set
icp.calibrate(X_test, y_test)

# Make predictions with confidence and credibility intervals
prediction_intervals = icp.predict(X_test, significance=0.05)


The prediction_intervals will contain the prediction regions for each instance in X_test at the specified significance level (e.g., 0.05 for 95% confidence). For each instance, you'll get a set of possible labels (in this binary classification case, fraudulent or not) along with the measure of confidence (how confident the model is about the prediction) and credibility (how likely the prediction is correct given the data).

In [None]:
# Example evaluation code
correct = 0
for i, interval in enumerate(prediction_intervals):
    if y_test.iloc[i] in interval:
        correct += 1
accuracy = correct / len(y_test)
print(f"Conformal prediction accuracy: {accuracy}")