## Sparkをはじめてみよう
Sparkを使って、README.mdを読み込み、いろいろと操作してみます。

In [1]:
# scはSparkContextといって、sparkの基本となるオブジェクト。
textFile = sc.textFile("../README.md")

In [2]:
# 行数をカウント
textFile.count()

99

In [3]:
# "Spark"を含む行を抽出。
sparkLines = textFile.filter(lambda l: "Spark" in l)
sparkLines.count()

### filter-> pythonのfilterと同じ

19

In [4]:
# "Spark"を含む行を3行取得
sparkLines.take(3)

[u'# Apache Spark',
 u'Spark is a fast and general cluster computing system for Big Data. It provides',
 u'rich set of higher-level tools including Spark SQL for SQL and DataFrames,']

In [5]:
# 出現単語ごとの件数を取得
textFile.flatMap(
    lambda l: l.strip().split()  # 行を単語に分解
).map(
    lambda w: (w, 1)  # 単語の数をカウントするため、（単語、1）というタプルに変換
).reduceByKey(
    lambda x, y: x + y  # 単語ごとに集計
).take(10)

### flatMap -> mapしつつ、内部のコレクションを展開する関数。[[1,2], [3,4], [4,5]]をflatMapすると、[1,2,3,4,5]になる。
### map -> pythonのmapと同じ
### reduceByKey -> (key, value)のコレクションに対して、キーごとに値を集計する。

[(u'when', 1),
 (u'R,', 1),
 (u'including', 3),
 (u'computation', 1),
 (u'using:', 1),
 (u'guidance', 2),
 (u'Scala,', 1),
 (u'environment', 1),
 (u'only', 1),
 (u'rich', 1)]

## RDDの処理イメージ
RDDのTransformationとActionがどのように実行されるのか見てみます。

In [6]:
# テキストファイルからRDD作成。この時点ではRDDオブジェクトが作成されるだけで、実際の読み込みは発生しない。
linesRDD = sc.textFile("../README.md")
print type(linesRDD)

<class 'pyspark.rdd.RDD'>


In [7]:
# "Spark"というワードを持つ行のみ抽出。ここでも新たなRDDオブジェクトが作成されるだけ。
filteredRDD =  linesRDD.filter(lambda l: "Spark" in l)
print type(filteredRDD)

<class 'pyspark.rdd.PipelinedRDD'>


In [9]:
# 行数取得。ここでやっとテキストファイルから読み込んでフィルタするという操作が実行される。
cnt = filteredRDD.count()
print cnt

19


## DataFrameを動かしてみよう

In [10]:
# DataFrameを使うためのエントリーポイントとなるオブジェクト作成
from pyspark.sql import SparkSession
spark =  SparkSession.builder.appName("pycon2016").getOrCreate()

In [11]:
# 下記のようなjsonがある
%cat ../examples/src/main/resources/people.json

{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}


In [12]:
# これを読み込む
df = spark.read.json("../examples/src/main/resources/people.json")
df.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [13]:
# 特定のカラムを選択
df.select("name").show()

+-------+
|   name|
+-------+
|Michael|
|   Andy|
| Justin|
+-------+



In [14]:
# 絞り込み選択
df.filter(df['age'] > 21).show()

+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+



In [15]:
# 集約関数
df.groupBy("age").count().show()

+----+-----+
| age|count|
+----+-----+
|  19|    1|
|null|    1|
|  30|    1|
+----+-----+



In [16]:
# メモリ上にテンポラリテーブルとして保存することも可能。
df.createOrReplaceTempView("people")

In [17]:
# テーブルには直接SQLを発行することができます。戻り値はDataFrame。
res = spark.sql("select age from people where name = 'Andy'")
res.show()

+---+
|age|
+---+
| 30|
+---+



In [18]:
# もちろん、データを書き込むことも可能。jsonで読んだデータをcsvで書き込む。
newDf = df.select(df["name"], df["age"] + 10)
newDf.write.mode("overwrite").csv("/tmp/new_people.csv")

In [19]:
spark.read.csv("/tmp/new_people.csv").show()

