In [20]:
import logging
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor, as_completed
import pandas as pd
import numpy as np
from joblib import load
from typing import List, Dict, Any

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(message)s')
logger = logging.getLogger(__name__)

FEATURES_FILE = "features_test.csv"
LABELS_FILE = "labels_test.npy"
RESULTS_FILE = "results.csv"
MODEL_FILE = "trained_model.joblib"  # Path to the pre-trained model

# Helper function: Safe datetime parsing
def safe_parse_datetime(date_str: str) -> datetime:
    try:
        return datetime.fromisoformat(date_str)
    except ValueError:
        logger.error(f"Invalid datetime format: {date_str}")
        return None

# Preprocess providers with logic for dynamic LIMIT_MIN and LIMIT_MAX updates
def preprocess_providers(providers: pd.DataFrame) -> pd.DataFrame:
    providers['TIME'] = pd.to_datetime(providers['TIME'])
    providers = providers.sort_values(by=['ID', 'TIME'], ascending=[True, True])
    grouped = providers.groupby(['ID', providers['TIME'].dt.date])
    updated_groups = []
    
    for (provider_id, date), group in grouped:
        earliest_row = group.iloc[0]
        min_limit = earliest_row['LIMIT_MIN']
        max_limit = earliest_row['LIMIT_MAX']
        group['LIMIT_MIN'] = min_limit
        group['LIMIT_MAX'] = max_limit
        updated_groups.append(group)
        
    updated_providers = pd.concat(updated_groups)
    logger.info("Providers processed with dynamic LIMIT_MIN and LIMIT_MAX updates based on the earliest time of each day.")
    return updated_providers.reset_index(drop=True)

# Function to process a single transaction and generate features for it
def process_transaction(transaction: pd.Series, providers: pd.DataFrame, rate_dict: Dict[str, float]) -> (List[Dict[str, Any]], List[int]):
    features = []
    labels = []
    providers_compatible = []
    ids = []
    time_of_transaction = safe_parse_datetime(transaction['eventTimeRes'])
    time_of_transaction_seconds = (time_of_transaction.year * 365 + time_of_transaction.month * 30 + time_of_transaction.day) * 86400 + time_of_transaction.hour * 3600 + time_of_transaction.minute * 60 + time_of_transaction.second
    transaction_amount_in_usd = transaction['amount'] * rate_dict.get(transaction['cur'], 1)
    
    for _, provider in providers.iterrows():
        t = provider['TIME']
        t_seconds = (t.year * 365 + t.month * 30 + t.day) * 86400 + t.hour * 3600 + t.minute * 60 + t.second
        
        if (transaction['amount'] <= provider['LIMIT_MAX'] and
            provider['CURRENCY'] == transaction['cur'] and
            transaction['amount'] <= provider['MAX_SUM'] and
            transaction['amount'] >= provider['MIN_SUM'] and
            time_of_transaction_seconds - t_seconds >= 0):
            if provider['ID'] not in ids:
                providers_compatible.append(provider)
                ids.append(provider['ID'])
                
    for provider in providers_compatible:
        penalty = max(0, (provider['LIMIT_MIN']) * 0.01)
        time_of_day = time_of_transaction.hour
        day_of_week = time_of_transaction.weekday()
        is_weekend = 1 if day_of_week >= 5 else 0
        success_score = ((provider['CONVERSION'] > 0.5) + (provider['AVG_TIME'] < 20) + (provider['COMMISSION'] <= 0.04) + (penalty < 600) + (transaction_amount_in_usd <= 20))
        success = 1 if success_score >= 3 else 0
        
        feature = {
            'conversion': provider['CONVERSION'],
            'avg_time': provider['AVG_TIME'],
            'commission': provider['COMMISSION'] * transaction['amount'],
            'penalty': penalty,
            'amount_in_usd': transaction_amount_in_usd,
            'limits_ratio': transaction['amount'] / provider['LIMIT_MAX'],
            'provider_id': provider['ID'],
            'transaction_id': transaction.name,
            'eventTimeRes': transaction['eventTimeRes'],
            'time_in_seconds': time_of_transaction_seconds,
            'time_of_day': time_of_day,
            'day_of_week': day_of_week,
            'is_weekend': is_weekend
        }
        features.append(feature)
        labels.append(success)
        
    return features, labels

