In [None]:
import concurrent.futures
import requests
import time
import json
import pandas as pd
import logging
import sys
from typing import List, Dict, Any, Optional

class ExistingDatasetAPIClient:
    def __init__(self, api_key: str, config_file: str = 'config_new.json'):
        self.api_key = api_key
        self.base_url = "https://api.rawg.io/api"
        logging.getLogger().handlers = []
        self._setup_logging_from_config(config_file)
        self.logger = logging.getLogger(__name__)

    def _setup_logging_from_config(self, config_file: str):
        try:
            with open(config_file, 'r', encoding='utf-8') as f:
                config = json.load(f)

            log_config = config.get('logging', {})
            if not log_config.get('enabled', True):
                logging.getLogger().disabled = True
                return

            level = getattr(logging, log_config.get('level', 'INFO'))
            handlers = []
            formatter = logging.Formatter(
                '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
            )

            if log_config.get('log_to_file', True):
                file_handler = logging.FileHandler(
                    log_config.get('filename', 'api_client_more_info.log'),
                    encoding='utf-8'
                )
                file_handler.setFormatter(formatter)
                handlers.append(file_handler)

            if log_config.get('log_to_console', True):
                console_handler = logging.StreamHandler(sys.stdout)
                console_handler.setFormatter(formatter)
                handlers.append(console_handler)

            logging.basicConfig(level=level, handlers=handlers)

        except Exception as e:
            logging.basicConfig(
                level=logging.INFO,
                format='%(asctime)s - %(levelname)s - %(message)s',
                handlers=[logging.StreamHandler(sys.stdout)]
            )

    def _make_api_request(self, endpoint: str, params: Optional[Dict] = None) -> Optional[Dict]:
        if params is None:
            params = {}

        params['key'] = self.api_key

        try:
            if endpoint == "games":
                url = f"{self.base_url}/games"
            else:
                url = f"{self.base_url}/{endpoint}"

            response = requests.get(url, params=params, timeout=30)

            if response.status_code == 429:
                self.logger.warning("Лимит запросов. Ждем 60 секунд...")
                time.sleep(60)
                return self._make_api_request(endpoint, params)

            if response.status_code == 502:
                self.logger.warning("Ошибка 502, ждем 3 секунды...")
                time.sleep(3)
                return self._make_api_request(endpoint, params)

            response.raise_for_status()
            return response.json()

        except requests.exceptions.RequestException as e:
            self.logger.error(f"Ошибка запроса к {endpoint}: {e}")
            return None

    def load_existing_dataset(self, filepath: str, id_column: str = 'id') -> tuple[pd.DataFrame, List[int]]:
        self.logger.info(f"Загрузка датасета из {filepath}")

        try:
            if filepath.endswith('.csv'):
                df = pd.read_csv(filepath)
            elif filepath.endswith('.xlsx'):
                df = pd.read_excel(filepath)
            elif filepath.endswith('.json'):
                df = pd.read_json(filepath)
            else:
                raise ValueError("Поддерживаются только CSV, Excel и JSON форматы")

            # Проверяем наличие колонки с ID
            if id_column not in df.columns:
                raise ValueError(f"Колонка '{id_column}' не найдена в датасете")

            # Преобразуем ID в список, убирая NaN значения
            game_ids = df[id_column].dropna().astype(int).tolist()

            self.logger.info(f"Загружено {len(df)} записей, {len(game_ids)} валидных ID игр")
            return df, game_ids

        except Exception as e:
            self.logger.error(f"Ошибка загрузки датасета: {e}")
            raise

    # МЕТОДЫ ДЛЯ СБОРА ДОПОЛНИТЕЛЬНЫХ ДАННЫХ

    def get_game_details(self, game_id: int) -> Optional[Dict]:
        self.logger.debug(f"Загрузка деталей игры {game_id}")

        endpoint = f"games/{game_id}"
        data = self._make_api_request(endpoint)

        if data:
            # Добавляем аналитические метрики
            playtime = data.get('playtime', 0)
            data['completion_ratio'] = playtime / max(playtime, 1) if playtime > 0 else 0

            rating = data.get('rating', 0)
            rating_top = data.get('rating_top', 5)
            data['rating_ratio'] = rating / max(rating_top, 1) if rating > 0 else 0

            # Извлекаем дополнительные метрики
            data['platforms_count'] = len(data.get('platforms', []))
            data['genres_count'] = len(data.get('genres', []))
            data['developers_count'] = len(data.get('developers', []))
            data['publishers_count'] = len(data.get('publishers', []))

            return data
        return None

    def get_game_additions(self, game_id: int) -> Optional[Dict]:
        self.logger.debug(f"Загрузка дополнений игры {game_id}")

        endpoint = f"games/{game_id}/additions"
        data = self._make_api_request(endpoint)

        if data:
            additions = data.get('results', [])

            return {
                'game_id': game_id,
                'additions': additions,
                'additions_count': len(additions)
            }
        return None

    def get_game_series(self, game_id: int) -> Optional[Dict]:
        self.logger.debug(f"Загрузка серии игры {game_id}")

        endpoint = f"games/{game_id}/game-series"
        data = self._make_api_request(endpoint)

        if data:
            series_games = data.get('results', [])

            return {
                'game_id': game_id,
                'series_games': series_games,
                'series_count': len(series_games)
            }
        return None

    def get_game_achievements(self, game_id: int) -> Optional[Dict]:
        self.logger.debug(f"Загрузка достижений игры {game_id}")

        endpoint = f"games/{game_id}/achievements"
        data = self._make_api_request(endpoint)

        if data:
            achievements = data.get('results', [])

            total_achievements = len(achievements)

            # Считаем сложность достижений по проценту completion
            completion_rates = []
            for ach in achievements:
                if ach.get('percent') is not None:
                    try:
                        percent_value = ach['percent']
                        if isinstance(percent_value, str):
                            percent_value = percent_value.replace('%', '').strip()
                            percent_value = float(percent_value)
                        else:
                            percent_value = float(percent_value)
                        completion_rates.append(percent_value)
                    except (ValueError, TypeError):
                        continue

            return {
                'game_id': game_id,
                'achievements': achievements,
                'achievements_count': total_achievements
            }
        return None

    # ПАРАЛЛЕЛЬНЫЕ МЕТОДЫ

    def get_details_parallel(self, game_ids: List[int], max_workers: int = 3) -> List[Dict]:
        self.logger.info(f"Параллельная загрузка деталей для {len(game_ids)} игр")

        game_details = []
        with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
            future_to_id = {
                executor.submit(self.get_game_details, game_id): game_id
                for game_id in game_ids
            }

            completed = 0
            total = len(game_ids)

            for future in concurrent.futures.as_completed(future_to_id):
                game_id = future_to_id[future]
                try:
                    details = future.result()
                    if details:
                        game_details.append(details)
                    completed += 1
                    if completed % 10 == 0:
                        self.logger.info(f"Детали: {completed}/{total} ({completed/total*100:.1f}%)")
                except Exception as e:
                    completed += 1
                    self.logger.error(f"Ошибка загрузки деталей для игры {game_id}: {e}")

        self.logger.info(f"Загружено деталей: {len(game_details)}/{len(game_ids)}")
        return game_details

    def get_additions_parallel(self, game_ids: List[int], max_workers: int = 2) -> List[Dict]:
        self.logger.info(f"Параллельная загрузка дополнений для {len(game_ids)} игр")

        additions_data = []
        with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
            future_to_id = {
                executor.submit(self.get_game_additions, game_id): game_id
                for game_id in game_ids
            }

            completed = 0
            total = len(game_ids)

            for future in concurrent.futures.as_completed(future_to_id):
                game_id = future_to_id[future]
                try:
                    additions = future.result()
                    if additions:
                        additions_data.append(additions)
                    completed += 1
                    if completed % 20 == 0:
                        self.logger.info(f"Дополнения: {completed}/{total} ({completed/total*100:.1f}%)")
                except Exception as e:
                    completed += 1
                    self.logger.error(f"Ошибка загрузки дополнений для игры {game_id}: {e}")

        self.logger.info(f"Загружено данных о дополнениях: {len(additions_data)}/{len(game_ids)}")
        return additions_data

    def get_series_parallel(self, game_ids: List[int], max_workers: int = 2) -> List[Dict]:
        self.logger.info(f"Параллельная загрузка серий для {len(game_ids)} игр")

        series_data = []
        with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
            future_to_id = {
                executor.submit(self.get_game_series, game_id): game_id
                for game_id in game_ids
            }

            completed = 0
            total = len(game_ids)

            for future in concurrent.futures.as_completed(future_to_id):
                game_id = future_to_id[future]
                try:
                    series = future.result()
                    if series:
                        series_data.append(series)
                    completed += 1
                    if completed % 20 == 0:
                        self.logger.info(f"Серии: {completed}/{total} ({completed/total*100:.1f}%)")
                except Exception as e:
                    completed += 1
                    self.logger.error(f"Ошибка загрузки серий для игры {game_id}: {e}")

        self.logger.info(f"Загружено данных о сериях: {len(series_data)}/{len(game_ids)}")
        return series_data

    def get_achievements_parallel(self, game_ids: List[int], max_workers: int = 2) -> List[Dict]:
        self.logger.info(f"Параллельная загрузка достижений для {len(game_ids)} игр")

        achievements_data = []
        with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
            future_to_id = {
                executor.submit(self.get_game_achievements, game_id): game_id
                for game_id in game_ids
            }

            completed = 0
            total = len(game_ids)

            for future in concurrent.futures.as_completed(future_to_id):
                game_id = future_to_id[future]
                try:
                    achievements = future.result()
                    if achievements:
                        achievements_data.append(achievements)
                    completed += 1
                    if completed % 20 == 0:
                        self.logger.info(f"Достижения: {completed}/{total} ({completed/total*100:.1f}%)")
                except Exception as e:
                    completed += 1
                    self.logger.error(f"Ошибка загрузки достижений для игры {game_id}: {e}")

        self.logger.info(f"Загружено данных о достижениях: {len(achievements_data)}/{len(game_ids)}")
        return achievements_data

    # МЕТОДЫ ДЛЯ ОБЪЕДИНЕНИЯ И СОХРАНЕНИЯ ДАННЫХ

    def save_to_csv(self, data: List[Dict], filename: str):
        if not data:
            self.logger.warning(f"Нет данных для сохранения в {filename}")
            return

        df = pd.DataFrame(data)
        df.to_csv(filename, index=False, encoding='utf-8-sig')
        self.logger.info(f"Сохранено: {filename} ({len(df)} записей)")

    def merge_with_original_dataset(self, original_df: pd.DataFrame,
                                  additional_data: Dict[str, List[Dict]],
                                  data_types: List[str]) -> pd.DataFrame:
        self.logger.info("Объединение данных с оригинальным датасетом")

        result_df = original_df.copy()

        # Объединяем детальную информацию
        if 'details' in data_types and additional_data.get('details'):
            details_df = pd.DataFrame(additional_data['details'])

            # Выбираем только новые колонки, которых нет в оригинальном датасете
            new_columns = [col for col in details_df.columns
                         if col not in result_df.columns or col == 'id']

            if new_columns:
                # Убираем дубликаты по ID на случай, если их несколько
                details_df_clean = details_df.drop_duplicates(subset=['id'])[new_columns]
                result_df = result_df.merge(details_df_clean, on='id', how='left', suffixes=('', '_detail'))
                self.logger.info(f"Добавлено {len(new_columns)-1} колонок с деталями")

        # Объединяем данные о дополнениях
        if 'additions' in data_types and additional_data.get('additions'):
            additions_df = pd.DataFrame(additional_data['additions'])
            additions_columns = [col for col in additions_df.columns
                               if col not in result_df.columns or col == 'game_id']

            if additions_columns:
                additions_df_clean = additions_df.drop_duplicates(subset=['game_id'])[additions_columns]
                result_df = result_df.merge(additions_df_clean, left_on='id', right_on='game_id', how='left')
                result_df = result_df.drop('game_id', axis=1, errors='ignore')
                self.logger.info(f"Добавлено {len(additions_columns)-1} колонок с дополнениями")

        # Объединяем данные о сериях
        if 'series' in data_types and additional_data.get('series'):
            series_df = pd.DataFrame(additional_data['series'])
            series_columns = [col for col in series_df.columns
                            if col not in result_df.columns or col == 'game_id']

            if series_columns:
                series_df_clean = series_df.drop_duplicates(subset=['game_id'])[series_columns]
                result_df = result_df.merge(series_df_clean, left_on='id', right_on='game_id', how='left')
                result_df = result_df.drop('game_id', axis=1, errors='ignore')
                self.logger.info(f"Добавлено {len(series_columns)-1} колонок с сериями")

        # Объединяем данные о достижениях
        if 'achievements' in data_types and additional_data.get('achievements'):
            achievements_df = pd.DataFrame(additional_data['achievements'])
            achievements_columns = [col for col in achievements_df.columns
                                  if col not in result_df.columns or col == 'game_id']

            if achievements_columns:
                achievements_df_clean = achievements_df.drop_duplicates(subset=['game_id'])[achievements_columns]
                result_df = result_df.merge(achievements_df_clean, left_on='id', right_on='game_id', how='left')
                result_df = result_df.drop('game_id', axis=1, errors='ignore')
                self.logger.info(f"Добавлено {len(achievements_columns)-1} колонок с достижениями")

        return result_df

    # ОСНОВНОЙ МЕТОД

    def enrich_existing_dataset(self,
                              input_file: str,
                              output_file: str = None,
                              data_types: List[str] = None,
                              id_column: str = 'id',
                              max_workers: Dict[str, int] = None) -> pd.DataFrame:

        if data_types is None:
            data_types = ['details', 'additions', 'series', 'achievements']

        if max_workers is None:
            max_workers = {
                'details': 3,
                'additions': 2,
                'series': 2,
                'achievements': 2
            }

        if output_file is None:
            base_name = input_file.split('.')[0]
            output_file = f"{base_name}_enriched.csv"

        self.logger.info(f"Исходный файл: {input_file}")
        self.logger.info(f"Выходной файл: {output_file}")
        self.logger.info(f"Типы данных: {data_types}")

        start_time = time.time()

        # 1. Загружаем существующий датасет
        original_df, game_ids = self.load_existing_dataset(input_file, id_column)

        if not game_ids:
            self.logger.error("Не найдено ID игр для обработки")
            return original_df

        # 2. Собираем дополнительные данные
        additional_data = {}

        if 'details' in data_types:
            self.logger.info("=== ЭТАП 1: СБОР ДЕТАЛЬНОЙ ИНФОРМАЦИИ ===")
            additional_data['details'] = self.get_details_parallel(
                game_ids, max_workers=max_workers['details']
            )
            self.save_to_csv(additional_data['details'], f"{output_file.split('.')[0]}_details.csv")

        if 'additions' in data_types:
            self.logger.info("=== ЭТАП 2: СБОР ДАННЫХ О ДОПОЛНЕНИЯХ ===")
            additional_data['additions'] = self.get_additions_parallel(
                game_ids, max_workers=max_workers['additions']
            )
            self.save_to_csv(additional_data['additions'], f"{output_file.split('.')[0]}_additions.csv")

        if 'series' in data_types:
            self.logger.info("=== ЭТАП 3: СБОР ДАННЫХ О СЕРИЯХ ===")
            additional_data['series'] = self.get_series_parallel(
                game_ids, max_workers=max_workers['series']
            )
            self.save_to_csv(additional_data['series'], f"{output_file.split('.')[0]}_series.csv")

        if 'achievements' in data_types:
            self.logger.info("=== ЭТАП 4: СБОР ДАННЫХ О ДОСТИЖЕНИЯХ ===")
            additional_data['achievements'] = self.get_achievements_parallel(
                game_ids, max_workers=max_workers['achievements']
            )
            self.save_to_csv(additional_data['achievements'], f"{output_file.split('.')[0]}_achievements.csv")

        # 3. Объединяем все данные
        self.logger.info("=== ЭТАП 5: ОБЪЕДИНЕНИЕ ДАННЫХ ===")
        enriched_df = self.merge_with_original_dataset(original_df, additional_data, data_types)

        # 4. Сохраняем результат
        enriched_df.to_csv(output_file, index=False, encoding='utf-8-sig')

        total_time = time.time() - start_time

        # 5. Финальная статистика
        self.logger.info("=" * 60)
        self.logger.info("ФИНАЛЬНАЯ СТАТИСТИКА:")
        self.logger.info(f"Общее время: {total_time:.1f} секунд")
        self.logger.info(f"Обработано игр: {len(game_ids)}")
        self.logger.info(f"Исходных колонок: {len(original_df.columns)}")
        self.logger.info(f"Финальных колонок: {len(enriched_df.columns)}")
        self.logger.info(f"Новых колонок: {len(enriched_df.columns) - len(original_df.columns)}")
        self.logger.info(f"Результат сохранен в: {output_file}")
        self.logger.info("=" * 60)

        return enriched_df

