In [1]:
from pyspark import SparkConf,SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import col,count,mean,udf,sum,when
import pandas as pd

spark = SparkSession.builder \
    .appName("Typhoon Analyze") \
    .master("local[*]") \
    .getOrCreate()
spark.conf.set("spark.rapids.sql.enable","true")


df_grade=spark.read.option("header", True).csv(r"../design/result/grade_trend/part-00000-7dcf4de8-72b3-42bb-bf65-4d2d581f866e-c000.csv")
df_intensity=spark.read.option("header", True).csv(r"../design/result/intensity_trend/part-00000-230f148b-8c77-4f42-bc0c-4d0236d48799-c000.csv")



24/12/26 00:58:02 WARN Utils: Your hostname, user-System-Product-Name resolves to a loopback address: 127.0.1.1; using 114.213.214.100 instead (on interface enp36s0f1)
24/12/26 00:58:02 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/12/26 00:58:02 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
from pyspark.sql.types import IntegerType
df_intensity = df_intensity.withColumn("year", df_intensity["year"].cast(IntegerType()))
df_intensity = df_intensity.withColumn("avg_central_pressure", df_intensity["avg_central_pressure"].cast("double"))
df_intensity = df_intensity.withColumn("avg_wind_speed", df_intensity["avg_wind_speed"].cast("double"))
df_intensity = df_intensity.filter(df_intensity.year >= 1977)
df_intensity.show()

+----+--------------------+------------------+
|year|avg_central_pressure|    avg_wind_speed|
+----+--------------------+------------------+
|1977|   986.1454311454312|31.975546975546976|
|1978|   986.0371747211896| 34.02416356877323|
|1979|   983.2565997888067| 35.45406546990496|
|1980|    984.497641509434|  36.6627358490566|
|1981|   985.9777777777778|  30.7979797979798|
|1982|   981.8138195777351|41.602687140115165|
|1983|   985.1509433962265|34.782293178519595|
|1984|   985.1949339207049|35.401982378854626|
|1985|   989.1456815816857| 32.43496357960458|
|1986|   985.3704891740176| 34.86367281475541|
|1987|   978.7153679653679| 43.52272727272727|
|1988|     988.74715261959|  32.2380410022779|
|1989|   983.5480116391852| 37.59456838021339|
|1990|   981.2694198623402| 40.08849557522124|
|1991|   978.0804416403786| 43.71845425867508|
|1992|   981.2928679817906| 39.55993930197268|
|1993|   987.3385321100917| 33.41284403669725|
|1994|   983.7460857726345| 38.35602450646699|
|1995|   987.

In [3]:
df_intensiy_wind = df_intensity.filter(df_intensity.avg_wind_speed.isNotNull()).drop('avg_central_pressure')
df_intensiy_wind.show(10)


+----+------------------+
|year|    avg_wind_speed|
+----+------------------+
|1977|31.975546975546976|
|1978| 34.02416356877323|
|1979| 35.45406546990496|
|1980|  36.6627358490566|
|1981|  30.7979797979798|
|1982|41.602687140115165|
|1983|34.782293178519595|
|1984|35.401982378854626|
|1985| 32.43496357960458|
|1986| 34.86367281475541|
+----+------------------+
only showing top 10 rows



In [4]:
df_intensity_pressure = df_intensity.drop('avg_wind_speed')
df_intensity_pressure.show(10)
df_intensity_pressure

+----+--------------------+
|year|avg_central_pressure|
+----+--------------------+
|1977|   986.1454311454312|
|1978|   986.0371747211896|
|1979|   983.2565997888067|
|1980|    984.497641509434|
|1981|   985.9777777777778|
|1982|   981.8138195777351|
|1983|   985.1509433962265|
|1984|   985.1949339207049|
|1985|   989.1456815816857|
|1986|   985.3704891740176|
+----+--------------------+
only showing top 10 rows



DataFrame[year: int, avg_central_pressure: double]

In [5]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import RegressionEvaluator, MulticlassClassificationEvaluator


