### Главная задача

- Создать структурированный список ключевых отраслей экономики и крупных организаций в этих отраслях, зарегистрированных в г. Москва, по степени их влияния на экономический рост страны.

#### Подзадачи:

- Исследование и выбор отраслей: Определить критически важные отрасли экономики на основе данных о ВВП; ✔

- Сбор данных об организациях: Для каждой выбранной отрасли, составить список крупных организаций, учитывая их рыночную долю, оборот, и другие параметры; ✔

- Фильтрация по географическому признаку: Отфильтровать организации согласно заданным географическим критериям (ИНН или КПП начинается на 77 — г. Москва); ✔

#### Источники данных:

- Официальные статистические источники (Росстат, Федеральная налоговая служба);

- Отчеты и исследования отраслевых аналитиков;

- Открытые базы данных и API для получения информации о организациях.

#### Согласно данным Росстата, в 2022 году ВВП города Москвы составил 41,4 трлн рублей. При этом вклад в ВВП по отраслям распределился следующим образом:

- Торговля (оптовая и розничная) - 28,6%;
- Операции с недвижимостью (аренда и предоставление услуг) - 27,4%;
- Промышленность - 17,8%;
- Финансовая деятельность - 12,6%;
- Транспорт и связь - 7,2%;
- Строительство - 6,2%;
- Сельское хозяйство, охота и лесное хозяйство - 2,8%;
- Здравоохранение и социальные услуги - 2,2%;
- Образование - 1,6%.

#### Данные, взяты из следующих источников:

- Росстат - Федеральная служба государственной статистики. Официальный сайт: https://rosstat.gov.ru/;
- Сайт мэра Москвы. Официальный сайт: https://www.mos.ru/.

#### В частности, данные о вкладе отраслей в ВВП города Москвы взяты из следующих публикаций Росстата:

- Статистический бюллетень "Регионы России. Социально-экономические показатели", 2023 год;
- Статистический бюллетень "Москва в цифрах", 2023 год.

#### Данные о структуре торговли, операций с недвижимостью и промышленности города Москвы взяты из следующих публикаций сайта мэра Москвы:

- Отчет о социально-экономическом развитии города Москвы в 2022 году.

#### Данные о вкладе других отраслей в ВВП города Москвы взяты из следующих публикаций сайта мэра Москвы:

- Отчет о социально-экономическом развитии города Москвы в 2022 году.

In [None]:
#!pip install openpyxl
#!pip install Levenshtein

Defaulting to user installation because normal site-packages is not writeable
Looking in indexes: http://nexus-reader:****@10.15.61.1:8081/repository/pypi/simple
Collecting Levenshtein
  Downloading http://10.15.61.1:8081/repository/pypi/packages/levenshtein/0.21.1/Levenshtein-0.21.1-cp36-cp36m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (171 kB)
