In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType, BooleanType
from pyspark.sql.window import Window
import re

In [2]:
# 创建 SparkSession
spark = SparkSession\
    .builder\
    .appName("batch_processing")\
    .master("local[*]")\
    .getOrCreate()

### 读取文件

In [3]:
base_path = "F:/大四/大数据/tennis_slam_pointbypoint/"
df = spark.read.option("header", "true").option("inferSchema", "true").csv(f"{base_path}*-points.csv")

In [4]:
df.columns

['match_id',
 'ElapsedTime',
 'SetNo',
 'P1GamesWon',
 'P2GamesWon',
 'SetWinner',
 'GameNo',
 'GameWinner',
 'PointNumber',
 'PointWinner',
 'PointServer',
 'Speed_KMH',
 'Rally',
 'P1Score',
 'P2Score',
 'P1Momentum',
 'P2Momentum',
 'P1PointsWon',
 'P2PointsWon',
 'P1Ace',
 'P2Ace',
 'P1Winner',
 'P2Winner',
 'P1DoubleFault',
 'P2DoubleFault',
 'P1UnfErr',
 'P2UnfErr',
 'P1NetPoint',
 'P2NetPoint',
 'P1NetPointWon',
 'P2NetPointWon',
 'P1BreakPoint',
 'P2BreakPoint',
 'P1BreakPointWon',
 'P2BreakPointWon',
 'P1FirstSrvIn',
 'P2FirstSrvIn',
 'P1FirstSrvWon',
 'P2FirstSrvWon',
 'P1SecondSrvIn',
 'P2SecondSrvIn',
 'P1SecondSrvWon',
 'P2SecondSrvWon',
 'P1ForcedError',
 'P2ForcedError',
 'History',
 'Speed_MPH',
 'P1BreakPointMissed',
 'P2BreakPointMissed',
 'ServeIndicator',
 'Serve_Direction',
 'Winner_FH',
 'Winner_BH',
 'ServingTo',
 'P1TurningPoint',
 'P2TurningPoint',
 'ServeNumber',
 'WinnerType',
 'WinnerShotType',
 'P1DistanceRun',
 'P2DistanceRun',
 'RallyCount',
 'ServeWidth'

### 筛选列 清理缺失值

In [5]:
columns = [
    "match_id", "SetNo", "P1GamesWon", "P2GamesWon", "SetWinner", "GameNo","GameWinner",
    "PointNumber", "PointWinner", "PointServer", "Speed_KMH", "P1PointsWon", "P2PointsWon", "P1Ace", "P2Ace", "P1Winner",
    "P2Winner", "P1DoubleFault", "P2DoubleFault", "P1UnfErr", "P2UnfErr",
    "P1NetPoint", "P2NetPoint", "P1NetPointWon", "P2NetPointWon", "P1BreakPoint",
    "P2BreakPoint", "P1BreakPointWon", "P2BreakPointWon", "Speed_MPH", "ServeIndicator","WinnerType", "WinnerShotType", 
]
selected_df = df.select(columns)
selected_df.show(n=1,vertical=True)

-RECORD 0------------------------------
 match_id        | 2023-wimbledon-1101 
 SetNo           | 1                   
 P1GamesWon      | 0                   
 P2GamesWon      | 0                   
 SetWinner       | 0                   
 GameNo          | 1                   
 GameWinner      | 0                   
 PointNumber     | 0X                  
 PointWinner     | 0                   
 PointServer     | 0                   
 Speed_KMH       | 0                   
 P1PointsWon     | 0                   
 P2PointsWon     | 0                   
 P1Ace           | 0                   
 P2Ace           | 0                   
 P1Winner        | 0                   
 P2Winner        | 0                   
 P1DoubleFault   | 0                   
 P2DoubleFault   | 0                   
 P1UnfErr        | 0                   
 P2UnfErr        | 0                   
 P1NetPoint      | 0                   
 P2NetPoint      | 0                   
 P1NetPointWon   | 0                   


### 数据清洗

In [6]:
for column in columns:
    unique_values_df = selected_df.select(column).distinct()
    print(f"Unique values for column '{column}':")
    unique_values_df.show(truncate=False)  

Unique values for column 'match_id':
+-------------------+
|match_id           |
+-------------------+
|2023-wimbledon-1147|
|2023-wimbledon-1227|
|2023-wimbledon-2108|
|2023-wimbledon-1150|
|2023-wimbledon-2220|
|2024-wimbledon-2112|
|2024-wimbledon-2502|
|2024-wimbledon-1158|
|2024-wimbledon-2110|
|2024-wimbledon-2124|
|2024-wimbledon-2212|
|2024-wimbledon-2501|
|2023-wimbledon-1125|
|2023-wimbledon-2138|
|2024-wimbledon-1134|
|2023-wimbledon-2225|
|2023-wimbledon-2406|
|2024-wimbledon-1139|
|2024-wimbledon-2228|
|2023-wimbledon-1111|
+-------------------+
only showing top 20 rows

Unique values for column 'SetNo':
+-----+
|SetNo|
+-----+
|3    |
|5    |
|1    |
|4    |
|2    |
|NULL |
+-----+

Unique values for column 'P1GamesWon':
+----------+
|P1GamesWon|
+----------+
|7         |
|3         |
|0         |
|5         |
|6         |
|1         |
|4         |
|2         |
|11        |
|8         |
|9         |
|10        |
|12        |
|13        |
|15        |
|22        |
|16     

In [7]:
#异常及含义不清数据查看  %PointsWon== 0 时表示比赛刚开始
#filtered_df.filter((F.col("Speed_MPH")=="0")&(F.col("P1PointsWon")=="0")&(F.col("P2PointsWon")=="0")).show(vertical=True)
#filtered_df.filter((F.col("WinnerType")=="S")&(F.col("P1PointsWon")=="0")&(F.col("P2PointsWon")=="0")).show(vertical=True)
#filtered_df.filter((F.col("WinnerType")=="S")&(F.col("P2Ace")=="1")).show(vertical=True)
#WinnerType == S 时表示发球直得,不含Ace
#filtered_df.filter((F.col("P1UnfErr")=="1")&(F.col("P1DoubleFault")=="1")).show(vertical=True) 非受迫失误包含双误
#filtered_df.filter((F.col("ServeNumber")=="1")).show(vertical=True)  servenumber意义不明
#filtered_df.filter((F.col("P1GamesWon")>7)).show(vertical=True) #count=2778
#filtered_df.filter((F.col("Speed_KMH")!= 0)).count() #非空值807119   空值364202
#selected_df.filter(F.col("Speed_KMH").isNull() & F.col("Speed_MPH").isNotNull()).show(vertical=True) Speed_KMH为null且Speed_MPH不为空的行

In [8]:
mph_to_kmh = 1.60934
selected_df = selected_df.withColumn(
    "Speed_KMH_Converted",
    F.when(F.col("Speed_KMH").isNull(), F.col("Speed_MPH") * mph_to_kmh).otherwise(F.col("Speed_KMH"))
)
selected_df = selected_df.drop("Speed_MPH")

In [9]:
filtered_df = selected_df.filter(
    ~(
        (F.col("PointNumber") == "0X") |
        (F.col("PointNumber") == "0Y") |
        (F.col("PointNumber") == "0")  |
        (F.col("PointWinner") == "0")  
    )
)

In [10]:
filtered_df_with_nulls = filtered_df.filter(
    F.concat(*[F.col(c).isNull().cast("int") for c in filtered_df.columns]).contains(1)
)

# 显示包含 null 值的行
filtered_df_with_nulls.show(vertical=True)

-RECORD 0-----------------------------------
 match_id            | 2011-frenchopen-1101 
 SetNo               | 1                    
 P1GamesWon          | 0                    
 P2GamesWon          | 0                    
 SetWinner           | 0                    
 GameNo              | 1                    
 GameWinner          | 0                    
 PointNumber         | 1                    
 PointWinner         | 1                    
 PointServer         | 2                    
 Speed_KMH           | 218                  
 P1PointsWon         | 1                    
 P2PointsWon         | 0                    
 P1Ace               | 0                    
 P2Ace               | 0                    
 P1Winner            | 0                    
 P2Winner            | 0                    
 P1DoubleFault       | 0                    
 P2DoubleFault       | 0                    
 P1UnfErr            | 0                    
 P2UnfErr            | 1                    
 P1NetPoin

In [11]:
# 获取 DataFrame 中每一列的 null 值数量
null_counts = filtered_df.select([
    F.sum(F.col(c).isNull().cast("int")).alias(c) for c in filtered_df.columns
])

# 显示含有 null 值的列及其数量
null_counts.show(vertical=True)

-RECORD 0---------------------
 match_id            | 0      
 SetNo               | 0      
 P1GamesWon          | 369445 
 P2GamesWon          | 369445 
 SetWinner           | 369445 
 GameNo              | 0      
 GameWinner          | 369445 
 PointNumber         | 0      
 PointWinner         | 0      
 PointServer         | 0      
 Speed_KMH           | 91714  
 P1PointsWon         | 369445 
 P2PointsWon         | 369445 
 P1Ace               | 0      
 P2Ace               | 0      
 P1Winner            | 0      
 P2Winner            | 0      
 P1DoubleFault       | 0      
 P2DoubleFault       | 0      
 P1UnfErr            | 0      
 P2UnfErr            | 0      
 P1NetPoint          | 369445 
 P2NetPoint          | 369445 
 P1NetPointWon       | 369445 
 P2NetPointWon       | 369445 
 P1BreakPoint        | 369445 
 P2BreakPoint        | 369445 
 P1BreakPointWon     | 369445 
 P2BreakPointWon     | 369445 
 ServeIndicator      | 369445 
 WinnerType          | 748523 
 WinnerS

In [12]:
# 查看指定列含有 null 的行（例如 "SetWinner" 列）
null_in_column_df = filtered_df.filter(F.col("SetWinner").isNull())
null_in_column_df.show(vertical=True, truncate=False)

-RECORD 0-----------------------------------
 match_id            | 2021-frenchopen-1101 
 SetNo               | 1                    
 P1GamesWon          | NULL                 
 P2GamesWon          | NULL                 
 SetWinner           | NULL                 
 GameNo              | 1                    
 GameWinner          | NULL                 
 PointNumber         | 1                    
 PointWinner         | 1                    
 PointServer         | 2                    
 Speed_KMH           | 166                  
 P1PointsWon         | NULL                 
 P2PointsWon         | NULL                 
 P1Ace               | 0                    
 P2Ace               | 0                    
 P1Winner            | 0                    
 P2Winner            | 0                    
 P1DoubleFault       | 0                    
 P2DoubleFault       | 0                    
 P1UnfErr            | 0                    
 P2UnfErr            | 1                    
 P1NetPoin

In [13]:
# 计算非零 Speed_KMH 的平均值
#mean_speed = filtered_df.filter(F.col("Speed_KMH") != 0).agg(F.avg("Speed_KMH")).collect()[0][0]
avg_speed_KMH = filtered_df.filter(F.col("Speed_KMH").isNotNull() & (F.col("Speed_KMH") != 0)) \
                           .agg(F.avg("Speed_KMH").alias("avg_Speed_KMH")).collect()[0]["avg_Speed_KMH"]
#对于Speed_MPH 为null和0值时取平均值，对于WinnerType，WinnerShotType空时取0，其他行drop
# 使用平均值替换 Speed_KMH 中的0值
filtered_df = filtered_df.withColumn(
    "Speed_KMH",
    F.when((F.col("Speed_KMH").isNull()) | (F.col("Speed_KMH") == 0), avg_speed_KMH)
     .otherwise(F.col("Speed_KMH"))
)
# 对 WinnerType 和 WinnerShotType 列进行处理
filtered_df = filtered_df.withColumn(
    "WinnerType", 
    F.when(F.col("WinnerType").isNull(), 0)
     .otherwise(F.col("WinnerType"))
)

filtered_df = filtered_df.withColumn(
    "WinnerShotType", 
    F.when(F.col("WinnerShotType").isNull(), 0)
     .otherwise(F.col("WinnerShotType"))
)

filtered_df = filtered_df.dropna(how="any")
# 显示最终的 DataFrame
filtered_df.show(truncate=False,vertical=True)


-RECORD 0----------------------------------
 match_id            | 2023-wimbledon-1101 
 SetNo               | 1                   
 P1GamesWon          | 0                   
 P2GamesWon          | 0                   
 SetWinner           | 0                   
 GameNo              | 1                   
 GameWinner          | 0                   
 PointNumber         | 1                   
 PointWinner         | 2                   
 PointServer         | 2                   
 Speed_KMH           | 188                 
 P1PointsWon         | 0                   
 P2PointsWon         | 1                   
 P1Ace               | 0                   
 P2Ace               | 0                   
 P1Winner            | 0                   
 P2Winner            | 0                   
 P1DoubleFault       | 0                   
 P2DoubleFault       | 0                   
 P1UnfErr            | 0                   
 P2UnfErr            | 0                   
 P1NetPoint          | 0        

<!--# 定义窗口，按照 match_id 和 SetNo 排序
window_spec = Window.partitionBy("match_id").orderBy("PointNumber").rowsBetween(Window.unboundedPreceding, Window.currentRow)

# 创建 P1SetWon 和 P2SetWon 列，基于 SetWinner 列
df_with_sets = filtered_df.withColumn(
    "P1SetWon", F.when(F.col("SetWinner") == 1, 1).otherwise(0)
).withColumn(
    "P2SetWon", F.when(F.col("SetWinner") == 2, 1).otherwise(0)
)

# 计算 P1SetWonAcc 和 P2SetWonAcc，确保累加从 0 开始
df_with_sets = df_with_sets.withColumn(
    "P1SetWonAcc", F.sum("P1SetWon").over(window_spec)
).withColumn(
    "P2SetWonAcc", F.sum("P2SetWon").over(window_spec)
)

# 添加新列 WP，当 P1SetWonAcc 或 P2SetWonAcc 累加到 3 后，设置 WP 为 0 或 1
df_with_sets = df_with_sets.withColumn(
    "WP", 
    F.when(F.col("P1SetWonAcc") >= 3, 1)
    .when(F.col("P2SetWonAcc") >= 3, 0)
    .otherwise(None)  # 保持为 None 直到累加到 3
)

# 使用窗口函数传播 WP 值，使得相同 match_id 下所有行的 WP 值一致
window_spec_match_id = Window.partitionBy("match_id").orderBy("PointNumber").rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)

