Вы будете работать с базой данных Yandex, которая хранит информацию о венчурных фондах и инвестициях в компании-стартапы. Эта база данных основана на датасете Startup Investments, опубликованном на популярной платформе для соревнований по исследованию данных Kaggle.

Анализировать рынок инвестиций без подготовки может быть непросто. Поэтому сначала познакомьтесь с важными понятиями, которые вам встретятся в работе с базой данных.

- Венчурные фонды — это финансовые организации, которые могут позволить себе высокий риск и инвестировать в компании с инновационной бизнес-идеей или разработанной новой технологией, то есть в стартапы.

- Цель венчурных фондов — в будущем получить значительную прибыль, которая в разы превысит размер их трат на инвестиции в компанию. Если стартап подорожает, венчурный фонд может получить долю в компании или фиксированный процент от её выручки.

- Чтобы процесс финансирования стал менее рискованным, его делят на стадии — раунды. Тот или иной раунд зависит от того, какого уровня развития достигла компания.

- Первые этапы — предпосевной и посевной раунды. Предпосевной раунд предполагает, что компания как таковая ещё не создана и находится в стадии замысла.

- Следующий — посевной — раунд знаменует рост проекта: создатели компании разрабатывают бизнес-модель и привлекают инвесторов.

- Если компании требуется ментор или наставник — она привлекает бизнес-ангела. Бизнес-ангелы — инвесторы, которые помимо финансовой поддержки предлагают экспертную помощь. Такой раунд называют ангельским.

- Когда стартап становится компанией с проверенной бизнес-моделью и начинает зарабатывать самостоятельно, предложений инвесторов становится больше. Это раунд A, а за ним следуют и другие: B, C, D — на этих этапах компания активно развивается и готовится к IPO.

Иногда выделяют венчурный раунд — финансирование, которое могло поступить от венчурного фонда на любом этапе: начальном или более позднем.
В данных об инвестициях вам встретятся упоминания раундов, но самостоятельный проект не предполагает, что вы должны разбираться в их специфике лучше любого инвестора. Главное — понимать, как устроены данные.
Вы уже знаете, что такое ER-диаграмма. Работу с новой базой данных лучше начать с изучения схемы.

In [None]:
# Стандартные библиотеки Python
import copy
import hashlib
import itertools
import json
import os

# Сторонние библиотеки для работы с базами данных
import vertica_python

# Библиотеки для анализа и визуализации данных
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd

# Библиотеки для работы с PySpark и анализа данных
import pyspark
from pyspark import SparkConf, SparkContext
from pyspark.sql import DataFrame, SparkSession, SQLContext, Window
from pyspark.sql.types import (
    ArrayType, DecimalType, DoubleType, FloatType, IntegerType, LongType, StringType, StructField, StructType, FloatType
)
from pyspark.sql import functions as F
import pyspark.sql.functions as psf
from pyspark.sql.functions import regexp_replace
from pyspark.sql.functions import row_number


# Определение имени задачи (джобы)
JOB_NAME = "SQL"

In [None]:
# Настройка конфигурации Spark
spark = SparkSession.builder \
    .appName(JOB_NAME) \
    .config("spark.master", "yarn") \
    .config("spark.dynamicAllocation.enabled", "True") \
    .config("spark.dynamicAllocation.maxExecutors", "42") \
    .config("spark.driver.cores", 2) \
    .config("spark.driver.memory", "2g") \
    .config("spark.executor.cores", 1) \
    .config("spark.executor.memory", "4g") \
    .config("spark.executor.memoryOverhead", "4g") \
    .config("spark.driver.memoryOverhead", "6g") \
    .config('spark.jars', '/home/verbeckiyei/yandex_sql/spark-vertica-connector-all-3.3.5.jar')\
    .enableHiveSupport() \
    .getOrCreate()

# Получение контекста SparkContext из SparkSession
# SparkContext является точкой входа для любого Spark функционала => SparkContext нужен для взаимодействия с API Spark.
sc = spark.sparkContext

log4j = sc._jvm.org.apache.log4j
log4j.LogManager.getRootLogger().setLevel(log4j.Level.FATAL)