#预测强度的回归模型
# year作为输入，输出强度
# 提取特征向量
assembler = VectorAssembler(inputCols=["year"], outputCol="features")
df_intensity_features = assembler.transform(df_intensity_pressure)

#划分训练集和测试集
pressure_train, pressure_test = df_intensity_features.randomSplit([0.8, 0.2], seed=1234)

#训练
pressure_model = LinearRegression(featuresCol="features", labelCol="avg_central_pressure", regParam=0.1)
lr_model_central_pressure = pressure_model.fit(pressure_train)

#预测
pressure_prediction = lr_model_central_pressure.transform(pressure_test)

evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="avg_central_pressure", metricName="rmse")
rmse_central_pressure = evaluator.evaluate(pressure_prediction)

print(f"RMSE for central pressure prediction: {rmse_central_pressure}")




RMSE for central pressure prediction: 3.473577220150774


In [6]:
# 预测
future_years = spark.createDataFrame([(year,) for year in range(2015, 2033)], ["year"])
future_years_features = assembler.transform(future_years)
pressure_predictions = lr_model_central_pressure.transform(future_years_features)
pressure_predictions.show()

                                                                                

+----+--------+-----------------+
|year|features|       prediction|
+----+--------+-----------------+
|2015|[2015.0]|983.6482701040463|
|2016|[2016.0]|983.6388986396908|
|2017|[2017.0]|983.6295271753352|
|2018|[2018.0]|983.6201557109796|
|2019|[2019.0]| 983.610784246624|
|2020|[2020.0]|983.6014127822684|
|2021|[2021.0]| 983.592041317913|
|2022|[2022.0]|983.5826698535574|
|2023|[2023.0]|983.5732983892018|
|2024|[2024.0]|983.5639269248462|
|2025|[2025.0]|983.5545554604906|
|2026|[2026.0]| 983.545183996135|
|2027|[2027.0]|983.5358125317795|
|2028|[2028.0]|983.5264410674239|
|2029|[2029.0]|983.5170696030683|
|2030|[2030.0]|983.5076981387128|
|2031|[2031.0]|983.4983266743573|
|2032|[2032.0]|983.4889552100017|
+----+--------+-----------------+



In [7]:
# 提取特征向量
assembler_wind = VectorAssembler(inputCols=["year"], outputCol="features")
df_intensiy_wind_features = assembler_wind.transform(df_intensiy_wind)

# 划分训练集和测试集
wind_train, wind_test = df_intensiy_wind_features.randomSplit([0.8, 0.2], seed=1234)

# 训练
wind_model = LinearRegression(featuresCol="features", labelCol="avg_wind_speed",regParam=0.1)
lr_model_wind_speed = wind_model.fit(wind_train)

# 预测
wind_prediction = lr_model_wind_speed.transform(wind_test)

evaluator_wind = RegressionEvaluator(predictionCol="prediction", labelCol="avg_wind_speed", metricName="rmse")
rmse_wind_speed = evaluator_wind.evaluate(wind_prediction)

print(f"RMSE for wind speed prediction: {rmse_wind_speed}")

RMSE for wind speed prediction: 3.987021392568346


In [8]:

future_years = spark.createDataFrame([(year,) for year in range(2015, 2033)], ["year"])
future_years_features = assembler_wind.transform(future_years)
wind_predictions = lr_model_wind_speed.transform(future_years_features)
wind_predictions.show()

+----+--------+------------------+
|year|features|        prediction|
+----+--------+------------------+
|2015|[2015.0]|  37.5224634705423|
|2016|[2016.0]|  37.5457055616171|
|2017|[2017.0]|  37.5689476526919|
|2018|[2018.0]| 37.59218974376669|
|2019|[2019.0]| 37.61543183484149|
|2020|[2020.0]| 37.63867392591629|
|2021|[2021.0]| 37.66191601699108|
|2022|[2022.0]| 37.68515810806588|
|2023|[2023.0]| 37.70840019914068|
|2024|[2024.0]| 37.73164229021547|
|2025|[2025.0]| 37.75488438129027|
|2026|[2026.0]| 37.77812647236507|
|2027|[2027.0]|37.801368563439866|
|2028|[2028.0]| 37.82461065451466|
|2029|[2029.0]|37.847852745589456|
|2030|[2030.0]|37.871094836664255|
|2031|[2031.0]| 37.89433692773905|
|2032|[2032.0]|37.917579018813846|
+----+--------+------------------+