# 使用 coalesce 函数来填充 WP 的空值，确保相同 match_id 下的所有行有统一的 WP 值
df_with_sets = df_with_sets.withColumn(
    "WP", F.coalesce(F.last("WP", True).over(window_spec_match_id), F.lit(0))  # 如果 WP 为空，默认设置为 0
)

# 删除 SetWinner 列
df_result = df_with_sets.drop("SetWinner")

# 显示结果
#df_result.filter((F.col("SetNo")==1) & (F.col("WP")==1)).show(vertical=True)
-->

#### 基于局数分析

In [14]:
df = filtered_df.withColumn(
    "P1FWinner", F.when((F.col("WinnerShotType") == "F") & (F.col("PointWinner") == 1), 1).otherwise(0)
).withColumn(
    "P1BWinner", F.when((F.col("WinnerShotType") == "B") & (F.col("PointWinner") == 1), 1).otherwise(0)
).withColumn(
    "P2FWinner", F.when((F.col("WinnerShotType") == "F") & (F.col("PointWinner") == 2), 1).otherwise(0)
).withColumn(
    "P2BWinner", F.when((F.col("WinnerShotType") == "B") & (F.col("PointWinner") == 2), 1).otherwise(0)
).withColumn(
    "P1SerWin", F.when((F.col("WinnerType") == "S") & (F.col("PointWinner") == 1), 1).otherwise(0)
).withColumn(
    "P2SerWIn", F.when((F.col("WinnerType") == "S") & (F.col("PointWinner") == 2), 1).otherwise(0)
).withColumn(
    "P1PtWin", F.when((F.col("WinnerType") == "S") & (F.col("PointWinner") == 1), 1).otherwise(0)
).withColumn(
    "P2PtWIn", F.when((F.col("WinnerType") == "S") & (F.col("PointWinner") == 2), 1).otherwise(0)
)
filtered_df = filtered_df.drop("WinnerShotType","WinnerType")