# Создание SQLContext на основе SparkContext
# SQLContext - это класс, который обеспечивает функциональные возможности для работы с данными, используя Spark SQL.
sqlContext = SQLContext(sc)

# Вывод версии Spark
spark.version

Picked up _JAVA_OPTIONS: -Djava.io.tmpdir=/var/tmp
Picked up _JAVA_OPTIONS: -Djava.io.tmpdir=/var/tmp
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/lib/spark3/jars/log4j-slf4j-impl-2.17.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/lib/spark3/jars/spark-vertica-connector-assembly-3.3.5_scala-2.13_v3.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/lib/hadoop/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/10/16 14:17:08 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
23/10/16 14:17:08 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
23/10/16 14:17:08 WARN Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043.
23/10/16 14:17:08 WARN Utils: Service 'SparkUI' could not bind on port 4043. Attempting port 4044.
23/10/16 14:17:08 WARN Utils: Service 'SparkUI' could not bind on port 4044. Attempting port 4045.
23/10/16 14:17:09 WARN HiveConf: HiveConf of name hive.metastore.event.db.notification.api.auth does not exist
23/10/16 14:17:26 WARN YarnSchedulerBackend$YarnSchedulerEndpoint: Attempted to request executors before the AM has registered!




'3.3.0'

In [None]:
# # Остановка сессии Spark.
# sc.stop()
# spark.stop()

In [1]:
# Проверка валидности сессии
spark.sparkContext

---

In [None]:
# Конфиг с параметрами подключения к базе данных Vertica
with open('/home/verbeckiyei/config/config_opts.json') as f:
    opts = json.load(f)


# Словарь с параметрами подключения к базе данных Vertica с использованием Kerberos
with open('/home/verbeckiyei/config/config_KERBEROS_VERTICA.json') as f:
    KERBEROS_VERTICA = json.load(f)


# Функция для получения данных из таблицы Vertica с использованием Spark
def get_table(tablename, scheme=opts['db'], spark_session=sqlContext):
    """
    Функция для получения данных из таблицы Vertica с использованием Spark.

    Параметры:
    tablename (str): Имя таблицы.
    scheme (str, optional): Схема базы данных. По умолчанию используется схема, указанная в словаре opts.
    spark_session (SparkSession, optional): Сессия Spark. По умолчанию используется sqlContext.

    Возвращает:
    DataFrame: Датафрейм с данными из указанной таблицы.
    """
    task_opts = copy.deepcopy(opts) # создание глубокой копии словаря opts
    task_opts['table'] = tablename  # добавление имени таблицы в словарь task_opts
    task_opts['dbschema'] = scheme  # добавление схемы базы данных в словарь task_opts

    # Чтение данных из таблицы Vertica с использованием Spark
    df_from_vertica = spark_session.read \
        .format("com.vertica.spark.datasource.DefaultSource") \
        .options(**task_opts).load()
    return df_from_vertica

# Функция для получения данных из таблицы Vertica с использованием JDBC
def get_table_via_jdbc(tablename, scheme=opts['db'], spark_session=sqlContext):
    """
    Функция для получения данных из таблицы Vertica с использованием JDBC.

    Параметры:
    tablename (str): Имя таблицы.
    scheme (str, optional): Схема базы данных. По умолчанию используется схема, указанная в словаре opts.
    spark_session (SparkSession, optional): Сессия Spark. По умолчанию используется sqlContext.

    Возвращает:
    DataFrame: Датафрейм с данными из указанной таблицы.
    """
    return spark_session.read.jdbc(
        table=f"( SELECT * FROM {scheme}.{tablename} ) as {tablename}",     # SQL-запрос для выбора всех данных из таблицы
        url="jdbc:vertica://udrvs-ver-1x-cluster.data.corp:5433/verticadb", # URL подключения
        properties={
            "driver": "com.vertica.jdbc.Driver", # драйвер JDBC
            "fetchsize": "10000",                # размер выборки
            "user": opts['user'],                # имя пользователя
            "password": opts['password'],        # пароль
        },
    )

