# 概述
EMR Serverless Spark是一款云原生，专为大规模数据处理和分析而设计的全托管Serverless产品。它为企业提供了一站式的数据平台服务，包括Spark任务调试、调度和运维等，极大地简化了数据处理的全生命周期工作流程。
您可以在DSW中，利用Serverless Spark提供的Livy API，远程连接Serverless Spark集群，并提交Spark任务到服务端执行。关于Livy API，请参见[REST API](https://livy.incubator.apache.org/docs/latest/rest-api.html)。
![image.png](_html/1.jpg)

# 使用限制
- DSW实例作为客户端，对资源规格没有特殊要求，推荐规格仅为建议，用户可按需选择需要的CPU/GPU规格。
- 推荐使用官方镜像 pytorch-develop:2.1-cpu-py311-ubuntu22.04

# 前提条件
1. 开通并创建[EMR Serverless Spark工作空间](https://help.aliyun.com/zh/emr/emr-serverless-spark/getting-started/create-a-workspace)
2. 在Spark控制台[创建Gateway及访问Token](https://help.aliyun.com/zh/emr/emr-serverless-spark/use-cases/use-the-sparkmagic-plugin-of-jupyter-notebook-to-interact-with-serverless-spark)，**后续的连接步骤需要Gateway的Endpoint及Token信息**

# 使用步骤


## 步骤一：安装sparkmagic插件
执行以下命令，安装sparkmagic插件。Sparkmagic插件的更多详细信息和高级配置选项，请参见[sparkmagic](https://github.com/jupyter-incubator/sparkmagic)。

In [0]:
# install library
!pip install sparkmagic

## 步骤二：配置与启动Spark Session

In [0]:
# 载入sparkmagic插件
%load_ext sparkmagic.magics

### 2.1（推荐）使用sparkmagic的管理界面进行session管理
详细配置及参数文档参考[通过Jupyter Notebook的sparkmagic插件与Serverless Spark进行交互](https://help.aliyun.com/zh/emr/emr-serverless-spark/use-cases/use-the-sparkmagic-plugin-of-jupyter-notebook-to-interact-with-serverless-spark)

In [0]:
# 打开sparkmagic配置UI，在界面上操作之前，请先执行下一个cell调大session创建超时时间，再继续创建Endpoint及Session
%manage_spark

In [0]:
# 注意，在启动session前，需要调大sparkmagic插件的启动session超时时间，否则可能会出现无法启动session的情况。
import sparkmagic.utils.configuration as conf
conf.override("livy_session_startup_timeout_seconds", 1000)

#### 在管理界面中配置Spark Session
在上一步中打开的插件管理界面中，按如下操作步骤（a~c）进行配置并创建Session，推荐在红框区域进行配置操作，非红框区域用作配置预览：
  ![image.png](_html/2.jpg)
a. 管理Endpoint配置，在【Add Endpoint】页签填写相关参数，填写完成后点击配置页面最右侧**Add endpoint**按钮
  ![image.png](_html/3.jpg)
  【参数说明】其中Address和Password需要在EMR Spark控制台-工作空间-管理员配置-Compute-Gateway中获取，获取方法[参考文档](https://help.aliyun.com/zh/emr/emr-serverless-spark/use-cases/use-the-sparkmagic-plugin-of-jupyter-notebook-to-interact-with-serverless-spark#b742f2a133x2y)
|参数|说明|
|-|-|
|Auth type|选择Basic_Access。|
|Address|填写格式为https://<Gateway的Endpoint信息>。|
|Username|使用默认值即可。|
|Password|在EMR Spark控制台-工作空间-管理员配置-Compute-Gateway-Token管理中获取的Token。|

b. 切换到【Create Session】页签，选择刚创建好的Endpoint，自定义一个session名称，语言选择Python，其他参数保持默认，点击配置页面最右侧**Create Session**按钮
  ![image.png](_html/4.jpg)
c. 这时DSW实例的kernel会进入busy状态，创建session大约需要1~5分钟，创建完成后，可以在【Manage Sessions】页签看到刚创建好的session。


### 2.2（可选）不使用UI，直接使用命令创建spark session

In [0]:
# 同样在新建session前进行超时配置
import sparkmagic.utils.configuration as conf
conf.override("livy_session_startup_timeout_seconds", 1000)

In [0]:
# 如不习惯使用管理界面操作，上一小节中的配置可以用如下命令替换，执行后新建一个spark session
# Endpoint和Token需要在EMR Spark控制台-工作空间-管理员配置-Compute-Gateway中获取
# 例如：%spark add -s customsession -l python -u http://emr-spark-livy-gateway-cn-hangzhou.data.aliyun.com/api/v1/workspace/w-f***********/livycompute/lc-fim********* -a username -p u3gg*********
%spark add -s 自定义session名称 -l python -u https://填写Gateway的Endpoint -a username -p 填写Gateway的Token

In [0]:
# 更多%spark命令参数说明可以执行%spark?来查看
%spark?

In [0]:
%%spark
# 如果多个session同时在运行，在执行spark migic时，可以通过这种方式做session指定 %%spark --session 运行中的sessionname

#查看Spark版本
print("Spark Version:", sc.version)

## 步骤三：使用PySpark开发并提交训练任务
以训练一个线性回归模型为例，我们通过spark magic提交模型训练及预测任务到Spark集群计算，并获取打印结果。

In [0]:
%%spark
from pyspark.ml.regression import LinearRegression
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

# 构造一个简单的DataFrame作为示例数据
data = [(1, 1), (2, 3), (3, 5), (4, 7), (5, 9)]  # 特征和标签数据
columns = ["feature", "label"]
df = spark.createDataFrame(data, columns)

# 使用VectorAssembler将特征转换为向量，因为许多ML算法要求输入数据为向量形式
assembler = VectorAssembler(inputCols=["feature"], outputCol="features_vector")
df_transformed = assembler.transform(df)

# 选择LinearRegression模型进行训练
lr = LinearRegression(featuresCol="features_vector", labelCol="label")

# 划分训练集（这里为了简化直接使用全部数据训练，实际应用中应划分为训练集和测试集）
train_data = df_transformed

# 训练模型
model = lr.fit(train_data)

# 打印模型参数
print("模型系数:", model.coefficients)
print("模型截距:", model.intercept)

# 使用模型进行预测（以第一行数据为例）
input_data = [[1]]  # 特征数据
input_df = spark.createDataFrame(input_data, ["feature"])
input_df = assembler.transform(input_df)
prediction = model.transform(input_df)
print("预测结果:", prediction.select("prediction").collect()[0][0])

## 步骤四：释放Session资源
创建的Session会在闲置达到两小时后自动终止，确保资源的及时回收。此外，您也可以手动单击sparkmagic插件界面上的Delete来提前结束并释放会话资源。