filtered_df.show(vertical=True)

-RECORD 0----------------------------------
 match_id            | 2023-wimbledon-1101 
 SetNo               | 1                   
 P1GamesWon          | 0                   
 P2GamesWon          | 0                   
 SetWinner           | 0                   
 GameNo              | 1                   
 GameWinner          | 0                   
 PointNumber         | 1                   
 PointWinner         | 2                   
 PointServer         | 2                   
 Speed_KMH           | 188                 
 P1PointsWon         | 0                   
 P2PointsWon         | 1                   
 P1Ace               | 0                   
 P2Ace               | 0                   
 P1Winner            | 0                   
 P2Winner            | 0                   
 P1DoubleFault       | 0                   
 P2DoubleFault       | 0                   
 P1UnfErr            | 0                   
 P2UnfErr            | 0                   
 P1NetPoint          | 0        

In [15]:
df.columns

['match_id',
 'SetNo',
 'P1GamesWon',
 'P2GamesWon',
 'SetWinner',
 'GameNo',
 'GameWinner',
 'PointNumber',
 'PointWinner',
 'PointServer',
 'Speed_KMH',
 'P1PointsWon',
 'P2PointsWon',
 'P1Ace',
 'P2Ace',
 'P1Winner',
 'P2Winner',
 'P1DoubleFault',
 'P2DoubleFault',
 'P1UnfErr',
 'P2UnfErr',
 'P1NetPoint',
 'P2NetPoint',
 'P1NetPointWon',
 'P2NetPointWon',
 'P1BreakPoint',
 'P2BreakPoint',
 'P1BreakPointWon',
 'P2BreakPointWon',
 'ServeIndicator',
 'WinnerType',
 'WinnerShotType',
 'Speed_KMH_Converted',
 'P1FWinner',
 'P1BWinner',
 'P2FWinner',
 'P2BWinner',
 'P1SerWin',
 'P2SerWIn',
 'P1PtWin',
 'P2PtWIn']

