In [1]:
import os
from pyspark.sql import SparkSession, DataFrame
from pyspark import SparkConf
from pyspark.sql.functions import (
    regexp_replace,
    regexp_extract_all,
    col,
    lit
)

In [2]:
def create_spark_configuration() -> SparkConf:
    """
    Создает и конфигурирует экземпляр SparkConf для приложения Spark.

    Returns:
        SparkConf: Настроенный экземпляр SparkConf.
    """
    # Получаем имя пользователя
    user_name = os.getenv("USER")
    
    conf = SparkConf()
    conf.setAppName("lab 1 Test")
    conf.setMaster("yarn")
    conf.set("spark.submit.deployMode", "client")
    conf.set("spark.executor.memory", "12g")
    conf.set("spark.executor.cores", "8")
    conf.set("spark.executor.instances", "2")
    conf.set("spark.driver.memory", "4g")
    conf.set("spark.driver.cores", "2")
    conf.set("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.6.0")
    conf.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
    conf.set("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkCatalog")
    conf.set("spark.sql.catalog.spark_catalog.type", "hadoop")
    conf.set("spark.sql.catalog.spark_catalog.warehouse", f"hdfs:///user/{user_name}/warehouse")
    conf.set("spark.sql.catalog.spark_catalog.io-impl", "org.apache.iceberg.hadoop.HadoopFileIO")

    return conf

In [3]:
conf = create_spark_configuration()

In [4]:
spark = SparkSession.builder.config(conf=conf).getOrCreate()
spark

:: loading settings :: url = jar:file:/opt/spark-3.5.2-bin-hadoop3/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/user1/.ivy2/cache
The jars for the packages stored in: /home/user1/.ivy2/jars
org.apache.iceberg#iceberg-spark-runtime-3.5_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-e7021a85-2bd4-4608-aaba-f90fc85e3780;1.0
	confs: [default]
	found org.apache.iceberg#iceberg-spark-runtime-3.5_2.12;1.6.0 in central
:: resolution report :: resolve 599ms :: artifacts dl 20ms
	:: modules in use:
	org.apache.iceberg#iceberg-spark-runtime-3.5_2.12;1.6.0 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   1   |   0   |   0   |   0   ||   1   |   0   |
	---------------------------------------------------------------------
:: retrieving :: org.apache.spa

In [5]:
path = "/user/user1/2019oct.csv"

In [7]:
df = (spark.read.format("csv")
      .option("header", "true")
      .load(path)
)

                                                                                

In [8]:
df.show()

                                                                                

+--------------------+----------+----------+-------------------+--------------------+--------+-------+---------+--------------------+
|          event_time|event_type|product_id|        category_id|       category_code|   brand|  price|  user_id|        user_session|
+--------------------+----------+----------+-------------------+--------------------+--------+-------+---------+--------------------+
|2019-10-01 00:00:...|      view|  44600062|2103807459595387724|                NULL|shiseido|  35.79|541312140|72d76fde-8bb3-4e0...|
|2019-10-01 00:00:...|      view|   3900821|2053013552326770905|appliances.enviro...|    aqua|  33.20|554748717|9333dfbd-b87a-470...|
|2019-10-01 00:00:...|      view|  17200506|2053013559792632471|furniture.living_...|    NULL| 543.10|519107250|566511c2-e2e3-422...|
|2019-10-01 00:00:...|      view|   1307067|2053013558920217191|  computers.notebook|  lenovo| 251.74|550050854|7c90fc70-0e80-459...|
|2019-10-01 00:00:...|      view|   1004237|205301355563188265

In [9]:
df = df.select(
    "event_time","event_type", "product_id","category_code", "brand", "price"
)

In [10]:
df.show()

                                                                                