In [9]:
# 合并两个预测结果
combined_predictions = pressure_predictions.select("year", "prediction").withColumnRenamed("prediction", "predicted_pressure") \
    .join(wind_predictions.select("year", "prediction").withColumnRenamed("prediction", "predicted_wind_speed"), on="year", how="inner")

combined_predictions.show()



+----+------------------+--------------------+
|year|predicted_pressure|predicted_wind_speed|
+----+------------------+--------------------+
|2015| 983.6482701040463|    37.5224634705423|
|2016| 983.6388986396908|    37.5457055616171|
|2017| 983.6295271753352|    37.5689476526919|
|2018| 983.6201557109796|   37.59218974376669|
|2019|  983.610784246624|   37.61543183484149|
|2020| 983.6014127822684|   37.63867392591629|
|2021|  983.592041317913|   37.66191601699108|
|2022| 983.5826698535574|   37.68515810806588|
|2023| 983.5732983892018|   37.70840019914068|
|2024| 983.5639269248462|   37.73164229021547|
|2025| 983.5545554604906|   37.75488438129027|
|2026|  983.545183996135|   37.77812647236507|
|2027| 983.5358125317795|  37.801368563439866|
|2028| 983.5264410674239|   37.82461065451466|
|2029| 983.5170696030683|  37.847852745589456|
|2030| 983.5076981387128|  37.871094836664255|
|2031| 983.4983266743573|   37.89433692773905|
|2032| 983.4889552100017|  37.917579018813846|
+----+-------

                                                                                

In [10]:
combined_predictions.coalesce(1).write.mode("overwrite").option("header",True).csv("result/intensity_prediction")

                                                                                

单台风预测

In [11]:
df_single_pre=spark.read.option("header", True).csv(r"../typhoon_data.csv")

In [12]:
df_single_pre

DataFrame[_c0: string, International number ID: string, year: string, month: string, day: string, hour: string, grade: string, Latitude of the center: string, Longitude of the center: string, Central pressure: string, Maximum sustained wind speed: string, Direction of the longest radius of 50kt winds or greater: string, The longeast radius of 50kt winds or greater: string, The shortest radius of 50kt winds or greater: string, Direction of the longest radius of 30kt winds or greater: string, The longeast radius of 30kt winds or greater: string, The shortest radius of 30kt winds or greater: string, Indicator of landfall or passage: string]

In [13]:
from pyspark.sql.functions import to_timestamp, concat_ws
from pyspark.sql.functions import unix_timestamp

spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")
# 将数据类型转换为合适的类型
df_single_pre = df_single_pre.withColumn(
    "date",
    to_timestamp(concat_ws("-", col("year"), col("month"), col("day"), col("hour")), "yyyy-MM-dd-HH")
)
df_single_pre = df_single_pre.select("International number ID","date", "Latitude of the center", "Longitude of the center", "Central pressure")
# 获取时间戳
df_single_pre = df_single_pre.withColumn("timestamp", unix_timestamp("date"))

df_single_pre = df_single_pre.withColumn("Latitude of the center", df_single_pre["Latitude of the center"].cast("double")/10.)
df_single_pre = df_single_pre.withColumn("Longitude of the center", df_single_pre["Longitude of the center"].cast("double")/10.)
df_single_pre = df_single_pre.withColumn("Central pressure", df_single_pre["Central pressure"].cast("double"))
# 按照台风ID进行分组
typhoon_groups = df_single_pre.groupBy("International number ID").count()


In [14]:
typhoon_id="9001"
typhoon_data = df_single_pre.filter(col("International number ID") == typhoon_id)
typhoon_data.show(100)