# Функция для записи данных в таблицу Vertica с использованием Spark
def write_table(df, tablename, mode='append', scheme=None):
    """
    Функция для записи данных в таблицу Vertica с использованием Spark.

    Параметры:
    df (DataFrame): Датафрейм с данными для записи.
    tablename (str): Имя таблицы.
    mode (str, optional): Режим записи. По умолчанию 'append' (добавление данных к существующей таблице). Другой возможный вариант - 'overwrite' (перезапись таблицы).
    scheme (str, optional): Схема базы данных. По умолчанию None (будет использована схема, указанная в словаре opts).
    """
    task_opts = copy.deepcopy(opts) # создание глубокой копии словаря opts
    task_opts['table'] = tablename  # добавление имени таблицы в словарь task_opts
    task_opts['dbschema'] = scheme  # добавление схемы базы данных в словарь task_opts

    # Сохранение данных в таблицу Vertica с использованием Spark
    df.write.save(format="com.vertica.spark.datasource.DefaultSource",
                  mode=mode, **task_opts)


In [None]:
# Загрузка данных из БД Hive
df = spark.sql('select * from company')

#### Задание № 1

- Отобразите все записи из таблицы company по компаниям, которые закрылись.

In [None]:
# Применение фильтра
df.filter(F.col('status') == 'closed')

#### Задание № 2

- Отобразите количество привлечённых средств для новостных компаний США. Используйте данные из таблицы company;

- Отсортируйте таблицу по убыванию значений в поле funding_total.

In [None]:
# Применение фильтра
df = df.filter(df['country_code'] == 'USA')
df = df.filter(df['category_code'] == 'news')

# Сортировка по полю funding_total в порядке убывания
df = df.orderBy(df['funding_total'].desc())

# Выбор столбца funding_total
df = df.select('funding_total')

# Вывод результатов
df.show()

#### Задание № 3

- Найдите общую сумму сделок по покупке одних компаний другими в долларах. Отберите сделки, которые осуществлялись только за наличные с 2011 по 2013 год включительно.

In [None]:
# Загрузка данных из БД
df = spark.sql('select * from acquisition')

# Применение фильтра
df = df.filter(df['term_code'] == 'cash')
df = df.filter(df['acquired_at'].year.isin([2011, 2012, 2013]))

# Вычисление суммы значений в столбце price_amount
total_price = df.agg({'price_amount': 'sum'}).collect()[0][0]

# Вывод результата
print(total_price)

#### Задание № 4

- Найдите общую сумму сделок по покупке одних компаний другими в долларах. Отберите сделки, которые осуществлялись только за наличные с 2011 по 2013 год включительно.

In [None]:
# Загрузка данных из БД
df = spark.sql('select * from people')

# Применение фильтра
df = df.filter(df['twitter_username'].like('Silver%'))

# Выбор столбцов first_name, last_name и twitter_username
df = df.select('first_name', 'last_name', 'twitter_username')

# Вывод результатов
df.show()

#### Задание № 5

- Выведите на экран всю информацию о людях, у которых названия аккаунтов в твиттере содержат подстроку 'money', а фамилия начинается на 'K'.

In [None]:
# Загрузка данных из БД
df = spark.sql('select * from people')

# SQL запрос
df = df.where('twitter_username like "%money%" and last_name like "%K%"')

# Вывод результата
df.show()

#### Задание № 6

- Для каждой страны отобразите общую сумму привлечённых инвестиций, которые получили компании, зарегистрированные в этой стране;

- Страну, в которой зарегистрирована компания, можно определить по коду страны. Отсортируйте данные по убыванию суммы.

In [None]:
# Загрузка данных из БД
df = spark.sql('select * from company')

# Группировка данных по столбцу country_code
df = df.groupBy('country_code')

# Применение агрегатной функции sum() к столбцу funding_total
df = df.agg(F.sum('funding_total').alias('total_funding'))

# Сортировка данных по столбцу total_funding по убыванию
df = df.orderBy('total_funding', descending=True)

# Вывод результата
df.show()

#### Задание № 7

- Составьте таблицу, в которую войдёт дата проведения раунда, а также минимальное и максимальное значения суммы инвестиций, привлечённых в эту дату.

