## 1 Определение области проекта:
создание аналитической платформы, которая объединяет данные по странам (ВВП, сельскохозяйственный индекс, выбросы CO2)
создаёт локальную SQLite базу данных для хранения данных
фильтрует данные и строит визуализацию по запросам к бд



In [2]:
# Ячейка 1: Импорт библиотек
import pandas as pd
import sqlite3
import os
import glob
import re

In [3]:
# Ячейка 2: Определение путей
# Пути к папкам
start_input_folder = r"C:\Users\filos\Desktop\итмо\маг\инжиниринг данных\данные\инпут1" # Данные для инициализации БД (2000 г.)
update_input_folder = r"C:\Users\filos\Desktop\итмо\маг\инжиниринг данных\данные\инпут2" # Данные для обновления БД (остальные года)
output_db_folder = r"C:\Users\filos\Desktop\итмо\маг\инжиниринг данных\данные\база"
db_path = os.path.join(output_db_folder, "combined_data_normalized.db")

# Создаём папку для БД, если её нет
os.makedirs(output_db_folder, exist_ok=True)

print(f"Папка для начальных данных (2000 год): {start_input_folder}")
print(f"Папка для данных обновления (остальные года): {update_input_folder}")
print(f"Папка для базы данных: {output_db_folder}")
print(f"Путь к файлу БД: {db_path}")

Папка для начальных данных (2000 год): C:\Users\filos\Desktop\итмо\маг\инжиниринг данных\данные\инпут1
Папка для данных обновления (остальные года): C:\Users\filos\Desktop\итмо\маг\инжиниринг данных\данные\инпут2
Папка для базы данных: C:\Users\filos\Desktop\итмо\маг\инжиниринг данных\данные\база
Путь к файлу БД: C:\Users\filos\Desktop\итмо\маг\инжиниринг данных\данные\база\combined_data_normalized.db


## 2 Сбор данных
данные для базы данных берём из csv файлов с портала https://worlddataview.com/
особенности сайта не позволяют выгрузить одним файлом данные за все годы поэтому каждый год сохраняется отдельно отдельной csv таблицей для каждого исследуемого параметра

для первого запуска берём например данные за 2000 год из папки инпут1

следующие данные прошлых лет хранятся в отдельной папке инпут2 и уже позже добавляются к бд
база данных строится из таблицы в которой хранится название страны из исходного csv файла, страна получает свой уникальный id при появлении новых стран в следующие года новые страны получают свои id 
id короче чем текстовое название страны что позволяет экономить память в бд если там будут записи за миллионы лет


во второй таблице хранятся id страны и те параметры которые берутся из csv например выбросы co2 из таблицы csv по странам за 2000 год, каждая строка имеет свой айди страны год и уровень параметра , несколько параметров ( сейчас 3 ) сращиваются новыми столбцами в бд


отдельно генерируется условный код из 3х первых букв названия страны RUS и тд (некоторые страны не имеют настоящих кодов) просто для того чтобы показать хранение дополнительных данных, наприиер можно так хранить не код а изображение с флагом которое можно вызывать при визуализациях не храня в бд дубликаты по тысячам строк

In [5]:
# Ячейка 3: Проверка наличия файлов в инпут1
def check_csv_files(input_folder, patterns):
    """
    Проверяет наличие CSV-файлов по шаблонам сайта - все файлы имеют одинаковый формат названий и разный год в конце
    """
    all_csv_files = os.listdir(input_folder)
    
    for param_name, pattern in patterns.items():
        print(f"\n--- Проверка файлов для параметра: {param_name} ---")
        regex = re.compile(pattern)
        matched_files = [f for f in all_csv_files if regex.match(f)]
        
        if not matched_files:
            print(f"Файлы для параметра '{param_name}' не найдены по шаблону '{pattern}'.")
            continue
        
        years_found = []
        for file in matched_files:
            match = regex.match(file)
            if match:
                year = int(match.group(1))
                years_found.append(year)
        
        years_found = sorted(years_found)
        print(f"Найдено файлов: {len(years_found)}. Годы: {years_found}")

