# Как парсил данные по характеристикам компаний из СПАРК

> Парсинг проводился по XML файлам из базы СПАРК

> XML файлы по конкретным поставщикам (ИНН из РНП)

> В доступе были наборы по ИП и по ЮЛ (по ЮЛ более подробная информация)

## Необходимые библиотеки

In [1]:
# для работы с XML
import xmltodict
import pandas as pd
import os
# для визуального отслеживания прогресса длительных операций
from tqdm import tqdm

## Парсим данные по ИП

In [31]:
def read_xml_to_df(file_path):
    with open(file_path, 'r', encoding='utf-8') as file:
        xml_content = file.read()
        data = xmltodict.parse(xml_content).get('Response', {}).get('Data', {})
    
    report = data.get('Report', {})
    reg_date_spark = report.get('DateReg', 'N/A')
    inn_spark = report.get('INN', 'N/A')
    region_name_spark = report.get('OKATO', {}).get('@RegionName', 'N/A')
    region_code_spark = report.get('OKATO', {}).get('@RegionCode', 'N/A')
    
    # Проверка и обработка структуры OKVED
    okved_data = report.get('OKVED2List', {}).get('OKVED', {})
    if isinstance(okved_data, list):
        main_okved_code_spark = okved_data[0].get('@Code', 'N/A') if okved_data else 'N/A'
        main_okved_name_spark = okved_data[0].get('@Name', 'N/A') if okved_data else 'N/A'
    elif isinstance(okved_data, dict):
        main_okved_code_spark = okved_data.get('@Code', 'N/A')
        main_okved_name_spark = okved_data.get('@Name', 'N/A')
    else:
        main_okved_code_spark = 'N/A'
        main_okved_name_spark = 'N/A'

    df_spark = pd.DataFrame({
        'reg_date': [reg_date_spark],
        'inn': [inn_spark],
        'region_name': [region_name_spark],
        'region_code': [region_code_spark],
        'main_okved_code': [main_okved_code_spark],
        'main_okved_name': [main_okved_name_spark]
    })
    
    # Обработка данных по законам
    laws_data = {}
    state_contracts = data.get('Report', {}).get('StateContracts', {})
    for law, law_data in state_contracts.items():
        year_info_list = law_data.get('Year', [])
        if not isinstance(year_info_list, list):
            year_info_list = [year_info_list]  # Превращаем в список, если это одиночный объект
        for year_info in year_info_list:
            year = year_info.get('@Year', 'N/A')
            contracts = year_info.get('Contracts', {})
            signed_number = contracts.get('@SignedNumber', 'N/A')
            sum_contracts = contracts.get('@Sum', 'N/A')
            laws_data[f'{law}_{year}_SignedNumber'] = signed_number
            laws_data[f'{law}_{year}_Sum'] = sum_contracts

    df_laws = pd.DataFrame([laws_data])

    final_df = pd.concat([df_spark, df_laws], axis=1)
    
    return final_df

def process_all_xml_files(directory):
    all_files_df = pd.DataFrame()
    
    for filename in os.listdir(directory):
        if filename.endswith(".xml"):
            file_path = os.path.join(directory, filename)
            try:
                df = read_xml_to_df(file_path)
                all_files_df = pd.concat([all_files_df, df], ignore_index=True)
            except Exception as e:
                print(f"Failed to process {filename}: {e}")
    
    return all_files_df

# Директория с XML файлами
directory_path = r'D:\Jupyter_Notebook\2024_ИПС_Данные\SPARK\GetEntrepreneurShortReport'

# Обработка всех XML файлов и создание общего DataFrame
df = process_all_xml_files(directory_path)

In [32]:
df.to_csv('enterpreneurs_spark_info.csv', index=False)

## Парсим данные по ЮЛ

Будет 3 датасета для ЮЛ:
1) ИНН + общая информация (ОКВЭД, регион)
2) ИНН + кол-во сотрудников за каждый год (формат: ИНН-год-число)
3) ИНН + фин. показатели из GetCompanyAccountingReport (формат: ИНН-год-фин.показатели)