In [16]:
wp = (df.groupBy(
    "match_id", "SetNo"
).agg(
    F.max("SetWinner").alias("SetWinP")))

In [19]:
joined_df = df.join(wp, on=['match_id', 'SetNo'], how='inner')
final_df = joined_df.filter(F.col("SetWinP") != 0)
joined_df.filter(F.col("SetWinP")==0).count()

10835

In [20]:
game_based_df = (final_df.groupBy(
    "match_id", "SetNo", "GameNo"
).agg(
    F.max("SetWinP").alias("SetWinP"),
    F.max("GameWinner").alias("G_Winner"),
    F.max("P1GamesWon").alias("G_P1GmWon"),
    F.max("P2GamesWon").alias("G_P2GmWon"),
    F.max("ServeIndicator").alias("G_Ser"),
    F.sum("P1Ace").alias("G_P1Ace"),
    F.sum("P2Ace").alias("G_P2Ace"),
    F.sum("P1Winner").alias("G_P1Wn"),
    F.sum("P2Winner").alias("G_P2Wn"),
    F.sum("P1DoubleFault").alias("G_P1Df"),
    F.sum("P2DoubleFault").alias("G_P2Df"),
    F.sum("P1UnfErr").alias("G_P1UE"),
    F.sum("P2UnfErr").alias("G_P2UE"),
    F.sum("P1NetPoint").alias("G_P1NP"),
    F.sum("P2NetPoint").alias("G_P2NP"),
    F.sum("P1NetPointWon").alias("G_P1NPW"),
    F.sum("P2NetPointWon").alias("G_P2NPW"),
    F.sum("P1BreakPoint").alias("G_P1BP"),
    F.sum("P2BreakPoint").alias("G_P2BP"),
    F.sum("P1BreakPointWon").alias("G_P1BPWon"),
    F.sum("P2BreakPointWon").alias("G_P2BPWon"),
    F.sum("P1FWinner").alias("G_P1FW"),
    F.sum("P1BWinner").alias("G_P1BW"),
    F.sum("P2FWinner").alias("G_P2FW"),
    F.sum("P2BWinner").alias("G_P2BW"),
    F.sum("P1SerWin").alias("G_P1SerW"),
    F.sum("P2SerWin").alias("G_P2SerW"),
    F.avg("Speed_KMH").alias("G_avg_SerSp"),
    F.count("*").alias("G_totalPoints")
    )
)
#.withColumn("id", F.concat(F.col("match_id"), F.lit("_"), F.col("SetNo"), F.lit("_"), F.col("GameNo")))