+-----------------------+-------------------+----------------------+-----------------------+----------------+---------+
|International number ID|               date|Latitude of the center|Longitude of the center|Central pressure|timestamp|
+-----------------------+-------------------+----------------------+-----------------------+----------------+---------+
|                   9001|1990-01-12 00:00:00|                   7.0|                  153.0|          1002.0|632073600|
|                   9001|1990-01-12 06:00:00|                   7.0|                  152.5|          1000.0|632095200|
|                   9001|1990-01-12 12:00:00|                   8.0|                  151.5|          1000.0|632116800|
|                   9001|1990-01-12 18:00:00|                   8.5|                  150.5|           998.0|632138400|
|                   9001|1990-01-13 00:00:00|                   9.0|                  149.5|           998.0|632160000|
|                   9001|1990-01-13 06:0

In [15]:
from pyspark.sql.window import Window
from pyspark.sql.functions import lead

#最后一个时间点的数据
last_recode=typhoon_data.orderBy("timestamp", ascending=False).limit(1)

# 获得下一个时间点的位置和强度
window_spec = Window.partitionBy("International number ID").orderBy("date")
typhoon_data = typhoon_data.withColumn("next_latitude", lead("Latitude of the center").over(window_spec))
typhoon_data = typhoon_data.withColumn("next_longitude", lead("Longitude of the center").over(window_spec))
typhoon_data = typhoon_data.withColumn("next_central_pressure", lead("Central pressure").over(window_spec))
typhoon_data = typhoon_data.dropna(subset=["next_latitude", "next_longitude", "next_central_pressure"])


typhoon_data = typhoon_data.filter(col("next_latitude").isNotNull())
#预测

In [16]:
last_recode.show()

+-----------------------+-------------------+----------------------+-----------------------+----------------+---------+
|International number ID|               date|Latitude of the center|Longitude of the center|Central pressure|timestamp|
+-----------------------+-------------------+----------------------+-----------------------+----------------+---------+
|                   9001|1990-01-17 18:00:00|                  31.0|                  166.0|           996.0|632570400|
+-----------------------+-------------------+----------------------+-----------------------+----------------+---------+



In [17]:
typhoon_data.show(100)

+-----------------------+-------------------+----------------------+-----------------------+----------------+---------+-------------+--------------+---------------------+
|International number ID|               date|Latitude of the center|Longitude of the center|Central pressure|timestamp|next_latitude|next_longitude|next_central_pressure|
+-----------------------+-------------------+----------------------+-----------------------+----------------+---------+-------------+--------------+---------------------+
|                   9001|1990-01-12 00:00:00|                   7.0|                  153.0|          1002.0|632073600|          7.0|         152.5|               1000.0|
|                   9001|1990-01-12 06:00:00|                   7.0|                  152.5|          1000.0|632095200|          8.0|         151.5|               1000.0|
|                   9001|1990-01-12 12:00:00|                   8.0|                  151.5|          1000.0|632116800|          8.5|         150

In [18]:

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression


# 提取特征向量
assembler_location = VectorAssembler(inputCols=["timestamp", "Latitude of the center", "Longitude of the center",
                                                "Central pressure"], outputCol="features")
typhoon_data_features = assembler_location.transform(typhoon_data)

# 训练模型
latitude_model = LinearRegression(featuresCol="features", labelCol="next_latitude", regParam=0.1)
lr_single_pre_latitude = latitude_model.fit(typhoon_data_features)

# 训练模型
longitude_model = LinearRegression(featuresCol="features", labelCol="next_longitude", regParam=0.1)
lr_single_pre_longitude = longitude_model.fit(typhoon_data_features)

pressure_model = LinearRegression(featuresCol="features", labelCol="next_central_pressure", regParam=0.1)
lr_single_pre_pressure = pressure_model.fit(typhoon_data_features)


In [19]:
last_recode = assembler_location.transform(last_recode)

In [20]:

# 预测未来的位置和强度
predictions = lr_single_pre_latitude.transform(last_recode) \
    .withColumnRenamed("prediction", "predicted_latitude")

predictions = lr_single_pre_longitude.transform(predictions) \
    .withColumnRenamed("prediction", "predicted_longitude")

predictions = lr_single_pre_pressure.transform(predictions) \
    .withColumnRenamed("prediction", "predicted_central_pressure")

predictions.show()


