## 1. Подготовка среды

Установка pyspark

In [1]:
!pip install pyspark



Установим драйвер JDBC для БД SQLite

In [2]:
!wget -q -O sqlite-jdbc.jar https://repo1.maven.org/maven2/org/xerial/sqlite-jdbc/3.41.2.1/sqlite-jdbc-3.41.2.1.jar

Инициализируем SparkSession для ETL-проекта и настраиваем подключение к SQLite через JDBC

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
import sqlite3
spark = SparkSession.builder \
    .appName("FinanceETL") \
    .config("spark.driver.extraClassPath", "sqlite-jdbc.jar") \
    .getOrCreate()

## 2. Извлечение данных


Используем SparkSession.read API для загрузки данных из CSV:

In [None]:
df_stocks = spark.read.csv("major-tech-stock-2019-2024.csv", header=True, inferSchema=True)
df_stocks.printSchema()
df_stocks.show(5)

root
 |-- Date: date (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Adj Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Ticker: string (nullable = true)

+----------+------------------+------------------+------------------+------------------+------------------+---------+------+
|      Date|              Open|              High|               Low|             Close|         Adj Close|   Volume|Ticker|
+----------+------------------+------------------+------------------+------------------+------------------+---------+------+
|2019-01-02| 38.72249984741211|39.712501525878906|38.557498931884766| 39.47999954223633|37.793785095214844|148158800|  AAPL|
|2019-01-03|35.994998931884766| 36.43000030517578|              35.5| 35.54750061035156| 34.02924346923828|365248800|  AAPL|
|2019-01-04| 36.13249969482422| 37.13750076293945| 35.95000076293945|37.0

Теперь извлечем данные из таблицы companies в SQLite. Spark предоставляет возможность читать из СУБД через формат jdbc:

In [None]:
df_companies = spark.read.format("jdbc") \
    .option("url", "jdbc:sqlite:companies_db.sqlite") \
    .option("dbtable", "companies") \
    .option("driver", "org.sqlite.JDBC") \
    .load()

df_companies.printSchema()
df_companies.show()

root
 |-- Symbol: string (nullable = true)
 |-- CompanyName: string (nullable = true)
 |-- Sector: string (nullable = true)
 |-- Country: string (nullable = true)

+------+--------------------+--------------------+-------+
|Symbol|         CompanyName|              Sector|Country|
+------+--------------------+--------------------+-------+
|  AAPL|          Apple Inc.|          Technology|    USA|
|  MSFT|Microsoft Corpora...|          Technology|    USA|
|  AMZN|    Amazon.com, Inc.|Consumer Discreti...|    USA|
| GOOGL|       Alphabet Inc.|Communication Ser...|    USA|
|  TSLA|         Tesla, Inc.|Consumer Discreti...|    USA|
+------+--------------------+--------------------+-------+



## 3. Объединение данных и проверки

In [None]:
df_joined = df_stocks.join(df_companies, # То с чем объединяем
                           df_stocks["Ticker"] == df_companies["Symbol"], # Условие объединения
                           how="inner") # Тип объединения
df_joined.printSchema()
df_joined = df_joined.drop("Ticker")# Удалим дублёр
df_joined.show(5)

root
 |-- Date: date (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Adj Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Ticker: string (nullable = true)
 |-- Symbol: string (nullable = true)
 |-- CompanyName: string (nullable = true)
 |-- Sector: string (nullable = true)
 |-- Country: string (nullable = true)

+----------+------------------+------------------+------------------+------------------+------------------+--------+------+-----------+----------+-------+
|      Date|              Open|              High|               Low|             Close|         Adj Close|  Volume|Symbol|CompanyName|    Sector|Country|
+----------+------------------+------------------+------------------+------------------+------------------+--------+------+-----------+----------+-------+
|2023-12-29|193.89999389648438|194.39999389648438|191.72999572753906|192.52

## 4. Очистка и преобразование данных

4.1 Проверка и обработка пропущенных значений.

In [None]:
null_counts = df_joined.select(
    [F.count(F.when(F.col(c).isNull(), c)).alias(c)
    for c in df_joined.columns])
null_counts.show()

+----+----+----+---+-----+---------+------+------+------+-----------+------+-------+
|Date|Open|High|Low|Close|Adj Close|Volume|Ticker|Symbol|CompanyName|Sector|Country|
+----+----+----+---+-----+---------+------+------+------+-----------+------+-------+
|   0|   0|   0|  0|    0|        0|     0|     0|     0|          0|     0|      0|
+----+----+----+---+-----+---------+------+------+------+-----------+------+-------+



**Стратегии обработки пропусков:**


*   Если пропусков мало и они в несущественных строках – данные можно удалить. В PySpark: df_drop = df_joined.na.drop(subset=["ColumnName1","ColumnName2"]) – удалит строки, где любое из
указанных полей пустое. Либо .drop() без параметров – удалит любую строку, где какой-либо столбец пустой.
*   **Заполнение значениями:** метод df.fillna(value, subset=["col1","col2"]) или эквивалент df.na.fill(). Можно подставить фиксированное значение. Например, если в секторе у компании стоит null, можно заменить на "Unknown"; если пропущен объем – можно поставить 0 (или среднее значение, но для простоты 0).

*   Более сложные подходы: вычислить среднее/медиану и заполнить, использовать forward fill (протягивание предыдущего значения по дате, актуально для временных рядов), но эти методы требуют дополнительной логики.



Для примера применим комбинацию: удалим записи без объема (считаем, что объем критически важен), а категориальные пропуски, если бы были, заменим на "Unknown".

In [None]:
df_joined = df_joined.na.drop(subset=["Volume"])
df_joined = df_joined.fillna({"Sector": "Unknown"})

4.2 Преобразование типов данных.

 В нашем датафрейме Date сейчас представлена как строка (формат "yyyy-MM-dd"). Для удобства агрегирования по датам можно преобразовать ее в тип DateType. Spark предоставляет функцию to_date:\

In [None]:
df_joined = df_joined.withColumn("Date", F.to_date("Date", "yyyy-MM-dd"))
df_joined.printSchema()

root
 |-- Date: date (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Adj Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Symbol: string (nullable = true)
 |-- CompanyName: string (nullable = true)
 |-- Sector: string (nullable = false)
 |-- Country: string (nullable = true)



Теперь столбец Date станет типа date. Также можем выделить из даты дополнительные атрибуты, если понадобится, например год или месяц:

In [None]:
df_joined = df_joined.withColumn(
    "Year", F.year("Date")).withColumn("Month", F.month("Date"))
df_joined.printSchema()

root
 |-- Date: date (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Adj Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Symbol: string (nullable = true)
 |-- CompanyName: string (nullable = true)
 |-- Sector: string (nullable = false)
 |-- Country: string (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Month: integer (nullable = true)



Это добавит два новых столбца: год и месяц, извлеченные из даты.

Другие преобразования типов, которые могут понадобиться:


*   Приведение строковых чисел к числовым типам
`(df.withColumn("col_new", df.col_old.cast("integer")))`.
*   Разбиение составных полей, объединение полей (конкатенация строк).
*   Например, можно объединить столбцы Name и Sector в одно поле или наоборот, разнести сектор на более общую категорию (в данном случае нет необходимости, просто отмечаем возможности).



Итак, мы выполнили все шаги очистки и преобразования: устранили пропуски, привели типы данных, добавили нужные столбцы. Теперь перейдем к аналитическим операциям – агрегациям и фильтрации, т.е. собственно к анализу данных.

## 5. Агрегация данных

Для демонстрации возможностей Spark по агрегированию данных придумаем несколько задач:

**5.1 Средняя цена закрытия по каждому тикеру** (Symbol). Используем groupBy и avg:

In [None]:
avg_close_by_symbol = df_joined.groupBy("Symbol").agg(F.avg("Close").alias("AvgClose"))
avg_close_by_symbol.show()

+------+------------------+
|Symbol|          AvgClose|
+------+------------------+
|  AAPL| 123.0310850674094|
|  TSLA|  170.887469464531|
| GOOGL| 98.19300560276535|
|  AMZN|127.64970470119167|
|  MSFT| 236.2598490146461|
+------+------------------+



**5.2 Суммарный объем и количество дней по годам для каждой компании.** Здесь сгруппируем по двум полям: Symbol и Year. В агрегатах посчитаем сумму Volume и количество записей. Spark имеет функцию count, а сумму – sum.

In [None]:
stats_by_year = df_joined.groupBy("Symbol","Year").agg(
        F.count("*").alias("DaysCount"),
        F.sum("Volume").alias("TotalVolume")
        ).orderBy("Symbol","Year")
stats_by_year.show(10)

+------+----+---------+-----------+
|Symbol|Year|DaysCount|TotalVolume|
+------+----+---------+-----------+
|  AAPL|2019|      252|28254942800|
|  AAPL|2020|      253|39863855600|
|  AAPL|2021|      252|22812206100|
|  AAPL|2022|      251|22065504500|
|  AAPL|2023|      250|14804257200|
|  AMZN|2019|      252|19493002000|
|  AMZN|2020|      253|24950814000|
|  AMZN|2021|      252|17076362000|
|  AMZN|2022|      251|19096256300|
|  AMZN|2023|      250|14707898000|
+------+----+---------+-----------+
only showing top 10 rows



## 6. Фильтрация и поиск данных

В Spark можно легко отфильтровать данные по условиям, аналогичным WHERE в SQL. Мы покажем несколько примеров фильтрации на нашем DataFrame df_joined (или на агрегированных результатах):

**Пример 1:** Фильтрация по значению числового поля. Найдем все записи, где цена закрытия акции превысила определенный порог, скажем $1000 (значимо для акций с высокой ценой, таких как Google или Amazon в некоторые периоды).

In [None]:
expensive_days = df_joined.filter(df_joined.Close > 1000)
expensive_days.select("Date","Symbol","Close").show(5)

+----+------+-----+
|Date|Symbol|Close|
+----+------+-----+
+----+------+-----+



**Пример 2:** Фильтрация по текстовому полю (строке). Допустим, хотим получить все записи для компании Apple (тикер AAPL).

In [None]:
apple_df = df_joined.filter(df_joined.Symbol == "AAPL")
apple_df.orderBy("Date").show()

+----------+------------------+------------------+------------------+------------------+------------------+---------+------+-----------+----------+-------+----+-----+
|      Date|              Open|              High|               Low|             Close|         Adj Close|   Volume|Symbol|CompanyName|    Sector|Country|Year|Month|
+----------+------------------+------------------+------------------+------------------+------------------+---------+------+-----------+----------+-------+----+-----+
|2019-01-02| 38.72249984741211|39.712501525878906|38.557498931884766| 39.47999954223633|37.793785095214844|148158800|  AAPL| Apple Inc.|Technology|    USA|2019|    1|
|2019-01-03|35.994998931884766| 36.43000030517578|              35.5| 35.54750061035156| 34.02924346923828|365248800|  AAPL| Apple Inc.|Technology|    USA|2019|    1|
|2019-01-04| 36.13249969482422| 37.13750076293945| 35.95000076293945|37.064998626708984| 35.48192596435547|234428400|  AAPL| Apple Inc.|Technology|    USA|2019|    1

**Пример 3:** Фильтрация по дате или диапазону дат. Предположим, нужно взять данные за 2020 год для Amazon.

In [None]:
amazon_2020 = df_joined.filter((df_joined.Symbol == "AMZN") &
 (F.year("Date") == 2020))
amazon_2020.select("Date","Close","Volume").show(5)

+----------+------------------+---------+
|      Date|             Close|   Volume|
+----------+------------------+---------+
|2020-12-31|162.84649658203125| 59144000|
|2020-12-30|164.29249572753906| 64186000|
|2020-12-29|166.10000610351562| 97458000|
|2020-12-28|  164.197998046875|113736000|
|2020-12-24|158.63450622558594| 29038000|
+----------+------------------+---------+
only showing top 5 rows



**Пример 4:** Поиск по подстроке. Представим, что нужно найти компании, название которых содержит слово "Inc". Это скорее опрос справочной таблицы компаний:

In [None]:
df_companies.filter(df_companies.CompanyName.contains("Inc")).show()

+------+----------------+--------------------+-------+
|Symbol|     CompanyName|              Sector|Country|
+------+----------------+--------------------+-------+
|  AAPL|      Apple Inc.|          Technology|    USA|
|  AMZN|Amazon.com, Inc.|Consumer Discreti...|    USA|
| GOOGL|   Alphabet Inc.|Communication Ser...|    USA|
|  TSLA|     Tesla, Inc.|Consumer Discreti...|    USA|
+------+----------------+--------------------+-------+



Фильтрация – очень распространенная операция при анализе данных. В контексте ETL ее применяют и на этапе Transform (например, отфильтровать нерелевантные данные, как мы делали при очистке, удаляя строки с null, или исключая дубли). Здесь же мы демонстрируем ее для решения "поисковых" задач, когда из объединенного набора данных нужно выбрать интересующую информацию.
После выполнения фильтраций, убедитесь, что результаты соответствуют ожиданиям (правильное количество строк, корректные значения). Если нужно, можно также сочетать сортировку (orderBy) и ограничение вывода (limit) для удобства просмотра.

## 7. Загрузка результатов (Load) в хранилище

Финальный шаг – сохранить полученные данные. Предположим, по результатам наших преобразований мы хотим сформировать итоговый набор данных – например, очищенные и обогащенные данные о ценах акций, готовые для дальнейшего использования или передачи аналитикам. Мы покажем два способа загрузки: в файловую систему (формат Parquet) и обратно в базу данных.

**7.1 Сохранение DataFrame в файл Parquet.**\
Формат Parquet – это эффективный колонковый формат хранения данных, часто используемый в хранилищах big data. Он сохраняет схему и сжатые данные, оптимизирован для последующего чтения Spark'ом и другими инструментами.

In [None]:
df_joined.write.mode("overwrite"
#мод overwrite использован на случай,
#если вы будете несколько раз запускать ячейку
#и спарк спокойно переписывал данные
).parquet("output/stock_data_cleaned.parquet")

После выполнения этой команды в директории `output/stock_data_cleaned.parquet` (либо указанной вами) появится набор файлов Parquet, которые представляют разделы датасета.

**7.2 Загрузка данных в базу через JDBC**.\
Аналогично тому, как мы читали таблицу через JDBC, можно и записать DataFrame в таблицу базы данных. Предположим, мы хотим записать итоговый объединенный и очищенный набор цен акций в новую таблицу, скажем stock_prices_cleaned в нашей SQLite базе finance.db. Мы можем использовать метод write с форматом jdbc:

In [None]:
df_joined.write.format("jdbc") \
    .option("url", "jdbc:sqlite:finance.db") \
    .option("dbtable", "stock_prices_cleaned") \
    .option("driver", "org.sqlite.JDBC") \
    .mode("overwrite") \
    .save()

Для проверки можно подключиться к БД и посмотреть корреткность сохраненных данных

In [None]:
import sqlite3
conn = sqlite3.connect("finance.db")
cur = conn.cursor()
cur.execute("SELECT Symbol, Date, Close, Sector FROM stock_prices_cleaned LIMIT 5;")
rows = cur.fetchall()
for r in rows:
    print(r)
conn.close()

('AAPL', 1703808000000, 192.52999877929688, 'Technology')
('AAPL', 1703721600000, 193.5800018310547, 'Technology')
('AAPL', 1703635200000, 193.14999389648438, 'Technology')
('AAPL', 1703548800000, 193.0500030517578, 'Technology')
('AAPL', 1703203200000, 193.60000610351562, 'Technology')


## Задания для закрепления

Попробуйте усложнить ETL-пайплайн:


1. Добавьте данные о курсе рубля и расчитайте цены в рублях
`USD_RUB Historical Data.csv`

2. Определите, в какие месяцы курс доллара был наиболее выгодным для покупки акций.

3. Сохраните данные с рублевыми ценами для дальнейшего использования.

4. загрузите данные из parquet и выведите топ 5 самых дорогих акций в рублях