- Оставьте в итоговой таблице только те записи, в которых минимальное значение суммы инвестиций не равно нулю и не равно максимальному значению.

In [None]:
# Загрузка данных из БД
df = spark.sql('select * from funding_round')

# Группировка данных по столбцу funded_at
df = df.groupBy('funded_at')

# Применение агрегатных функций min() и max() к столбцу raised_amount
df = df.agg(
    min_raised_amount=F.min('raised_amount'),
    max_raised_amount=F.max('raised_amount')
)

# Фильтрация данных по условиям MIN(raised_amount) != 0 AND MIN(raised_amount) != MAX(raised_amount)
df = df.filter(lambda row: row['min_raised_amount'] != 0 and row['min_raised_amount'] != row['max_raised_amount'])

# Вывод результата
df.show()

#### Задание № 8

Создайте поле с категориями:

- Для фондов, которые инвестируют в 100 и более компаний, назначьте категорию high_activity;

- Для фондов, которые инвестируют в 20 и более компаний до 100, назначьте категорию middle_activity;

- Если количество инвестируемых компаний фонда не достигает 20, назначьте категорию low_activity;

- Отобразите все поля таблицы fund и новое поле с категориями.

In [None]:
# Загрузка данных из БД
df = spark.sql('select * from fund')

# Применяем условную логику с использованием функции when
df = df.withColumn("category",
                  F.when(F.col("invested_companies") >= 100, "high_activity")
                  F.when(F.col("invested_companies") < 20, "low_activity")
                  F.otherwise("middle_activity")
)

# Вывод результата
df.show()

#### Задание № 9

- Для каждой из категорий, назначенных в предыдущем задании, посчитайте округлённое до ближайшего целого числа среднее количество инвестиционных раундов, в которых фонд принимал участие;

- Выведите на экран категории и среднее число инвестиционных раундов;

- Отсортируйте таблицу по возрастанию среднего.

In [None]:
# Загрузка данных из БД
df = spark.sql('select * from fund')

# Создание новой колонки для категоризации 'activity'
df = df.withColumn("activity",
                  F.when(F.col("invested_companies") >= 100, "high_activity")
                  F.when(F.col("invested_companies") >= 20, "middle_activity")
                  F.otherwise("low_activity")
)

# Группировка по 'activity' и расчет среднего значения для 'investment_rounds'
df = df.groupBy("activity").agg(F.round(F.avg("investment_rounds")).alias("avg_investment_rounds"))

# Сортировка по среднему значению 'investment_rounds'
df = df.orderBy("avg_investment_rounds")

# Вывод результата
df.show()

#### Задание № 10

- Проанализируйте, в каких странах находятся фонды, которые чаще всего инвестируют в стартапы;

- Для каждой страны посчитайте минимальное, максимальное и среднее число компаний, в которые инвестировали фонды этой страны, основанные с 2010 по 2012 год включительно. Исключите страны с фондами, у которых минимальное число компаний, получивших инвестиции, равно нулю;

- Выгрузите десять самых активных стран-инвесторов: отсортируйте таблицу по среднему количеству компаний от большего к меньшему. Затем добавьте сортировку по коду страны в лексикографическом порядке.

In [None]:
# Загрузка данных из БД
df = spark.sql('select * from fund')

# Фильтрация данных по году основания
df = df.filter(F.year("founded_at").isin([2010, 2011, 2012]))

# Группировка по 'country_code' и агрегация данных
df = df.groupBy("country_code") .agg(F.min("invested_companies").alias("min_invested_companies"),
                                     F.max("invested_companies").alias("max_invested_companies"),
                                     F.avg("invested_companies").alias("avg_invested_companies"))

# Применение условия HAVING
df = df.filter(F.col("min_invested_companies") > 0)

# Сортировка и ограничение количества строк
df = df.orderBy(F.desc("avg_invested_companies"), "country_code").limit(10)

# Вывод результата
df.show()

#### Задание № 11

- Отобразите имя и фамилию всех сотрудников стартапов;

- Добавьте поле с названием учебного заведения, которое окончил сотрудник, если эта информация известна.