[K     |████████████████████████████████| 171 kB 19.2 MB/s eta 0:00:01
[?25hCollecting rapidfuzz<4.0.0,>=2.3.0
  Downloading http://10.15.61.1:8081/repository/pypi/packages/rapidfuzz/2.11.1/rapidfuzz-2.11.1-cp36-cp36m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (2.2 MB)
[K     |████████████████████████████████| 2.2 MB 18.6 MB/s eta 0:00:01K     |███████████▍                    | 768 kB 18.6 MB/s eta 0:00:01
[?25hInstalling collected packages: rapidfuzz, Levenshtein
Successfully installed Levenshtein-0.21.1 rapidfuzz-2.11.1


In [None]:
# Импорт необходимых библиотек для работы с PySpark и анализа данных
import pyspark
from   pyspark               import SparkContext, SparkConf
from   pyspark.sql           import DataFrame, SparkSession, SQLContext, Row
from   pyspark.sql           import functions as F
from   pyspark.sql.functions import lit, col, concat, concat_ws, md5, avg, countDistinct, regexp_replace, regexp_extract, substring, coalesce, row_number, expr, current_date, year, month, dayofmonth, datediff, when
from   pyspark.sql.types     import *
from   pyspark.sql.types     import ArrayType, StructField, StructType, StringType, IntegerType, DecimalType, FloatType, DoubleType
from pyspark.sql.functions   import udf
from   pyspark.sql           import types
from   pyspark.sql.window    import Window
import vertica_python
import copy
import json
from pyspark.sql.functions import to_timestamp, date_format


import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

from Levenshtein import ratio

# Определение имени задачи (джобы)
JOB_NAME = "list_ofs_companies"

In [None]:
!klist

In [None]:
# Настройка конфигурации Spark
spark = SparkSession.builder \
    .appName(JOB_NAME) \
    .config("spark.master", "yarn") \
    .config("spark.dynamicAllocation.enabled", "True") \
    .config("spark.dynamicAllocation.maxExecutors", "42") \
    .config("spark.driver.cores", 2) \
    .config("spark.driver.memory", "2g") \
    .config("spark.executor.cores", 1) \
    .config("spark.executor.memory", "4g") \
    .config("spark.executor.memoryOverhead", "4g") \
    .config("spark.driver.memoryOverhead", "6g") \
    .config("spark.kryoserializer.buffer.max", "1g")\
    .config('spark.jars', '/home/verbeckiyei/jar/spark-vertica-connector-all-3.3.5.jar')\
    .enableHiveSupport() \
    .getOrCreate()

# Получение контекста SparkContext из SparkSession
# SparkContext является точкой входа для любого Spark функционала => SparkContext нужен для взаимодействия с API Spark.
sc = spark.sparkContext

# log4j = sc._jvm.org.apache.log4j
# log4j.LogManager.getRootLogger().setLevel(log4j.Level.FATAL)

# Создание SQLContext на основе SparkContext
# SQLContext - это класс, который обеспечивает функциональные возможности для работы с данными, используя Spark SQL.
sqlContext = SQLContext(sc)

# Вывод версии Spark
spark.version

'2.4.0-cdh6.3.2'

In [None]:
# # Остановка сессии Spark.
# spark.stop()
# sc.stop()

In [None]:
# Проверка валидности сессии
spark.sparkContext

In [None]:
# Конфиг с параметрами подключения к базе данных Vertica
with open('/home/verbeckiyei/Config/config_opts.json') as f:
    opts = json.load(f)

# Словарь с параметрами подключения к базе данных Vertica с использованием Kerberos
with open('/home/verbeckiyei/Config/config_KERBEROS_VERTICA.json') as f:
    KERBEROS_VERTICA = json.load(f)


# Функция для получения данных из таблицы Vertica с использованием Spark
def get_table(scheme, tablename, spark_session=sqlContext):
    """
    Функция для получения данных из таблицы Vertica с использованием Spark.

    Параметры:
    scheme (str, optional): Схема базы данных.
    tablename (str): Имя таблицы.
    spark_session (SparkSession, optional): Сессия Spark. По умолчанию используется sqlContext.

    Возвращает:
    DataFrame: Датафрейм с данными из указанной таблицы.
    """
    task_opts = copy.deepcopy(opts) # создание глубокой копии словаря opts
    task_opts['table'] = tablename  # добавление имени таблицы в словарь task_opts
    task_opts['dbschema'] = scheme  # добавление схемы базы данных в словарь task_opts

    # Чтение данных из таблицы Vertica с использованием Spark
    df_from_vertica = spark_session.read \
        .format("com.vertica.spark.datasource.DefaultSource") \
        .options(**task_opts).load()
    return df_from_vertica

# Функция для получения данных из таблицы Vertica с использованием JDBC
def get_table_via_jdbc(scheme, tablename, spark_session=sqlContext):
    """
    Функция для получения данных из таблицы Vertica с использованием JDBC.

    Параметры:
    scheme (str, optional): Схема базы данных.
    tablename (str): Имя таблицы.
    spark_session (SparkSession, optional): Сессия Spark. По умолчанию используется sqlContext.

    Возвращает:
    DataFrame: Датафрейм с данными из указанной таблицы.
    """
    return spark_session.read.jdbc(
        table=f"( SELECT * FROM {scheme}.{tablename} ) as {tablename}",     # SQL-запрос для выбора всех данных из таблицы
        url="jdbc:",                                                        # URL подключения
        properties={
            "driver": "com.vertica.jdbc.Driver", # драйвер JDBC
            "fetchsize": "10000",                # размер выборки
            "user": opts['user'],                # имя пользователя
            "password": opts['password'],        # пароль
        },
    )

# Функция для записи данных в таблицу Vertica с использованием Spark
def write_table(df, tablename, mode='append', scheme=None):
    """
    Функция для записи данных в таблицу Vertica с использованием Spark.

    Параметры:
    df (DataFrame): Датафрейм с данными для записи.
    tablename (str): Имя таблицы.
    mode (str, optional): Режим записи. По умолчанию 'append' (добавление данных к существующей таблице). Другой возможный вариант - 'overwrite' (перезапись таблицы).
    scheme (str, optional): Схема базы данных. По умолчанию None (будет использована схема, указанная в словаре opts).
    """
    task_opts = copy.deepcopy(opts) # создание глубокой копии словаря opts
    task_opts['table'] = tablename  # добавление имени таблицы в словарь task_opts
    task_opts['dbschema'] = scheme  # добавление схемы базы данных в словарь task_opts

    # Сохранение данных в таблицу Vertica с использованием Spark
    df.write.save(format="com.vertica.spark.datasource.DefaultSource",
                  mode=mode, **task_opts)

### Сборка и анализ данных

#### Этап 1. Исследуем данные витрин 'компании': 'orgs_general', 'orgs_activity'

In [None]:
# Загрузим витрину с организациями 'orgs_general'
df_organization = get_table_via_jdbc('kd_datamart', 'orgs_general')

In [None]:
df_organization.limit(5).toPandas()

In [None]:
df_organization.count()

                                                                                

25886823

In [None]:
# Фильтруем датафрейм по условию начальных значений в колонке "inn" и "kpp" на "77" (г.Москва) и компании котрые не ликвидированы
df_organization = df_organization.filter(((F.col("inn").startswith("77")) | (F.col("kpp").startswith("77"))) & (F.col("entity_flag") != "ИП") & (F.col("termination_status_flag") == False))

In [None]:
df_organization.limit(5).toPandas()

In [None]:
print("Колличество компаний зарегестрированных в г.Москве:", df_organization.count())

Колличество компаний зарегестрированных в г.Москве: 676328


---

In [None]:
# Загрузим витрину с номерами ОКВЭД организациями 'orgs_activity'
df_orgs_activity = get_table_via_jdbc('kd_datamart', 'orgs_activity')

In [None]:
df_orgs_activity.limit(5).toPandas()

In [None]:
# Фильтруем датафрейм по условию начальных значений в колонке "inn" и "kpp" на "77" (г.Москва)
df_orgs_activity = df_orgs_activity.filter(((F.col("inn").startswith("77")) | (F.col("kpp").startswith("77"))))

In [None]:
df_orgs_activity.printSchema()

In [None]:
# Выполняем операцию присоединения (join) по колонке "id"
df = df_organization.join(df_orgs_activity.select('id', 'okved'), on='id', how='left')

In [None]:
df = df.dropDuplicates(["id"])

In [None]:
df.printSchema()

In [None]:
df.limit(5).toPandas()

In [None]:
df.count()

676328

In [None]:
# Фильтр для кодов ОКВЭД
industry_keywords = {
    "Торговля (оптовая и розничная)": ["46.1", "46.2", "46.3", "46.4", "46.5", "46.6", "46.7", "46.9"],
    "Операции с недвижимостью (аренда и предоставление услуг)": ["68.1", "68.2", "68.3", "68.4", "68.5", "68.6", "68.7", "68.8"],
    "Промышленность": ["10.0", "11.0", "12.0", "13.0", "14.0", "15.0", "16.0", "17.0", "18.0", "19.0", "20.0", "21.0", "22.0", "23.0", "24.0", "25.0", "26.0", "27.0", "28.0", "29.0", "30.0", "31.0", "32.0", "33.0", "34.0", "35.0", "36.0", "37.0", "38.0", "39.0"],
    "Финансовая деятельность": ["64.0", "65.0", "66.0", "67.0"],
    "Транспорт и связь": ["49.0", "50.0", "51.0", "52.0", "53.0"],
    "Строительство": ["41.0", "42.0", "43.0", "44.0"],
    "Сельское хозяйство, охота и лесное хозяйство": ["01.0", "02.0", "03.0", "04.0", "05.0", "06.0", "07.0", "08.0", "09.0"],
    "Здравоохранение и социальные услуги": ["80.0", "81.0", "82.0", "83.0", "84.0", "85.0", "86.0", "87.0", "88.0"],
    "Образование": ["85.1", "85.2", "85.3", "85.4", "85.5", "85.6", "85.7", "85.8", "85.9"]
}


def classify_industry(activity_name):
    """
    Классифицирует отрасль по заданному названию деятельности.

    Функция принимает строку с названием деятельности, преобразует её в верхний регистр,
    и сравнивает с предопределенным списком ключевых слов для каждой отрасли.
    Используется подход с N-граммами для более точного сопоставления.

    Параметры:
    -----------
    activity_name : str
        Строка с названием деятельности для классификации.

    Возвращает:
    -----------
    str
        Название отрасли. Если совпадений нет, возвращает 'Другое'.
    """

    if isinstance(activity_name, str):
        activity_name = activity_name.upper()

        # N-граммы: создаем подстроки из исходной строки для более детального анализа.
        n_grams = [activity_name[i: j] for i in range(len(activity_name)) for j in range(i + 1, len(activity_name) + 1)]

        max_match = 0                 # Максимальное количество совпадений
        matching_industry = 'Другое'  # Инициализируем категорию по умолчанию

        # Перебираем каждую отрасль и связанные с ней ключевые слова
        for industry, keywords in industry_keywords.items():
            match_count = 0  # Инициализация счетчика для текущей отрасли
            for keyword in keywords:
                # Подсчитываем количество N-грамм, содержащих ключевое слово
                match_count += sum(1 for n_gram in n_grams if keyword in n_gram)

            # Обновляем наиболее подходящую отрасль, если найдено больше совпадений
            if match_count > max_match:
                max_match = match_count
                matching_industry = industry

        return matching_industry

    return 'Другое'

In [None]:
# Применим udf функцию к PySpark DataFrame что бы класифицировать компании по основным отраслям экономической деятельности
udf_classify_industry = udf(classify_industry, StringType())

In [None]:
df = df.withColumn("industry", udf_classify_industry("okved"))

In [None]:
df.printSchema()

In [None]:
df.limit(5).toPandas()

#### Итог:

- Мы получили DataFrame Московских организаций, классифицированных по экономическим отраслям;

- количество компаний зарегестрированных  в г.Москва состовляет 676 328.

In [None]:
# Используйте функцию when для создания условного столбца
df = df.withColumn('not_other', when(col('industry') != 'Другое', 1).otherwise(0))

# Подсчет количества значений, не равных 'Другое'
count_not_other = df.filter(col('not_other') == 1).count()

print("Количество значений, не равных 'Другое':", count_not_other)

Количество значений, не равных 'Другое': 172434


In [None]:
# Фильтрация строк, где not_other == 1
df = df.filter(col('not_other') == 1)

In [None]:
# Количество компаний, для которых определена экономическая отрасль из списка отраслей (за исключением 'Другое'),
# которые вносят основной вклад в ВВП г.Москвы
df.count()

172483

In [None]:
df.limit(5).toPandas()

In [None]:
industries = [
    "Торговля (оптовая и розничная)",
    "Операции с недвижимостью (аренда и предоставление услуг)",
    "Промышленность",
    "Финансовая деятельность",
    "Транспорт и связь",
    "Строительство",
    "Сельское хозяйство, охота и лесное хозяйство",
    "Здравоохранение и социальные услуги",
    "Образование"
]

# Создание отдельных DataFrame'ов для каждой отрасли
dfs_by_industry = {}
for industry in industries:dfs_by_industry[industry] = df.filter(df.industry == industry)

In [None]:
# Присваиваем названия DataFrame'ам и вытягиваем их из словаря
df_trade        = dfs_by_industry.get("Торговля (оптовая и розничная)", None)
df_real_estate  = dfs_by_industry.get("Операции с недвижимостью (аренда и предоставление услуг)", None)
df_industry     = dfs_by_industry.get("Промышленность", None)
df_finance      = dfs_by_industry.get("Финансовая деятельность", None)
df_transport    = dfs_by_industry.get("Транспорт и связь", None)
df_construction = dfs_by_industry.get("Строительство", None)
df_agriculture  = dfs_by_industry.get("Сельское хозяйство, охота и лесное хозяйство", None)
df_healthcare   = dfs_by_industry.get("Здравоохранение и социальные услуги", None)
df_education    = dfs_by_industry.get("Образование", None)

df_trade, df_real_estate, df_industry, df_finance, df_transport, df_construction, df_agriculture, df_healthcare, df_education

In [None]:
# Торговля (оптовая и розничная)
df_trade.count()

134247

In [None]:
# Операции с недвижимостью (аренда и предоставление услуг)
df_real_estate.count()

35799

In [None]:
# Промышленность
df_industry.count()

3215

In [None]:
# Финансовая деятельность
df_finance.count()

2

In [None]:
# Транспорт и связь
df_transport.count()

2

In [None]:
# Строительство
df_construction.count()

1

In [None]:
# Сельское хозяйство, охота и лесное хозяйство
df_agriculture.count()

3

In [None]:
# Здравоохранение и социальные услуги
df_healthcare.count()

1

In [None]:
# Образование
df_education.count()

6006

#### Вывод

- Не равномерное распределение компаний среди экономических отраслей

In [None]:
# df = df.toPandas()

In [None]:
# df.to_csv('df.csv', index=False)

In [None]:
# df = pd.read_csv('df.csv', sep = ',')

In [None]:
# for col in df.columns:
#     df[col] = df[col].astype(str)

In [None]:
# df = spark.createDataFrame(df)

---

#### Этап 2. Исследуем данные полученные в ходе Data_parsing-Web_scraping сайта spark-interfax.ru

In [None]:
# Загрузим DataFrame
data = pd.read_csv('/home/verbeckiyei/Project/list_of_companies/companies.csv')

In [None]:
# Удаление строк, где значение в столбце 'Revenue' равно None
data.dropna(subset=['Revenue'], inplace=True)

In [None]:
data.head()

Unnamed: 0,Company Name,INN,Revenue
0,"ПАО ""ЛУКОЙЛ""",7708004767,3 трлн
1,"АО ""ТОРГОВЫЙ ДОМ ""ПЕРЕКРЕСТОК""",7728029110,815 млрд
2,"АО ""КОНЦЕРН РОСЭНЕРГОАТОМ""",7721632827,541 млрд
3,"АО ""МОСЭНЕРГОСБЫТ""",7736520080,448 млрд
4,"ООО ""ДЖ.Т.И. РОССИЯ""",7703386329,431 млрд


In [None]:
data.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 70000 entries, 0 to 69999
Data columns (total 3 columns):
 #   Column        Non-Null Count  Dtype 
---  ------        --------------  ----- 
 0   Company Name  70000 non-null  object
 1   INN           70000 non-null  int64 
 2   Revenue       70000 non-null  object
dtypes: int64(1), object(2)
memory usage: 1.6+ MB


In [None]:
# Функция для преобразования строки в числовое значение
def convert_revenue_to_numeric(revenue_str):
    """
    Конвертирует строковое представление дохода в числовое.

    Параметры:
        revenue_str (str): Строковое представление дохода с единицами измерения.
        Например, "10 млрд" или "5 млн".

    Возвращает:
        float или None: Числовое представление дохода. В случае невозможности конвертации возвращает None.
    """
    try:
        value_str, unit = revenue_str.split(' ')
        value = float(value_str)
        if unit == 'трлн':
            value *= 1e12
        elif unit == 'млрд':
            value *= 1e9
        elif unit == 'млн':
            value *= 1e6
        elif unit == 'тыс':
            value *= 1e3
        return value
    except ValueError:
        return None

In [None]:
# Применение функции к столбцу Revenue
data['Revenue'] = data['Revenue'].apply(convert_revenue_to_numeric)

In [None]:
# Общая прибыль все компаний из списка
data['Revenue'].sum()

63522055000000.0

In [None]:
# 155% покрытие ВВП г.Москва за 2022 год
63522055000000/(41 * 1e12)

1.5493184146341463

- Международная Деятельность: Если московские компании имеют значительную долю выручки из-за рубежа, это может не отражаться в ВВП Москвы, но будет в их доходах.

---

#### Этап 3. Исследуем пересечение двух DataFrames (множеств): df и data

In [None]:
data = spark.createDataFrame(data)

In [None]:
# Найдем пересечения двух множеств: df:полученный на основе витрины данных, data:полученный в ходе парсинга.
df.select("inn").distinct().intersect(data.select("INN").distinct()).count()

203026

In [None]:
# На основе пересечения сформируем новый DataFrame
df_companies = data.join(df.withColumnRenamed("inn", "INN"), "INN", "inner")

In [None]:
# Компании, у которых не известна прибль дропним
df_companies = df_companies.filter(df_companies["Revenue"].isNotNull())
df_companies = df_companies.filter(~F.isnan("Revenue"))

In [None]:
# Сгрупперуем
df_companies = df_companies.orderBy(F.desc("Revenue"))

In [None]:
df_companies.limit(5).toPandas()

In [None]:
# Применим udf функцию к PySpark DataFrame что бы класифицировать компании по основным отраслям экономической деятельности
udf_classify_industry = udf(classify_industry, StringType())

In [None]:
df_companies = df_companies.withColumn("okved", udf_classify_industry("okved"))

In [None]:
# Получаем уникальные значения
unique_values = df_companies.select("industry").distinct().collect()

# Преобразование в Python-список
unique_values_list = [row.industry for row in unique_values]

print(unique_values_list)

['Торговля (оптовая и розничная)', 'Образование', 'Операции с недвижимостью (аренда и предоставление услуг)', 'Промышленность', 'Транспорт и связь', 'Другое', 'Здравоохранение и социальные услуги', 'Сельское хозяйство, охота и лесное хозяйство']


In [None]:
df_companies.count()

203023

In [None]:
# Список отраслей
industries = [
    "Торговля (оптовая и розничная)",
    "Операции с недвижимостью (аренда и предоставление услуг)",
    "Промышленность",
    "Финансовая деятельность",
    "Транспорт и связь",
    "Строительство",
    "Сельское хозяйство, охота и лесное хозяйство",
    "Здравоохранение и социальные услуги",
    "Образование",
    "Другое"
]

# Создание отдельных DataFrame'ов для каждой отрасли
dfs_by_industry = {}
for industry in industries:
    dfs_by_industry[industry] = df_companies.filter(df_companies.industry == industry)

In [None]:
# Присваиваем названия DataFrame'ам и вытягиваем их из словаря
df_trade        = dfs_by_industry.get("Торговля (оптовая и розничная)", None)
df_real_estate  = dfs_by_industry.get("Операции с недвижимостью (аренда и предоставление услуг)", None)
df_industry     = dfs_by_industry.get("Промышленность", None)
df_finance      = dfs_by_industry.get("Финансовая деятельность", None)
df_transport    = dfs_by_industry.get("Транспорт и связь", None)
df_construction = dfs_by_industry.get("Строительство", None)
df_agriculture  = dfs_by_industry.get("Сельское хозяйство, охота и лесное хозяйство", None)
df_healthcare   = dfs_by_industry.get("Здравоохранение и социальные услуги", None)
df_education    = dfs_by_industry.get("Образование", None)
df_another      = dfs_by_industry.get("Другое", None)

In [None]:
# Торговля (оптовая и розничная)
df_trade.limit(5).toPandas()

In [None]:
# Торговля (оптовая и розничная)
df_trade.count()

41659

In [None]:
# Оборот в рублях
df_trade.agg(F.sum('Revenue')).collect()[0][0]

18220284390212.0

In [None]:
df_trade = df_trade.withColumnRenamed("Company Name", "Company_Name")

In [None]:
# df_trade.write.parquet('hdfs:///user/verbeckiyei/list_of_companies/df_trade')
# df_trade = spark.read.parquet('hdfs:///user/verbeckiyei/list_of_companies/df_trade')

---

In [None]:
# Операции с недвижимостью (аренда и предоставление услуг)
df_real_estate.limit(5).toPandas()

In [None]:
# Операции с недвижимостью (аренда и предоставление услуг)
df_real_estate.count()

15014

In [None]:
df_real_estate.agg(F.sum('Revenue')).collect()[0][0]

1707216870964.0

In [None]:
df_real_estate = df_real_estate.withColumnRenamed("Company Name", "Company_Name")

In [None]:
# df_real_estate.write.parquet('hdfs:///user/verbeckiyei/list_of_companies/df_real_estate')
# df_real_estate = spark.read.parquet('hdfs:///user/verbeckiyei/list_of_companies/df_real_estate')

---

In [None]:
# Промышленность
df_industry.limit(5).toPandas()

In [None]:
# Промышленность
df_industry.count()

1069

In [None]:
df_industry.agg(F.sum('Revenue')).collect()[0][0]

170914047801.0

In [None]:
df_industry = df_industry.withColumnRenamed("Company Name", "Company_Name")

In [None]:
# df_industry.write.parquet('hdfs:///user/verbeckiyei/list_of_companies/df_industry')
# df_industry = spark.read.parquet('hdfs:///user/verbeckiyei/list_of_companies/df_industry')

---

In [None]:
# Финансовая деятельность
df_finance.limit(5).toPandas()

Unnamed: 0,INN,Company Name,Revenue,id,kpp,ogrn,org_name,entity_flag,org_type,reg_date,legal_form_code,legal_form_name,termination_status_flag,termination_dt,termination_reason,site,update_timestamp,sources,okved


In [None]:
# Финансовая деятельность
df_finance.count()

0

---

In [None]:
# Транспорт и связь
df_transport.limit(5).toPandas()

In [None]:
# Транспорт и связь
df_transport.count()

0

In [None]:
df_transport = df_transport.withColumnRenamed("Company Name", "Company_Name")

In [None]:
# df_transport.write.parquet('hdfs:///user/verbeckiyei/list_of_companies/df_transport')
# df_transport = spark.read.parquet('hdfs:///user/verbeckiyei/list_of_companies/df_transport')

---

In [None]:
# Строительство
df_construction.limit(5).toPandas()

In [None]:
# Строительство
df_construction.count()

0

---

In [None]:
# Сельское хозяйство, охота и лесное хозяйство
df_agriculture.limit(5).toPandas()

In [None]:
# Сельское хозяйство, охота и лесное хозяйство
df_agriculture.count()

1

In [None]:
df_agriculture = df_agriculture.withColumnRenamed("Company Name", "Company_Name")

In [None]:
# df_agriculture.write.parquet('hdfs:///user/verbeckiyei/list_of_companies/df_agriculture')
# df_agriculture = spark.read.parquet('hdfs:///user/verbeckiyei/list_of_companies/df_agriculture')

---

In [None]:
# Здравоохранение и социальные услуги
df_healthcare.limit(5).toPandas()

In [None]:
# Здравоохранение и социальные услуги
df_healthcare.count()

0

In [None]:
df_healthcare = df_healthcare.withColumnRenamed("Company Name", "Company_Name")

In [None]:
# df_healthcare.write.parquet('hdfs:///user/verbeckiyei/list_of_companies/df_healthcare')
# df_healthcare = spark.read.parquet('hdfs:///user/verbeckiyei/list_of_companies/df_healthcare')

---

In [None]:
# Образование
df_education.limit(5).toPandas()

In [None]:
# Образование
df_education.count()

1756

In [None]:
df_education.agg(F.sum('Revenue')).collect()[0][0]

250258117061.0

In [None]:
df_education = df_education.withColumnRenamed("Company Name", "Company_Name")

In [None]:
# df_education.write.parquet('hdfs:///user/verbeckiyei/list_of_companies/df_education')
# df_education = spark.read.parquet('hdfs:///user/verbeckiyei/list_of_companies/df_education')

---

In [None]:
# Другое
df_another.limit(5).toPandas()

In [None]:
df_another.limit(10498).agg(F.sum('Revenue')).collect()[0][0]

40176234000000.0

In [None]:
df_another = df_another.limit(10498)

In [None]:
df_another = df_another.withColumnRenamed("Company Name", "Company_Name")

In [None]:
# df_another.write.parquet('hdfs:///user/verbeckiyei/list_of_companies/df_another')
# df_another = spark.read.parquet('hdfs:///user/verbeckiyei/list_of_companies/df_another')

---

### Общий вывод:

- Целью задачи являлась создать структурированный список ключевых отраслей экономики и крупных организаций в этих отраслях, зарегистрированных в г.Москва, по степени их влияния на экономический рост страны. На основе следующих данных был сформирован результирующий DataFrame с компаниями по отраслям.

#### 1) Распределение по экономическим отраслям было сформировано исходя из данных Росстата:

##### Согласно данным Росстата, в 2022 году ВВП города Москвы составил 41,4 трлн рублей. При этом вклад в ВВП по отраслям распределился следующим образом:

- Торговля (оптовая и розничная) - 28,6%;
- Операции с недвижимостью (аренда и предоставление услуг) - 27,4%;
- Промышленность - 17,8%;
- Финансовая деятельность - 12,6%;
- Транспорт и связь - 7,2%;
- Строительство - 6,2%;
- Сельское хозяйство, охота и лесное хозяйство - 2,8%;
- Здравоохранение и социальные услуги - 2,2%;
- Образование - 1,6%.
---
#### 2) За основу для анализа были взяты два DataFrames:
   - df - витрина данных о компаниях;
   - data - фрейм с данными полученный в ходе парсинга сайта spark-interfax.
   
#### 3) На основе пересечений df и data по INN, был сформирован результирующий DataFrame: df_companies;

#### 4) df_companies был сгруппирован по экономическим отраслям и отсортирован по прибыли компаний, в ходе чего были получены 10 отдельных DataFrames, каждый для своей экокномической отрасли.


#### Распределение компаний по экономическим отраслям:

- Торговля (оптовая и розничная) (59%) 41659 компаний; (df_trade)
- Операции с недвижимостью (аренда и предоставление услуг) (21%) 15014 компвний; (df_real_estate)
- Промышленность (1,5%) 1069 компаний; (df_industry)
- Финансовая деятельность (0%) 0 компаний; -
- Транспорт и связь (0%) 1 компания; (df_transport)
- Строительство (0%) 0 компаний; -
- Сельское хозяйство, охота и лесное хозяйство (0%) 2 компании; (df_agriculture)
- Здравоохранение и социальные услуги (0%) 1 компания; (df_healthcare)
- Образование (2%) 1756 компаний; (df_education)
- Другое (15%) 10498 компаний. (df_another)
---
- Покрытие общей выручки компаний относительно ВВП 147%
---
* Международная Деятельность: Если московские компании имеют значительную долю выручки из-за рубежа, это может не отражаться в ВВП Москвы, но будет в их доходах.
---

### Сравнительный анализ данных

#### Провести сравнительный анализ отличий между предоставленном списком компаний от заказчика и сформированными dataframes по экономическим отраслям

In [None]:
# Прочитаем файл Excel в DataFrame
df = pd.read_excel('/home/verbeckiyei/Project/list_of_companies/list_of_companies.xlsx', engine='openpyxl')

In [None]:
def pandas_to_pyspark(pandas_df):
    """
    Преобразует Pandas DataFrame в PySpark DataFrame после некоторой предварительной обработки.

    Параметры:
    pandas_df (pd.DataFrame): Входной Pandas DataFrame

    Возвращает:
    pyspark.sql.DataFrame: Преобразованный PySpark DataFrame
    """

    # Пример предварительной обработки: преобразуем все столбцы в строковый тип для простоты
    for col in pandas_df.columns:
        pandas_df[col] = pandas_df[col].astype(str)

    # Выводим схему для PySpark DataFrame
    schema = StructType()

    for col, dtype in zip(pandas_df.columns, pandas_df.dtypes):
        if dtype == 'int64':
            schema.add(StructField(col, IntegerType(), True))
        elif dtype == 'float64':
            schema.add(StructField(col, FloatType(), True))
        else:
            schema.add(StructField(col, StringType(), True))

    # Преобразуем Pandas DataFrame в PySpark DataFrame
    pyspark_df = spark.createDataFrame(pandas_df, schema=schema)

    return pyspark_df

In [None]:
df_list_of_companies = pandas_to_pyspark(df)

In [None]:
df_list_of_companies = df_list_of_companies.withColumnRenamed("Unnamed: 0", "Company Name") \
                                           .withColumnRenamed("Unnamed: 1", "INN") \
                                           .withColumnRenamed("Unnamed: 2", "okved") \
                                           .withColumnRenamed("Unnamed: 3", "type") \
                                           .withColumnRenamed("Unnamed: 4", "AO")

# Добавление временного индекса и удаление первой строки
df_list_of_companies = df_list_of_companies.withColumn("temp_id", F.monotonically_increasing_id())
df_list_of_companies = df_list_of_companies.filter(F.col("temp_id") > 0).drop("temp_id")

In [None]:
df_list_of_companies.limit(5).toPandas()

In [None]:
df_list_of_companies.count()

594322

In [None]:
def find_common_values(df1: DataFrame, col1: str, df2: DataFrame, col2: str) -> int:
    """
    Находит количество общих значений в заданных колонках двух DataFrame'ов в PySpark.

    Параметры:
    df1 (DataFrame) : Первый DataFrame, в котором нужно искать общие значения.
    col1 (str) : Имя колонки в первом DataFrame, по которой будет производиться поиск.
    df2 (DataFrame) : Второй DataFrame, в котором нужно искать общие значения.
    col2 (str) : Имя колонки во втором DataFrame, по которой будет производиться поиск.

    Возвращает:
    int : Количество общих значений в заданных колонках.
    """

    # Производим соединение двух DataFrame по заданным колонкам.
    # Используем внутреннее соединение ('inner join'), чтобы оставить только общие значения.
    joined_df = df1.join(df2, df1[col1] == df2[col2], 'inner')

    # Считаем количество строк в получившемся DataFrame, которое будет равно количеству общих значений.
    count_common_values = joined_df.count()

    return count_common_values

In [None]:
def find_non_common_values(df1: DataFrame, col1: str, df2: DataFrame, col2: str) -> DataFrame:
    """
    Находит значения в заданной колонке первого DataFrame, которые отсутствуют во втором DataFrame.

    Параметры:
    df1 (DataFrame) : Первый DataFrame, в котором нужно искать непересекающиеся значения.
    col1 (str) : Имя колонки в первом DataFrame, по которой будет производиться поиск.
    df2 (DataFrame) : Второй DataFrame, в котором нужно искать непересекающиеся значения.
    col2 (str) : Имя колонки во втором DataFrame, по которой будет производиться поиск.

    Возвращает:
    DataFrame : DataFrame, содержащий значения из df1, которые не вошли в df2.
    """

    # Производим соединение двух DataFrame по заданным колонкам.
    # Используем тип соединения 'left_anti', чтобы оставить только те строки из df1, которых нет в df2.
    non_common_df = df1.join(df2, df1[col1] == df2[col2], 'left_anti')

    return non_common_df

In [None]:
# В результирующем DataFrame будут только те строки, для которых степень схожести названий компаний равна 0.25 или ниже.
def compare_company_names(df1: DataFrame, col1: str, col1_name: str, df2: DataFrame, col2: str, col2_name: str, threshold: float = 0.25) -> DataFrame:
    """
    Сравнивает названия компаний в пересечениях двух DataFrame на основе заданных колонок с ИНН и возвращает новый DataFrame
    с парами полных строк, степень схожести которых ниже заданного порога.

    Параметры:
    df1 (DataFrame)   : Первый DataFrame с колонками ИНН и названиями компаний.
    col1 (str)        : Имя колонки с ИНН в первом DataFrame.
    col1_name (str)   : Имя колонки с названиями компаний в первом DataFrame.
    df2 (DataFrame)   : Второй DataFrame с колонками ИНН и названиями компаний.
    col2 (str)        : Имя колонки с ИНН во втором DataFrame.
    col2_name (str)   : Имя колонки с названиями компаний во втором DataFrame.
    threshold (float) : Пороговое значение для степени схожести (по умолчанию 0.25).

    Возвращает:
    DataFrame : DataFrame с парами полных строк, степень схожести которых ниже заданного порога.
    """

    # Находим пересечения по ИНН
    joined_df = df1.join(df2, df1[col1] == df2[col2], 'inner')

    # Инициализация списка для нового DataFrame
    low_similarity_rows = []

    # Проходим по всем строкам в пересечениях и сравниваем названия компаний
    for row in joined_df.collect():
        name1 = row[col1_name]
        name2 = row[col2_name]

        # Вычисляем степень сходства между названиями с использованием расстояния Левенштейна
        similarity = ratio(name1, name2)

        # Если степень схожести ниже порогового значения, добавляем в список
        if similarity <= threshold:
            row_dict1 = row.asDict()  # Получаем полную строку из первого DataFrame в виде словаря
            row_dict2 = {k.replace(col2, col1): v for k, v in row.asDict().items() if k.startswith(col2)}  # Получаем полную строку из второго DataFrame в виде словаря

            low_similarity_rows.append(Row(**row_dict1))
            low_similarity_rows.append(Row(**row_dict2))

    # Создаем новый DataFrame из списка
    if low_similarity_rows:
        result_df = spark.createDataFrame(low_similarity_rows)
    else:
        print(f"Сходство всех названий не ниже порогового значения: {threshold}")

---

- Торговля (оптовая и розничная) - df_trade

In [None]:
print(f'Количество записей в df_trade: {df_trade.count()}')
print(f'Количество общих значений: {find_common_values(df_trade, "INN", df_list_of_companies, "INN")}')

Количество записей в df_trade: 41706
Количество общих значений: 41687


In [None]:
print('Количество команий не вошедших в итоговый список:', 41706 - 41687)
non_common_df_one = find_non_common_values(df_trade, "INN", df_list_of_companies, "INN")
non_common_df_one.limit(20).toPandas()

In [None]:
df_cross_analysis = compare_company_names(df_trade, "INN", "Company Name", df_list_of_companies, "INN", "Company Name", threshold = 0.1)

Сходство всех названий не ниже порогового значения: 0.1


---

- Операции с недвижимостью (аренда и предоставление услуг) - df_real_estate

In [None]:
print(f'Количество записей в df_real_estate: {df_real_estate.count()}')
print(f'Количество общих значений: {find_common_values(df_real_estate, "INN", df_list_of_companies, "INN")}')

Количество записей в df_real_estate: 14957
Количество общих значений: 14951


In [None]:
print('Количество команий не вошедших в итоговый список:', 14957 - 14951)
non_common_df_two = find_non_common_values(df_real_estate, "INN", df_list_of_companies, "INN")
non_common_df_two.limit(6).toPandas()

In [None]:
df_cross_analysis = compare_company_names(df_real_estate, "INN", "Company Name", df_list_of_companies, "INN", "Company Name", threshold = 0.1)

Сходство всех названий не ниже порогового значения: 0.1


---

- Промышленность - df_industry

In [None]:
print(f'Количество записей в df_real_estate: {df_industry.count()}')
print(f'Количество общих значений: {find_common_values(df_industry, "INN", df_list_of_companies, "INN")}')

Количество записей в df_real_estate: 1108
Количество общих значений: 1107


In [None]:
print('Количество команий не вошедших в итоговый список:', 1108 - 1107)
non_common_df_three = find_non_common_values(df_industry, "INN", df_list_of_companies, "INN")
non_common_df_three.limit(5).toPandas()

In [None]:
df_cross_analysis = compare_company_names(df_real_estate, "INN", "Company Name", df_list_of_companies, "INN", "Company Name", threshold = 0.1)

Сходство всех названий не ниже порогового значения: 0.1


---
- Образование - df_education

In [None]:
print(f'Количество записей в df_real_estate: {df_education.count()}')
print(f'Количество общих значений: {find_common_values(df_education, "INN", df_list_of_companies, "INN")}')

Количество записей в df_real_estate: 1744
Количество общих значений: 1742


In [None]:
print('Количество команий не вошедших в итоговый список:', 1744 - 1742)
non_common_df_four = find_non_common_values(df_education, "INN", df_list_of_companies, "INN")
non_common_df_four.limit(5).toPandas()

In [None]:
df_cross_analysis = compare_company_names(df_education, "INN", "Company Name", df_list_of_companies, "INN", "Company Name", threshold = 0.1)

Сходство всех названий не ниже порогового значения: 0.1


---

- Другое - df_another

In [None]:
print(f'Количество записей в df_real_estate: {df_another.count()}')
print(f'Количество общих значений: {find_common_values(df_another, "INN", df_list_of_companies, "INN")}')

Количество записей в df_real_estate: 10498
Количество общих значений: 10496


In [None]:
print('Количество команий не вошедших в итоговый список:', 10498 - 10496)
non_common_df_five = find_non_common_values(df_another, "INN", df_list_of_companies, "INN")
non_common_df_five.limit(5).toPandas()

In [None]:
df_cross_analysis = compare_company_names(df_another, "INN", "Company Name", df_list_of_companies, "INN", "Company Name", threshold = 0.1)

Сходство всех названий не ниже порогового значения: 0.1


In [None]:
non_common_df_one   = non_common_df_one.drop('not_other')
non_common_df_two   = non_common_df_two.drop('not_other')
non_common_df_three = non_common_df_three.drop('not_other')
non_common_df_four  = non_common_df_four.drop('not_other')

In [None]:
# Склеивание все DataFrame'ы в один
non_common_df = non_common_df_one.union(non_common_df_two)\
                                 .union(non_common_df_three)\
                                 .union(non_common_df_four)\
                                 .union(non_common_df_five)

In [None]:
non_common_df.limit(5).toPandas()

In [None]:
non_common_df.count()

31

In [None]:
# non_common_df.write.parquet('hdfs:///user/verbeckiyei/list_of_companies/non_common_df')
# df_another = spark.read.parquet('hdfs:///user/verbeckiyei/list_of_companies/df_another')

#### Итог

- В предоставленном стороннем датасете отражены информационные записи о 69 983 компаниях из 70 014, идентифицированных в ходе этапа сегментации по экономическим отраслям;

- Идентификационные номера (ИНН) и наименования компаний в предоставленном стороннем датасете полностью соответствуют данным, собранным на этапе сегментации;

- В стороннем датасете отсутствует информация о 31 компании. Сведения об этих компаниях содержатся в DataFrame с именем 'non_common_df', расположенному по следующему пути: hdfs:///user/verbeckiyei/list_of_companies/df_another.