# Create features for each provider-transaction pair, along with the corresponding labels (success/failure)
def create_features(providers: pd.DataFrame, transactions: pd.DataFrame, rate_dict: Dict[str, float], features_file: str, labels_file: str) -> (pd.DataFrame, pd.Series):
    all_features = []
    all_labels = []
    
    print("Вы хотите сформировать новые признаки для данных?")
    a = int(input("1 - да, 0 - нет (использовать имеющиеся, если есть):"))
    
    if a == 0:
        try:
            features = pd.read_csv(features_file)
            labels = np.load(labels_file, allow_pickle=True)
            all_features.extend(features.to_dict(orient='records'))
            all_labels.extend(labels)
        except FileNotFoundError:
            logger.error("Features or labels file not found, generating new ones.")
            a = 1
    
    if a == 1:
        with ThreadPoolExecutor() as executor:
            futures = [executor.submit(process_transaction, transaction, providers, rate_dict) for _, transaction in transactions.iterrows()]
            for future in as_completed(futures):
                features, labels = future.result()
                all_features.extend(features)
                all_labels.extend(labels)
    
    features_df = pd.DataFrame(all_features)
    features_df.to_csv(features_file, index=False)
    np.save(labels_file, all_labels)
    logger.info(f"Features saved to {features_file}")
    logger.info(f"Labels saved to {labels_file}")
    return features_df, pd.Series(all_labels)

# Load data from CSV files
def load_data(providers_file: str, payments_file: str, rates_file: str) -> (pd.DataFrame, pd.DataFrame, Dict[str, float]):
    providers = pd.read_csv(providers_file)
    transactions = pd.read_csv(payments_file)
    rates = pd.read_csv(rates_file).set_index('destination').to_dict()['rate']
    return providers, transactions, rates


# Обновленная функция main
def main(providers_file: str, payments_file: str, rates_file: str, features_for_model: str, labels_for_model: str):
    try:
        logger.info("Загрузка данных...")
        providers, transactions, rates = load_data(providers_file, payments_file, rates_file)
        logger.debug(f"Провайдеры загружены: {len(providers)} записей")
        logger.debug(f"Транзакции загружены: {len(transactions)} записей")
        logger.debug(f"Курсы валют: {rates}")

        logger.info("Предобработка данных провайдеров...")
        providers = preprocess_providers(providers)
        logger.debug(f"Обработанные провайдеры: {providers.head()}")

        logger.info("Генерация признаков и меток...")
        features_df, labels = create_features(providers, transactions, rates, features_for_model, labels_for_model)
        logger.debug(f"Генерированные признаки: {features_df.shape}")
        logger.debug(f"Пример первых признаков: {features_df.head()}")

        logger.info("Загрузка модели...")
        model = load(MODEL_FILE)
        logger.debug(f"Модель загружена из {MODEL_FILE}")

        logger.info("Выполнение предсказаний для всех транзакций...")
        predictions = model.predict(features_df[["conversion", "avg_time", "commission", "penalty", "amount_in_usd"]])
        probabilities = model.predict_proba(features_df[["conversion", "avg_time", "commission", "penalty", "amount_in_usd"]])[:, 1]
        logger.debug(f"Предсказания: {predictions}")
        logger.debug(f"Вероятности: {probabilities}")

        logger.info("Начало обработки транзакций через predict_flow...")
        results = []
        for idx, transaction in transactions.iterrows():
            logger.debug(f"Обработка транзакции ID {transaction.name}...")
            result = predict_flow(model, transaction, providers, rates, features_df)
            results.append(result)

        logger.info("Сохранение результатов...")
        results_df = pd.concat(results, ignore_index=True)
        results_df.to_csv(RESULTS_FILE, index=False)
        logger.info(f"Результаты сохранены в {RESULTS_FILE}")

        return predictions, probabilities, results_df

    except Exception as e:
        logger.error(f"Ошибка в main: {e}")
        raise