+--------------------+----------+----------+--------------------+--------+-------+
|          event_time|event_type|product_id|       category_code|   brand|  price|
+--------------------+----------+----------+--------------------+--------+-------+
|2019-10-01 00:00:...|      view|  44600062|                NULL|shiseido|  35.79|
|2019-10-01 00:00:...|      view|   3900821|appliances.enviro...|    aqua|  33.20|
|2019-10-01 00:00:...|      view|  17200506|furniture.living_...|    NULL| 543.10|
|2019-10-01 00:00:...|      view|   1307067|  computers.notebook|  lenovo| 251.74|
|2019-10-01 00:00:...|      view|   1004237|electronics.smart...|   apple|1081.98|
|2019-10-01 00:00:...|      view|   1480613|   computers.desktop|  pulser| 908.62|
|2019-10-01 00:00:...|      view|  17300353|                NULL|   creed| 380.96|
|2019-10-01 00:00:...|      view|  31500053|                NULL|luminarc|  41.16|
|2019-10-01 00:00:...|      view|  28719074|  apparel.shoes.keds|   baden| 102.71|
|201

In [11]:
df.printSchema()

root
 |-- event_time: string (nullable = true)
 |-- event_type: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- category_code: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- price: string (nullable = true)



In [12]:
def transform_dataframe(data: DataFrame) -> DataFrame:
    """
    Преобразует столбцы DataFrame в указанные типы данных и
    выполняет необходимые преобразования.

    Args:
        data (DataFrame): Исходный DataFrame.

    Returns:
        DataFrame: Преобразованный DataFrame.
    """
    # Преобразуем столбцы в соответствующие типы данных
    data = data.withColumn("product_id",
                           col("product_id").cast("Integer"))
    data = data.withColumn("price",
                           col("price").cast("Float"))


    return data

In [13]:
df = transform_dataframe(df)

In [14]:
df.show()

[Stage 3:>                                                          (0 + 1) / 1]

+--------------------+----------+----------+--------------------+--------+-------+
|          event_time|event_type|product_id|       category_code|   brand|  price|
+--------------------+----------+----------+--------------------+--------+-------+
|2019-10-01 00:00:...|      view|  44600062|                NULL|shiseido|  35.79|
|2019-10-01 00:00:...|      view|   3900821|appliances.enviro...|    aqua|   33.2|
|2019-10-01 00:00:...|      view|  17200506|furniture.living_...|    NULL|  543.1|
|2019-10-01 00:00:...|      view|   1307067|  computers.notebook|  lenovo| 251.74|
|2019-10-01 00:00:...|      view|   1004237|electronics.smart...|   apple|1081.98|
|2019-10-01 00:00:...|      view|   1480613|   computers.desktop|  pulser| 908.62|
|2019-10-01 00:00:...|      view|  17300353|                NULL|   creed| 380.96|
|2019-10-01 00:00:...|      view|  31500053|                NULL|luminarc|  41.16|
|2019-10-01 00:00:...|      view|  28719074|  apparel.shoes.keds|   baden| 102.71|
|201

                                                                                

In [15]:
df.printSchema()

root
 |-- event_time: string (nullable = true)
 |-- event_type: string (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- category_code: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- price: float (nullable = true)



In [16]:
database_name = "lopin_database1"

In [17]:
create_database_sql = f"""
CREATE DATABASE IF NOT EXISTS spark_catalog.{database_name}
"""

In [18]:
spark.sql(create_database_sql)

DataFrame[]

In [19]:
spark.catalog.setCurrentDatabase(database_name)

In [20]:
# Сохранение DataFrame в виде таблицы
df.writeTo("sobd_lab1_table").using("iceberg").create()

24/12/15 21:05:57 WARN HadoopTableOperations: Error reading version hint file hdfs:/user/user1/warehouse/lopin_database1/sobd_lab1_table/metadata/version-hint.text
java.io.FileNotFoundException: File does not exist: /user/user1/warehouse/lopin_database1/sobd_lab1_table/metadata/version-hint.text
	at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:87)
	at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:77)
	at org.apache.hadoop.hdfs.server.namenode.FSDirStatAndListingOp.getBlockLocations(FSDirStatAndListingOp.java:159)
	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:2198)
	at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:795)
	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:468)
	at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolPro

In [21]:
for table in spark.catalog.listTables():
    print(table.name)

sobd_lab1_table


In [22]:
spark.stop()