# Шаблоны файлов
patterns = {
    'gdp_growth': r'GDP growth \(annual \) (\d{4})\.csv',
    'crop_index': r'Crop production index \(2004-2006  100\) (\d{4})\.csv',
    'co2_emissions': r'CO2 emissions \(metric tons per capita\) (\d{4})\.csv'
}

print("Проверка файлов в папке начальных данных (2000):")
check_csv_files(start_input_folder, patterns)

Проверка файлов в папке начальных данных (2000):

--- Проверка файлов для параметра: gdp_growth ---
Найдено файлов: 1. Годы: [2000]

--- Проверка файлов для параметра: crop_index ---
Найдено файлов: 1. Годы: [2000]

--- Проверка файлов для параметра: co2_emissions ---
Найдено файлов: 1. Годы: [2000]


In [6]:
# Ячейка 4: Функция чтения и предварительной обработки CSV (для инпут1)
def read_and_process_param_files(input_folder, param_name, pattern):
    """
    Читает CSV-файлы для одного параметра из указанной папки, обрабатывает и возвращает объединённый DataFrame.
    """
    all_csv_files = os.listdir(input_folder)
    regex = re.compile(pattern)

    dataframes = []
    for file in all_csv_files:
        if regex.match(file):
            year_match = regex.match(file)
            year = int(year_match.group(1))
            file_path = os.path.join(input_folder, file)

            try:
                df = pd.read_csv(file_path)
                expected_cols = ['Country', df.columns[1], 'Rank', 'Year']
                if not all(col in df.columns for col in expected_cols):
                    print(f"Файл {file} не содержит ожидаемых столбцов. Пропускаю.")
                    continue

                df = df.drop(columns=['Rank'])
                df = df.rename(columns={df.columns[1]: param_name})
                df['year'] = year
                df = df.drop(columns=['Year'])

                dataframes.append(df)
                print(f"Файл {file} (год {year}) успешно обработан.")
            except Exception as e:
                print(f"Ошибка при чтении файла {file}: {e}")

    if not dataframes:
        print(f"Не удалось обработать ни одного файла для параметра '{param_name}'.")
        return pd.DataFrame()

    combined_df = pd.concat(dataframes, ignore_index=True)
    print(f"DataFrame для параметра '{param_name}' объединён. Размер: {combined_df.shape}")
    return combined_df

