# <a id='toc1_'></a>[PySparkのTips](#toc0_)

**Table of contents**<a id='toc0_'></a>    
- [PySparkのTips](#toc1_)    
- [特定カラムのユニークな値をリストとしてすべて取得](#toc2_)    
- [joinの結合条件にcontainsを用いる](#toc3_)    
- [Dataframeの任意の行番号範囲を取り出す](#toc4_)    
- [pyspark.sql.functions.coalesceの挙動](#toc5_)    
- [Nullの置換](#toc6_)    
  - [pyspark.sql.DataFrame.fillnaによる置換](#toc6_1_)    
  - [coalesceによる置換](#toc6_2_)    
- [FULL OUTER JOIN(外部結合の注意点)](#toc7_)    
- [主キー候補を機械的に抽出する](#toc8_)    
  - [考え方](#toc8_1_)    
- [時間帯の重複箇所を識別](#toc9_)    

<!-- vscode-jupyter-toc-config
	numbering=false
	anchor=true
	flat=false
	minLevel=1
	maxLevel=6
	/vscode-jupyter-toc-config -->
<!-- THIS CELL WILL BE REPLACED ON TOC UPDATE. DO NOT WRITE YOUR TEXT IN THIS CELL -->

細かいTips、テクニックをまとめる。

In [1]:
from glob import glob
from datetime import datetime

import polars as pl
from pyspark.sql import SparkSession, Window
from pyspark.sql.functions import col
from pyspark.sql import functions as F
from pyspark.sql.types import StringType, TimestampType, StructType, StructField
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
from pyspark.ml.functions import vector_to_array
import numpy as np

# Create a SparkSession。pythonからsparkを使う場合、セッションの作成が必要。
spark = SparkSession.builder.appName("Testing PySpark Example").getOrCreate()

# デフォルトのログレベルだと大量にログが出力されるので限定する。
spark.sparkContext.setLogLevel("ERROR")

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/01/01 11:11:22 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [None]:
# 各データ読み込み
df_receipt = spark.read.parquet("../../../100knocks-preprocess/docker/work/data/receipt.parquet")

# 店舗データ
df_store = spark.read.parquet("../../../100knocks-preprocess/docker/work/data/store.parquet")

# 顧客データ
df_customer = spark.read.parquet("../../../100knocks-preprocess/docker/work/data/customer.parquet")

# 製品データ
df_product = spark.read.parquet("../../../100knocks-preprocess/docker/work/data/product.parquet")

# 製品データ
df_category = spark.read.parquet("../../../100knocks-preprocess/docker/work/data/category.parquet")

                                                                                

# <a id='toc2_'></a>[特定カラムのユニークな値をリストとしてすべて取得](#toc0_)
下記のような方法がある。どちらにせよめんどくさい。

In [4]:
# collectの結果の各値は対象カラムをキーとする辞書のような形で取得できる
[v["gender_cd"] for v in df_customer.select("gender_cd").distinct().collect()]

                                                                                

['0', '9', '1']

In [5]:
df_customer.select("gender_cd").dropDuplicates().rdd.flatMap(lambda x: x).collect()

                                                                                

['0', '9', '1']

In [None]:
# Dataframeとして取得したいなら.distinctでOK
df_customer.select("gender_cd").distinct().show()

+---------+
|gender_cd|
+---------+
|        0|
|        9|
|        1|
+---------+



# <a id='toc3_'></a>[joinの結合条件にcontainsを用いる](#toc0_)
結構便利。left joinで左のカラムに右のカラムの値が含まれている行に対して結合したい場合などに使える。

In [29]:
# サンプルデータの作成
data_a = [
    (1, "This is a sample message"),
    (2, "Another example message"),
    (3, "Message with a keyword"),
    (4, "No match here"),
    (5, "samplesample"),
    (6, "sample keyword"),
]
columns_a = ["id", "message"]
df_a = spark.createDataFrame(data_a, columns_a)

data_b = [
    (1, "sample", "MSG001"),
    (2, "example", "MSG002"),
    (3, "keyword", "MSG003")
]
columns_b = ["id" ,"message_key", "MSG_No"]
df_b = spark.createDataFrame(data_b, columns_b)


In [30]:
df_a.show()

+---+--------------------+
| id|             message|
+---+--------------------+
|  1|This is a sample ...|
|  2|Another example m...|
|  3|Message with a ke...|
|  4|       No match here|
|  5|        samplesample|
|  6|      sample keyword|
+---+--------------------+



In [31]:
df_b.show()

+---+-----------+------+
| id|message_key|MSG_No|
+---+-----------+------+
|  1|     sample|MSG001|
|  2|    example|MSG002|
|  3|    keyword|MSG003|
+---+-----------+------+



In [32]:
df_a.join(
    df_b,
    # 結合条件にcontainsを使用。messageにmessage_keyが含まれていれば紐づける
    F.contains(df_a.message, df_b.message_key),
    "left"
).show()

+---+--------------------+----+-----------+------+
| id|             message|  id|message_key|MSG_No|
+---+--------------------+----+-----------+------+
|  1|This is a sample ...|   1|     sample|MSG001|
|  2|Another example m...|   2|    example|MSG002|
|  3|Message with a ke...|   3|    keyword|MSG003|
|  4|       No match here|NULL|       NULL|  NULL|
|  5|        samplesample|   1|     sample|MSG001|
|  6|      sample keyword|   1|     sample|MSG001|
|  6|      sample keyword|   3|    keyword|MSG003|
+---+--------------------+----+-----------+------+



"sample keyword"のように２つの単語に紐づく場合は2行紐づく。

# <a id='toc4_'></a>[Dataframeの任意の行番号範囲を取り出す](#toc0_)

In [None]:
dataset = df_receipt.withColumn(
    "sales_month",
    F.col("sales_ymd").substr(0, 6) # yyyyMMdd -> yyyyMM形式にする
).groupBy("sales_month").agg(   # 月次ごとに集計
    F.sum("amount").alias("total_amount")
).withColumn(
    "row_num",
    F.row_number().over(Window.orderBy("sales_month")) - 1
)

dataset.show(18)

+-----------+------------+-------+
|sales_month|total_amount|row_num|
+-----------+------------+-------+
|     201701|      902056|      0|
|     201702|      764413|      1|
|     201703|      962945|      2|
|     201704|      847566|      3|
|     201705|      884010|      4|
|     201706|      894242|      5|
|     201707|      959205|      6|
|     201708|      954836|      7|
|     201709|      902037|      8|
|     201710|      905739|      9|
|     201711|      932157|     10|
|     201712|      939654|     11|
|     201801|      944509|     12|
|     201802|      864128|     13|
|     201803|      946588|     14|
|     201804|      937099|     15|
|     201805|     1004438|     16|
|     201806|     1012329|     17|
+-----------+------------+-------+
only showing top 18 rows



※PySparkのDataframeの部分行だけ取り出すには上記のようWindow関数を使って行番号を振るしかなさそう。  
  メモリに読み込まずに先頭から指定したレコード数だけ取り出す場合はlimitが使えるが、どこからどこまで取り出すという指定はできない。

In [None]:
train_size = 12
val_size = 6
offset = 6 # 次のtrainをどこから始めるか

In [None]:
train_data = []
val_data = []

# 12か月ごとに学習データ、6か月ごとに検証データを定義する
for i in range(3):
    train_start = offset * i
    train_end = train_start + train_size - 1
    val_start = train_end + 1
    val_end = val_start + offset - 1
    train_data.append(dataset.filter(F.col("row_num").between(train_start, train_end)))
    val_data.append(dataset.filter(F.col("row_num").between(val_start, val_end)))

# <a id='toc5_'></a>[pyspark.sql.functions.coalesceの挙動](#toc0_)
名前から何をするのかわかりづらいので、挙動をおさらいしておく。  
coalesceは”合体する”といった意味の英単語で、引数を左端から調べて最初に現れた非NULL値を返す関数である。  
PySparkだけでなくSQLの文法にもある。  

[pyspark.sql.functions.coalesceのドキュメント](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.coalesce.html?highlight=coale#pyspark.sql.functions.coalesce)

In [3]:
df = spark.createDataFrame([
    (None, None, None),
    (1, None, None),
    (None, 2, None),
    (None, None, 3),
    (None, 4, 5),
], schema='a long, b long, c long')
df.show()

                                                                                

+----+----+----+
|   a|   b|   c|
+----+----+----+
|NULL|NULL|NULL|
|   1|NULL|NULL|
|NULL|   2|NULL|
|NULL|NULL|   3|
|NULL|   4|   5|
+----+----+----+



In [17]:
# 2つのカラムをcoalesceする
df.select(F.coalesce("a", "b")).show()

+--------------+
|coalesce(a, b)|
+--------------+
|          NULL|
|             1|
|             2|
|          NULL|
|             4|
+--------------+



In [16]:
# 3つのカラムをcoalesceする
df.select(F.coalesce("a", "b", "c")).show()

+-----------------+
|coalesce(a, b, c)|
+-----------------+
|             NULL|
|                1|
|                2|
|                3|
|                4|
+-----------------+



上記のように各行を左のカラムから右のカラムへ（a->b）調べていき、  
最初にヒットしたNullでない値を採用して一つのカラムを返す。  
全カラムNullの場合は単にNullになる。

# <a id='toc6_'></a>[Nullの置換](#toc0_)

## <a id='toc6_1_'></a>[pyspark.sql.DataFrame.fillnaによる置換](#toc0_)
王道

In [18]:
df.show()

+----+----+----+
|   a|   b|   c|
+----+----+----+
|NULL|NULL|NULL|
|   1|NULL|NULL|
|NULL|   2|NULL|
|NULL|NULL|   3|
|NULL|   4|   5|
+----+----+----+



In [None]:
# 複数列のNullを置換
df.fillna({"a": 0, "b": 999}).show()

+---+---+----+
|  a|  b|   c|
+---+---+----+
|  0|999|NULL|
|  1|999|NULL|
|  0|  2|NULL|
|  0|999|   3|
|  0|  4|   5|
+---+---+----+



## <a id='toc6_2_'></a>[coalesceによる置換](#toc0_)

In [24]:
df.show()

+----+----+----+
|   a|   b|   c|
+----+----+----+
|NULL|NULL|NULL|
|   1|NULL|NULL|
|NULL|   2|NULL|
|NULL|NULL|   3|
|NULL|   4|   5|
+----+----+----+



In [4]:
expr = {}
for col in df.columns:
    expr[col] = F.coalesce(col, F.lit(0))

In [5]:
df.withColumns(
    expr
).show()

+---+---+---+
|  a|  b|  c|
+---+---+---+
|  0|  0|  0|
|  1|  0|  0|
|  0|  2|  0|
|  0|  0|  3|
|  0|  4|  5|
+---+---+---+



0の定数カラムとcoalesceするところがポイント。  
複数行にやろうとすると面倒。  
他にもF.whenを使用する方法があるが、当然なので割愛。

# <a id='toc7_'></a>[FULL OUTER JOIN(外部結合の注意点)](#toc0_)
PySparkに限った話ではないが、JOIN後に残すカラムは慎重に選ぶべき。  
例えば、テーブルAの主キーカラムAとテーブルBの主キーカラムBについて、FULL JOINで  
カラムA==カラムBを条件としてJoinする場合、  
Join後に残す主キーをカラムAだけとした場合、カラムBにしかなかった値はNULLになってしまう。  
カラムBにあった値もJOIN後の主キーに残すためには追加の処理が必要。  
JOINの仕組みを考えれば当たり前だが、注意すること。

In [2]:
# テーブルAのサンプルデータの作成
data_a = [
    (1, "Alice"),
    (2, "Bob"),
    (3, "Charlie")
]
columns_a = ["id_a", "name_a"]
df_a = spark.createDataFrame(data_a, columns_a)

# テーブルBのサンプルデータの作成
data_b = [
    (3, "David"),
    (4, "Eve"),
    (5, "Frank")
]
columns_b = ["id_b", "name_b"]
df_b = spark.createDataFrame(data_b, columns_b)

In [4]:
# 残すカラムを指定しない場合
df_a.join(
    df_b, 
    df_a.id_a == df_b.id_b, 
    "full_outer"
).show()



+----+-------+----+------+
|id_a| name_a|id_b|name_b|
+----+-------+----+------+
|   1|  Alice|NULL|  NULL|
|   2|    Bob|NULL|  NULL|
|   3|Charlie|   3| David|
|NULL|   NULL|   4|   Eve|
|NULL|   NULL|   5| Frank|
+----+-------+----+------+



                                                                                

In [5]:
# 主キーとしてid_aだけ残す
df_a.join(
    df_b, 
    df_a.id_a == df_b.id_b, 
    "full_outer"
).select(
    "id_a", "name_a", "name_b"
).show()



+----+-------+------+
|id_a| name_a|name_b|
+----+-------+------+
|   1|  Alice|  NULL|
|   2|    Bob|  NULL|
|   3|Charlie| David|
|NULL|   NULL|   Eve|
|NULL|   NULL| Frank|
+----+-------+------+



                                                                                

思いとして、id_bに相当するname_bはあるので、id_aにid_bの値も残っててくれたらありがたいが、  
当然そうはならない。対処としては下記のようにid_a, id_bをcoalesceに指定して、NULLを埋めて結合する方法と、  
whenを使用する方法がある。

In [7]:
# coalesceを使う方法
df_a.join(
    df_b, 
    df_a.id_a == df_b.id_b, 
    "full_outer"
).withColumn(
    "id",
    F.coalesce("id_a", "id_b")
).select(
    "id", "name_a", "name_b"
).show()



+---+-------+------+
| id| name_a|name_b|
+---+-------+------+
|  1|  Alice|  NULL|
|  2|    Bob|  NULL|
|  3|Charlie| David|
|  4|   NULL|   Eve|
|  5|   NULL| Frank|
+---+-------+------+



                                                                                

In [None]:
# whenを使う方法
df_a.join(
    df_b, 
    df_a.id_a == df_b.id_b, 
    "full_outer"
).withColumn(
    "id",
    F.when(F.col("id_a").isNull(), F.col("id_b")).otherwise(F.col("id_a"))
).select(
    "id", "name_a", "name_b"
).show()



+---+-------+------+
| id| name_a|name_b|
+---+-------+------+
|  1|  Alice|  NULL|
|  2|    Bob|  NULL|
|  3|Charlie| David|
|  4|   NULL|   Eve|
|  5|   NULL| Frank|
+---+-------+------+



                                                                                

# <a id='toc8_'></a>[主キー候補を機械的に抽出する](#toc0_)
数百のような大量のカラムを持つデータがあり、  
かつ主キー（全行について一意かつNullの列）が不明といったダーティデータの主キー候補を機械的に抽出する方法を考える。

In [None]:
# サンプルデータの作成
data = [
    (1, "Alice", "C", "001", 170, "apple", "AAA", "BBB", "CCC"),
    (2, "Bob", "A", "001", 170, "orange", "BBB", "CCC", "BBB"),
    (3, None, "B", "001", 156, "orange", "CCC", "CCC", "BBB"),
    (None, "Mike", "B", "002", 160, "orange", "AAA", "", "AAA"),
    (5, "Alice", "A", "002", 156, "apple", "AAA", "AAA", "BBB"),
    (6, "John", "A", "003", 170, "lemon", "CCC", "AAA", "CCC"),

]
cols = ["id", "name", "Class", "student_number", "height", "favorite_fruits", "col_A", "col_B", "col_C"]
df = spark.createDataFrame(data, cols)


## <a id='toc8_1_'></a>[考え方](#toc0_)
1. 文字列型のカラムのみを対象にする
2. Nullを含まないカラムを対象とする
3. 対象カラムをユニーク件数が多い順に並び替える
4. 対象カラムから一つずつカラムを取り出し、ユニーク件数が行数と一致するか判定する
5. 一致しない場合、別のカラムを一つ選び複合キーを作成し、新たな主キー候補とする
6. 一致するまで1.~2.を繰り返す

総当たりで複合キーを作成すると、カラム数が大量のときに計算量が膨大になるので避ける。  
Nullを含まない、かつユニーク件数が多いカラムを軸にどんどんユニーク件数が多くなるように複合キーを作成していくイメージ。  


In [67]:
# 主キーの候補になり得るのは基本的に文字列型のため、文字列型のカラムのみを対象とする。
string_cols = [
    field.name for field in df.schema.fields
    if isinstance(field.dataType, (StringType))
]

In [68]:
string_cols

['name',
 'Class',
 'student_number',
 'favorite_fruits',
 'col_A',
 'col_B',
 'col_C']

In [69]:
# Nullを含まない列の抽出
cols_not_contain_nulls = []

for col in string_cols:
    if df.filter(F.col(col).isNull()).count() == 0:
        cols_not_contain_nulls.append(col)

In [70]:
# 全てのカラムのユニーク件数を算出
unique_nums_dict = {}

for col in cols_not_contain_nulls:
    unique_nums_dict[col] = df.select(col).distinct().count()


# 効率化のため、ユニーク件数が多い順にカラムを並べておく。
target_cols = sorted(unique_nums_dict, key=unique_nums_dict.get, reverse=True)
target_cols

['col_B', 'Class', 'student_number', 'favorite_fruits', 'col_A', 'col_C']

In [72]:
# 複合キーの最大サイズ（=複合キーに採用した最大カラム数）
# max_composite_key_len = 3
pk_candidate = []

for col in target_cols:
    if df.select(pk_candidate).distinct().count() == df.count():
        break
    else:
        pk_candidate.append(col)
    

In [73]:
pk_candidate

['col_B', 'Class', 'student_number']

上記の処理で大雑把に主キー候補を抽出することは出来る。  
ただし、col_Bのようなあまり意味がなさそうなカラムも複合キーに含まれ、  
残りのClassとstudent_numberだけで複合キーとしては十分な場合も抽出されてしまう。  
なので、実作業ではカラムの意味も踏まえつつ有用なカラムだけを残して複合キーを作成すること。  

# <a id='toc9_'></a>[時間帯の重複箇所を識別](#toc0_)

下記のような時間帯A～Dがあったとき、  
重複している時間（重複開始時刻～重複終了時刻）、各重複に関わる時間帯、その時間帯総数を求めたい場合を考える。  
* 時間帯A: 2024年12月29日 9:00-17:00  
* 時間帯B: 2024年12月29日 12:00-23:00  
* 時間帯C: 2024年12月29日 10:00-13:00  
* 時間帯D: 2024年12月29日 12:30-16:00  

ただし、各重複に関わる時間帯の総数が最も小さくなるように重複時間を分けることとする。  
例えば、時間帯Aと時間帯Cは10:00-13:00で重複するが、12:00から時間帯Bとも重複し始める。  
この場合、時間帯Aと時間帯Cは10:00-12:00で重複しているものとし、重複している時間帯の総数は2として数える。  
12:00-12:30では時間帯A,B,Cが重複しており、重複している時間帯の総数は3となる。

In [23]:
# 時間帯データ
data = [
    ("A", datetime.strptime("2024-12-29 09:00:00", "%Y-%m-%d %H:%M:%S"), datetime.strptime("2024-12-29 17:00:00", "%Y-%m-%d %H:%M:%S")),
    ("B", datetime.strptime("2024-12-29 12:00:00", "%Y-%m-%d %H:%M:%S"), datetime.strptime("2024-12-29 23:00:00", "%Y-%m-%d %H:%M:%S")),
    ("C", datetime.strptime("2024-12-29 10:00:00", "%Y-%m-%d %H:%M:%S"), datetime.strptime("2024-12-29 13:00:00", "%Y-%m-%d %H:%M:%S")),
    ("D", datetime.strptime("2024-12-29 12:30:00", "%Y-%m-%d %H:%M:%S"), datetime.strptime("2024-12-29 16:00:00", "%Y-%m-%d %H:%M:%S")),
    # ("A", datetime.strptime("2024-12-29 09:00:00", "%Y-%m-%d %H:%M:%S"), datetime.strptime("2024-12-29 9:15:00", "%Y-%m-%d %H:%M:%S")),
    # ("B", datetime.strptime("2024-12-29 09:05:00", "%Y-%m-%d %H:%M:%S"), datetime.strptime("2024-12-29 9:20:00", "%Y-%m-%d %H:%M:%S")),
    # ("C", datetime.strptime("2024-12-29 09:00:00", "%Y-%m-%d %H:%M:%S"), datetime.strptime("2024-12-29 9:15:00", "%Y-%m-%d %H:%M:%S")),
]

# データスキーマの定義
schema = StructType([
    StructField("時間帯", StringType(), True),
    StructField("開始時刻", TimestampType(), True),
    StructField("終了時刻", TimestampType(), True),
])

# データをデータフレームに変換
df = spark.createDataFrame(data, schema)
df.show()

+------+-------------------+-------------------+
|時間帯|           開始時刻|           終了時刻|
+------+-------------------+-------------------+
|     A|2024-12-29 09:00:00|2024-12-29 17:00:00|
|     B|2024-12-29 12:00:00|2024-12-29 23:00:00|
|     C|2024-12-29 10:00:00|2024-12-29 13:00:00|
|     D|2024-12-29 12:30:00|2024-12-29 16:00:00|
+------+-------------------+-------------------+



In [3]:
# 境界時刻の抽出
boundaries = df.select(col("開始時刻")).union(df.select(col("終了時刻"))).distinct().orderBy("開始時刻")
boundaries.show()



+-------------------+
|           開始時刻|
+-------------------+
|2024-12-29 09:00:00|
|2024-12-29 10:00:00|
|2024-12-29 12:00:00|
|2024-12-29 12:30:00|
|2024-12-29 13:00:00|
|2024-12-29 16:00:00|
|2024-12-29 17:00:00|
|2024-12-29 23:00:00|
+-------------------+



                                                                                

重複している時刻は各時間帯の区切り（境界）となる時刻から特定できる。  
ゆえに、開始時刻と終了時刻を一つにまとめてユニーク＆昇順にしたDataframeを作成しておく。

In [4]:
# 各セグメントの作成。F.leadにより、重複開始時刻の一つあと(9:00開始なら10:00)の境界時刻を取得している
segments = boundaries.withColumnRenamed("開始時刻", "重複開始時刻") \
    .withColumn("重複終了時刻", F.lead("重複開始時刻").over(Window.orderBy("重複開始時刻")))
segments.show()

                                                                                

+-------------------+-------------------+
|       重複開始時刻|       重複終了時刻|
+-------------------+-------------------+
|2024-12-29 09:00:00|2024-12-29 10:00:00|
|2024-12-29 10:00:00|2024-12-29 12:00:00|
|2024-12-29 12:00:00|2024-12-29 12:30:00|
|2024-12-29 12:30:00|2024-12-29 13:00:00|
|2024-12-29 13:00:00|2024-12-29 16:00:00|
|2024-12-29 16:00:00|2024-12-29 17:00:00|
|2024-12-29 17:00:00|2024-12-29 23:00:00|
|2024-12-29 23:00:00|               NULL|
+-------------------+-------------------+



上記の重複開始時刻・重複終了時刻１レコードが重複時間を特定できる最小の時刻範囲（重複候補時間帯）となる。  

In [24]:
# 時間帯との重複を判定
overlap_conditions = (
    (col("開始時刻") < col("重複終了時刻")) & (col("終了時刻") > col("重複開始時刻"))
)
# whereはfilterの別名
overlap_segments = df.crossJoin(segments).where(overlap_conditions) \
    .groupBy("重複開始時刻", "重複終了時刻") \
    .agg(
        F.collect_list("時間帯").alias("重複時間帯"),
        F.size(F.collect_list("時間帯")).alias("重複時間帯の総数")
    ).filter(
        # segmentを含む時間帯が一つのものは除外
        F.col("重複時間帯の総数") > 1
    )

# 結果の表示
overlap_segments.show(truncate=False)

                                                                                

+-------------------+-------------------+------------+----------------+
|重複開始時刻       |重複終了時刻       |重複時間帯  |重複時間帯の総数|
+-------------------+-------------------+------------+----------------+
|2024-12-29 10:00:00|2024-12-29 12:00:00|[A, C]      |2               |
|2024-12-29 12:30:00|2024-12-29 13:00:00|[A, B, C, D]|4               |
|2024-12-29 12:00:00|2024-12-29 12:30:00|[A, B, C]   |3               |
|2024-12-29 13:00:00|2024-12-29 16:00:00|[A, B, D]   |3               |
|2024-12-29 16:00:00|2024-12-29 17:00:00|[A, B]      |2               |
+-------------------+-------------------+------------+----------------+



In [13]:
# 重複開始・終了時刻の差分から重複時間を求めるには下記のようにする。
# PySparkにおいて、Timestamp型同時の差分結果はDayTimeIntervalとなるが、longにキャストすれば秒を取得できる
overlap_segments.withColumn(
    "重複時間",
    (F.col("重複終了時刻") - F.col("重複開始時刻")).cast("long") / 3600
).show()

                                                                                

+-------------------+-------------------+------------+----------------+--------+
|       重複開始時刻|       重複終了時刻|  重複時間帯|重複時間帯の総数|重複時間|
+-------------------+-------------------+------------+----------------+--------+
|2024-12-29 10:00:00|2024-12-29 12:00:00|      [A, C]|               2|     2.0|
|2024-12-29 12:30:00|2024-12-29 13:00:00|[A, B, C, D]|               4|     0.5|
|2024-12-29 12:00:00|2024-12-29 12:30:00|   [A, B, C]|               3|     0.5|
|2024-12-29 13:00:00|2024-12-29 16:00:00|   [A, B, D]|               3|     3.0|
|2024-12-29 16:00:00|2024-12-29 17:00:00|      [A, B]|               2|     1.0|
+-------------------+-------------------+------------+----------------+--------+



In [59]:
# 配列が扱いずらいのであればexplodeすればよい
overlap_segments.withColumn(
    "重複時間帯",
    F.explode(F.col("重複時間帯"))
).show()

                                                                                

+-------------------+-------------------+----------+----------------+
|       重複開始時刻|       重複終了時刻|重複時間帯|重複時間帯の総数|
+-------------------+-------------------+----------+----------------+
|2024-12-29 10:00:00|2024-12-29 12:00:00|         A|               2|
|2024-12-29 10:00:00|2024-12-29 12:00:00|         C|               2|
|2024-12-29 12:30:00|2024-12-29 13:00:00|         A|               4|
|2024-12-29 12:30:00|2024-12-29 13:00:00|         B|               4|
|2024-12-29 12:30:00|2024-12-29 13:00:00|         C|               4|
|2024-12-29 12:30:00|2024-12-29 13:00:00|         D|               4|
|2024-12-29 12:00:00|2024-12-29 12:30:00|         A|               3|
|2024-12-29 12:00:00|2024-12-29 12:30:00|         B|               3|
|2024-12-29 12:00:00|2024-12-29 12:30:00|         C|               3|
|2024-12-29 13:00:00|2024-12-29 16:00:00|         A|               3|
|2024-12-29 13:00:00|2024-12-29 16:00:00|         B|               3|
|2024-12-29 13:00:00|2024-12-29 16:00:00|    

肝となるのはsegment（重複候補時間帯）を元のdfにcrossjoinさせるところ。  
こうすることでfor文を使わなくても各セグメントを含むか否かについて条件判定ができる。

In [36]:
df.crossJoin(segments).show()

                                                                                

+------+-------------------+-------------------+-------------------+-------------------+
|時間帯|           開始時刻|           終了時刻|       重複開始時刻|       重複終了時刻|
+------+-------------------+-------------------+-------------------+-------------------+
|     A|2024-12-29 09:00:00|2024-12-29 17:00:00|2024-12-29 09:00:00|2024-12-29 10:00:00|
|     A|2024-12-29 09:00:00|2024-12-29 17:00:00|2024-12-29 10:00:00|2024-12-29 12:00:00|
|     A|2024-12-29 09:00:00|2024-12-29 17:00:00|2024-12-29 12:00:00|2024-12-29 12:30:00|
|     A|2024-12-29 09:00:00|2024-12-29 17:00:00|2024-12-29 12:30:00|2024-12-29 13:00:00|
|     A|2024-12-29 09:00:00|2024-12-29 17:00:00|2024-12-29 13:00:00|2024-12-29 16:00:00|
|     A|2024-12-29 09:00:00|2024-12-29 17:00:00|2024-12-29 16:00:00|2024-12-29 17:00:00|
|     A|2024-12-29 09:00:00|2024-12-29 17:00:00|2024-12-29 17:00:00|2024-12-29 23:00:00|
|     A|2024-12-29 09:00:00|2024-12-29 17:00:00|2024-12-29 23:00:00|               NULL|
|     B|2024-12-29 12:00:00|2024-12-29 23:00

## 重複時間を考慮した時間の計算
重複が発生している場合、各時間帯の開始～終了時間（所要時間）を合計すると、  
クリティカルパスの所要時間以上になってしまうので、  
重複していても合計すればクリティカルパスの所要時間と等しくなるように重複時間を補正する。

In [39]:
result = df.withColumn(
    "開始～終了までの時間",
    (F.col("終了時刻") - F.col("開始時刻")).cast("long") / 3600
)

# 重複開始・終了時刻の差分から重複時間を求めるには下記のようにする。
# PySparkにおいて、Timestamp型同時の差分結果はDayTimeIntervalとなるが、longにキャストすれば秒を取得できる
overlap_segments_mod = overlap_segments.withColumn(
    "重複時間",
    (F.col("重複終了時刻") - F.col("重複開始時刻")).cast("long") / 3600
).withColumns({
    "重複時間（補正）":
    F.col("重複時間") / F.col("重複時間帯の総数"),
    "重複時間帯":
    F.explode(F.col("重複時間帯"))
})

In [31]:
result.show()

+------+-------------------+-------------------+--------------------+
|時間帯|           開始時刻|           終了時刻|開始～終了までの時間|
+------+-------------------+-------------------+--------------------+
|     A|2024-12-29 09:00:00|2024-12-29 17:00:00|                 8.0|
|     B|2024-12-29 12:00:00|2024-12-29 23:00:00|                11.0|
|     C|2024-12-29 10:00:00|2024-12-29 13:00:00|                 3.0|
|     D|2024-12-29 12:30:00|2024-12-29 16:00:00|                 3.5|
+------+-------------------+-------------------+--------------------+



In [None]:
overlap_segments_mod.show()

                                                                                

+-------------------+-------------------+----------+----------------+--------+-------------------+
|       重複開始時刻|       重複終了時刻|重複時間帯|重複時間帯の総数|重複時間|   重複時間（補正）|
+-------------------+-------------------+----------+----------------+--------+-------------------+
|2024-12-29 10:00:00|2024-12-29 12:00:00|         A|               2|     2.0|                1.0|
|2024-12-29 10:00:00|2024-12-29 12:00:00|         C|               2|     2.0|                1.0|
|2024-12-29 12:30:00|2024-12-29 13:00:00|         A|               4|     0.5|              0.125|
|2024-12-29 12:30:00|2024-12-29 13:00:00|         B|               4|     0.5|              0.125|
|2024-12-29 12:30:00|2024-12-29 13:00:00|         C|               4|     0.5|              0.125|
|2024-12-29 12:30:00|2024-12-29 13:00:00|         D|               4|     0.5|              0.125|
|2024-12-29 12:00:00|2024-12-29 12:30:00|         A|               3|     0.5|0.16666666666666666|
|2024-12-29 12:00:00|2024-12-29 12:30:00|      

In [42]:
overlap_segments_total = overlap_segments_mod.groupBy("重複時間帯").agg(
    F.sum("重複時間").alias("重複時間合計"),
    F.sum("重複時間（補正）").alias("重複時間合計（補正）")
)
overlap_segments_total.show()

                                                                                

+----------+------------+--------------------+
|重複時間帯|重複時間合計|重複時間合計（補正）|
+----------+------------+--------------------+
|         B|         5.0|  1.7916666666666665|
|         D|         3.5|               1.125|
|         C|         3.0|  1.2916666666666667|
|         A|         7.0|   2.791666666666667|
+----------+------------+--------------------+



In [53]:
result = result.join(
    overlap_segments_total,
    result.時間帯==overlap_segments_total.重複時間帯,
    "left"
).select(
    "時間帯", 
    "開始時刻", 
    "終了時刻", 
    "開始～終了までの時間", 
    F.round(
        (F.col("開始～終了までの時間") - F.col("重複時間合計") + F.col("重複時間合計（補正）")), 1
    ).alias("開始～終了までの時間（補正）"),
    "重複時間合計", 
    "重複時間合計（補正）"
)
result.show()

                                                                                

+------+-------------------+-------------------+--------------------+----------------------------+------------+--------------------+
|時間帯|           開始時刻|           終了時刻|開始～終了までの時間|開始～終了までの時間（補正）|重複時間合計|重複時間合計（補正）|
+------+-------------------+-------------------+--------------------+----------------------------+------------+--------------------+
|     A|2024-12-29 09:00:00|2024-12-29 17:00:00|                 8.0|                         3.8|         7.0|   2.791666666666667|
|     B|2024-12-29 12:00:00|2024-12-29 23:00:00|                11.0|                         7.8|         5.0|  1.7916666666666665|
|     C|2024-12-29 10:00:00|2024-12-29 13:00:00|                 3.0|                         1.3|         3.0|  1.2916666666666667|
|     D|2024-12-29 12:30:00|2024-12-29 16:00:00|                 3.5|                         1.1|         3.5|               1.125|
+------+-------------------+-------------------+--------------------+----------------------------+------------+---------

In [54]:
result.select(
    F.sum("開始～終了までの時間").alias("開始～終了までの時間合計"),
    F.sum(
        "開始～終了までの時間（補正）"
    ).alias("開始～終了までの時間合計（補正）")
).show()

                                                                                

+------------------------+--------------------------------+
|開始～終了までの時間合計|開始～終了までの時間合計（補正）|
+------------------------+--------------------------------+
|                    25.5|                            14.0|
+------------------------+--------------------------------+



クリティカルパスは9:00:00~23:00:00の計14時間なのであっている。