In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
import os

os.environ['HADOOP_USER_NAME'] = 'root'

# 1. Создаем сессию Spark
# Имя хоста 'namenode' берем из нашего docker-compose
spark = SparkSession.builder \
    .appName("QuickHDFSTest") \
    .master("local[*]") \
    .config("dfs.client.use.datanode.hostname", "true") \
    .config("spark.sql.warehouse.dir", "hdfs://namenode:9000/user/hive/warehouse") \
    .getOrCreate()



In [None]:
# 2. Тестовые данные
data = [("Ivan", 25), ("Anna", 30), ("Petr", 18), ("Elena", 45)]

# 3. Описываем схему
schema = StructType([
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True)
])

# 4. Создаем DataFrame
df = spark.createDataFrame(data, schema)

# 5. Показываем результат в консоли
print("Наш легкий DataFrame:")
df.show()

# 6. Записываем в HDFS (путь внутри контейнера к NameNode)
# Мы создадим папку /user/spark_test
try:
    df.write.mode("overwrite").parquet("hdfs://namenode:9000/user/spark_test")
    print("✅ Успешно сохранено в HDFS!")
except Exception as e:
    print(f"❌ Ошибка записи: {e}")

# 7. Читаем обратно для проверки
print("Чтение записанных данных из HDFS:")
df_load = spark.read.parquet("hdfs://namenode:9000/user/spark_test")
df_load.filter(df_load.age > 20).show()



In [26]:
data = [(14, "Gold"), (15, "Silver")]
df = spark.createDataFrame(data, ["client_id", "segment"])

# Сохраняем как таблицу (Spark создаст метаданные в папке spark-warehouse)
df.write.mode("append").saveAsTable("clients")



In [27]:
# Теперь SQL запрос будет работать!
spark.sql("SELECT * FROM clients").show()

+---------+---------+
|client_id|  segment|
+---------+---------+
|        4|corporate|
|       15|   Silver|
|        2|   Silver|
|        6|  premium|
|        3|  premium|
|        7|  premium|
|        8|  premium|
|        5|  premium|
|        9|  premium|
|       10|  premium|
|       11|  premium|
|       12|  premium|
|       13|  premium|
|       14|     Gold|
|        1|     Gold|
+---------+---------+



In [3]:
spark.sql('''
CREATE TABLE default.clients (
    client_id INTEGER,
    segment STRING
) USING PARQUET;
''').show()


++
||
++
++



In [6]:
spark.sql('''
INSERT INTO default.clients (client_id, segment) VALUES
--(1, 'retail'),
--(2, 'retail'),
(3, 'premium'),
(4, 'corporate'),
(5, 'premium')
''')

spark.sql("SELECT * FROM default.clients").show()


+---------+---------+
|client_id|  segment|
+---------+---------+
|        4|corporate|
|        2|   Silver|
|        3|  premium|
|        5|  premium|
|        1|     Gold|
+---------+---------+



In [30]:
spark.sql('''

INSERT INTO transactions (transaction_id,client_id, operation_dt, amount) VALUES
-- январь 2025
-- март 2025
(14, 1, '2026-02-01 08:00:00', 150.00),
(15, 2, '2026-02-02 09:30:00', 400.00),
(16, 3, '2025-12-31 10:10:00', 700.00);
''').show()
spark.sql("SELECT * FROM default.transactions").show()

++
||
++
++

+--------------+---------+-------------------+------+
|transaction_id|client_id|       operation_dt|amount|
+--------------+---------+-------------------+------+
|             6|        6|2026-01-05 10:10:00|  2700|
|            14|        1|2026-02-01 08:00:00|   150|
|             1|        1|2025-01-01 10:15:00|   100|
|             7|        7|2026-01-05 10:10:00|  5700|
|            15|        2|2026-02-02 09:30:00|   400|
|             2|        1|2025-01-01 15:30:00|   -20|
|             8|        8|2026-01-05 10:10:00|  7700|
|             9|        9|2026-01-05 10:10:00|  8700|
|            10|       10|2026-01-05 10:10:00|  2700|
|            11|       11|2026-01-05 10:10:00|    70|
|            12|       12|2026-01-05 10:10:00|     7|
|            13|       13|2026-01-05 10:10:00|    27|
|            16|        3|2025-12-31 10:10:00|   700|
|             3|        2|2025-01-01 11:00:00|   300|
|             4|        3|2025-01-01 09:45:00|   500|
+--------------