In [7]:
# Ячейка 5: Функция создания базы данных из инпут1
def create_db_from_input1(input_folder, db_path, patterns):
    """
    Создаёт нормализованную SQLite базу данных из CSV-файлов в input_folder (инпут1).
    Создаёт таблицы countries и country_indicators.
    Добавляет уникальный индекс на (country_id, year) в country_indicators.
    Корректно вставляет/обновляет данные по параметрам.
    """
    # --- 1. Сбор всех уникальных стран из CSV инпут1 ---
    all_countries = set()
    for param_name, pattern in patterns.items():
        regex = re.compile(pattern)
        csv_files = [f for f in os.listdir(input_folder) if regex.match(f)]
        for file in csv_files:
            file_path = os.path.join(input_folder, file)
            try:
                df = pd.read_csv(file_path)
                countries_in_file = df.iloc[:, 0].dropna().unique()
                all_countries.update(countries_in_file)
            except Exception as e:
                print(f"Ошибка при чтении файла {file}: {e}")

    print(f"Найдено уникальных стран в инпут1: {len(all_countries)}")

    # --- 2. Создание и заполнение таблицы countries ---
    conn = sqlite3.connect(db_path)

    create_countries_sql = """
    CREATE TABLE IF NOT EXISTS countries (
        country_id INTEGER PRIMARY KEY AUTOINCREMENT,
        country_name TEXT UNIQUE NOT NULL,
        country_code TEXT
    );
    """
    conn.execute(create_countries_sql)

    # Генерация условного кода (первые 3 буквы, убираем скобки/пробелы, в верхний регистр)
    countries_with_codes = []
    for country_name in sorted(all_countries):
        clean_name = re.sub(r'[^a-zA-Z0-9]', '', country_name)
        code = clean_name[:3].upper()
        if len(code) < 3:
            code = (code + 'XXX')[:3]
        countries_with_codes.append((country_name, code))

    insert_countries_sql = "INSERT OR IGNORE INTO countries (country_name, country_code) VALUES (?, ?);"
    conn.executemany(insert_countries_sql, countries_with_codes)

    # --- 3. Создание таблицы country_indicators ---
    create_indicators_sql = """
    CREATE TABLE IF NOT EXISTS country_indicators (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        country_id INTEGER NOT NULL,
        year INTEGER NOT NULL,
        gdp_growth REAL,
        crop_index REAL,
        co2_emissions REAL,
        FOREIGN KEY (country_id) REFERENCES countries (country_id)
    );
    """
    conn.execute(create_indicators_sql)

    # --- 4. Добавление уникального индекса ---
    create_unique_index_sql = """
    CREATE UNIQUE INDEX IF NOT EXISTS idx_country_year ON country_indicators (country_id, year);
    """
    conn.execute(create_unique_index_sql)
    print("Уникальный индекс idx_country_year создан (если не существовал).")

    # --- 5. Загрузка данных из CSV инпут1 в country_indicators ---
    # Создаём словарь: country_name -> country_id
    cursor = conn.cursor()
    cursor.execute("SELECT country_name, country_id FROM countries;")
    country_name_to_id = dict(cursor.fetchall())

    for param_name, pattern in patterns.items():
        regex = re.compile(pattern)
        csv_files = [f for f in os.listdir(input_folder) if regex.match(f)]
        print(f"\nОбработка параметра '{param_name}' из инпут1, файлов: {len(csv_files)}")

        for file in csv_files:
            year_match = regex.match(file)
            year = int(year_match.group(1))
            file_path = os.path.join(input_folder, file)

            try:
                df = pd.read_csv(file_path)
                df = df[['Country', df.columns[1]]] # Оставляем только Country и Value
                df.columns = ['Country', param_name]
                df['year'] = year
                df['country_id'] = df['Country'].map(country_name_to_id)

                # Подготовим данные для вставки/обновления
                insert_data = df[['country_id', 'year', param_name]].dropna(subset=['country_id'])
                insert_data[param_name] = insert_data[param_name].where(pd.notnull(insert_data[param_name]), None)
                # Убираем строки, где значение параметра - NaN, чтобы не перезаписывать существующие данные на NULL
                insert_data = insert_data.dropna(subset=[param_name])

                tuples_to_insert = [tuple(row) for row in insert_data.values]

                # Используем INSERT ... ON CONFLICT UPDATE, чтобы обновить существующую строку
                # Это работает, потому что (country_id, year) уникальны
                insert_sql = f"""
                INSERT INTO country_indicators (country_id, year, {param_name})
                VALUES (?, ?, ?)
                ON CONFLICT (country_id, year) DO UPDATE SET {param_name} = excluded.{param_name};
                """
                conn.executemany(insert_sql, tuples_to_insert)

            except Exception as e:
                print(f"Ошибка при обработке файла {file}: {e}")

    conn.commit()
    conn.close()
    print(f"\nБаза данных создана и заполнена из инпут1: {db_path}")

In [8]:
# Вызов функции для создания БД из инпут1
create_db_from_input1(start_input_folder, db_path, patterns)

Найдено уникальных стран в инпут1: 218
Уникальный индекс idx_country_year создан (если не существовал).

Обработка параметра 'gdp_growth' из инпут1, файлов: 1

Обработка параметра 'crop_index' из инпут1, файлов: 1

Обработка параметра 'co2_emissions' из инпут1, файлов: 1

