# 使用sparkmagic与spark进行交互


使用静态数据做分析的时候我们更加希望spark作为我们的计算资源,在远程集群上让我们作为运算资源调用,运算完成后可能还需要做个可视化的工作.这种时候就可以使用Livy配合sparkmagic,借助jupyter notebook来与spark交互了.注意这种方式一般用于分析静态数据,好处是可以充分利用本地的python包.


一般来说只要在master机器上启动Livy的服务,同时其8998端口是对外的,那么就可以在本地的`~/.sparkmagic/config.json`中修改配置连接Livy.下面是默认配置:

```json
{
  "kernel_python_credentials" : {
    "username": "",
    "password": "",
    "url": "http://localhost:8998",
    "auth": "None"
  },

  "kernel_scala_credentials" : {
    "username": "",
    "password": "",
    "url": "http://localhost:8998",
    "auth": "None"
  },
  "kernel_r_credentials": {
    "username": "",
    "password": "",
    "url": "http://localhost:8998"
  },

  "logging_config": {
    "version": 1,
    "formatters": {
      "magicsFormatter": { 
        "format": "%(asctime)s\t%(levelname)s\t%(message)s",
        "datefmt": ""
      }
    },
    "handlers": {
      "magicsHandler": { 
        "class": "hdijupyterutils.filehandler.MagicsFileHandler",
        "formatter": "magicsFormatter",
        "home_path": "~/.sparkmagic"
      }
    },
    "loggers": {
      "magicsLogger": { 
        "handlers": ["magicsHandler"],
        "level": "DEBUG",
        "propagate": 0
      }
    }
  },

  "wait_for_idle_timeout_seconds": 15,
  "livy_session_startup_timeout_seconds": 60,

  "fatal_error_suggestion": "The code failed because of a fatal error:\n\t{}.\n\nSome things to try:\na) Make sure Spark has enough available resources for Jupyter to create a Spark context.\nb) Contact your Jupyter administrator to make sure the Spark magics library is configured correctly.\nc) Restart the kernel.",

  "ignore_ssl_errors": false,

  "session_configs": {
    "driverMemory": "1000M",
    "executorCores": 2
  },

  "use_auto_viz": true,
  "coerce_dataframe": true,
  "max_results_sql": 2500,
  "pyspark_dataframe_encoding": "utf-8",
  
  "heartbeat_refresh_seconds": 30,
  "livy_server_heartbeat_timeout_seconds": 0,
  "heartbeat_retry_seconds": 10,

  "server_extension_default_kernel_name": "pysparkkernel",
  "custom_headers": {},
  
  "retry_policy": "configurable",
  "retry_seconds_to_sleep_list": [0.2, 0.5, 1, 3, 5],
  "configurable_retry_policy_max_retries": 8
}
```

我们一般只用修改不同kernel中的url就可以进行配置了.另一个比较重要的参数是`wait_for_idle_timeout_seconds`和`livy_session_startup_timeout_seconds`,它们管着过期时间.可能会需要微调.

每次启动一个sparkmagic的notebook后使用`sc`进行初始化.这会创建一个livy-session,同时也创建一个spark的会话上下文.sc这个变量就是这个spark上下文

In [1]:
sc

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
6,,pyspark,idle,,,✔


SparkSession available as 'spark'.
<SparkContext master=local appName=livy-session-6>

sparkmagic提供了一些魔术方法方便我们管理spark任务.可以使用`%%help`来查看有哪些魔术方法

In [2]:
%%help