In [21]:
game_based_df.describe().show(vertical=True)

-RECORD 0-----------------------------
 summary       | count                
 match_id      | 241170               
 SetNo         | 241170               
 GameNo        | 241170               
 SetWinP       | 241170               
 G_Winner      | 241170               
 G_P1GmWon     | 241170               
 G_P2GmWon     | 241170               
 G_Ser         | 241170               
 G_P1Ace       | 241170               
 G_P2Ace       | 241170               
 G_P1Wn        | 241170               
 G_P2Wn        | 241170               
 G_P1Df        | 241170               
 G_P2Df        | 241170               
 G_P1UE        | 241170               
 G_P2UE        | 241170               
 G_P1NP        | 241170               
 G_P2NP        | 241170               
 G_P1NPW       | 241170               
 G_P2NPW       | 241170               
 G_P1BP        | 241170               
 G_P2BP        | 241170               
 G_P1BPWon     | 241170               
 G_P2BPWon     | 241170  

In [22]:
game_based_df.filter(F.col("G_totalPoints")==1).show(vertical=True)

-RECORD 0-----------------------------
 match_id      | 2012-frenchopen-1129 
 SetNo         | 4                    
 GameNo        | 3                    
 SetWinP       | 1                    
 G_Winner      | 1                    
 G_P1GmWon     | 1                    
 G_P2GmWon     | 1                    
 G_Ser         | 1                    
 G_P1Ace       | 0.0                  
 G_P2Ace       | 0.0                  
 G_P1Wn        | 0.0                  
 G_P2Wn        | 0.0                  
 G_P1Df        | 0.0                  
 G_P2Df        | 0.0                  
 G_P1UE        | 0.0                  
 G_P2UE        | 0.0                  
 G_P1NP        | 0.0                  
 G_P2NP        | 0.0                  
 G_P1NPW       | 0.0                  
 G_P2NPW       | 0.0                  
 G_P1BP        | 0.0                  
 G_P2BP        | 0.0                  
 G_P1BPWon     | 0.0                  
 G_P2BPWon     | 0.0                  
 G_P1FW        | 0       

