# About: Simple Spark script for Test

---

Sparkの動作確認をしてみる。

## *Operation Note*

*This is a cell for your own recording.  ここに経緯を記述*

# Notebookと環境のBinding

Inventory中のgroup名でBind対象を指示する。

In [1]:
target_group = 'hadoop_all_testcluster'

コマンドの動作確認。

In [2]:
!ansible -a 'spark-submit --help' hadoop_client -l {target_group}

[0;32mXXX.XXX.XXX.72 | SUCCESS | rc=0 >>
Usage: spark-submit [options] <app jar | python file> [app arguments]
Usage: spark-submit --kill [submission ID] --master [spark://...]
Usage: spark-submit --status [submission ID] --master [spark://...]

Options:
  --master MASTER_URL         spark://host:port, mesos://host:port, yarn, or local.
  --deploy-mode DEPLOY_MODE   Whether to launch the driver program locally ("client") or
                              on one of the worker machines inside the cluster ("cluster")
                              (Default: client).
  --class CLASS_NAME          Your application's main class (for Java / Scala apps).
  --name NAME                 A name of your application.
  --jars JARS                 Comma-separated list of local jars to include on the driver
                              and executor classpaths.
  --packages                  Comma-separated list of maven coordinates of jars to include
                              on the 

# Datasetの準備

In [3]:
import tempfile
work_dir = tempfile.mkdtemp()
work_dir

'/tmp/tmpDGame6'

In [4]:
%env WORK_DIR={work_dir}

env: WORK_DIR=/tmp/tmpDGame6


今回は適当に・・・Iris Data Setを読み込む。 [Iris Data Set - UCI Machine Learning Repository](https://archive.ics.uci.edu/ml/datasets/Iris)

In [5]:
%%bash
cd ${WORK_DIR}
curl -O https://archive.ics.uci.edu/ml/machine-learning-databases/iris/iris.data

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0100  4551  100  4551    0     0   8250      0 --:--:-- --:--:-- --:--:--  8259


In [6]:
!head {work_dir}/iris.data

5.1,3.5,1.4,0.2,Iris-setosa
4.9,3.0,1.4,0.2,Iris-setosa
4.7,3.2,1.3,0.2,Iris-setosa
4.6,3.1,1.5,0.2,Iris-setosa
5.0,3.6,1.4,0.2,Iris-setosa
5.4,3.9,1.7,0.4,Iris-setosa
4.6,3.4,1.4,0.3,Iris-setosa
5.0,3.4,1.5,0.2,Iris-setosa
4.4,2.9,1.4,0.2,Iris-setosa
4.9,3.1,1.5,0.1,Iris-setosa


データをHDFSにアップロードする。

まずこのNotebook環境の一時ディレクトリから `hadoop_client` マシンにコピーしてから、 `hadoop fs` コマンドでコピーする。

In [7]:
hdfs_dataset_dir = '/dataset/iris'
hadoop_client_dir = '~/iris'

このNotebook環境から `hadoop_client` マシンへのコピー・・・

In [8]:
!ansible -m copy -a 'src={work_dir}/iris.data dest={hadoop_client_dir}/' hadoop_client -l {target_group}

[0;33mXXX.XXX.XXX.72 | SUCCESS => {
    "changed": true, 
    "checksum": "d11777fac6574637a4ec5f0effeab8542ae88b65", 
    "dest": "/home/ansible/iris/iris.data", 
    "gid": 500, 
    "group": "ansible", 
    "md5sum": "42615765a885ddf54427f12c34a0a070", 
    "mode": "0664", 
    "owner": "ansible", 
    "size": 4551, 
    "src": "/home/ansible/.ansible/tmp/ansible-tmp-1472182654.06-124636273342266/source", 
    "state": "file", 
    "uid": 500
}[0m


`hadoop fs` コマンドを実行する。

In [9]:
!ansible -m shell -a "chdir={hadoop_client_dir} \
                      hadoop fs -mkdir -p {hdfs_dataset_dir} && \
                      hadoop fs -copyFromLocal iris.data {hdfs_dataset_dir}" hadoop_client -l {target_group}

[0;32mXXX.XXX.XXX.72 | SUCCESS | rc=0 >>

[0m


念のため、HDFSにファイルがアップロードされていることを確認しておく。

以下の `dataset_dir` ディレクトリに、 `iris.data` が作成されていればOK。

In [10]:
!ansible -a 'hadoop fs -ls {hdfs_dataset_dir}' hadoop_client -l {target_group}

[0;32mXXX.XXX.XXX.72 | SUCCESS | rc=0 >>
Found 1 items
-rw-r--r--   3 ansible supergroup       4551 2016-08-26 12:37 /dataset/iris/iris.data
[0m


データベースおよびテーブルを作成する。まずはHiveのクエリを作成する。

# アプリケーションの実行

サンプルアプリケーションを実行する。参考: http://spark.apache.org/docs/latest/quick-start.html#self-contained-applications

In [11]:
import os

In [12]:
%%writefile {work_dir}/SimpleApp.py
# -*- coding:utf-8 -*-
"""SimpleApp.py"""
from pyspark import SparkContext, SparkConf

logFile = "/dataset/iris/iris.data"  # Should be some file on your system
conf = SparkConf().setAppName('test-pyspark').setMaster('yarn-cluster')
sc = SparkContext(conf=conf)
logData = sc.textFile(logFile).cache()

numAs = logData.filter(lambda s: 'a' in s).count()
numBs = logData.filter(lambda s: 'b' in s).count()

print("Lines with a: %i, lines with b: %i" % (numAs, numBs))

Writing /tmp/tmpDGame6/SimpleApp.py


クエリを実行する。

In [13]:
!ansible -m copy -a 'src={work_dir}/SimpleApp.py dest={hadoop_client_dir}/' hadoop_client -l {target_group}
!ansible -m shell -a 'spark-submit --master yarn-cluster --num-executors 4 {hadoop_client_dir}/SimpleApp.py' hadoop_client -l {target_group}

[0;33mXXX.XXX.XXX.72 | SUCCESS => {
    "changed": true, 
    "checksum": "68346c67549d133e8cd123f9e6dc94fd6c5a3e03", 
    "dest": "/home/ansible/iris/SimpleApp.py", 
    "gid": 500, 
    "group": "ansible", 
    "md5sum": "321e78f262bf625f5cdb840d3672f0cf", 
    "mode": "0664", 
    "owner": "ansible", 
    "size": 465, 
    "src": "/home/ansible/.ansible/tmp/ansible-tmp-1472182691.63-72101184265081/source", 
    "state": "file", 
    "uid": 500
}[0m
[0;32mXXX.XXX.XXX.72 | SUCCESS | rc=0 >>
16/08/26 12:38:15 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/08/26 12:38:17 INFO TimelineClientImpl: Timeline service address: http://testvm003:8188/ws/v1/timeline/
16/08/26 12:38:17 INFO Client: Requesting a new application from cluster with 4 NodeManagers
16/08/26 12:38:17 INFO Client: Verifying our application has not requested more than the maximum memory capability of the cluster (4096 MB per container)
16/

# 実行経過の確認

## ログの確認

ログは、 `yarn logs` コマンドにより確認することができる。この際、Application IDが必要。この情報はログから確認することができる。

> `16/08/26 12:38:23 INFO YarnClientImpl: Submitted application application_1472181274763_0002`

In [15]:
yarn_application_id = 'application_1472181274763_0002'

In [16]:
!ansible -a 'yarn logs -applicationId {yarn_application_id}' hadoop_client -l {target_group}

[0;32mXXX.XXX.XXX.72 | SUCCESS | rc=0 >>


Container: container_1472181274763_0002_01_000004 on testvm004_45454
LogType:directory.info
Log Upload Time:Fri Aug 26 12:39:03 +0900 2016
LogLength:3825
Log Contents:
ls -l:
total 40
lrwxrwxrwx 1 yarn yarn  128 Aug 26 12:38 __spark__.jar -> /hadoop/tmp/hadoop-yarn/nm-local-dir/usercache/ansible/filecache/13/spark-assembly-XXX.XXX.XXX.2.4.2.0-258-hadoopXXX.XXX.XXX.2.4.2.0-258.jar
lrwxrwxrwx 1 yarn yarn  105 Aug 26 12:38 __spark_conf__ -> /hadoop/tmp/hadoop-yarn/nm-local-dir/usercache/ansible/filecache/12/__spark_conf__7022645124653218094.zip
-rw-r--r-- 1 yarn yarn  109 Aug 26 12:38 container_tokens
-rwx------ 1 yarn yarn  754 Aug 26 12:38 default_container_executor.sh
-rwx------ 1 yarn yarn  700 Aug 26 12:38 default_container_executor_session.sh
-rwx------ 1 yarn yarn 6500 Aug 26 12:38 launch_container.sh
lrwxrwxrwx 1 yarn yarn   84 Aug 26 12:38 py4j-0.9-src.zip -> /hadoop/tmp/hadoop-yarn/nm-local-dir/usercache/ansible/filecac

# 後始末

一時ディレクトリを削除する。

In [17]:
!rm -fr {work_dir}