# Подготовка данных

## Считаем данные в исходном csv формате

In [1]:
!pip install findspark

Defaulting to user installation because normal site-packages is not writeable


Перезапустить kernel после установки!

In [2]:
import findspark
findspark.init()

In [3]:
!hdfs dfs -ls

Found 1 items
drwxr-xr-x   - ubuntu hadoop          0 2025-05-30 18:51 data


In [9]:
!hdfs dfs -ls data

Found 5 items
-rw-r--r--   1 ubuntu hadoop 3136657969 2025-05-30 18:51 data/2022-11-04.txt
-rw-r--r--   1 ubuntu hadoop       9703 2025-05-30 18:50 data/lectures.csv
-rw-r--r--   1 ubuntu hadoop     296161 2025-05-30 18:50 data/questions.csv
drwxr-xr-x   - ubuntu hadoop          0 2025-05-30 18:56 data/train.parquet
-rw-r--r--   1 ubuntu hadoop   53700464 2025-05-30 18:50 data/train_1000k.csv


Создадим SparkSession

In [5]:
from pyspark.sql import SparkSession

spark = (
    SparkSession
        .builder
        .appName("OTUS")
        .getOrCreate()
)

Загрузим данные

In [6]:
df = spark.read.csv("data/train_1000k.csv", inferSchema=True, header=True)

Поделим данные на 10 разделов и сохраним в формате parquete

In [7]:
%%time
df.count()

CPU times: user 3.82 ms, sys: 497 µs, total: 4.32 ms
Wall time: 1.13 s


1000000

In [8]:
(
    df
        .repartition(10)
        .write
        .mode("overwrite")
        .parquet("data/train.parquet")
)

Проверим размер сохраненного файла

In [10]:
!hdfs dfs -ls -h data/train.parquet

Found 11 items
-rw-r--r--   1 ubuntu hadoop          0 2025-05-30 18:56 data/train.parquet/_SUCCESS
-rw-r--r--   1 ubuntu hadoop      1.8 M 2025-05-30 18:56 data/train.parquet/part-00000-0d1c3d83-6870-4066-899a-0fe0198c2da0-c000.snappy.parquet
-rw-r--r--   1 ubuntu hadoop      1.8 M 2025-05-30 18:56 data/train.parquet/part-00001-0d1c3d83-6870-4066-899a-0fe0198c2da0-c000.snappy.parquet
-rw-r--r--   1 ubuntu hadoop      1.8 M 2025-05-30 18:56 data/train.parquet/part-00002-0d1c3d83-6870-4066-899a-0fe0198c2da0-c000.snappy.parquet
-rw-r--r--   1 ubuntu hadoop      1.8 M 2025-05-30 18:56 data/train.parquet/part-00003-0d1c3d83-6870-4066-899a-0fe0198c2da0-c000.snappy.parquet
-rw-r--r--   1 ubuntu hadoop      1.8 M 2025-05-30 18:56 data/train.parquet/part-00004-0d1c3d83-6870-4066-899a-0fe0198c2da0-c000.snappy.parquet
-rw-r--r--   1 ubuntu hadoop      1.8 M 2025-05-30 18:56 data/train.parquet/part-00005-0d1c3d83-6870-4066-899a-0fe0198c2da0-c000.snappy.parquet
-rw-r--r--   1 ubuntu hadoop      1.

Сравним скорость посчета строк из parquet

In [11]:
df_from_parquet = spark.read.parquet("data/train.parquet")

In [12]:
%%time
df_from_parquet.count()

CPU times: user 0 ns, sys: 2.04 ms, total: 2.04 ms
Wall time: 496 ms


1000000

In [13]:
df_from_parquet.show()

+------+-----------+--------+----------+---------------+-----------------+-----------+------------------+---------------------------+------------------------------+
|row_id|  timestamp| user_id|content_id|content_type_id|task_container_id|user_answer|answered_correctly|prior_question_elapsed_time|prior_question_had_explanation|
+------+-----------+--------+----------+---------------+-----------------+-----------+------------------+---------------------------+------------------------------+
| 62896|   96541616| 1400354|      1087|              0|               94|          3|                 0|                    17000.0|                         false|
|104413|   21162902| 2211492|      5262|              0|               95|          0|                 1|                    11000.0|                          true|
|537225|    4538020|10854346|      7880|              0|               26|          1|                 1|                    30000.0|                          true|
|158202|  