#### 1 - ИНН + общая информация (ОКВЭД, регион)

In [10]:
def read_xml_to_df(file_path):
    with open(file_path, 'r', encoding='utf-8') as file:
        xml_content = file.read()
        data = xmltodict.parse(xml_content)['Response']['Data']['Report']
    
    reg_date_spark = data.get('DateFirstReg', 'N/A')
    shortname_spark = data.get('ShortNameRus', 'N/A')
    inn_spark = data.get('INN', 'N/A')
    kpp_spark = data.get('KPP', 'N/A')
    region_name_spark = data['OKATO'].get('@RegionName', 'N/A') if 'OKATO' in data else 'N/A'
    region_code_spark = data['OKATO'].get('@RegionCode', 'N/A') if 'OKATO' in data else 'N/A'
    
    # Обработка OKVED2List как списка или одиночного элемента
    okved_data = data.get('OKVED2List', {}).get('OKVED', {})
    if isinstance(okved_data, list):
        main_okved_code_spark = okved_data[0].get('@Code', 'N/A') if okved_data else 'N/A'
        main_okved_name_spark = okved_data[0].get('@Name', 'N/A') if okved_data else 'N/A'
    elif isinstance(okved_data, dict):
        main_okved_code_spark = okved_data.get('@Code', 'N/A')
        main_okved_name_spark = okved_data.get('@Name', 'N/A')
    else:
        main_okved_code_spark = 'N/A'
        main_okved_name_spark = 'N/A'

    df_spark = pd.DataFrame({
        'reg_date': [reg_date_spark],
        'shortname': [shortname_spark],
        'inn': [inn_spark],
        'kpp': [kpp_spark],
        'region_name': [region_name_spark],
        'region_code': [region_code_spark],
        'main_okved_code': [main_okved_code_spark],
        'main_okved_name': [main_okved_name_spark]
    })

    # Обработка данных по законам
    laws_data = {}
    state_contracts = data.get('StateContracts', {})
    for law, law_data in state_contracts.items():
        year_info_list = law_data.get('Year', [])
        if not isinstance(year_info_list, list):
            year_info_list = [year_info_list]  # Превращаем в список, если это одиночный объект
        for year_info in year_info_list:
            year = year_info.get('@Year', 'N/A')
            contracts = year_info.get('Contracts', {})
            signed_number = contracts.get('@SignedNumber', 'N/A')
            sum_contracts = contracts.get('@Sum', 'N/A')
            laws_data[f'{law}_{year}_SignedNumber'] = signed_number
            laws_data[f'{law}_{year}_Sum'] = sum_contracts

    df_laws = pd.DataFrame([laws_data])

    final_df = pd.concat([df_spark, df_laws], axis=1)
    
    return final_df

def process_all_xml_files(directory):
    all_files_df = pd.DataFrame()
    
    for filename in os.listdir(directory):
        if filename.endswith(".xml"):
            file_path = os.path.join(directory, filename)
            try:
                df = read_xml_to_df(file_path)
                all_files_df = pd.concat([all_files_df, df], ignore_index=True)
            except Exception as e:
                print(f"Failed to process {filename}: {e}")
    
    return all_files_df

# Директория с XML файлами
directory_path = r'D:\Jupyter_Notebook\2024_ИПС_Данные\SPARK\GetCompanyExtendedReport'

# Обработка всех XML файлов и создание общего DataFrame
df_comp_main_info = process_all_xml_files(directory_path)

Failed to process 7734001829.xml: 'list' object has no attribute 'get'


In [12]:
df_comp_main_info.to_csv('comp_main_spark_info.csv', index=False)

#### 2 - ИНН + кол-во сотрудников за каждый год (формат: ИНН-год-число)

