In [14]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date, datediff, lit,desc
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression

# 创建SparkSession
spark = SparkSession.builder \
    .appName("Linear Regression Forecast") \
    .getOrCreate()

# 1. 从CSV文件加载数据
file_path = "output/task1.in_out_daily/part-00000-f000be85-9759-4374-bdda-566c8ba19423-c000.csv"  
data = spark.read.csv(file_path, header=True, inferSchema=True)

# 2. 提取report_date和total_purchase_amt
# 将report_date转换为数值型，相对于基础日期2013-07-01的偏移天数
base_date = "2013-07-01"
data = data.withColumn("report_date", to_date(col("report_date"), 'yyyyMMdd'))

data = data.withColumn("day_index", datediff(col("report_date"), lit(base_date)))
data.show()
purchase_df = data.select("day_index", "total_purchase_amt").orderBy("day_index")
redeem_df = data.select("day_index", "total_redeem_amt").orderBy("day_index")




+-----------+------------------+----------------+---------+
|report_date|total_purchase_amt|total_redeem_amt|day_index|
+-----------+------------------+----------------+---------+
| 2014-08-08|         233903717|       311648757|      403|
| 2014-08-24|         130195484|       191080151|      419|
| 2014-07-28|         371762756|       345986909|      392|
| 2014-07-04|         211649838|       264494550|      368|
| 2014-03-28|         225966355|       405443946|      270|
| 2014-08-16|         215059736|       219214339|      411|
| 2014-07-20|         176449304|       174462836|      384|
| 2014-04-08|         354770149|       250015131|      281|
| 2014-05-04|         303087562|       413222034|      307|
| 2014-05-28|         276134813|       415891684|      331|
| 2014-06-24|         245450766|       428471509|      358|
| 2014-03-20|         365011495|       336076380|      262|
| 2014-04-16|         387847838|       255914640|      289|
| 2014-05-12|         325108597|       2

In [16]:
# 3. 数据预处理
def preprocess_data(df, index_col, value_col):
    assembler = VectorAssembler(inputCols=index_col, outputCol="features")
    df = assembler.transform(df).select("features", col(value_col).alias("label"))
    return df
processed_purchase_df = preprocess_data(purchase_df, ["day_index"], "total_purchase_amt")
processed_redeem_df = preprocess_data(redeem_df, ["day_index"], "total_redeem_amt")

In [18]:
# 4. 训练线性回归模型


# lr = LinearRegression(featuresCol="features", labelCol="label")
lr = LinearRegression(featuresCol="features", labelCol="label", maxIter=10, regParam=0.3, elasticNetParam=0.8)
model1 = lr.fit(processed_purchase_df)

# 5. 预测未来30天
def predict_future(model, start_index, num_days):
    future_data = [(start_index + i,) for i in range(1, num_days + 1)]
    future_df = spark.createDataFrame(future_data, ["day_index"])
    assembler = VectorAssembler(inputCols=["day_index"], outputCol="features")
    future_df = assembler.transform(future_df)
    predictions = model.transform(future_df)
    return predictions

last_day_index = purchase_df.selectExpr("max(day_index) as day_index").collect()[0]["day_index"]
purchase_predictions = predict_future(model1, last_day_index, 30)
# 显示预测结果
purchase_predictions.show()

24/12/13 16:06:44 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
                                                                                

+---------+--------+--------------------+
|day_index|features|          prediction|
+---------+--------+--------------------+
|      427| [427.0]| 3.617769699730947E8|
|      428| [428.0]| 3.624542411184964E8|
|      429| [429.0]|  3.63131512263898E8|
|      430| [430.0]|3.6380878340929973E8|
|      431| [431.0]| 3.644860545547013E8|
|      432| [432.0]| 3.651633257001029E8|
|      433| [433.0]|3.6584059684550464E8|
|      434| [434.0]|3.6651786799090624E8|
|      435| [435.0]|3.6719513913630795E8|
|      436| [436.0]|3.6787241028170955E8|
|      437| [437.0]|3.6854968142711115E8|
|      438| [438.0]|3.6922695257251287E8|
|      439| [439.0]|3.6990422371791446E8|
|      440| [440.0]| 3.705814948633162E8|
|      441| [441.0]| 3.712587660087178E8|
|      442| [442.0]| 3.719360371541194E8|
|      443| [443.0]| 3.726133082995211E8|
|      444| [444.0]| 3.732905794449227E8|
|      445| [445.0]| 3.739678505903244E8|
|      446| [446.0]|  3.74645121735726E8|
+---------+--------+--------------

In [19]:
lr=LinearRegression(featuresCol="features",labelCol="label")
model2=lr.fit(processed_redeem_df)
redeem_predictions=predict_future(model2,last_day_index,30)
redeem_predictions.show()

24/12/13 16:07:05 WARN Instrumentation: [7e078376] regParam is zero, which might cause numerical instability and overfitting.
24/12/13 16:07:06 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK


+---------+--------+--------------------+
|day_index|features|          prediction|
+---------+--------+--------------------+
|      427| [427.0]| 3.452366435413134E8|
|      428| [428.0]| 3.460541023303232E8|
|      429| [429.0]| 3.468715611193331E8|
|      430| [430.0]| 3.476890199083429E8|
|      431| [431.0]|3.4850647869735277E8|
|      432| [432.0]| 3.493239374863626E8|
|      433| [433.0]|3.5014139627537245E8|
|      434| [434.0]|3.5095885506438226E8|
|      435| [435.0]|3.5177631385339206E8|
|      436| [436.0]|3.5259377264240193E8|
|      437| [437.0]|3.5341123143141174E8|
|      438| [438.0]| 3.542286902204216E8|
|      439| [439.0]| 3.550461490094314E8|
|      440| [440.0]| 3.558636077984413E8|
|      441| [441.0]| 3.566810665874511E8|
|      442| [442.0]|3.5749852537646097E8|
|      443| [443.0]| 3.583159841654708E8|
|      444| [444.0]| 3.591334429544806E8|
|      445| [445.0]|3.5995090174349046E8|
|      446| [446.0]|3.6076836053250027E8|
+---------+--------+--------------

In [21]:
from pyspark.sql.functions import col, expr, date_format
result1=purchase_predictions.select(
    col("day_index"),
    col("prediction")
)
result2=redeem_predictions.select(
    col("day_index"),
    col("prediction").alias("prediction2")
)
df=result1.join(result2,on=["day_index"],how="inner")
# 将base_date转换为日期类型

# 计算新的日期列
df_with_date = df.withColumn(
    "date",
    date_format(
        expr(f"date_add(to_date('{base_date}'), cast(day_index as INT))"),
        "yyyyMMdd"
    )
)
result=df_with_date.select(
    col("date").alias('report_date'), 
    col("prediction").alias('total_purchase_amt'),
    col("prediction2").alias('total_redeem_amt')
    )
#转换成整数
result=result.withColumn("total_purchase_amt",result["total_purchase_amt"].cast("int"))
result=result.withColumn("total_redeem_amt",result["total_redeem_amt"].cast("int"))

result.coalesce(1).write.csv("output/task3.result", header=False, mode="overwrite")
