In [1]:
import platform, socket, os

print("platform:", platform.platform())
print("hostname:", socket.gethostname())
print("cwd:", os.getcwd())

platform: Linux-6.6.87.2-microsoft-standard-WSL2-x86_64-with-glibc2.35
hostname: e1aa0d4ec9d0
cwd: /home/jovyan/work


In [2]:
from pyspark.sql import SparkSession 
#проверяем есть ли spark в образе jupyter. Если есть - запускаем, если нет - создаем локально
try:
    spark
    print("Using existing SparkSession ✅", spark.version)
except NameError:
    spark = SparkSession.builder.master("local[*]").appName("features-from-hdfs").getOrCreate() #Почему local[*]? Это самый стабильный старт в ноутбуке.
    print("Created SparkSession ✅", spark.version)

Created SparkSession ✅ 3.5.0


In [3]:
#важное: указываем правильный HDFS
spark.sparkContext._jsc.hadoopConfiguration().set("fs.defaultFS", "hdfs://namenode:8020")

In [4]:
#путь до файла с сырыми данными
raw_path = "hdfs://namenode:8020/raw/vacancies/vacancies_raw.json"

In [5]:
df = spark.read.option("multiLine", True).json(raw_path)

In [6]:
df.show(truncate=False)

+------------+------+------+---------------------+------------------+----------+
|published_at|region|salary|skills               |title             |vacancy_id|
+------------+------+------+---------------------+------------------+----------+
|2026-01-31  |Москва|250000|[python, ml, spark]  |Python ML Engineer|1         |
|2026-01-31  |Москва|200000|[sql, spark, airflow]|Data Scientist    |2         |
+------------+------+------+---------------------+------------------+----------+



In [9]:
from pyspark.sql import functions as F

exploded = (
    df
    .withColumn("published_date", F.to_date("published_at")) #нормализую дату для агрегаций
    .withColumn("skill", F.explode("skills")) #денормализация массива
)

exploded.createOrReplaceTempView("vacancies_exploded") #создаю sql-таблицу

In [10]:
#агрегация в sql (делаем sql-витрину, где по дате и региону имеем: 1) сколько вакансий с этим skill, 2) средняя зп по этому skill)
features = spark.sql("""
SELECT
  published_date,
  region,
  skill,
  COUNT(*) AS vacancies_cnt,
  AVG(salary) AS avg_salary
FROM vacancies_exploded
GROUP BY published_date, region, skill
""")

In [13]:
features.show(truncate=False) #truncate выравнивает ячейки

+--------------+------+-------+-------------+----------+
|published_date|region|skill  |vacancies_cnt|avg_salary|
+--------------+------+-------+-------------+----------+
|2026-01-31    |Москва|ml     |1            |250000.0  |
|2026-01-31    |Москва|spark  |2            |225000.0  |
|2026-01-31    |Москва|airflow|1            |200000.0  |
|2026-01-31    |Москва|python |1            |250000.0  |
|2026-01-31    |Москва|sql    |1            |200000.0  |
+--------------+------+-------+-------------+----------+



In [18]:
out_path = "hdfs://namenode:8020/features/vacancies_daily_skill"
features.write.mode("overwrite").parquet(out_path) #сохраняю витрину в hdfs
out_path

'hdfs://namenode:8020/features/vacancies_daily_skill'

In [19]:
check = spark.read.parquet(out_path) #чтение parquet (контрольная проверка)
check.show(truncate=False)

+--------------+------+-------+-------------+----------+
|published_date|region|skill  |vacancies_cnt|avg_salary|
+--------------+------+-------+-------------+----------+
|2026-01-31    |Москва|ml     |1            |250000.0  |
|2026-01-31    |Москва|spark  |2            |225000.0  |
|2026-01-31    |Москва|airflow|1            |200000.0  |
|2026-01-31    |Москва|python |1            |250000.0  |
|2026-01-31    |Москва|sql    |1            |200000.0  |
+--------------+------+-------+-------------+----------+