In [None]:
# Загрузка данных из БД
people_df = spark.sql('select * from people')
education_df = spark.sql('select * from education')

# Производим LEFT JOIN между people_df и education_df
df = people_df.alias("p") \
    .join(education_df.alias("e"), (people_df.id == education_df.person_id), how='left') \
    .select("p.first_name", "p.last_name", "e.institution")

# Вывод результата
df.show()

#### Задание № 12

- Для каждой компании найдите количество учебных заведений, которые окончили её сотрудники;

- Выведите название компании и число уникальных названий учебных заведений;

- Составьте топ-5 компаний по количеству университетов.

In [None]:
# Загрузка данных из БД
people_df = spark.sql('select * from people')
education_df = spark.sql('select * from education')
company_df = spark.sql('select * from company')

# Производим JOIN между company_df, people_df и education_df
joined_df = company_df.alias("c") \
           .join(people_df.alias("p"), "c.id = p.company_id") \
           .join(education_df.alias("e"), "p.id = e.person_id")

# Группировка по 'c.name' и агрегация данных
df = joined_df \
    .groupBy("c.name") \
    .agg(F.countDistinct("e.institution").alias("distinct_institutions")) \
    .orderBy(F.desc("distinct_institutions")) \
    .limit(5)

# Вывод результата
df.show()

#### Задание № 13

- Составьте список с уникальными названиями закрытых компаний, для которых первый раунд финансирования оказался последним.

In [None]:
# Загрузка данных из БД
company_df = spark.sql('select * from company')
funding_round_df = spark.sql('select * from funding_round')

# Производим JOIN между company_df и funding_round_df
joined_df = company_df.alias("c") \
            .join(funding_round_df.alias("fr"), "c.id = fr.company_id")

# Применяем фильтры и группируем по 'name'
df = joined_df.filter((F.col("fr.status") == 'closed') &
                      (F.col("fr.is_first_round") == 1) &
                      (F.col("fr.is_last_round") == 1)) \
              .groupBy("c.name") \
              .agg(F.count("c.name").alias("count"))

# Вывод результата
df.show()

#### Задание № 14

- Составьте список уникальных номеров сотрудников, которые работают в компаниях, отобранных в предыдущем задании.

In [None]:
# Загрузка данных из БД
people_df = spark.sql('select * from people')
company_df = spark.sql('select * from company')
funding_round_df = spark.sql('select * from funding_round')

# Получение списка company_id, соответствующих условиям в подзапросе
subquery_df = company_df.alias("c") \
                        .join(funding_round_df.alias("fr"), "c.id = fr.company_id") \
                        .filter((F.col("fr.status") == 'closed') &
                                (F.col("fr.is_first_round") == 1) &
                                (F.col("fr.is_last_round") == 1)) \
                        .groupBy("c.id") \
                        .agg(F.first("c.id").alias("company_id"))

# Производим фильтрацию people_df на основе полученных company_id
df = people_df.alias("p") \
              .join(subquery_df.alias("s"), F.col("p.company_id") == F.col("s.company_id"), "inner") \
              .select("p.id") \
              .distinct()

# Вывод результата
df.show()

#### Задание № 15

- Составьте таблицу, куда войдут уникальные пары с номерами сотрудников из предыдущей задачи и учебным заведением, которое окончил сотрудник.

In [None]:
# Загрузка данных из БД
people_df = spark.sql('select * from people')
education_df = spark.sql('select * from education')
company_df = spark.sql('select * from company')
funding_round_df = spark.sql('select * from funding_round')

# Получение списка company_id, соответствующих условиям в подзапросе
subquery_df = company_df.alias("c") \
                        .join(funding_round_df.alias("fr"), "c.id = fr.company_id") \
                        .filter((F.col("fr.status") == 'closed') &
                                (F.col("fr.is_first_round") == 1) &
                                (F.col("fr.is_last_round") == 1)) \
                        .groupBy("c.id") \
                        .agg(F.first("c.id").alias("company_id"))

# Производим LEFT JOIN между people_df и education_df
joined_df = people_df.alias("p") \
    .join(education_df.alias("e"), F.col("p.id") == F.col("e.person_id"), "left_outer")

