Скачаем датасет из Kaggle

In [2]:
import kagglehub

# Загрузка датасета
path = kagglehub.dataset_download("joannpeeler/labeled-chess-positions-109m-csv-format")

print("Path to dataset files:", path)

Downloading from https://www.kaggle.com/api/v1/datasets/download/joannpeeler/labeled-chess-positions-109m-csv-format?dataset_version_number=1...


100%|██████████| 4.25G/4.25G [22:23<00:00, 3.40MB/s] 

Extracting files...





Path to dataset files: /home/user0/.cache/kagglehub/datasets/joannpeeler/labeled-chess-positions-109m-csv-format/versions/1


In [1]:
import os
from pyspark.sql import SparkSession, DataFrame
from pyspark import SparkConf
from pyspark.sql.functions import (
    regexp_replace,
    regexp_extract_all,
    col,
    lit
)

In [2]:
def create_spark_configuration() -> SparkConf:
    """
    Создает и конфигурирует экземпляр SparkConf для приложения Spark.

    Returns:
        SparkConf: Настроенный экземпляр SparkConf.
    """
    # Получаем имя пользователя
    user_name = os.getenv("USER")
    
    conf = SparkConf()
    conf.setAppName("lab 1 Test")
    conf.setMaster("yarn")
    conf.set("spark.submit.deployMode", "client")
    conf.set("spark.executor.memory", "12g")
    conf.set("spark.executor.cores", "8")
    conf.set("spark.executor.instances", "2")
    conf.set("spark.driver.memory", "4g")
    conf.set("spark.driver.cores", "2")
    conf.set("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.6.0")
    conf.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
    conf.set("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkCatalog")
    conf.set("spark.sql.catalog.spark_catalog.type", "hadoop")
    conf.set("spark.sql.catalog.spark_catalog.warehouse", f"hdfs:///user/{user_name}/warehouse")
    conf.set("spark.sql.catalog.spark_catalog.io-impl", "org.apache.iceberg.hadoop.HadoopFileIO")

    return conf

In [3]:
conf = create_spark_configuration()

In [4]:
spark = SparkSession.builder.config(conf=conf).getOrCreate()
spark

:: loading settings :: url = jar:file:/opt/spark-3.5.2-bin-hadoop3/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/user0/.ivy2/cache
The jars for the packages stored in: /home/user0/.ivy2/jars
org.apache.iceberg#iceberg-spark-runtime-3.5_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-7b35bfbd-f28f-45df-b464-4b9ae98149e6;1.0
	confs: [default]
	found org.apache.iceberg#iceberg-spark-runtime-3.5_2.12;1.6.0 in central
:: resolution report :: resolve 601ms :: artifacts dl 25ms
	:: modules in use:
	org.apache.iceberg#iceberg-spark-runtime-3.5_2.12;1.6.0 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   1   |   0   |   0   |   0   ||   1   |   0   |
	---------------------------------------------------------------------
:: retrieving :: org.apache.spa

In [5]:
path = "hdfs:///user/user0/master_training_data_ver7.0d.csv"
df = (spark.read.format("csv")
      .option("header", "true")
      .load(path)
)

                                                                                

In [6]:
df.show()

                                                                                

+----------------+---+-------+--------------------+----------+----+------+
|            Hash|Ply|GamePly|                 FEN|HasCastled|Eval|Result|
+----------------+---+-------+--------------------+----------+----+------+
|77C51D9E08C4CFE5| 44|    113|3r1r1k/1pq2pp1/p4...|         3| -21|   0.5|
|40E22C66F91971B5| 44|     97|2q3rk/1rnbppbn/p5...|         3| 223|   1.0|
|05F5B9C1E13B6DB2| 55|    204|r2br2k/3b1pp1/2p4...|         3|-111|   0.0|
|B7F72DC9E8B257F2| 97|    131|8/3b1pk1/3Q2p1/1p...|         3|-252|   1.0|
|D30ED7CEEA0347BF| 45|    105|3r2k1/1p3ppp/rnp1...|         3| 135|   0.5|
|5FBA2109603D3D66|139|    162|8/4k3/r6p/4P1pP/4...|         3|-835|   0.0|
|B616FCC024D2C623|165|    178|8/5K2/3k4/5P2/4R3...|         3| 412|   1.0|
|6C02A2DF1DFF27D4| 71|    137|8/2p3b1/1p2k2p/4p...|         0| 342|   1.0|
|FF9F28B98001BC81| 88|    110|2r2rk1/5p2/b3p2p/...|         3|   4|   1.0|
|C804494384061CA0| 26|    207|r1bq1rk1/pp2b1pp/...|         3| 119|   1.0|
|BC575E8C388FDC8F| 13|   

In [7]:
df.printSchema()

root
 |-- Hash: string (nullable = true)
 |-- Ply: string (nullable = true)
 |-- GamePly: string (nullable = true)
 |-- FEN: string (nullable = true)
 |-- HasCastled: string (nullable = true)
 |-- Eval: string (nullable = true)
 |-- Result: string (nullable = true)



-- Hash: Строка. Уникальные строки (например, хеши). 
    Уникальный идентификатор позиции на шахматной доске.