+-------+---+
|    _c0|_c1|
+-------+---+
|Michael|   |
|   Andy| 40|
| Justin| 29|
+-------+---+



## DataFrameを使うメリット
名前、部署、年齢からなる社員名簿から部署ごとの平均年齢をとる例を実装してみる。


In [20]:
from pyspark.sql import Row
import pyspark.sql.functions as func

# 元データ
# (名前、部署名、年齢)
raw = [("Pete", "dev", 20), ("Keith", "dev", 22),  ("Roger", "sales", 30), ("John", "sales", 28)]

# RDD版
rdd = sc.parallelize(raw)

# DataFrame版
df = spark.createDataFrame(
    rdd.map(
        lambda r: Row(name=r[0], department=r[1], age=r[2])))

In [21]:
# RDDで処理するケース
rdd.map(
    lambda r: (r[1], (float(r[2]), 1))  # (部署名、(年齢、カウント))のタプルに変換
).reduceByKey(
    lambda x, y: (x[0] + y[0], x[1] + y[1])  # 部署ごとに年齢と人数の合計を算出
).mapValues(
    lambda v: v[0] / v[1]  # 年齢の合計と人数の合計割り算
).collect()

[('sales', 29.0), ('dev', 21.0)]

In [22]:
# DataFrameで処理するケース
df.groupBy("department").agg(func.avg("age")).show()

+----------+--------+
|department|avg(age)|
+----------+--------+
|       dev|    21.0|
|     sales|    29.0|
+----------+--------+



## メールのスパム判定

In [24]:
# まずは元データの中身を見る。
sc.textFile("../data/smsspam/SMSSpamCollection").take(3)

# ラベルと文章からなるtsvファイル

[u'ham\tGo until jurong point, crazy.. Available only in bugis n great world la e buffet... Cine there got amore wat...',
 u'ham\tOk lar... Joking wif u oni...',
 u"spam\tFree entry in 2 a wkly comp to win FA Cup final tkts 21st May 2005. Text FA to 87121 to receive entry question(std txt rate)T&C's apply 08452810075over18's"]

In [25]:
from pyspark.sql.types import *

# オリジナルデータをロード
# スキーマを個別で指定したい場合は、以下のようにStructTypeを作成する。
schema = StructType([StructField("label", StringType(), True), StructField("body", StringType(), True)])
df = spark.read.csv("../data/smsspam/SMSSpamCollection", schema=schema, sep="\t")

In [26]:
from pyspark.sql.functions import when

# ham/spamといったラベルを0/1に変換
labeled = df.select(
    when(df["label"] == "ham", 0).otherwise(1)
    .cast(IntegerType())
    .alias("label"),
    df["body"])

# トレーニングデータとテストデータに分割
training, test = labeled.randomSplit([0.7, 0.3])

In [27]:
from pyspark.ml.feature import HashingTF, Tokenizer, StopWordsRemover
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression

# 以下の流れのパイプラインを構築する。
# 文章を単語に分割 => 単語の選別 => 単語の出現頻度カウント => ロジスティック回帰モデル作成

# まずは、パイプラインのそれぞれのステージを構築するオブジェクトを作成する。
## 文章を単語に分割するやつ
tokenizer = Tokenizer(inputCol="body", outputCol="words")

## 単語を選別するやつ
remover = StopWordsRemover(inputCol=tokenizer.getOutputCol(), outputCol="filtered")

## 単語を数えるやつ
hashingTF = HashingTF(inputCol=remover.getOutputCol(), outputCol="features")

## ロジスティック回帰モデルを作成するやつ
lr = LogisticRegression(maxIter=10, regParam=0.01)

In [28]:
# 上記のオブジェクトを組み合わせてパイプラインを構築。
pipeline = Pipeline(stages=[tokenizer, remover, hashingTF, lr])

In [29]:
# 学習モデル構築
model = pipeline.fit(training)

In [30]:
# モデルの精度（AUC）を計測
from pyspark.ml.evaluation import BinaryClassificationEvaluator

prediction = model.transform(test)
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", labelCol="label", metricName="areaUnderROC")
evaluator.evaluate(prediction)

0.9805776464864642