# Применяем фильтры и группируем по p.id и e.institution
df = joined_df \
    .join(subquery_df.alias("s"), F.col("p.company_id") == F.col("s.company_id"), "inner") \
    .groupBy("p.id", "e.institution") \
    .agg(F.count("e.institution").alias("count"))\
    .filter(F.col("e.institution").isNotNull())

# Вывод результата
df.show()


#### Задание № 16

- Посчитайте количество учебных заведений для каждого сотрудника из предыдущего задания;

- При подсчёте учитывайте, что некоторые сотрудники могли окончить одно и то же заведение дважды.

In [None]:
# Загрузка данных из БД
people_df = spark.sql('select * from people')
education_df = spark.sql('select * from education')
company_df = spark.sql('select * from company')
funding_round_df = spark.sql('select * from funding_round')

# Подзапрос для фильтрации company_id в основном запросе
subquery_df = company_df.join(funding_round_df, company_df.id == funding_round_df.company_id) \
                        .filter((F.col('status') == 'closed') &
                                (F.col('is_first_round') == 1) &
                                (F.col('is_last_round') == 1)) \
                        .select('id') \
                        .distinct()

# Основной запрос
# 1) Сделаем LEFT JOIN между people и education
# 2) Применим фильтр для company_id из подзапроса
# 3) Произведем агрегацию данных
# 4) Отфильтруем результаты с учетом условия HAVING в SQL запросе
df = people_df.join(education_df, people_df.id == education_df.person_id, 'left') \
    .filter(people_df.company_id.isin([row.id for row in subquery_df.collect()])) \
    .groupBy('id') \
    .agg(F.count('instituition').alias('count_instituition'),
         F.countDistinct('instituition').alias('count_distinct_instituition')) \
    .filter(F.col('count_distinct_instituition') > 0) \
    .select('id', 'count_instituition')

# Вывод результата
df.show()


#### Задание № 17

- Дополните предыдущий запрос и выведите среднее число учебных заведений (всех, не только уникальных), которые окончили сотрудники разных компаний;

- Нужно вывести только одну запись, группировка здесь не понадобится.

In [None]:
# Создание базового DataFrame (аналог WITH base AS в SQL)
# Подзапрос для фильтрации company_id
subquery_df = company_df.join(funding_round_df, company_df.id == funding_round_df.company_id) \
                        .filter((F.col('status') == 'closed') &
                                (F.col('is_first_round') == 1) &
                                (F.col('is_last_round') == 1)) \
                        .select('id') \
                        .distinct()

# Создание базового DataFrame
base_df = people_df.join(education_df, people_df.id == education_df.person_id, 'left') \
                   .filter(people_df.company_id.isin([row.id for row in subquery_df.collect()])) \
                   .groupBy('id') \
                   .agg(F.count('instituition').alias('count'),
                        F.countDistinct('instituition').alias('count_distinct_instituition')) \
                   .filter(F.col('count_distinct_instituition') > 0)

# Вычисление среднего значения (аналог SELECT AVG(COUNT) FROM base в SQL)
df = base_df.agg(F.avg('count').alias('avg_count'))

# Вывод результата
df.show()

#### Задание № 18

- Напишите похожий запрос: выведите среднее число учебных заведений (всех, не только уникальных), которые окончили сотрудники Facebook.

In [None]:
# Создание базового DataFrame (аналог WITH base AS в SQL)
# Подзапрос для фильтрации company_id
subquery_df = company_df.filter(F.col('name') == 'Facebook') \
                        .select('id')

# Создание базового DataFrame
# Здесь используется RIGHT JOIN вместо LEFT JOIN
base_df = people_df.join(education_df, people_df.id == education_df.person_id, 'right') \
                   .filter(people_df.company_id.isin([row.id for row in subquery_df.collect()])) \
                   .groupBy('id') \
                   .agg(F.count('instituition').alias('count'))

# Вычисление среднего значения (аналог SELECT AVG(COUNT) FROM base в SQL)
df = base_df.agg(F.avg('count').alias('avg_count'))

# Вывод результата
df.show()

#### Задание № 19