In [23]:
# 将所有列（除了 match_id 和 G_avg_SerSp）转换为 int 类型
columns_to_cast = [col for col in game_based_df.columns if col not in ["match_id", "G_avg_SerSp"]]
# 使用 select 和 cast 逐列转换
game_based_df = game_based_df.select(
    [col if col in ["match_id", "G_avg_SerSp"] else F.col(col).cast(IntegerType()).alias(col) for col in game_based_df.columns]
)

In [24]:
game_based_df.printSchema()

root
 |-- match_id: string (nullable = true)
 |-- SetNo: integer (nullable = true)
 |-- GameNo: integer (nullable = true)
 |-- SetWinP: integer (nullable = true)
 |-- G_Winner: integer (nullable = true)
 |-- G_P1GmWon: integer (nullable = true)
 |-- G_P2GmWon: integer (nullable = true)
 |-- G_Ser: integer (nullable = true)
 |-- G_P1Ace: integer (nullable = true)
 |-- G_P2Ace: integer (nullable = true)
 |-- G_P1Wn: integer (nullable = true)
 |-- G_P2Wn: integer (nullable = true)
 |-- G_P1Df: integer (nullable = true)
 |-- G_P2Df: integer (nullable = true)
 |-- G_P1UE: integer (nullable = true)
 |-- G_P2UE: integer (nullable = true)
 |-- G_P1NP: integer (nullable = true)
 |-- G_P2NP: integer (nullable = true)
 |-- G_P1NPW: integer (nullable = true)
 |-- G_P2NPW: integer (nullable = true)
 |-- G_P1BP: integer (nullable = true)
 |-- G_P2BP: integer (nullable = true)
 |-- G_P1BPWon: integer (nullable = true)
 |-- G_P2BPWon: integer (nullable = true)
 |-- G_P1FW: integer (nullable = true)
 |

In [25]:
game_based_df.write.csv("./game_based_df.csv",header=True)

In [26]:
spark.stop()

#### 基于盘数分析

In [None]:
# 定义窗口，按照 match_id 和 SetNo 排序，并调整窗口范围
window_spec = Window.partitionBy("match_id", "SetNo").orderBy("PointNumber").rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)