Magic,Example,Explanation
info,%%info,Outputs session information for the current Livy endpoint.
cleanup,%%cleanup -f,"Deletes all sessions for the current Livy endpoint, including this notebook's session. The force flag is mandatory."
delete,%%delete -f -s 0,Deletes a session by number for the current Livy endpoint. Cannot delete this kernel's session.
logs,%%logs,Outputs the current session's Livy logs.
configure,"%%configure -f {""executorMemory"": ""1000M"", ""executorCores"": 4}",Configure the session creation parameters. The force flag is mandatory if a session has already been  created and the session will be dropped and recreated. Look at Livy's POST /sessions Request Body for a list of valid parameters. Parameters must be passed in as a JSON string.
spark,%%spark -o df df = spark.read.parquet('...,"Executes spark commands.  Parameters:  -o VAR_NAME: The Spark dataframe of name VAR_NAME will be available in the %%local Python context as a  Pandas dataframe with the same name.  -m METHOD: Sample method, either take or sample.  -n MAXROWS: The maximum number of rows of a dataframe that will be pulled from Livy to Jupyter.  If this number is negative, then the number of rows will be unlimited.  -r FRACTION: Fraction used for sampling."
sql,%%sql -o tables -q SHOW TABLES,"Executes a SQL query against the variable sqlContext (Spark v1.x) or spark (Spark v2.x).  Parameters:  -o VAR_NAME: The result of the SQL query will be available in the %%local Python context as a  Pandas dataframe.  -q: The magic will return None instead of the dataframe (no visualization).  -m, -n, -r are the same as the %%spark parameters above."
local,%%local a = 1,All the code in subsequent lines will be executed locally. Code must be valid Python code.


我们用于分析的数据一般会放在这么几个位置:

+ `hdfs`
+ 对象存储,亚马逊有对象存储s3,它在本地可以使用[aws-cli](https://github.com/aws/aws-cli)或者[Boto 3](https://boto3.amazonaws.com/v1/documentation/api/latest/index.html)进行访问和操作

这两种方式都可以直接安装[smart_open](https://github.com/RaRe-Technologies/smart_open)在本地操作,然后在spark上使用read命令操作

+ `hive` 如果spark已经配置好了hive连接,那么可以使用全局变量`sqlContext`来访问

In [3]:
import pandas as pd
import numpy as np

In [4]:
pdf = pd.DataFrame(np.random.rand(100, 3))

In [5]:
pdf[:10]

          0         1         2
0  0.043678  0.931353  0.335554
1  0.741314  0.419054  0.830939
2  0.631265  0.369277  0.705801
3  0.919189  0.532052  0.250095
4  0.861283  0.711060  0.763949
5  0.605339  0.282307  0.552550
6  0.321202  0.111927  0.464540
7  0.413631  0.190857  0.855158
8  0.311511  0.864287  0.367467
9  0.852249  0.743370  0.415710

In [6]:
df = spark.createDataFrame(pdf)

In [7]:
df.count()

100

In [8]:
df[df[0]>0.5].count()

50

## 读取数据

数据分析,模型训练都是建立在有数据的基础上的,一般我们的数据来源有这个几个方面:

+ 数据库,一般是mysql,pg这些关系数据库
+ 共享内存,以redis为代表的共享内存一般用于缓存热数据.
+ kafka,通常用于流处理
+ hive 最常见的数据仓库
+ hdfs 以文件的形式保存数据

### 读取mysql数据

In [9]:
mysql_df = spark.read.format("jdbc").options(
    url="jdbc:mysql://172.16.1.77:3306/samh",
    driver="com.mysql.cj.jdbc.Driver",
    dbtable="(SELECT * FROM cartoon) tmp", user="recommend",
    password="R1QaL1^G0Fpv8^ZT"
).load()

In [14]:
mysql_df.count()

101639

In [19]:
mysql_pdf = mysql_df.sample(False, fraction=0.01).toPandas()

In [21]:
mysql_pdf[:10]

   cartoon_id cartoon_name  ... uploader_Uid uploader_Uname
0         113        70亿之针  ...         1971          张洁80后
1         130        稻荷恋之歌  ...         1971          张洁80后
2         259         花的名字  ...         1977           扬州酒版
3         262        美女是野兽  ...         1977           扬州酒版
4         265      汤尼岳崎的钢弹  ...         1977           扬州酒版
5         425         REAL  ...         1985         紫藤花恋98
6         529         欺诈游戏  ...         2618      爱听歌的牛牛爱大齐
7         607          秦始皇  ...         2284           万度HR
8         715         野蛮人乔  ...         1998          小炎865
9         767       魔兽世界漫画  ...         2000        跟着时尚感觉走

[10 rows x 76 columns]

In [17]:
help(mysql_df.sample)

Help on method sample in module pyspark.sql.dataframe:

sample(withReplacement=None, fraction=None, seed=None) method of pyspark.sql.dataframe.DataFrame instance
    Returns a sampled subset of this :class:`DataFrame`.
    
    :param withReplacement: Sample with replacement or not (default False).
    :param fraction: Fraction of rows to generate, range [0.0, 1.0].
    :param seed: Seed for sampling (default a random seed).
    
    .. note:: This is not guaranteed to provide exactly the fraction specified of the total
        count of the given :class:`DataFrame`.
    
    .. note:: `fraction` is required and, `withReplacement` and `seed` are optional.
    
    >>> df = spark.range(10)
    >>> df.sample(0.5, 3).count()
    4
    >>> df.sample(fraction=0.5, seed=3).count()
    4
    >>> df.sample(withReplacement=True, fraction=0.5, seed=3).count()
    1
    >>> df.sample(1.0).count()
    10
    >>> df.sample(fraction=1.0).count()
    10
    >>> df.sample(False, fraction=1.0).count()
 

In [None]:
mysql_df.write.format("jdbc").options(url="jdbc:mysql://172.16.1.77:3306/test",
                                      driver="com.mysql.cj.jdbc.Driver",
                                      dbtable="spark_cartoon", user="recommend",
                                      password="R1QaL1^G0Fpv8^ZT").save()

### 读取本地文件写入hdfs

很遗憾,这个操作是不行的,我们可以做的不过是读出数据,转成dict,然后拷贝到spark而已.更靠谱的方式是将文件传入hdfs或者对象存储

In [9]:
%%local
import json
with open("data/user.json") as f:
    localjson = json.load(f)
print(localjson)

[{'name': 'hsz', 'age': 18}, {'name': 'hzj', 'age': 20}, {'name': 'zyf', 'age': 28}, {'name': 'ykl', 'age': 38}]


In [10]:
spark.read.parquet("userdata.parquet")

'Path does not exist: hdfs://iZbp13z41dx386bju3wb1zZ:8020/user/huangsizhe/userdata.parquet;'
Traceback (most recent call last):
  File "/data/spark/spark-2.4.0-bin-2.6.0-cdh5.7.0/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 316, in parquet
    return self._df(self._jreader.parquet(_to_seq(self._spark._sc, paths)))
  File "/data/spark/spark-2.4.0-bin-2.6.0-cdh5.7.0/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/data/spark/spark-2.4.0-bin-2.6.0-cdh5.7.0/python/lib/pyspark.zip/pyspark/sql/utils.py", line 69, in deco
    raise AnalysisException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.AnalysisException: 'Path does not exist: hdfs://iZbp13z41dx386bju3wb1zZ:8020/user/huangsizhe/userdata.parquet;'



In [11]:
pdf = pd.DataFrame([{'name': 'hsz', 'age': 18}, {'name': 'hzj', 'age': 20}, {'name': 'zyf', 'age': 28}, {'name': 'ykl', 'age': 38}])

In [12]:
pdf

   age name
0   18  hsz
1   20  hzj
2   28  zyf
3   38  ykl

In [13]:
df = spark.createDataFrame(pdf)

In [14]:
df.write.parquet("hdfs://iZbp13z41dx386bju3wb1zZ:8020/user/huangsizhe/userdata.parquet")

An error occurred while calling o107.parquet.
: org.apache.hadoop.security.AccessControlException: Permission denied: user=huangsizhe, access=WRITE, inode="/user":root:supergroup:drwxr-xr-x
	at org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.checkFsPermission(DefaultAuthorizationProvider.java:281)
	at org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.check(DefaultAuthorizationProvider.java:262)
	at org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.check(DefaultAuthorizationProvider.java:242)
	at org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.checkPermission(DefaultAuthorizationProvider.java:169)
	at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:152)
	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:6590)
	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:6572)
	at org.apache.hado