Составьте таблицу из полей:

- name_of_fund — название фонда;

- name_of_company — название компании;

- amount — сумма инвестиций, которую привлекла компания в раунде.

В таблицу войдут данные о компаниях, в истории которых было больше шести важных этапов, а раунды финансирования проходили с 2012 по 2013 год включительно.

In [None]:
# Загрузка данных из БД
investment_df = spark.sql('select * from investment')
fund_df = spark.sql('select * from fund')

# Создание подзапроса для таблицы funding_round, фильтрация по датам
filtered_funding_round_df = funding_round_df.filter((F.col('funded_at') >= '2012-01-01') &
                                                    (F.col('funded_at') <= '2013-12-31'))

# Произведем соединения между таблицами
# LEFT JOIN между investment и company
join_1 = investment_df.join(company_df, investment_df.company_id == company_df.id, 'left')

# LEFT JOIN между результатом предыдущего JOIN и fund
join_2 = join_1.join(fund_df, join_1.fund_id == fund_df.id, 'left')

# INNER JOIN между результатом предыдущих JOIN и отфильтрованным funding_round
final_join = join_2.join(filtered_funding_round_df, join_2.funding_round_id == filtered_funding_round_df.id, 'inner')

# Применение фильтров и выбор столбцов
df = final_join.filter(F.col('milestones') > 6) \
                      .select(F.col('f.name').alias('name_of_fund'),
                              F.col('c.name').alias('name_of_company'),
                              F.col('fr.raised_amount').alias('amount'))

# Вывод результата
df.show()

#### Задание № 20

Выгрузите таблицу, в которой будут такие поля:

- Название компании-покупателя;

- сумма сделки;

- название компании, которую купили;

- сумма инвестиций, вложенных в купленную компанию;

- доля, которая отображает, во сколько раз сумма покупки превысила сумму вложенных в компанию инвестиций, округлённая до ближайшего целого числа.

Не учитывайте те сделки, в которых сумма покупки равна нулю. Если сумма инвестиций в компанию равна нулю, исключите такую компанию из таблицы.
Отсортируйте таблицу по сумме сделки от большей к меньшей, а затем по названию купленной компании в лексикографическом порядке. Ограничьте таблицу первыми десятью записями.

In [None]:
# Загрузка данных из БД
acquisition_df = spark.sql('select * from acquisition')

# Создание базового DataFrame 'acquiring'
acquiring_df = acquisition_df.join(company_df, acquisition_df.acquiring_company_id == company_df.id, 'left') \
                             .filter(F.col('price_amount') > 0) \
                             .select(F.col('c.name').alias('buyer'),
                                     F.col('a.price_amount').alias('price'),
                                     F.col('a.id').alias('key'))

# Создание базового DataFrame 'acquired'
acquired_df = acquisition_df.join(company_df, acquisition_df.acquired_company_id == company_df.id, 'left') \
                            .filter(F.col('funding_total') > 0) \
                            .select(F.col('c.name').alias('acquisition'),
                                    F.col('c.funding_total').alias('investment'),
                                    F.col('a.id').alias('key'))

# Произведение соединения и вычисление итогового результата
df = acquiring_df.join(acquired_df, acquiring_df.key == acquired_df.key) \
                        .withColumn('uplift', F.round(acquiring_df.price / acquired_df.investment)) \
                        .select('buyer', 'price', 'acquisition', 'investment', 'uplift') \
                        .orderBy(F.desc('price'), 'acquisition') \
                        .limit(10)

# Вывод результата
df.show()

#### Задание № 21

- Выгрузите таблицу, в которую войдут названия компаний из категории social, получившие финансирование с 2010 по 2013 год включительно;

- Проверьте, что сумма инвестиций не равна нулю;

- Выведите также номер месяца, в котором проходил раунд финансирования.

In [None]:
# Произведем LEFT JOIN между company и funding_round
join_df = company_df.join(funding_round_df, company_df.id == funding_round_df.company_id, 'left')

# Применение фильтров согласно условиям
filtered_df = join_df.filter((F.col('category_code') == 'social') &
                             (F.col('funded_at') >= '2010-01-01') &
                             (F.col('funded_at') <= '2013-12-31') &
                             (F.col('raised_amount') != 0))