База данных создана и заполнена из инпут1: C:\Users\filos\Desktop\итмо\маг\инжиниринг данных\данные\база\combined_data_normalized.db


In [9]:
# Ячейка 6: Проверка содержимого созданной БД
def check_db_contents(db_path):
    """
    Выводит количество строк в обеих таблицах.
    """
    conn = sqlite3.connect(db_path)

    cursor = conn.cursor()

    cursor.execute("SELECT COUNT(*) FROM countries;")
    count_countries = cursor.fetchone()[0]
    print(f"Строк в таблице 'countries': {count_countries}")

    cursor.execute("SELECT COUNT(*) FROM country_indicators;")
    count_indicators = cursor.fetchone()[0]
    print(f"Строк в таблице 'country_indicators': {count_indicators}")

    # Пример JOIN запроса
    print("\nПример данных (JOIN):")
    join_query = """
    SELECT c.country_name, ci.year, ci.gdp_growth, ci.crop_index, ci.co2_emissions
    FROM country_indicators ci
    JOIN countries c ON ci.country_id = c.country_id
    WHERE c.country_name = 'Russian Federation (the)' AND ci.year = 2000
    LIMIT 10;
    """
    result_df = pd.read_sql_query(join_query, conn)
    print(result_df)

    conn.close()

# Вызов функции
check_db_contents(db_path)

Строк в таблице 'countries': 218
Строк в таблице 'country_indicators': 194

Пример данных (JOIN):
               country_name  year  gdp_growth  crop_index  co2_emissions
0  Russian Federation (the)  2000        10.0       82.26          10.62


файл с бд combined_data_normalized успешно создан и хранится в заданной папке

In [11]:
# пути к новым файлам и вызов функции обновления
update_input_folder = r"C:\Users\filos\Desktop\итмо\маг\инжиниринг данных\данные\инпут2"
print(f"Папка с файлами для обновления: {update_input_folder}")

Папка с файлами для обновления: C:\Users\filos\Desktop\итмо\маг\инжиниринг данных\данные\инпут2


далее функция сканирует input_folder (инпут2) и находит все CSV-файлы по шаблонам.
Собирает уникальные названия стран из этих файлов.
Сравнивает с уже существующими в countries и находит новые.
Добавляет новые страны в countries с условными кодами.
Обновляет словарь existing_countries, чтобы включить новые country_id.
Читает каждый файл, сопоставляет Country с country_id.
Вставляет строки в country_indicators