+-----------------------+-------------------+----------------------+-----------------------+----------------+---------+--------------------+------------------+-------------------+--------------------------+
|International number ID|               date|Latitude of the center|Longitude of the center|Central pressure|timestamp|            features|predicted_latitude|predicted_longitude|predicted_central_pressure|
+-----------------------+-------------------+----------------------+-----------------------+----------------+---------+--------------------+------------------+-------------------+--------------------------+
|                   9001|1990-01-17 18:00:00|                  31.0|                  166.0|           996.0|632570400|[6.325704E8,31.0,...|33.231401653063585|  172.9574220958981|         1000.903918070846|
+-----------------------+-------------------+----------------------+-----------------------+----------------+---------+--------------------+------------------+-------------

In [21]:
from pyspark.sql.functions import expr, from_unixtime

# 修改
predictions = predictions.withColumn("date", col("date") + expr("INTERVAL 6 HOURS"))
predictions = predictions.withColumn("timestamp", unix_timestamp(col("date")))

predictions = predictions.withColumn("Latitude of the center", col("predicted_latitude")) \
    .withColumn("Longitude of the center", col("predicted_longitude")) \
    .withColumn("Central pressure", col("predicted_central_pressure"))
predictions = predictions.select("International number ID","date","Latitude of the center","Longitude of the center","Central pressure","timestamp")
predictions.show()


+-----------------------+-------------------+----------------------+-----------------------+-----------------+---------+
|International number ID|               date|Latitude of the center|Longitude of the center| Central pressure|timestamp|
+-----------------------+-------------------+----------------------+-----------------------+-----------------+---------+
|                   9001|1990-01-18 00:00:00|    33.231401653063585|      172.9574220958981|1000.903918070846|632592000|
+-----------------------+-------------------+----------------------+-----------------------+-----------------+---------+



In [22]:
from pyspark.sql.functions import expr

def predict(df_single_pre,typhoon_id,k=5):
    typhoon_data = df_single_pre.filter(col("International number ID") == typhoon_id)
    #最后一个时间点的数据
    last_recode=typhoon_data.orderBy("timestamp", ascending=False).limit(1)
    # last_recode.show()
    # 获得下一个时间点的位置和强度
    window_spec = Window.partitionBy("International number ID").orderBy("date")
    typhoon_data = typhoon_data.withColumn("next_latitude", lead("Latitude of the center").over(window_spec))
    typhoon_data = typhoon_data.withColumn("next_longitude", lead("Longitude of the center").over(window_spec))
    typhoon_data = typhoon_data.withColumn("next_central_pressure", lead("Central pressure").over(window_spec))
    typhoon_data = typhoon_data.dropna(subset=["next_latitude", "next_longitude", "next_central_pressure"])


    typhoon_data = typhoon_data.filter(col("next_latitude").isNotNull())
    
    # 提取特征向量
    assembler_location = VectorAssembler(inputCols=["timestamp", "Latitude of the center", "Longitude of the center","Central pressure"], outputCol="features")
    typhoon_data_features = assembler_location.transform(typhoon_data)

    # 训练模型
    latitude_model = LinearRegression(featuresCol="features", labelCol="next_latitude", regParam=0.1)
    lr_single_pre_latitude = latitude_model.fit(typhoon_data_features)

    # 训练模型
    longitude_model = LinearRegression(featuresCol="features", labelCol="next_longitude", regParam=0.1)
    lr_single_pre_longitude = longitude_model.fit(typhoon_data_features)

    pressure_model = LinearRegression(featuresCol="features", labelCol="next_central_pressure", regParam=0.1)
    lr_single_pre_pressure = pressure_model.fit(typhoon_data_features)
    
    
    
    last_recodes = spark.createDataFrame([], last_recode.schema)
    for i in range(k):
        last_recode = assembler_location.transform(last_recode)

        # 预测未来的位置和强度
        predictions = lr_single_pre_latitude.transform(last_recode) \
            .withColumnRenamed("prediction", "predicted_latitude")

        predictions = lr_single_pre_longitude.transform(predictions) \
            .withColumnRenamed("prediction", "predicted_longitude")

        predictions = lr_single_pre_pressure.transform(predictions) \
            .withColumnRenamed("prediction", "predicted_central_pressure")
                
        predictions = predictions.withColumn("date", col("date") + expr("INTERVAL 6 HOURS"))
        predictions = predictions.withColumn("timestamp", unix_timestamp(col("date")))

        predictions = predictions.withColumn("Latitude of the center", col("predicted_latitude")) \
            .withColumn("Longitude of the center", col("predicted_longitude")) \
            .withColumn("Central pressure", col("predicted_central_pressure"))
        predictions = predictions.select("International number ID","date","Latitude of the center",
                                         "Longitude of the center","Central pressure","timestamp")
        last_recodes = last_recodes.union(predictions)
        last_recode = predictions
    return last_recodes

ls=predict(df_single_pre,"9001",k=5)
ls.show()
    
    
    

+-----------------------+-------------------+----------------------+-----------------------+------------------+---------+
|International number ID|               date|Latitude of the center|Longitude of the center|  Central pressure|timestamp|
+-----------------------+-------------------+----------------------+-----------------------+------------------+---------+
|                   9001|1990-01-18 00:00:00|    33.231401653063585|      172.9574220958981| 1000.903918070846|632592000|
|                   9001|1990-01-18 06:00:00|     35.97998259422093|     181.45605981910722|1007.5304003121637|632613600|
|                   9001|1990-01-18 12:00:00|     39.28653466335345|     191.86018671195103|1015.9752030709315|632635200|
|                   9001|1990-01-18 18:00:00|    43.219083852964104|     204.59116244062574|1026.4695233951097|632656800|
|                   9001|1990-01-19 00:00:00|     47.87439519360487|     220.14449125080813|1039.3650741703968|632678400|
+-----------------------

In [23]:
#所有台风数
print(df_single_pre.filter(col("year") >= 1990).filter(col("Indicator of landfall or passage")=="#").select("International number ID").distinct().count())

123


In [26]:
# 选取有登陆的台风
typhoon_ids = df_single_pre.filter(col("year") >= 1990).filter(col("Indicator of landfall or passage")=="#").select("International number ID").distinct().rdd.flatMap(lambda x: x).collect()


counter=0
for typhoon_id in typhoon_ids:
    counter=counter+1
    prog=counter/123*100
    print(f"handling id: {typhoon_id} {prog:.2f}%")
    predictions = predict(df_single_pre, typhoon_id, k=5)
    all_predictions = all_predictions.union(predictions)
    predictions=predictions.toPandas()
    if counter == 1:
        predictions.to_csv("result/position_predict.csv", index=False)
    else:
        with open("result/position_predict.csv", 'a') as f:
            predictions.to_csv(f, header=False, index=False)
    # 释放之前使用的变量空间
    del predictions
    


handling id: 9407 0.81%
handling id: 9314 1.63%
handling id: 9119 2.44%
handling id: 9708 3.25%
handling id: 9609 4.07%
handling id: 9114 4.88%
handling id: 9305 5.69%
handling id: 9311 6.50%
handling id: 9211 7.32%
handling id: 9606 8.13%
handling id: 9313 8.94%
handling id: 9209 9.76%
handling id: 9304 10.57%
handling id: 9808 11.38%
handling id: 9918 12.20%
handling id: 9306 13.01%
handling id: 9719 13.82%
handling id: 9411 14.63%
handling id: 9426 15.45%
handling id: 9916 16.26%
handling id: 9621 17.07%
handling id: 9807 17.89%
handling id: 9612 18.70%
handling id: 9709 19.51%
handling id: 9805 20.33%
handling id: 14 21.14%
handling id: 9210 21.95%
handling id: 9117 22.76%
handling id: 9514 23.58%
handling id: 9707 24.39%
handling id: 9810 25.20%
handling id: 1512 26.02%
handling id: 613 26.83%
handling id: 205 27.64%
handling id: 1418 28.46%
handling id: 1518 29.27%
handling id: 415 30.08%
handling id: 422 30.89%
handling id: 1112 31.71%
handling id: 1318 32.52%
handling id: 1910 