-- Ply: Целое число. От 0 и выше (обычно от 0 до 100, в зависимости от хода). 
    Количество полуходов (ходов для белых и черных) в текущей игре. Это число увеличивается на 1 с каждым ходом.

-- GamePly: Целое число. От 1 и выше (обычно от 1 до 200 и более). 
    Номер полного хода в игре (для белых и черных). Например, если в игре было сделано 10 полных ходов, то GamePly будет 10.

-- FEN: Строка. Строки в формате FEN. 
    Представление позиции на доске в формате Forsyth-Edwards Notation (FEN), которое включает положение фигур, кто ходит, и возможность рокировки.

-- HasCastled: Целое число. 0 или 3. Показывает, было ли сделано рокирование в игре: 
    0 — не было, 
    3 — было (для белых или черных, в зависимости от контекста).

-- Eval: Число с плавающей точкой. От -∞ до +∞.
    Оценка текущей позиции в терминах силы (например, положительное значение для белых, отрицательное — для черных).

-- Result: Число с плавающей точкой. 0.0, 0.5, 1.0.
    0.0 — поражение для игрока, чей ход зафиксирован,
    0.5 — ничья,
    1.0 — победа для игрока, чей ход зафиксирован.

In [8]:
# Уберем лишние колонки
df = df.select(
    "Ply", "GamePly", "HasCastled", "Eval", "Result"
)

df.show()

# Уберем колонку Hash (Уникальный идентификатор позиции) и FEN (Представление позиции на шахматной доске в формате FEN), которые 

+---+-------+----------+----+------+
|Ply|GamePly|HasCastled|Eval|Result|
+---+-------+----------+----+------+
| 44|    113|         3| -21|   0.5|
| 44|     97|         3| 223|   1.0|
| 55|    204|         3|-111|   0.0|
| 97|    131|         3|-252|   1.0|
| 45|    105|         3| 135|   0.5|
|139|    162|         3|-835|   0.0|
|165|    178|         3| 412|   1.0|
| 71|    137|         0| 342|   1.0|
| 88|    110|         3|   4|   1.0|
| 26|    207|         3| 119|   1.0|
| 13|    136|         2| -12|   0.0|
| 85|    168|         3|-325|   0.0|
| 62|     94|         3|-819|   0.0|
| 82|    146|         3|-375|   0.0|
|138|    151|         1| -85|   0.5|
| 41|    112|         3| 249|   1.0|
| 87|    371|         3| -22|   0.5|
| 79|     96|         3| -69|   0.5|
| 79|    113|         3| 330|   1.0|
| 92|    133|         3| 465|   1.0|
+---+-------+----------+----+------+
only showing top 20 rows



In [9]:
# Поменяем изначальный строковый тип данных на подходящий

from pyspark.sql.functions import col, regexp_replace, regexp_extract_all, lit

def transform_dataframe(data: DataFrame) -> DataFrame:
    """
    Преобразует столбцы DataFrame в указанные типы данных и
    выполняет необходимые преобразования.

    Args:
        data (DataFrame): Исходный DataFrame.

    Returns:
        DataFrame: Преобразованный DataFrame.
    """
    # Преобразуем столбцы в соответствующие типы данных
    data = data.withColumn("Ply", col("Ply").cast("Integer"))
    data = data.withColumn("GamePly", col("GamePly").cast("Integer"))
    data = data.withColumn("HasCastled", col("HasCastled").cast("Integer"))
    data = data.withColumn("Eval", col("Eval").cast("Float"))
    data = data.withColumn("Result", col("Result").cast("Float"))

    # Возвращаем преобразованный DataFrame
    return data

In [10]:
# Проверим измененный тип данных
df = transform_dataframe(df)
df.printSchema()

root
 |-- Ply: integer (nullable = true)
 |-- GamePly: integer (nullable = true)
 |-- HasCastled: integer (nullable = true)
 |-- Eval: float (nullable = true)
 |-- Result: float (nullable = true)



Сначала создадим базу данных

In [19]:
database_name = "samorokov_db"

In [20]:
# Создадим инструкцию SQL для добавления базы данных в каталог Apache Spark
create_database_sql = f"""
CREATE DATABASE IF NOT EXISTS spark_catalog.{database_name}
"""

In [21]:
spark.sql(create_database_sql)

DataFrame[]

In [22]:
# Установим созданную базу данных как текущую.
spark.catalog.setCurrentDatabase(database_name)

In [23]:
# Сохранение DataFrame в виде таблицы
# записываем преобразованный датафрейм в таблицу sobd_lab1_table
df.writeTo("sobd_table_lab1").using("iceberg").create()

                                                                                

БД успешно создана, глянем, какие таблицы туда входят

In [24]:
for table in spark.catalog.listTables():
    print(table.name)

sobd_table_lab1


ПРИМЕР (удаления таблицы):
##### spark.sql("DROP TABLE spark_catalog.ivanov_database.sobd_lab1_table")
##### spark.sql("DROP DATABASE spark_catalog.ivanov_database")

In [25]:
# После удачной записи остановим сессию
spark.stop()