In [13]:
def update_db_from_input2(input_folder, db_path, patterns):
    """
    Обновляет существующую SQLite базу данных из CSV-файлов в input_folder (инпут2).
    Добавляет новые страны в таблицу countries.
    Корректно вставляет/обновляет данные в таблицу country_indicators, избегая дубликатов.
    """
    conn = sqlite3.connect(db_path)
    cursor = conn.cursor()

    # --- 1. Получить существующие страны из БД ---
    cursor.execute("SELECT country_name, country_id FROM countries;")
    existing_countries = dict(cursor.fetchall())
    print(f"В БД уже содержится {len(existing_countries)} стран.")

    # --- 2. Собрать все уникальные новые страны из CSV ---
    all_new_countries_from_csv = set()
    for param_name, pattern in patterns.items():
        regex = re.compile(pattern)
        csv_files = [f for f in os.listdir(input_folder) if regex.match(f)]
        print(f"\n--- Обработка параметра '{param_name}' ---")
        print(f"Найдено файлов: {len(csv_files)}")

        for file in csv_files:
            file_path = os.path.join(input_folder, file)
            try:
                df = pd.read_csv(file_path)
                countries_in_file = df.iloc[:, 0].dropna().unique()
                all_new_countries_from_csv.update(countries_in_file)
                print(f"  - {file}: найдено {len(countries_in_file)} стран")
            except Exception as e:
                print(f"  - Ошибка при чтении файла {file}: {e}")

    print(f"\nВсего уникальных стран найдено в инпут2: {len(all_new_countries_from_csv)}")

    # --- 3. Найти новые страны, которых нет в БД ---
    new_countries_to_add = all_new_countries_from_csv - set(existing_countries.keys())
    print(f"Новых стран для добавления в таблицу 'countries': {len(new_countries_to_add)}")

    # --- 4. Добавить новые страны в таблицу countries ---
    if new_countries_to_add:
        new_countries_with_codes = []
        for country_name in sorted(new_countries_to_add):
            # Генерация условного кода
            clean_name = re.sub(r'[^a-zA-Z0-9]', '', country_name)
            code = clean_name[:3].upper()
            if len(code) < 3:
                code = (code + 'XXX')[:3]
            new_countries_with_codes.append((country_name, code))

        insert_new_countries_sql = "INSERT OR IGNORE INTO countries (country_name, country_code) VALUES (?, ?);"
        cursor.executemany(insert_new_countries_sql, new_countries_with_codes)
        conn.commit()
        print(f"  - Добавлено {len(new_countries_with_codes)} новых строк в таблицу 'countries'.")

        # Обновить словарь existing_countries, чтобы включить новые ID
        cursor.execute("SELECT country_name, country_id FROM countries;")
        existing_countries = dict(cursor.fetchall())

    # --- 5. Загрузить данные из CSV в country_indicators ---
    print("\n--- Загрузка данных в таблицу 'country_indicators' ---")
    rows_inserted_or_updated_count = 0
    for param_name, pattern in patterns.items():
        regex = re.compile(pattern)
        csv_files = [f for f in os.listdir(input_folder) if regex.match(f)]

        for file in csv_files:
            year_match = regex.match(file)
            year = int(year_match.group(1))
            file_path = os.path.join(input_folder, file)

            try:
                df = pd.read_csv(file_path)
                df = df[['Country', df.columns[1]]] # Оставляем только Country и Value
                df.columns = ['Country', param_name]
                df['year'] = year
                # Сопоставляем с существующим country_id
                df['country_id'] = df['Country'].map(existing_countries)

                # Оставляем только строки, для которых удалось найти country_id
                df_to_insert = df[['country_id', 'year', param_name]].dropna(subset=['country_id'])
                df_to_insert = df_to_insert.copy() # Избегаем SettingWithCopyWarning
                df_to_insert['country_id'] = df_to_insert['country_id'].astype(int) # Убедимся, что ID - целое
                df_to_insert[param_name] = df_to_insert[param_name].where(pd.notnull(df_to_insert[param_name]), None)
                # Убираем строки, где значение параметра - NaN, чтобы не перезаписывать существующие данные на NULL
                df_to_insert = df_to_insert.dropna(subset=[param_name])

                if not df_to_insert.empty:
                    # Подготовим данные для вставки/обновления
                    tuples_to_insert = [tuple(row) for row in df_to_insert.values]

                    # Используем INSERT ... ON CONFLICT UPDATE
                    insert_sql = f"""
                    INSERT INTO country_indicators (country_id, year, {param_name})
                    VALUES (?, ?, ?)
                    ON CONFLICT (country_id, year) DO UPDATE SET {param_name} = excluded.{param_name};
                    """
                    cursor.executemany(insert_sql, tuples_to_insert)
                    rows_inserted_or_updated_count += len(tuples_to_insert)
                    print(f"  - Вставлено/обновлено {len(tuples_to_insert)} строк из файла {file}")

            except Exception as e:
                print(f"  - Ошибка при обработке файла {file}: {e}")

    conn.commit()
    conn.close()
    print(f"\n--- Обновление завершено ---")
    print(f"Всего строк вставлено или обновлено в 'country_indicators': {rows_inserted_or_updated_count}")

In [14]:
# Вызов функции обновления
update_db_from_input2(update_input_folder, db_path, patterns)

В БД уже содержится 218 стран.