In [None]:
def read_xml_to_df(file_path):
    with open(file_path, 'r', encoding='utf-8') as file:
        xml_content = file.read()
        data = xmltodict.parse(xml_content).get('Response', {}).get('Data', {}).get('Report', {})
    
    inn_spark = data.get('INN', 'N/A')

    employees_list = data.get('StaffNumberFTS', {}).get('Number', [])
    if isinstance(employees_list, dict):  # Если есть только одна запись, преобразуем в список
        employees_list = [employees_list]

    rows = []
    for employee in employees_list:
        year_spark = employee.get('@ActualDate', 'N/A')[:4]  # Получаем только год из даты
        employees_spark = employee.get('#text', 'N/A')
        rows.append({
            'inn': inn_spark,
            'year': year_spark,
            'employees': employees_spark
        })

    return pd.DataFrame(rows)

def process_all_xml_files(directory):
    all_files_df = pd.DataFrame()
    
    files = [f for f in os.listdir(directory) if f.endswith(".xml")]
    for filename in tqdm(files, desc="Processing XML files"):
        file_path = os.path.join(directory, filename)
        try:
            df = read_xml_to_df(file_path)
            all_files_df = pd.concat([all_files_df, df], ignore_index=True)
        except Exception as e:
            print(f"Failed to process {filename}: {e}")
    
    return all_files_df

# Директория с XML файлами
directory_path = r'D:\Jupyter_Notebook\2024_ИПС_Данные\SPARK\GetCompanyExtendedReport'

# Обработка всех XML файлов и создание общего DataFrame
df_comp_empl_info = process_all_xml_files(directory_path)

In [31]:
df_comp_empl_info.to_csv('comp_empl_spark_info.csv', index=False)

#### 3 - ИНН + фин. показатели из GetCompanyAccountingReport (формат: ИНН-год-фин.показатели)

In [69]:
def read_xml_to_df(file_path):
    with open(file_path, 'r', encoding='utf-8') as file:
        xml_content = file.read()
        data = xmltodict.parse(xml_content).get('Response', {}).get('Data', {}).get('Report', {})
    
    inn = data.get('INN', 'N/A')
    period_data = data.get('Period', {})
    year = period_data.get('@PeriodName', 'N/A')
    forms = period_data.get('Form', [])
    
    if isinstance(forms, dict):  # Если Form представлен одним словарем, обернем его в список
        forms = [forms]
    
    metrics = {}
    metrics['INN'] = inn
    metrics['Year'] = year
    seen_names = set()  # Для отслеживания уникальных названий показателей

    for form in forms:
        power = float(form.get('@Power', 1))  # Получаем множитель и по умолчанию ставим 1
        values = form.get('Value', [])
        
        if isinstance(values, dict):  # Если Value представлен одним словарем, обернем его в список
            values = [values]
        
        for value in values:
            metric_name = value.get('@Name')
            if metric_name not in seen_names:  # Учитываем только первое вхождение каждого показателя
                seen_names.add(metric_name)
                metric_value = float(value.get('#text', 0)) * power
                metrics[metric_name] = metric_value

    df_financial = pd.DataFrame([metrics])  # Создаем DataFrame из словаря, чтобы каждый ключ стал столбцом
    
    return df_financial

def process_all_xml_files(directory):
    all_files_df = pd.DataFrame()
    
    files = [f for f in os.listdir(directory) if f.endswith(".xml")]
    for filename in tqdm(files, desc="Processing XML files"):
        file_path = os.path.join(directory, filename)
        try:
            df = read_xml_to_df(file_path)
            all_files_df = pd.concat([all_files_df, df], ignore_index=True)
        except Exception as e:
            print(f"Failed to process {filename}: {e}")
    
    return all_files_df

# Директория с XML файлами
directory_path = r'D:\Jupyter_Notebook\2024_ИПС_Данные\SPARK\GetCompanyAccountingReport\GetCompanyAccountingReport'

# Обработка всех XML файлов и создание общего DataFrame
df_comp_fin_info = process_all_xml_files(directory_path)

Processing XML files: 100%|█████████████████████████████████████████████████| 213462/213462 [26:11:18<00:00,  2.26it/s]


In [70]:
df_comp_fin_info.to_csv('comp_fin_spark_info.csv', index=False)