In [32]:
spark.sql('''

INSERT INTO transactions (transaction_id,client_id, operation_dt, amount) VALUES
-- январь 2025
-- март 2025
(17, 1, '2026-01-31 08:00:00', 150.00),
(18, 2, '2026-01-22 09:30:00', 400.00),
(19, 3, '2026-01-01 10:10:00', 700.00);
''').show()
spark.sql("SELECT * FROM default.transactions").show()

++
||
++
++

+--------------+---------+-------------------+------+
|transaction_id|client_id|       operation_dt|amount|
+--------------+---------+-------------------+------+
|             6|        6|2026-01-05 10:10:00|  2700|
|            14|        1|2026-02-01 08:00:00|   150|
|             1|        1|2025-01-01 10:15:00|   100|
|            18|        2|2026-01-22 09:30:00|   400|
|             7|        7|2026-01-05 10:10:00|  5700|
|            15|        2|2026-02-02 09:30:00|   400|
|             2|        1|2025-01-01 15:30:00|   -20|
|             8|        8|2026-01-05 10:10:00|  7700|
|             9|        9|2026-01-05 10:10:00|  8700|
|            10|       10|2026-01-05 10:10:00|  2700|
|            11|       11|2026-01-05 10:10:00|    70|
|            12|       12|2026-01-05 10:10:00|     7|
|            13|       13|2026-01-05 10:10:00|    27|
|            17|        1|2026-01-31 08:00:00|   150|
|            19|        3|2026-01-01 10:10:00|   700|
|            16

In [29]:
spark.sql("SELECT * FROM default.transactions").show()

+--------------+---------+-------------------+------+
|transaction_id|client_id|       operation_dt|amount|
+--------------+---------+-------------------+------+
|             6|        6|2026-01-05 10:10:00|  2700|
|             1|        1|2025-01-01 10:15:00|   100|
|             7|        7|2026-01-05 10:10:00|  5700|
|             2|        1|2025-01-01 15:30:00|   -20|
|             8|        8|2026-01-05 10:10:00|  7700|
|             9|        9|2026-01-05 10:10:00|  8700|
|            10|       10|2026-01-05 10:10:00|  2700|
|            11|       11|2026-01-05 10:10:00|    70|
|            12|       12|2026-01-05 10:10:00|     7|
|            13|       13|2026-01-05 10:10:00|    27|
|             3|        2|2025-01-01 11:00:00|   300|
|             4|        3|2025-01-01 09:45:00|   500|
+--------------+---------+-------------------+------+



In [38]:
spark.sql('''

SELECT * FROM transactions
WHERE operation_dt >= date_trunc('month', add_months(current_date(), -1))
  AND operation_dt <  date_trunc('month', current_date())


''').show()

+--------------+---------+-------------------+------+
|transaction_id|client_id|       operation_dt|amount|
+--------------+---------+-------------------+------+
|             6|        6|2026-01-05 10:10:00|  2700|
|            18|        2|2026-01-22 09:30:00|   400|
|             7|        7|2026-01-05 10:10:00|  5700|
|             8|        8|2026-01-05 10:10:00|  7700|
|             9|        9|2026-01-05 10:10:00|  8700|
|            10|       10|2026-01-05 10:10:00|  2700|
|            11|       11|2026-01-05 10:10:00|    70|
|            12|       12|2026-01-05 10:10:00|     7|
|            13|       13|2026-01-05 10:10:00|    27|
|            17|        1|2026-01-31 08:00:00|   150|
|            19|        3|2026-01-01 10:10:00|   700|
+--------------+---------+-------------------+------+



In [39]:
spark.sql('''

SELECT * FROM transactions
WHERE operation_dt <= date_trunc('month', add_months(current_date(), 1))
  AND operation_dt >=  date_trunc('month', current_date())


''').show()