# 计算每个分区（match_id 和 SetNo）中的最大 SetWinner 值
df_with_max_setwinner = filtered_df.withColumn(
    "SetWinner_Max", F.max("SetWinner").over(window_spec)
)

# 显示 SetWinner_Max 为 0 的行
df_with_max_setwinner.filter(F.col("SetWinner_Max") == 0).show(n=60, vertical=True)

# 查看特定条件下的数据
#df_with_max_setwinner.filter((F.col("match_id") == "2011-ausopen-1101") & (F.col("PointNumber") == 28)).show(vertical=True)


In [None]:
filtered_df.show(vertical=True)

In [None]:
# 定义窗口，按照 match_id 和 SetNo 排序
window_spec1 = Window.partitionBy("match_id").orderBy("SetNo").rowsBetween(Window.unboundedPreceding, Window.currentRow)

# 创建 P1SetWon 和 P2SetWon 列，基于 SetWinner 列
df_with_sets = df_with_max_setwinner.withColumn(
    "P1SetWon", F.when(F.col("SetWinner") == 1, 1).otherwise(0)
).withColumn(
    "P2SetWon", F.when(F.col("SetWinner") == 2, 1).otherwise(0)
)

# 计算 P1SetWonAcc 和 P2SetWonAcc，确保累加从 0 开始
df_with_sets = df_with_sets.withColumn(
    "P1SetWonAcc", F.sum("P1SetWon").over(window_spec1)
).withColumn(
    "P2SetWonAcc", F.sum("P2SetWon").over(window_spec1)
)

# 添加新列 WP，当 P1SetWonAcc 或 P2SetWonAcc 累加到 3 后，设置 WP 为 0 或 1
df_with_sets = df_with_sets.withColumn(
    "WP", 
    F.when(F.col("P1SetWonAcc") >= 3, 1)
    .when(F.col("P2SetWonAcc") >= 3, 0)
    .otherwise(None)  # 保持为 None 直到累加到 3
)

# 使用窗口函数，填充 WP 值，在同一 match_id 分区内填充 WP 值
df_with_wp = df_with_sets.withColumn(
    "WP", F.first("WP", True).over(Window.partitionBy("match_id"))
)

# 显示结果
df_with_wp.show(vertical=True)


In [None]:
df_with_wp.filter(df_with_wp["WP"].isNotNull()).count()#null 609696  notnull 931846

In [None]:
set_based_df = (filtered_df.groupBy(
    "match_id", "SetNo"
).agg(
    F.max("P1SetWon").alias("S_P1SW"),
    F.max("P2SetWon").alias("S_P2SW"),
    F.max("P2GamesWon").alias("S_P2GmWon"),
    F.max("ServeIndicator").alias("S_Ser"),
    F.sum("P1Ace").alias("S_P1Ace"),
    F.sum("P2Ace").alias("S_P2Ace"),
    F.sum("P1Winner").alias("S_P1Wn"),
    F.sum("P2Winner").alias("S_P2Wn"),
    F.sum("P1DoubleFault").alias("S_P1Df"),
    F.sum("P2DoubleFault").alias("S_P2Df"),
    F.sum("P1UnfErr").alias("S_P1UE"),
    F.sum("P2UnfErr").alias("S_P2UE"),
    F.sum("P1NetPoint").alias("S_P1NP"),
    F.sum("P2NetPoint").alias("S_P2NP"),
    F.sum("P1NetPointWon").alias("S_P1NPW"),
    F.sum("P2NetPointWon").alias("S_P2NPW"),
    F.sum("P1BreakPoint").alias("S_P1BP"),
    F.sum("P2BreakPoint").alias("S_P2BP"),
    F.sum("P1BreakPointWon").alias("S_P1BPWon"),
    F.sum("P2BreakPointWon").alias("S_P2BPWon"),
    F.sum("P1FWinner").alias("S_P1FW"),
    F.sum("P1BWinner").alias("S_P1BW"),
    F.sum("P2FWinner").alias("S_P2FW"),
    F.sum("P2BWinner").alias("S_P2BW"),
    F.sum("P1SerWin").alias("S_P1SerW"),
    F.sum("P2SerWin").alias("S_P2SerW"),
    F.avg("Speed_KMH").alias("S_avg_SerSp"),  
    F.count("*").alias("S_totalPoints")  
))