In [94]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import StructType, StringType, IntegerType, FloatType
from pyspark.sql.window import Window

# Create a SparkSession
spark = SparkSession.builder \
.appName("ReadExcelWithHeader") \
.config("spark.jars.packages", "com.crealytics:spark-excel_2.12:0.13.5") \
.getOrCreate()

excel_file_path = "sampleCur.xlsx"

In [95]:
df = spark.read \
.format("com.crealytics.spark.excel") \
.option("header", "true") \
.option("inferSchema", "true") \
.option("dataAddress", "'0'!A7") \
.load(excel_file_path)

# Show the DataFrame
df.show()

+----+---------------------+----------------------------------+--------------------+------------------------------+----+---+--------------------+-----+----------------------------+------------------------------------------------+------------------+
|   #|Number of Generations|Club Member (First name/Last name)|                 _c3|CNV (Cumulated Network Volume)|Rank|  P|PV (Personal Volume)|  PVG|MCR (Maximum Confirmed Rank)|Number of months with zero Volume Points accrued|Club Member Status|
+----+---------------------+----------------------------------+--------------------+------------------------------+----+---+--------------------+-----+----------------------------+------------------------------------------------+------------------+
| 1.0|                  .0 |                         6111685.0|         Галюк Елена|                         > 500|   E|   |                 100|  230|                         E/1|                                                |                  |
| 2.

In [99]:
from pyspark.sql import functions as F

# @udf(returnType=IntegerType())
# def parseGen(dotted: str) -> int:
#     return int(dotted.split(".")[-1])

excel2Field = {}
excel2Field["#"] = "rownum"
excel2Field["Number of Generations"] = "gen"
excel2Field["Club Member (First name/Last name)"] = "Id"
excel2Field["_c3"] = "Name"
excel2Field["CNV (Cumulated Network Volume)"] = "cnv"
excel2Field["Rank"] = "Rank"
excel2Field["P"] = "Pending"
excel2Field["PV (Personal Volume)"] = "Personal"
excel2Field["PVG"] = "Group"
excel2Field["MCR (Maximum Confirmed Rank)"] = "maxrank"
excel2Field["Number of months with zero Volume Points accrued"] = "zeroMonths"
excel2Field["Club Member Status"] = "status"

df_remapColumns = df.withColumnsRenamed(excel2Field)
df_dropRownum = df_remapColumns.drop(df_remapColumns.rownum)
df_recastId = df_dropRownum.withColumn("Id", df_dropRownum.Id.cast(IntegerType()))

df_recastPersonal: DataFrame = df_recastId.withColumn("Personal", df_dropRownum.Personal.cast(IntegerType()))
df_nonNullPersonal = df_recastPersonal.withColumn("Personal", F.when(F.isnotnull(df_recastPersonal.Personal), df_recastPersonal.Personal).otherwise(0))

parseGen = udf(lambda dotted:  int(dotted.split(".")[-1]), IntegerType()).asNondeterministic()
df_digitizeGen = df_nonNullPersonal.withColumn("gen", parseGen(df_nonNullPersonal.gen))

df_digitizeGen.show()


+---+-------+--------------------+-----+----+-------+--------+-----+-------+----------+------+
|gen|     Id|                Name|  cnv|Rank|Pending|Personal|Group|maxrank|zeroMonths|status|
+---+-------+--------------------+-----+----+-------+--------+-----+-------+----------+------+
|  0|6111685|         Галюк Елена|> 500|   E|       |     100|  230|    E/1|          |      |
|  1|6190205|       Галюк Дмитрий|> 500|   D|       |       0|     |  SD/28|         1|     m|
|  2|6202838| Tabolich Anastasiya|> 500|  PM|       |       0|     |       |        12|     m|
|  2|6866923|Фалавандишвили Натия|> 500|  PM|       |       0|    7|   D/28|         7|     m|
|  3|6959087|    Мурванидзе Луиза|> 500|  PM|       |       7|     |       |          |     m|
|  4|7309266|  Murvanidze Violeta|135.5|    |       |       0|     |       |         3|     m|
|  3|7575828|Фалавандишвили Ти...|   35|    |       |       0|     |       |         7|     m|
|  2|7688719|       Konidala Anna|> 500|  PM|     

In [101]:
# @udf(returnType=IntegerType())
def id_gen_2_parent(id, gen) -> int:
    id_gen_2_parent.levelId[gen] = id
    return id_gen_2_parent.levelId[gen-1] if gen > 0 else None
id_gen_2_parent.levelId = {}

id_gen_2_parent_udf = udf(id_gen_2_parent, IntegerType()).asNondeterministic()
df_withParent: DataFrame = df_digitizeGen.withColumn("ParentId", id_gen_2_parent_udf(df_digitizeGen.Id, df_digitizeGen.gen))
df_withParent.show()

+---+-------+--------------------+-----+----+-------+--------+-----+-------+----------+------+--------+
|gen|     Id|                Name|  cnv|Rank|Pending|Personal|Group|maxrank|zeroMonths|status|ParentId|
+---+-------+--------------------+-----+----+-------+--------+-----+-------+----------+------+--------+
|  0|6111685|         Галюк Елена|> 500|   E|       |     100|  230|    E/1|          |      |    NULL|
|  1|6190205|       Галюк Дмитрий|> 500|   D|       |       0|     |  SD/28|         1|     m| 6111685|
|  2|6202838| Tabolich Anastasiya|> 500|  PM|       |       0|     |       |        12|     m| 6190205|
|  2|6866923|Фалавандишвили Натия|> 500|  PM|       |       0|    7|   D/28|         7|     m| 6190205|
|  3|6959087|    Мурванидзе Луиза|> 500|  PM|       |       7|     |       |          |     m| 6866923|
|  4|7309266|  Murvanidze Violeta|135.5|    |       |       0|     |       |         3|     m| 6959087|
|  3|7575828|Фалавандишвили Ти...|   35|    |       |       0|  

In [102]:
# df_persist = df_withParent.cache().show()
df_summed = df_withParent.groupBy("ParentId").count().show()
# df_withOveral: DataFrame = df_withParent.join(df_summed, df_withParent.Id==df_summed.ParentId).show()

24/05/01 01:07:34 ERROR Executor: Exception in task 3.0 in stage 83.0 (TID 345)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/var/folders/f2/gqjlvqbd2d16pr5dbzwpt1cw0000gp/T/ipykernel_65190/66415511.py", line 4, in id_gen_2_parent
KeyError: 3

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:572)
	at org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:94)
	at org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:75)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.

PythonException: 
  An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
  File "/var/folders/f2/gqjlvqbd2d16pr5dbzwpt1cw0000gp/T/ipykernel_65190/66415511.py", line 4, in id_gen_2_parent
KeyError: 3
