In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, split, regexp_replace, trim, length, when

# Создание Spark сессии с указанием зависимости и параметрами памяти
spark = SparkSession.builder \
    .appName("Excel to PySpark") \
    .config("spark.jars.packages", "com.crealytics:spark-excel_2.12:0.14.0") \
    .config("spark.driver.memory", "2g") \
    .config("spark.executor.memory", "4g") \
    .getOrCreate()

print("Активные Spark сессии:", spark.sparkContext.uiWebUrl)

:: loading settings :: url = jar:file:/opt/conda/lib/python3.11/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/jovyan/.ivy2/cache
The jars for the packages stored in: /home/jovyan/.ivy2/jars
com.crealytics#spark-excel_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-a99afd07-45b7-4d99-919d-f2cd25eb8dc5;1.0
	confs: [default]
	found com.crealytics#spark-excel_2.12;0.14.0 in central
	found org.apache.poi#poi;4.1.2 in central
	found commons-codec#commons-codec;1.13 in central
	found org.apache.commons#commons-collections4;4.4 in central
	found org.apache.commons#commons-math3;3.6.1 in central
	found com.zaxxer#SparseBitSet;1.2 in central
	found org.apache.poi#poi-ooxml;4.1.2 in central
	found org.apache.poi#poi-ooxml-schemas;4.1.2 in central
	found org.apache.xmlbeans#xmlbeans;3.1.0 in central
	found com.github.virtuald#curvesapi;1.06 in central
	found com.norbitltd#spoiwo_2.12;1.8.0 in central
	found org.scala-lang.modules#scala-xml_2.12;1.3.0 in central
	found com.github.pjfanning#excel-streaming-reader;2.3.6 in central
	

Активные Spark сессии: http://7476e7a1d8f9:4040


In [2]:
# Загрузка первого листа из Excel
df = spark.read.format("com.crealytics.spark.excel") \
    .option("header", "true") \
    .option("dataAddress", "Данные!A3") \
    .load("/home/jovyan/PySpark_test/config/Численность выбывших.xls")

In [3]:
df = df.withColumnRenamed("Классификатор объектов административно-территориального деления (ОКАТО)", "region") \
       .withColumnRenamed("Классификатор стран мира ", "world") \
       .withColumnRenamed("Единица измерения", "unit_name") \
       .withColumnRenamed("Период", "period_name") \
       .withColumnRenamed("Потоки миграции", "migration")

# Обработка region
df = df.withColumn("region", trim(col("region"))) \
       .withColumn("region_okato", split(col("region"), " ").getItem(0)) \
       .withColumn("region", split(col("region"), " ").getItem(1)) \
       .filter(length(col("region_okato")) == 11)

# Обработка world
df = df.withColumn("world", trim(col("world"))) \
       .withColumn("world_okato", split(col("world"), " ").getItem(0)) \
       .withColumn("world", split(col("world"), " ").getItem(1)) \
       .withColumn("world_okato", when(split(col("world_okato"), ":").getItem(1).isNotNull(), split(col("world_okato"), ":").getItem(1)).otherwise(col("world_okato")))

# Обработка unit_name, period_name и migration
df = df.withColumn("unit_name", trim(regexp_replace(col("unit_name"), "\\d+", ""))) \
       .withColumn("period_name", trim(regexp_replace(col("period_name"), "\\d+", ""))) \
       .withColumn("migration", trim(regexp_replace(col("migration"), "\\d+", "")))

months = ['январь', 'февраль', 'март', 'апрель', 'май', 'июнь', 'июль', 'август', 'сентябрь', 'октябрь', 'ноябрь', 'декабрь']
df = df.filter(col("period_name").isin(months))

# Мелтинг данных
cols_to_melt = [str(year) for year in range(1993, 2024)]
cols_to_keep = [col for col in df.columns if col not in cols_to_melt]

from pyspark.sql import functions as F

df_melted = df.select(*cols_to_keep, *[F.expr('stack(31, ' + ', '.join([f"'{year}', `{year}`" for year in cols_to_melt]) + ') as (years, leaving)')])

from pyspark.sql.functions import monotonically_increasing_id

# Добавление индекса в DataFrame
df_melted = df_melted.withColumn("index", monotonically_increasing_id())

# Подготовка итогового DataFrame
df_melted = df_melted.withColumn("leaving", when(col("leaving").isNull(), 0).otherwise(col("leaving"))) \
                     .withColumn("years", col("years").cast("int")) \
                     .orderBy("index", "years")

df_melted = df_melted.select("region", "region_okato", "world", "world_okato", "unit_name", "period_name", "migration", "years", "leaving")

In [4]:
# Сохранение итогового DataFrame в CSV
output_path = "/home/jovyan/PySpark_test/config/result.csv"
df_melted.coalesce(1).write.csv(output_path, header=True, mode="overwrite")

24/07/29 14:30:29 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
24/07/29 14:30:33 WARN TaskSetManager: Stage 0 contains a task of very large size (3324 KiB). The maximum recommended task size is 1000 KiB.
24/07/29 14:30:42 WARN TaskSetManager: Stage 1 contains a task of very large size (3324 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

In [5]:
# Закрытие Spark сессии
spark.stop()