## 使用Jupyter Notebook运行EMR Spark

Spark是目前最流行的大数据计算引擎，我们在Jupyter Notebook的默认Python运行环境中实现了若干魔术方法，能让你轻松的使用Jupyter运行PySpark


#### 在正式运行Spark前，需要加载emrmagic插件

In [1]:
%load_ext emrmagic

The emrmagic extension is already loaded. To reload it, use:
  %reload_ext emrmagic


#### emrmagic插件主要包含以下魔术命令

- %%spark.set_config: 用于配置Spark运行时参数，逻辑上你可以配置所有当前EMR Spark版本支持的参数，如executor数量、driver/executor内存、dynamicAllocation等等。
- %spark.load_config: 用于查看当前Spark参数配置。
- %spark.get_session: 用于在EMR集群中以yarn client模式启动Spark作业，启动参数使用spark.set_config配置的参数。
- %%spark.sql: 在启动Spark作业后使用执行SparkSQL，作用相当于`spark.sql('sql statement')`。
- %spark.stop_session: 用于终止当前Notebook开启的Spark作业。

以下Notebook将举例说明上述魔术命令的使用方法

#### spark.set_config

这是一个cell magic，配置内容采用k=v的模式，在一个Notebook只存在一份Spark配置，你可以在启动Spark作业前的任意时刻改动配置。
如需查看Spark默认配置，可执行以下的命令

In [10]:
!cat /etc/emr/spark-conf/spark-defaults.conf

cat: /etc/emr/spark-conf/spark-defaults.conf: No such file or directory


In [4]:
%%spark.set_config

spark.executor.instances=1
spark.executor.memory=512m
spark.executor.cores=2

需要注意的是，emrmagic内部会将启动Notebook时选择的Python虚拟环境地址传递给spark.yarn.dist.archives参数，并结合spark.pyspark.python参数用于确保Spark作业Python环境的一致性。

如果对于PySpark的Python环境管理机制不甚了解，建议不要修改该参数。

#### spark.load_config

这是一个line magic，作用为获取当前的Spark配置，可在任意时刻查看，在此不再赘述

In [5]:
%spark.load_config

{'spark.executor.instances': '1',
 'spark.executor.memory': '512m',
 'spark.executor.cores': '2'}

#### spark.get_session

这是一个line magic，该命令接收一个行参数，用于指定Spark作业名称；返回一个SparkSession和SparkContext对象，用于后续执行Spark程序。每个Notebook只能启动一个Spark作业，重复执行该命令会获取已生成的SparkSession。另外，该SparkSession默认开启Hive支持（即enableHiveSupport），连接EMR集群中的Hive Metastore。

In [6]:
spark, sc = %spark.get_session test

ERROR StatusLogger Reconfiguration failed: No configuration found for '452b3a41' at 'null' in 'null'
ERROR StatusLogger Reconfiguration failed: No configuration found for 'Default' at 'null' in 'null'
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/08/10 16:08:10 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/08/10 16:08:11 WARN DomainSocketFactory: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.
23/08/10 16:08:12 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.


In [7]:
spark

接下来即可开始编写PySpark程序


In [9]:
sum = sc.range(1,10).sum()
print("Sum = " + str(sum))

df = spark.read.json("file:///opt/apps/SPARK3/spark-current/examples/src/main/resources/people.json")
df.printSchema()

df.show()

Sum = 45


                                                                                

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



可以直接通过EMR Spark访问oss中的数据

In [8]:
import os

bucket = os.environ['VENV'].split('/')[2]
username = os.environ['USER']

spark.read.json('oss://{bucket}/jupyter/notebook/{username}/tutorial'.format(bucket=bucket, username=username)).show()