--- Обработка параметра 'gdp_growth' ---
Найдено файлов: 20
  - GDP growth (annual ) 2001.csv: найдено 218 стран
  - GDP growth (annual ) 2002.csv: найдено 218 стран
  - GDP growth (annual ) 2003.csv: найдено 218 стран
  - GDP growth (annual ) 2004.csv: найдено 218 стран
  - GDP growth (annual ) 2005.csv: найдено 218 стран
  - GDP growth (annual ) 2006.csv: найдено 218 стран
  - GDP growth (annual ) 2007.csv: найдено 218 стран
  - GDP growth (annual ) 2008.csv: найдено 218 стран
  - GDP growth (annual ) 2009.csv: найдено 218 стран
  - GDP growth (annual ) 2010.csv: найдено 218 стран
  - GDP growth (annual ) 2011.csv: найдено 218 стран
  - GDP growth (annual ) 2012.csv: найдено 218 стран
  - GDP growth (annual ) 2013.csv: найдено 218 стран
  - GDP growth (annual ) 2014.csv: найдено 218 стран
  - GDP growth (annual ) 2015.csv: найдено 218 стран
  - GDP growth (annual ) 2016.csv: найдено 218 стран
  - GDP growth (annual ) 2017.csv: найдено 218 стран
  - GDP

данные успешно добавлены в бд теперь там данные не за 1 год а с 2000 по 2020

тестовый запрос к бд который возвращает список стран в базе и годы


In [17]:
import sqlite3
import pandas as pd

def print_countries_and_years(db_path):
    """
    Подключается к БД и выводит список уникальных стран и лет.
    """
    conn = sqlite3.connect(db_path)

    # --- Получить список стран ---
    cursor = conn.cursor()
    cursor.execute("SELECT DISTINCT country_name FROM countries ORDER BY country_name;")
    countries_list = [row[0] for row in cursor.fetchall()]

    # --- Получить список лет ---
    cursor.execute("SELECT DISTINCT year FROM country_indicators ORDER BY year;")
    years_list = [row[0] for row in cursor.fetchall()]

    conn.close()

    print("--- Список стран в БД ---")
    print(f"Всего стран: {len(countries_list)}")
    for country in countries_list:
        print(country)

    print("\n--- Список лет в БД ---")
    print(f"Всего лет: {len(years_list)}")
    print(years_list)

# --- Вызов функции ---
print_countries_and_years(db_path)

--- Список стран в БД ---
Всего стран: 218
Afghanistan
Albania
Algeria
American Samoa
Andorra
Angola
Antigua and Barbuda
Argentina
Armenia
Australia
Austria
Azerbaijan
Bahamas (the)
Bahrain
Bajo Nuevo Bank (Petrel Is.)
Bangladesh
Barbados
Belarus
Belgium
Belize
Benin
Bhutan
Bolivia (Plurinational State of)
Bonaire
Bosnia and Herzegovina
Botswana
Brazil
Brunei Darussalam
Bulgaria
Burkina Faso
Burundi
Cabo Verde
Cambodia
Cameroon
Canada
Central African Republic (the)
Chad
Chile
China
Colombia
Comoros (the)
Congo (the Democratic Republic of the)
Congo (the)
Costa Rica
Croatia
Cuba
Curaçao
Cyprus
Cyprus No Mans Area
Czechia
Côte d'Ivoire
Denmark
Djibouti
Dominica
Dominican Republic (the)
Ecuador
Egypt
El Salvador
Equatorial Guinea
Eritrea
Estonia
Eswatini
Ethiopia
Faroe Islands
Fiji
Finland
France
Gabon
Gambia (the)
Georgia
Germany
Ghana
Greece
Greenland
Grenada
Guam
Guatemala
Guinea
Guinea-Bissau
Guyana
Haiti
Holy See (the)
Honduras
Hungary
Iceland
India
Indonesia
Iran (Islamic Republic o

база данных успешно создана и сохранена в файл

из отдельной папки с новыми данынми успешно обновлена