+--------------+---------+-------------------+------+
|transaction_id|client_id|       operation_dt|amount|
+--------------+---------+-------------------+------+
|            14|        1|2026-02-01 08:00:00|   150|
|            15|        2|2026-02-02 09:30:00|   400|
+--------------+---------+-------------------+------+



In [37]:
spark.sql('''

SELECT date_trunc('month', current_date()) FROM transactions



''').show()

+---------------------------------+
|date_trunc(month, current_date())|
+---------------------------------+
|              2026-02-01 00:00:00|
|              2026-02-01 00:00:00|
|              2026-02-01 00:00:00|
|              2026-02-01 00:00:00|
|              2026-02-01 00:00:00|
|              2026-02-01 00:00:00|
|              2026-02-01 00:00:00|
|              2026-02-01 00:00:00|
|              2026-02-01 00:00:00|
|              2026-02-01 00:00:00|
|              2026-02-01 00:00:00|
|              2026-02-01 00:00:00|
|              2026-02-01 00:00:00|
|              2026-02-01 00:00:00|
|              2026-02-01 00:00:00|
|              2026-02-01 00:00:00|
|              2026-02-01 00:00:00|
|              2026-02-01 00:00:00|
+---------------------------------+



In [20]:
spark.sql('''

WITH last_month_tx AS (
    SELECT
        client_id,
        operation_dt,
        amount
    FROM transactions
    WHERE operation_dt >= date_trunc('month', add_months(current_date(), -1))
      AND operation_dt <  date_trunc('month', current_date())
),
top_clients AS (
    SELECT
        client_id
    FROM last_month_tx
    GROUP BY client_id
    ORDER BY SUM(amount) DESC
    LIMIT 10
),
ranked_tx AS (
    SELECT
        t.client_id,
        t.operation_dt,
        t.amount,
        ROW_NUMBER() OVER (
            PARTITION BY t.client_id
            ORDER BY t.operation_dt DESC
        ) AS rn
    FROM last_month_tx t
    JOIN top_clients tc
      ON t.client_id = tc.client_id
)
SELECT
    client_id,
    operation_dt,
    amount
FROM ranked_tx
WHERE rn <= 3;

''').show()

+---------+-------------------+------+
|client_id|       operation_dt|amount|
+---------+-------------------+------+
|        6|2026-01-05 10:10:00|  2700|
|        7|2026-01-05 10:10:00|  5700|
|        8|2026-01-05 10:10:00|  7700|
|        9|2026-01-05 10:10:00|  8700|
|       10|2026-01-05 10:10:00|  2700|
|       11|2026-01-05 10:10:00|    70|
|       12|2026-01-05 10:10:00|     7|
|       13|2026-01-05 10:10:00|    27|
+---------+-------------------+------+



In [22]:
spark.sql('''
CREATE TABLE daily_client_balance (
    client_id INTEGER,
    balance_date STRING,
    balance_amount DECIMAL,
    load_dt STRING
) USING PARQUET;
''').show()

++
||
++
++



In [23]:
spark.sql('''
INSERT INTO daily_client_balance (client_id, balance_date, balance_amount, load_dt) VALUES
(1, '2025-01-01', 80.00, '2025-01-02'),
(2, '2025-01-01', 300.00, '2025-01-02'),
(3, '2025-01-01', 500.00, '2025-01-02');
''').show()

++
||
++
++



In [24]:
spark.sql('''
SELECT
    d.balance_date,
    SUM(d.balance_amount)      AS balance_sum,
    SUM(t.amount)              AS transactions_sum,
    SUM(d.balance_amount) - SUM(t.amount) AS diff
FROM daily_client_balance d
JOIN transactions t
  ON d.client_id = t.client_id
 AND DATE(t.operation_dt) = DATE(d.balance_date)
-- WHERE DATE(d.balance_date) = '2025-01-01'
GROUP BY d.client_id, d.balance_date;
''').show()

+------------+-----------+----------------+----+
|balance_date|balance_sum|transactions_sum|diff|
+------------+-----------+----------------+----+
|  2025-01-01|        160|              80|  80|
|  2025-01-01|        300|             300|   0|
|  2025-01-01|        500|             500|   0|
+------------+-----------+----------------+----+



In [None]:

spark.stop()