+--------------------+--------------------+--------+--------------+
|               cells|            metadata|nbformat|nbformat_minor|
+--------------------+--------------------+--------+--------------+
|[{markdown, null,...|{{Spark - Python ...|       4|             5|
|[{markdown, null,...|{{Spark - Python ...|       4|             5|
|[{markdown, null,...|{{Spark - Python ...|       4|             5|
|[{code, 1, c254df...|{{Spark - Python ...|       4|             5|
+--------------------+--------------------+--------+--------------+



使用Spark mllib运行简单的机器学习程序

In [15]:
!pip install pandas numpy -i http://mirrors.cloud.aliyuncs.com/pypi/simple --trusted-host mirrors.cloud.aliyuncs.com

Looking in indexes: http://mirrors.cloud.aliyuncs.com/pypi/simple
Collecting pandas
  Downloading http://mirrors.cloud.aliyuncs.com/pypi/packages/e3/59/35a2892bf09ded9c1bf3804461efe772836a5261ef5dfb4e264ce813ff99/pandas-2.0.3-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (12.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m12.3/12.3 MB[0m [31m68.6 MB/s[0m eta [36m0:00:00[0m00:01[0m0:01[0m
[?25hCollecting numpy
  Downloading http://mirrors.cloud.aliyuncs.com/pypi/packages/71/3c/3b1981c6a1986adc9ee7db760c0c34ea5b14ac3da9ecfcf1ea2a4ec6c398/numpy-1.25.2-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (18.2 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m18.2/18.2 MB[0m [31m91.1 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
Collecting pytz>=2020.1 (from pandas)
  Downloading http://mirrors.cloud.aliyuncs.com/pypi/packages/7f/99/ad6bd37e748257dd70d6f85d916cafe79c0b0f5e2e95b11f7fbc82bf3110/pytz-2023.3-py2.py3-none-an

In [16]:
from pyspark.ml.classification import LogisticRegression

# Load training data
training = spark.read.format("libsvm").load("file:///opt/apps/SPARK3/spark-current/data/mllib/sample_libsvm_data.txt")

lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)

# Fit the model
lrModel = lr.fit(training)

# Print the coefficients and intercept for logistic regression
print("Coefficients: " + str(lrModel.coefficients))
print("Intercept: " + str(lrModel.intercept))

# We can also use the multinomial family for binary classification
mlr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8, family="multinomial")

# Fit the model
mlrModel = mlr.fit(training)

# Print the coefficients and intercepts for logistic regression with multinomial family
print("Multinomial coefficients: " + str(mlrModel.coefficientMatrix))
print("Multinomial intercepts: " + str(mlrModel.interceptVector))

23/08/10 16:24:03 WARN LibSVMFileFormat: 'numFeatures' option not specified, determining the number of features by going though the input. If you know the number in advance, please specify it via 'numFeatures' option to avoid the extra scan.


                                                                                

23/08/10 16:24:06 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
23/08/10 16:24:06 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS
23/08/10 16:24:06 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
23/08/10 16:24:06 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
Coefficients: (692,[272,300,323,350,351,378,379,405,406,407,428,433,434,435,455,456,461,462,483,484,489,490,496,511,512,517,539,540,568],[-7.520689871384157e-05,-8.11577314684704e-05,3.814692771846389e-05,0.0003776490540424341,0.0003405148366194407,0.0005514455157343111,0.00040853861160969167,0.00041974673327494573,0.0008119171358670032,0.0005027708372668752,-2.392926040660149e-05,0.0005745048020902299,0.000903754642680371,7.818229700243959e-05,-2.17875519529124e-05,-3.402165821789581e-05,0.0004966517360637634,0.0008190557828370371,-8.01

#### spark.sql

该命令既是line magic又是cell magic，下面通过具体的例子来介绍改magic的主要特性。

在cell中编写单句SQL执行

In [17]:
%%spark.sql

show tables

2023-08-10 16:24:20,228 Thread-4 ERROR Reconfiguration failed: No configuration found for '22c6bb19' at 'null' in 'null'
16:24:24.050 [Thread-4] ERROR com.aliyun.datalake.metastore.common.STSHelper - can't get ststoken afer retry:3 times, due to Failed to connect to /100.100.100.200:80
java.net.ConnectException: Failed to connect to /100.100.100.200:80
	at com.aliyun.datalake.external.okhttp3.internal.connection.RealConnection.connectSocket(RealConnection.java:249) ~[aliyun-java-sdk-dlf-shaded-0.2.9.jar:?]
	at com.aliyun.datalake.external.okhttp3.internal.connection.RealConnection.connect(RealConnection.java:167) ~[aliyun-java-sdk-dlf-shaded-0.2.9.jar:?]
	at com.aliyun.datalake.external.okhttp3.internal.connection.StreamAllocation.findConnection(StreamAllocation.java:258) ~[aliyun-java-sdk-dlf-shaded-0.2.9.jar:?]
	at com.aliyun.datalake.external.okhttp3.internal.connection.StreamAllocation.findHealthyConnection(StreamAllocation.java:135) ~[aliyun-java-sdk-dlf-shaded-0.2.9.jar:?]
	at co

AnalysisException: org.apache.hadoop.hive.ql.metadata.HiveException: java.lang.RuntimeException: Unable to instantiate com.aliyun.datalake.metastore.hive2.ProxyMetaStoreClient

16:24:27.186 [dlf-token-refresher-0] ERROR com.aliyun.datalake.metastore.common.STSHelper - can't get ststoken afer retry:3 times, due to Failed to connect to /100.100.100.200:80
java.net.ConnectException: Failed to connect to /100.100.100.200:80
	at com.aliyun.datalake.external.okhttp3.internal.connection.RealConnection.connectSocket(RealConnection.java:249) ~[aliyun-java-sdk-dlf-shaded-0.2.9.jar:?]
	at com.aliyun.datalake.external.okhttp3.internal.connection.RealConnection.connect(RealConnection.java:167) ~[aliyun-java-sdk-dlf-shaded-0.2.9.jar:?]
	at com.aliyun.datalake.external.okhttp3.internal.connection.StreamAllocation.findConnection(StreamAllocation.java:258) ~[aliyun-java-sdk-dlf-shaded-0.2.9.jar:?]
	at com.aliyun.datalake.external.okhttp3.internal.connection.StreamAllocation.findHealthyConnection(StreamAllocation.java:135) ~[aliyun-java-sdk-dlf-shaded-0.2.9.jar:?]
	at com.aliyun.datalake.external.okhttp3.internal.connection.StreamAllocation.newStream(StreamAllocation.java:114)

支持编写多句SQL，SQL之间使用分号分隔，展示最后一条语句的返回结果

In [17]:
%%spark.sql

use default;
drop table if exists test_table;
create table test_table (a string, b int);
insert into test_table values("abc", 1), ("def", 2);
select a, sum(b) from test_table group by a

Unnamed: 0,a,sum(b)
0,abc,1
1,def,2


In [18]:
x = 'abc'

支持向SQL语句中传入变量

In [19]:
%%spark.sql
select * from test_table where a = '{x}'

Unnamed: 0,a,b
0,abc,1


支持将最后一句SQL的查询结果传递给一个变量（使用`-o`参数），变量类型为Spark DataFrame

In [20]:
%%spark.sql -o df

select a, sum(b) from test_table group by a

Unnamed: 0,a,sum(b)
0,abc,1
1,def,2


In [21]:
df.show()

+---+------+
|  a|sum(b)|
+---+------+
|abc|     1|
|def|     2|
+---+------+