# Обновленная функция predict_flow
def predict_flow(model, transaction, providers, rates, features_df, results_file="results.csv") -> pd.DataFrame:
    try:
        logger.debug(f"Начало обработки транзакции {transaction.name}...")

        results = []
        transaction_features = features_df[features_df['transaction_id'] == transaction.name]
        if transaction_features.empty:
            logger.warning(f"Нет признаков для транзакции ID {transaction.name}.")
            results.append({
                'terminal_id': 'Z',
                'eventTimeRes': transaction['eventTimeRes'],
                'prediction': '-',
                'amount_in_usd': -1
            })
            return pd.DataFrame(results)

        logger.debug(f"Найдено {len(transaction_features)} признаков для транзакции ID {transaction.name}.")
        predicted_success = model.predict(transaction_features[["conversion", "avg_time", "commission", "penalty", "amount_in_usd"]])
        logger.debug(f"Предсказанный результат: {predicted_success}")

        for _, feature_row in transaction_features.iterrows():
            terminal_id = feature_row['provider_id']
            amount_in_usd = feature_row['amount_in_usd']
            logger.debug(f"Обработка провайдера ID {terminal_id}...")

            matching_provider = providers[providers['ID'] == terminal_id]
            if matching_provider.empty:
                logger.warning(f"Нет данных о провайдере ID {terminal_id}.")
                continue

            current_time_dt = pd.to_datetime(matching_provider['TIME'].values[0])
            event_time_res_dt = pd.to_datetime(transaction['eventTimeRes'])

            logger.debug(f"Текущая дата провайдера: {current_time_dt}, дата транзакции: {event_time_res_dt}")

            if pd.isna(current_time_dt) or pd.isna(event_time_res_dt):
                logger.warning(f"Некорректные даты для провайдера ID {terminal_id} или транзакции.")
                continue

            if current_time_dt.date() == event_time_res_dt.date():
                logger.debug("Совпадение дат найдено.")

                if predicted_success[0] == 1:
                    logger.debug("Обновление лимитов провайдера после успешной транзакции.")
                    providers.loc[providers['ID'] == terminal_id, 'MIN_LIMITS'] -= amount_in_usd
                    providers.loc[providers['ID'] == terminal_id, 'MAX_LIMITS'] -= amount_in_usd

                avg_time = matching_provider['AVG_TIME'].values[0]
                providers.loc[providers['ID'] == terminal_id, 'TIME'] += pd.Timedelta(seconds=avg_time)
                
                results.append({
                    'terminal_id': terminal_id,
                    'eventTimeRes': transaction['eventTimeRes'],
                    'prediction': int(predicted_success[0]),
                    'amount_in_usd': amount_in_usd
                })
                logger.debug(f"Результат добавлен: {results[-1]}")

        results_df = pd.DataFrame(results)
        logger.info(f"Результаты транзакции {transaction.name} сохранены.")
        return results_df

    except Exception as e:
        logger.error(f"Ошибка в predict_flow: {e}")
        return pd.DataFrame()




if __name__ == "__main__":
    # Пример использования
    providers_file = "providers_2.csv"
    payments_file = "payments_2.csv"
    rates_file = "ex_rates.csv"
    
    features_for_model = FEATURES_FILE
    labels_for_model = LABELS_FILE
    main(providers_file, payments_file, rates_file, features_for_model, labels_for_model)


2024-12-22 02:01:08,155 - Загрузка данных...
2024-12-22 02:01:08,433 - Предобработка данных провайдеров...
2024-12-22 02:01:08,463 - Providers processed with dynamic LIMIT_MIN and LIMIT_MAX updates based on the earliest time of each day.
2024-12-22 02:01:08,469 - Генерация признаков и меток...


Вы хотите сформировать новые признаки для данных?