# Выбор необходимых столбцов и выполнение операций над ними
df = filtered_df.select(F.col('c.name').alias('social_co'),
                               F.month('funded_at').alias('funding_month'))

# Вывод результата
df.show()

#### Задание № 22

Отберите данные по месяцам с 2010 по 2013 год, когда проходили инвестиционные раунды. Сгруппируйте данные по номеру месяца и получите таблицу, в которой будут поля:

- Номер месяца, в котором проходили раунды;

- количество уникальных названий фондов из США, которые инвестировали в этом месяце;

- количество компаний, купленных за этот месяц;

- общая сумма сделок по покупкам в этом месяце.

In [None]:
# Загрузка данных из БД
fund_df = spark.sql('select * from fund')
investment_df = spark.sql('select * from investment')

# Создание базового DataFrame 'fundings'
fundings_df = fund_df.join(investment_df, fund_df.id == investment_df.fund_id, 'left') \
                     .join(funding_round_df, investment_df.funding_round_id == funding_round_df.id, 'left') \
                     .filter((F.col('country_code') == 'USA') &
                             (F.year(F.col('funded_at').cast('date')).between(2010, 2013))) \
                     .groupBy(F.month(F.col('funded_at').cast('date')).alias('funding_month')) \
                     .agg(F.countDistinct('f.id').alias('us_funds'))

# Создание базового DataFrame 'acquisitions'
acquisitions_df = acquisition_df.filter(F.year(F.col('acquired_at').cast('date')).between(2010, 2013)) \
                                .groupBy(F.month(F.col('acquired_at').cast('date')).alias('funding_month')) \
                                .agg(F.count('acquired_company_id').alias('bought_co'),
                                     F.sum('price_amount').alias('sum_total'))

# Произведение LEFT JOIN для получения итогового результата
df = fundings_df.join(acquisitions_df, fundings_df.funding_month == acquisitions_df.funding_month, 'left') \
                       .select('fnd.funding_month', 'fnd.us_funds', 'acq.bought_co', 'acq.sum_total')

# Вывод результата
df.show()

#### Задание № 23

- Составьте сводную таблицу и выведите среднюю сумму инвестиций для стран, в которых есть стартапы, зарегистрированные в 2011, 2012 и 2013 годах;

- Данные за каждый год должны быть в отдельном поле;

- Отсортируйте таблицу по среднему значению инвестиций за 2011 год от большего к меньшему.

In [None]:
# Создание базового DataFrame 'y_11'
y_11 = company_df.filter(F.year(F.col('founded_at').cast('date')).between(2011, 2013)) \
                 .groupBy(F.col('country_code').alias('country'),
                          F.year(F.col('founded_at').cast('date')).alias('year_founded')) \
                 .agg(F.avg('funding_total').alias('y_2011')) \
                 .filter(F.col('year_founded') == 2011)

# Создание базового DataFrame 'y_12'
y_12 = company_df.filter(F.year(F.col('founded_at').cast('date')).between(2011, 2013)) \
                 .groupBy(F.col('country_code').alias('country'),
                          F.year(F.col('founded_at').cast('date')).alias('year_founded')) \
                 .agg(F.avg('funding_total').alias('y_2012')) \
                 .filter(F.col('year_founded') == 2012)

# Создание базового DataFrame 'y_13'
y_13 = company_df.filter(F.year(F.col('founded_at').cast('date')).between(2011, 2013)) \
                 .groupBy(F.col('country_code').alias('country'),
                          F.year(F.col('founded_at').cast('date')).alias('year_founded')) \
                 .agg(F.avg('funding_total').alias('y_2013')) \
                 .filter(F.col('year_founded') == 2013)

# Произведение JOIN для получения итогового результата
df = y_11.join(y_12, y_11.country == y_12.country) \
         .join(y_13, y_12.country == y_13.country) \
         .select(y_11.country, 'y_2011', 'y_2012', 'y_2013') \
         .orderBy(F.desc('y_2011'))

# Вывод результата
df.show()