# ФУНКЦИИ ДЛЯ БЫСТРОГО ИСПОЛЬЗОВАНИЯ

def full_enrich_dataset(api_key: str, input_file: str, output_file: str = None) -> pd.DataFrame:
    client = ExistingDatasetAPIClient(api_key)

    result = client.enrich_existing_dataset(
        input_file=input_file,
        output_file=output_file,
        data_types=['details', 'additions', 'series', 'achievements'],
        max_workers={
            'details': 3,
            'additions': 2,
            'series': 2,
            'achievements': 2
        }
    )

    return result


if __name__ == "__main__":
    API_KEY = "abb5e7b7be40493ea962a4cc1e7c4275"

    full_result = full_enrich_dataset(
        api_key=API_KEY,
        input_file="games_parallel_2023_2023.csv",
        output_file="games_new_info_2023.csv"
    )

    print("Все операции завершены успешно!")


=== ПРИМЕР 2: ПОЛНОЕ ОБОГАЩЕНИЕ ===
2025-10-30 01:03:48,677 - __main__ - INFO - === НАЧАЛО ОБОГАЩЕНИЯ ДАТАСЕТА ===
2025-10-30 01:03:48,678 - __main__ - INFO - Исходный файл: games_parallel_2023_2023.csv
2025-10-30 01:03:48,679 - __main__ - INFO - Выходной файл: games_new_info_2023.csv
2025-10-30 01:03:48,680 - __main__ - INFO - Типы данных: ['details', 'additions', 'series', 'achievements']
2025-10-30 01:03:48,683 - __main__ - INFO - Загрузка датасета из games_parallel_2023_2023.csv
2025-10-30 01:03:48,895 - __main__ - INFO - Загружено 1000 записей, 1000 валидных ID игр
2025-10-30 01:03:48,898 - __main__ - INFO - === ЭТАП 1: СБОР ДЕТАЛЬНОЙ ИНФОРМАЦИИ ===
2025-10-30 01:03:48,900 - __main__ - INFO - Параллельная загрузка деталей для 1000 игр
2025-10-30 01:03:52,837 - __main__ - INFO - Детали: 10/1000 (1.0%)
2025-10-30 01:03:55,930 - __main__ - INFO - Детали: 20/1000 (2.0%)
2025-10-30 01:03:58,876 - __main__ - INFO - Детали: 30/1000 (3.0%)
2025-10-30 01:04:03,285 - __main__ - INFO - Дета