# Spark ML

Last updated 17:15 20161125


## 목적

* Spark를 사용하여 통계분석을 할 수 있다.
* Spark를 사용하여 추천, 분류, 예측을 분석할 수 있다.

## 시작하기 전과 후

* 시작에 필요한 지식
    * Python
    * 데이터 관리 pandas, sql
    * 머신러닝 scikit-learn
* 끝나고 나면
    * Spark를 사용해서 빅데이터 처리를 할 수 있다.

## 문제

* 문제 S-11: Spark MLib Decision Tree
* 문제 S-12: Spark MLib Logistic regression
    * https://www.codementor.io/spark/tutorial/spark-mllib-logistic-regression
* 문제 S-13: Spark MLib movie recommendation 사례
* 문제 S-14: Spark Streaming
* 문제 S-15: GraphX
* 문제 시각화 Bokeh
http://www.blog.pythonlibrary.org/2016/07/27/python-visualization-with-bokeh/

* spark-submit (self-contained app in quick-start 참조)

## 머신러닝이란?

* 왜 머신러닝? man vs machine
    * 사람이 처리하려면 데이터가 대규모
    * 사람이 발견하지 못하는 패턴
    * 사람이 저지르기 쉬운 bias
    
* supervised vs unsupervised

## M.1 데이터 수집

* 데이터

구분 | 데이터 제공
-----|-----
t1df | ds_twitter.seoul mongodb
t2df | twitter json

### M.1.1 mongodb spark connector

* 참조 https://docs.mongodb.com/spark-connector/
* 사전 설치
    * Running MongoDB instance (version 2.6 or later).
    * Spark 1.6.x.
    * Scala 2.10.x if using the mongo-spark-connector_2.10 package
설정에 packages를 넣음 (scala -version이 2.11이라 mongo-spark-connector_2.11을 넣어야 하지만, 2.10으로 ok
    ```
    $vim conf/spark-defaults.conf 
    spark.jars.packages=org.mongodb.spark:mongo-spark-connector_2.10:1.1.0
    ```
    
    * scala version 확인
    ```
    jsl@jsl-smu:~$ scala -version
    Scala code runner version 2.11.6 -- Copyright 2002-2013, LAMP/EPFL
    ```

* MongoDB Python API Basics
    * MongoDB에 쓰기
        * DataFrame을 MongoDB로 저장
            ```
            write.format("com.mongodb.spark.sql.DefaultSource").mode("overwrite").save()
            ```

    * MongoDB 읽기 (collection을 DataFrame으로)
        * database, collection은 spark.mongodb.input.uri로 설정해 놓음
        * format은 "com.mongodb.spark.sql.DefaultSource"로 정해놓음.
            ```
           sqlContext.read.format("com.mongodb.spark.sql.DefaultSource").load()
            ```

        * uri설정

In [1]:
import os
import findspark

home=os.getenv("HOME")
spark_home=os.path.join(home,"Downloads/spark-1.6.0-bin-hadoop2.6")
findspark.init(spark_home)

In [2]:
import pyspark
conf=pyspark.SparkConf()
conf = pyspark.SparkConf().setAppName("myAppName")
#conf.set("spark.mongodb.input.uri","mongodb://127.0.0.1/ds_rest_subwayPassengersDb.db_rest_subwayTable?readPreference=primaryPreferred")
#conf.set("spark.mongodb.output.uri","mongodb://127.0.0.1/ds_rest_subwayPassengersDb.db_rest_subwayTable")
conf.set("spark.mongodb.input.uri","mongodb://127.0.0.1/ds_twitter.seoul?readPreference=primaryPreferred")
conf.set("spark.mongodb.output.uri","mongodb://127.0.0.1/ds_twitter.seoul")
sc = pyspark.SparkContext(conf=conf)
#sc = pyspark.SparkContext()

In [3]:
print sc._conf.getAll()
print sc._conf.get("spark.jars.packages")
sc.setLogLevel("ERROR")
sqlContext = pyspark.sql.SQLContext(sc)

[(u'spark.app.name', u'myAppName'), (u'spark.submit.pyFiles', u'/home/jsl/.ivy2/jars/graphframes_graphframes-0.1.0-spark1.6.jar,/home/jsl/.ivy2/jars/org.mongodb.spark_mongo-spark-connector_2.10-1.1.0.jar,/home/jsl/.ivy2/jars/com.databricks_spark-csv_2.10-1.3.0.jar,/home/jsl/.ivy2/jars/org.mongodb_mongo-java-driver-3.2.2.jar,/home/jsl/.ivy2/jars/org.apache.commons_commons-csv-1.1.jar,/home/jsl/.ivy2/jars/com.univocity_univocity-parsers-1.5.1.jar'), (u'spark.rdd.compress', u'True'), (u'spark.serializer.objectStreamReset', u'100'), (u'spark.master', u'local[*]'), (u'spark.mongodb.output.uri', u'mongodb://127.0.0.1/ds_twitter.seoul'), (u'spark.submit.deployMode', u'client'), (u'spark.mongodb.input.uri', u'mongodb://127.0.0.1/ds_twitter.seoul?readPreference=primaryPreferred'), (u'spark.jars', u'file:/home/jsl/.ivy2/jars/graphframes_graphframes-0.1.0-spark1.6.jar,file:/home/jsl/.ivy2/jars/org.mongodb.spark_mongo-spark-connector_2.10-1.1.0.jar,file:/home/jsl/.ivy2/jars/com.databricks_spark-cs

* mongo daemon
    ```
    mongod --dbpath ./data/
    ```

In [4]:
print "---------read-----------"
t1df = sqlContext.read.format("com.mongodb.spark.sql.DefaultSource").load()
print t1df.printSchema()

---------read-----------
root
 |-- _id: struct (nullable = true)
 |    |-- oid: string (nullable = true)
 |-- contributors: null (nullable = true)
 |-- coordinates: struct (nullable = true)
 |    |-- type: string (nullable = true)
 |    |-- coordinates: array (nullable = true)
 |    |    |-- element: double (containsNull = true)
 |-- created_at: string (nullable = true)
 |-- entities: struct (nullable = true)
 |    |-- hashtags: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- indices: array (nullable = true)
 |    |    |    |    |-- element: integer (containsNull = true)
 |    |    |    |-- text: string (nullable = true)
 |    |-- media: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- display_url: string (nullable = true)
 |    |    |    |-- expanded_url: string (nullable = true)
 |    |    |    |-- id: long (nullable = true)
 |    |    |    |-- id_str: string (nullable = true)
 |    |    

In [5]:
t1df.select('id').show()

+------------------+
|                id|
+------------------+
|801659584221888512|
|801659575388819456|
|801659572146434048|
|801659563451641860|
|801659515535912960|
|801659490496086016|
|801659477724315648|
|801659444731920384|
|801659433054965760|
|801659393553022976|
|801659383738269696|
|801659353195364353|
|801659346706776064|
|801659328868466688|
|801659316071645184|
|801659298040320000|
|801659295280627712|
|801659290083803136|
|801659262103494656|
|801659237176741888|
+------------------+
only showing top 20 rows



* t1df에서 text 열만 선택해서 t1tdf

In [6]:
#df.registerTempTable("myTwitter")
#myTab = sqlContext.sql("SELECT CardSubwayStatisticsService.row.RIDE_PASGR_NUM FROM myTwitter")
t1df.registerTempTable("seoul")
t1tdf = sqlContext.sql("SELECT id,text FROM seoul")

print type(t1tdf)
t1tdf.show()

print t1tdf.first()
print t1tdf.head()

<class 'pyspark.sql.dataframe.DataFrame'>
+------------------+--------------------+
|                id|                text|
+------------------+--------------------+
|801659584221888512|RT @always_gd: #B...|
|801659575388819456|RT @InfiniteUpdat...|
|801659572146434048|RT @InfiniteUpdat...|
|801659563451641860|RT @PartOfJimin: ...|
|801659515535912960|RT @MHDEFB: มาแล้...|
|801659490496086016|Watching Weightli...|
|801659477724315648|RT @heochan_th: 1...|
|801659444731920384|RT @hancinema: [S...|
|801659433054965760|RT @5lyc47: Meanw...|
|801659393553022976|RT @_candyclover:...|
|801659383738269696|RT @FanLantana: 【...|
|801659353195364353|RT @MHDEFB: มาแล้...|
|801659346706776064|@shrnlp Seoul! Lo...|
|801659328868466688|RT @Legendofblues...|
|801659316071645184|RT @Legendofblues...|
|801659298040320000|RT @JiHanThailand...|
|801659295280627712|RT @simslover163:...|
|801659290083803136|RT @ygent_officia...|
|801659262103494656|RT @GFRIEND_TH: [...|
|801659237176741888|RT @GFRIEND_TH

* count rows
* slicing by rows or columns
* stopwords
* sample or select some rows

### M.1.2 pymongo

* spark mongo에서 읽어서
* dataframe??
* tokenizer

In [10]:
from pymongo import MongoClient
_mclient = MongoClient()
_db=_mclient.ds_twitter
_table=_db.seoul
cursor=_table.find()

In [11]:
for i,e in enumerate(cursor):
    if(i%100==0):
        print i,e['text']
        #parsedData = e['text'].map(lambda line: array([float(x) for x in line.split(' ')]))

0 RT @peach_n_pie: ..Instagram blank_seoul..
BLANK with I.O.I gugudan เซจอง💗
BLANK 16F/W🍂 #MORE_BLANK_LESS_STRESS 
-1 https://t.co/Xv4qNDBxZy
100 RT @delightcy_: Yupp right now, OSH ssi that lives in Seoul 📞,hello
hello
pls intro urself simply
...
Ja! Spinspin, spinning board!! https:…
200 I won a game by 119 point at Seoul stage. Let’s play together![https://t.co/RWLzroqwS4] #Bowling_King
300 RT @always_gd: #BIGBANG10 THE #CONCERT
: 0.TO.10 FINAL IN SEOUL 💻📱🔫

티켓팅 성공해서 고척 스카이돔에서 만나요! 울 오빠 봐야죠🙏🏻 #티켓팅금손되기길 
#새해첫주부터보고싶데이 

✔️161119…
400 -2 °c in Seoul rn, seokjin is back wearing rIPPED JEANS HOW??? TEACH ME YOUR WAYS SEOKJIN https://t.co/yp0gvLfjoF
500 RT @KBSWorldTV: #BAP FANS! Watch Daehyun being one of the MCs for our new show BATTLE LIKES TOMORROW @7PM(Seoul, UTC+9) LIVE EXCLUSIVE ON K…
600 RT ygent_official: [BIGBANG10 THE CONCERT : 0.TO.10 FINAL IN SEOUL TEASER SPOT]
More info @ … https://t.co/hk9MqwRvty
700 THANKYOU FOR WELCOMING ME NICELY, SEOUL. #jin #161125 #incheon https://t.c

In [8]:
cursor

<pymongo.cursor.Cursor at 0x103cf4210>

In [7]:
parsedData

NameError: name 'parsedData' is not defined

In [5]:
tokenizer = Tokenizer(inputCol="sentence", outputCol="words")

NameError: name 'Tokenizer' is not defined

### M.1.3 json

In [12]:
t2df= sqlContext.read.json("src/ds_twitter_seoul_3.json")

In [13]:
type(t2df)

pyspark.sql.dataframe.DataFrame

In [14]:
t2df.select('id','text').take(1)

[Row(id=801955891956236288, text=u'RT @peach_n_pie: ..Instagram blank_seoul..\nBLANK with I.O.I gugudan \u0e40\u0e0b\u0e08\u0e2d\u0e07\U0001f497\nBLANK 16F/W\U0001f342 #MORE_BLANK_LESS_STRESS \n-1 https://t.co/Xv4qNDBxZy')]

## M.2 데이터 사전처리

In [17]:
from pyspark.ml.feature import *
re = RegexTokenizer(inputCol="text", outputCol="words", pattern="\\W")
wordsDf=re.transform(t1tdf)
wordsDf.show()

+------------------+--------------------+--------------------+
|                id|                text|               words|
+------------------+--------------------+--------------------+
|801955891956236288|RT @peach_n_pie: ...|[rt, peach_n_pie,...|
|801955872410697728|RT @usefulstooges...|[rt, usefulstooge...|
|801955852798267393|I won a game by 1...|[i, won, a, game,...|
|801955840051781633|pcy for Seoul fas...|[pcy, for, seoul,...|
|801955833424642048|RT @seoulstorys: ...|[rt, seoulstorys,...|
|801955813715804160|Yes, yes #coffee,...|[yes, yes, coffee...|
|801955812302127104|ｺﾚすごい♪
飲んだだけでみるみる...|[https, t, co, cd...|
|801955794631598080|RT @ygent_officia...|[rt, ygent_offici...|
|801955792853168128|RT @ygent_officia...|[rt, ygent_offici...|
|801955787476070404|RT @mrnkaloto: Ti...|[rt, mrnkaloto, t...|
|801955750666899456|RT @choconini_: S...|[rt, choconini_, ...|
|801955740197986304|RT @YGBlackPink: ...|[rt, ygblackpink,...|
|801955739140964352|RT @KoreanWaveINA...|[rt, koreanwav

In [18]:
from pyspark.ml.feature import StopWordsRemover
stop = StopWordsRemover(inputCol="words", outputCol="nostops")

In [None]:
stopwords=list()

_stopwords=stop.getStopWords()
for e in _stopwords:
    stopwords.append(e)
_mystopwords=[u"나",u"너", u"우리"]
for e in _mystopwords:
    stopwords.append(e)

In [19]:
stopDf=stop.transform(wordsDf)
stopDf.show()

+------------------+--------------------+--------------------+--------------------+
|                id|                text|               words|             nostops|
+------------------+--------------------+--------------------+--------------------+
|801955891956236288|RT @peach_n_pie: ...|[rt, peach_n_pie,...|[rt, peach_n_pie,...|
|801955872410697728|RT @usefulstooges...|[rt, usefulstooge...|[rt, usefulstooge...|
|801955852798267393|I won a game by 1...|[i, won, a, game,...|[won, game, 106, ...|
|801955840051781633|pcy for Seoul fas...|[pcy, for, seoul,...|[pcy, seoul, fash...|
|801955833424642048|RT @seoulstorys: ...|[rt, seoulstorys,...|[rt, seoulstorys,...|
|801955813715804160|Yes, yes #coffee,...|[yes, yes, coffee...|[yes, yes, coffee...|
|801955812302127104|ｺﾚすごい♪
飲んだだけでみるみる...|[https, t, co, cd...|[https, t, cduyaa...|
|801955794631598080|RT @ygent_officia...|[rt, ygent_offici...|[rt, ygent_offici...|
|801955792853168128|RT @ygent_officia...|[rt, ygent_offici...|[rt, ygent_off

In [20]:
from pyspark.ml.feature import CountVectorizer
cv = CountVectorizer(inputCol="words", outputCol="cv", vocabSize=30,minDF=1.0)
cvModel = cv.fit(wordsDf)
cvDf = cvModel.transform(wordsDf)

cvDf.collect()
cvDf.select('text','words','cv').show()

+--------------------+--------------------+--------------------+
|                text|               words|                  cv|
+--------------------+--------------------+--------------------+
|RT @peach_n_pie: ...|[rt, peach_n_pie,...|(30,[1,2,3,4,21],...|
|RT @usefulstooges...|[rt, usefulstooge...|(30,[0,1,2,3,4],[...|
|I won a game by 1...|[i, won, a, game,...|(30,[0,2,3,4],[1....|
|pcy for Seoul fas...|[pcy, for, seoul,...|(30,[0,2,3,4,20],...|
|RT @seoulstorys: ...|[rt, seoulstorys,...|(30,[0,1,2,3,4,13...|
|Yes, yes #coffee,...|[yes, yes, coffee...|(30,[0,2,3,4,15],...|
|ｺﾚすごい♪
飲んだだけでみるみる...|[https, t, co, cd...|(30,[2,3,4],[2.0,...|
|RT @ygent_officia...|[rt, ygent_offici...|(30,[0,1,2,3,4,6,...|
|RT @ygent_officia...|[rt, ygent_offici...|(30,[0,1,2,3,4,6,...|
|RT @mrnkaloto: Ti...|[rt, mrnkaloto, t...|(30,[0,1,2,3,4],[...|
|RT @choconini_: S...|[rt, choconini_, ...|(30,[0,1,5,7,10,1...|
|RT @YGBlackPink: ...|[rt, ygblackpink,...|(30,[0,1,5,7,11,1...|
|RT @KoreanWaveINA...|[rt

## M.3 Machine Learning

* 라이브러리
    * weka
    * scikit-learn

* ML API는 DataFrame API를 사용하고 있다. (Spark 3.0 이후 RDD API는 지원을 하지 않을 예정)
* 

구분 | 비연속 | 연속 | 입력
----------|----------|----------|----------
Unspervised | KMeans, LDA | SVD, PCA | 
Supervised | Naive Bayesian, Decision Tree, | regression, Ensembles | Labeled Point

구분 | 설명
-------|-------
mllib | RDD API
ml | DataFrame API, Pipelines. 이것을 사용하는 것을 추천

* classfication
    * trainDf (lableCol, featureCol을 입력하지 않는 경우는 아래 명칭 사용)
        * label (DoubleType)
        * features (sparse or dense vectors)
    * trainRdd는 LabeledPoint
    * word vector는 dense, sparse모두 가능?

 
구분 | ML | 설명
-----|-----|-----
ml | LogisticRegression(trainDf) | 
| DecisionTreeClassifier(labelCol=, featuresCol=)
| LinearRegression
| NaiveBayes(smoothing=1.0, modelType="multinomial")
mllib | SVMWithSGD.train(parsedData, iterations=100)
| LogisticRegressionWithLBFGS.train(parsedData)
| LinearRegressionWithSGD.train(parsedData)


## 문제 S-11: Spark MLib Decision Tree

* 참조 https://www.codementor.io/spark/tutorial/spark-python-mllib-decision-trees

* 1단계: 데이터 수집
* 2단계: 데이터 준비
* 3단계: 모델링
* 4단계: 예측
* 5단계: 평가

### 1단계: 데이터 수집

* url에서 데이터를 내려받는다.
* data 디렉토리에 저장한다.
* 데이터를 내려 받아 놓았다면, 반복하지 않고 있는 파일을 읽는다.

* train data

In [3]:
import os
import urllib

_url = 'http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data.gz'
_trainFn=os.path.join(os.getcwd(),'data','kddcup.data.gz')
if(not os.path.exists(_trainFn)):
    print "%s data does not exist! retrieving.." % _trainFn
    _trainFn=urllib.urlretrieve(_url,_trainFn)


In [4]:
_trainRdd = sc.textFile(_trainFn)
print _trainRdd.count()

4898431


In [16]:
_trainRdd.take(1)

[u'0,tcp,http,SF,215,45076,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,1,1,0.00,0.00,0.00,0.00,1.00,0.00,0.00,0,0,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,normal.']

* test data

In [5]:
_url2 = 'http://kdd.ics.uci.edu/databases/kddcup99/corrected.gz'
_testFn=os.path.join(os.getcwd(),'data','corrected.gz')
if(not os.path.exists(_testFn)):
    print "%s data does not exist! retrieving.." % _testFn
    _testFn=urllib.urlretrieve(_url,_testFn)


In [6]:
_testRdd = sc.textFile(_testFn)
print _testRdd.count()

311029


In [19]:
_testRdd.take(1)

[u'0,udp,private,SF,105,146,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,1,0.00,0.00,0.00,0.00,1.00,0.00,0.00,255,254,1.00,0.01,0.00,0.00,0.00,0.00,0.00,0.00,normal.']

### 2단계: 데이터 준비

* 2-1 csv를 분리한다.
* 2-2 변수를 확인하여, 연속 값 또는 명목 값을 가지도록 한다.
    * 알파벳은 명목척도로 변환한다.
* 2-3 train data를 생성한다.
    * features 41개 - protocols, services, flags
    * label - 마지막 42번째 열 (attack = 0 if 'normal.', else 1)

변수명 | protocls | services | flags | ... | attack
-----|-----|-----|-----|-----|-----
인덱스 | 1 | 2 | 3| ... | 42
데이터 값 예 | tcp | http | SF | ... | normal


* 2-1 csv를 분리한다.
    * csv를 컴마로 분리하여, 2차원 데이터로 구조화한다.

In [43]:
_train = _trainRdd.map(lambda x: x.split(","))
_test = _testRdd.map(lambda x: x.split(","))

In [92]:
print len(_train.first()), _train.first()
print len(_test.first()), _test.first()

42 [u'0', u'tcp', u'http', u'SF', u'215', u'45076', u'0', u'0', u'0', u'0', u'0', u'1', u'0', u'0', u'0', u'0', u'0', u'0', u'0', u'0', u'0', u'0', u'1', u'1', u'0.00', u'0.00', u'0.00', u'0.00', u'1.00', u'0.00', u'0.00', u'0', u'0', u'0.00', u'0.00', u'0.00', u'0.00', u'0.00', u'0.00', u'0.00', u'0.00', u'normal.']
42 [u'0', u'udp', u'private', u'SF', u'105', u'146', u'0', u'0', u'0', u'0', u'0', u'0', u'0', u'0', u'0', u'0', u'0', u'0', u'0', u'0', u'0', u'0', u'1', u'1', u'0.00', u'0.00', u'0.00', u'0.00', u'1.00', u'0.00', u'0.00', u'255', u'254', u'1.00', u'0.01', u'0.00', u'0.00', u'0.00', u'0.00', u'0.00', u'0.00', u'normal.']


* 2-2 변수를 확인하여, 연속 값 또는 명목 값을 가지도록 한다.
    * 2,3,4번째 속성에 알파벳이 있다. 명목 값을 구하기 위해, 중복 값을 제외하고 key를 구한다.

In [44]:
protocols = _train.map(lambda x: x[1]).distinct().collect()
services = _train.map(lambda x: x[2]).distinct().collect()
flags = _train.map(lambda x: x[3]).distinct().collect()

In [45]:
print len(protocols), protocols
print len(services), services
print len(flags), flags

3 [u'udp', u'icmp', u'tcp']
70 [u'urp_i', u'http_443', u'Z39_50', u'smtp', u'domain', u'private', u'echo', u'time', u'shell', u'red_i', u'eco_i', u'sunrpc', u'ftp_data', u'urh_i', u'pm_dump', u'pop_3', u'pop_2', u'systat', u'ftp', u'uucp', u'whois', u'harvest', u'netbios_dgm', u'efs', u'remote_job', u'daytime', u'ntp_u', u'finger', u'ldap', u'netbios_ns', u'kshell', u'iso_tsap', u'ecr_i', u'nntp', u'http_2784', u'printer', u'domain_u', u'uucp_path', u'courier', u'exec', u'aol', u'netstat', u'telnet', u'gopher', u'rje', u'sql_net', u'link', u'ssh', u'netbios_ssn', u'csnet_ns', u'X11', u'IRC', u'tftp_u', u'login', u'supdup', u'name', u'nnsp', u'mtp', u'http', u'bgp', u'ctf', u'hostnames', u'klogin', u'vmnet', u'tim_i', u'discard', u'imap4', u'auth', u'other', u'http_8001']
11 [u'OTH', u'RSTR', u'S3', u'S2', u'S1', u'S0', u'RSTOS0', u'REJ', u'SH', u'RSTO', u'SF']


* 2-3 train data를 생성한다.
    * LabeledPoint 형식으로 만든다.
    * feature 생성 - 명목척도로 만든다.
        * protocols는 알파벳, 이를 key를 사용하여 명목척도로 만든다.
        * services는 알파벳, 이를 key를 사용하여 명목척도로 만든다.
        * flags는 알파벳, 이를 key를 사용하여 명목척도로 만든다.
        * features는 numpy array를 사용하거나, Python list를 사용한다.
    * class 생성
        * 'normal.'이면 0
        * 아니면 1

* protocols에 대한 데이터 생성 해보기
    * 데이터 항목이 키 값에 있으면, 키 값을 넣는다 (index()).    
    * train data에서 key를 생성했기 때문에, test data에 없을 수 있다. 이 경우 최대 값을 넣는다 (len(), 임의의 값을 넣어도 좋다)

* index()는 list의 index를 알려 준다. 

In [85]:
protocols.index('tcp')

2

* 1건에 대해 LabeledPoint 생성 해보기

In [73]:
line=[u'0', u'tcp', u'http', u'SF', u'215', u'45076', u'0', u'0', u'0', u'0',\
      u'0', u'1', u'0', u'0', u'0', u'0', u'0', u'0', u'0', u'0', u'0', u'0',\
      u'1', u'1', u'0.00', u'0.00', u'0.00', u'0.00', u'1.00', u'0.00', u'0.00',\
      u'0', u'0', u'0.00', u'0.00', u'0.00', u'0.00', u'0.00', u'0.00', u'0.00',\
      u'0.00', u'normal.']

feature=line[0:-1]
feature[1] = protocols.index(line[1]) if line[1] in protocols else len(protocols)
feature[2] = services.index(line[2]) if line[2] in services else len(services)
feature[3] = flags.index(line[3]) if line[3] in flags else len(flags)
attack = 0.0 if line[-1]=='normal.' else 1.0
LabeledPoint(attack, [float(x) for x in feature])

* LabeledPoint를 생성한다.

In [79]:
from pyspark.mllib.regression import LabeledPoint
import numpy as np

def createLP(line):
    features=line[0:-1]
    features[1] = protocols.index(line[1]) if line[1] in protocols else len(protocols)
    features[2] = services.index(line[2]) if line[2] in services else len(services)
    features[3] = flags.index(line[3]) if line[3] in flags else len(flags)
    attack = 0.0 if line[-1]=='normal.' else 1.0
    lp=LabeledPoint(attack, [float(x) for x in features])
    return lp

trainRdd = _train.map(createLP)
testRdd = _test.map(createLP)

In [80]:
print trainRdd.first()
print testRdd.first()

(0.0,[0.0,2.0,58.0,10.0,215.0,45076.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,1.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0])
(0.0,[0.0,0.0,5.0,10.0,105.0,146.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,1.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,255.0,254.0,1.0,0.01,0.0,0.0,0.0,0.0,0.0,0.0])


In [81]:
trainRdd.count()

4898431

In [82]:
testRdd.count()

311029

### 3단계: 모델링

* 입력변수를 정의한다.

입력변수 | 설명
-------|-------
data | RDD of LabeledPoint
numClasses | 분류 class 수
categoricalFeaturesInfo | 명목척도의 Map (연속변수는 Map에 넣지 않음)
impurity| "entropy" 또는 "gini"
maxDepth | 트리의 최대 깊이 0 means 1 leaf node. Depth 1 means 1 internal node + 2 leaf nodes.
maxBins| Number of bins used for finding splits at each node.


In [83]:
from pyspark.mllib.tree import DecisionTree, DecisionTreeModel
treeModel = DecisionTree.trainClassifier(trainRdd, numClasses=2, 
              categoricalFeaturesInfo={1: len(protocols), 2: len(services), 3: len(flags)},
              impurity='gini', maxDepth=4, maxBins=100)

### 4단계: 예측

In [86]:
predictions = treeModel.predict(testRdd.map(lambda p: p.features))
labels_and_preds = testRdd.map(lambda p: p.label).zip(predictions)

### 5단계: 평가

In [87]:
test_accuracy = labels_and_preds.filter(lambda (v, p): v == p).count() / float(testRdd.count())

In [88]:
print test_accuracy

0.918795353488


In [89]:
print treeModel.toDebugString()

DecisionTreeModel classifier of depth 4 with 29 nodes
  If (feature 22 <= 55.0)
   If (feature 3 in {2.0,3.0,4.0,7.0,9.0,10.0})
    If (feature 2 in {0.0,3.0,5.0,7.0,8.0,9.0,12.0,13.0,15.0,18.0,26.0,27.0,32.0,36.0,42.0,50.0,51.0,52.0,58.0,64.0,67.0,68.0})
     If (feature 34 <= 0.91)
      Predict: 0.0
     Else (feature 34 > 0.91)
      Predict: 1.0
    Else (feature 2 not in {0.0,3.0,5.0,7.0,8.0,9.0,12.0,13.0,15.0,18.0,26.0,27.0,32.0,36.0,42.0,50.0,51.0,52.0,58.0,64.0,67.0,68.0})
     If (feature 4 <= 22.0)
      Predict: 1.0
     Else (feature 4 > 22.0)
      Predict: 0.0
   Else (feature 3 not in {2.0,3.0,4.0,7.0,9.0,10.0})
    If (feature 33 <= 0.3)
     If (feature 5 <= 0.0)
      Predict: 1.0
     Else (feature 5 > 0.0)
      Predict: 0.0
    Else (feature 33 > 0.3)
     If (feature 22 <= 2.0)
      Predict: 0.0
     Else (feature 22 > 2.0)
      Predict: 1.0
  Else (feature 22 > 55.0)
   If (feature 5 <= 0.0)
    If (feature 11 <= 0.0)
     If (feature 2 in {0.0})
      Predict

## 문제 S-12: Spark MLib Logistic regression


* 데이터 읽기

In [93]:
_fp=os.path.join(spark_home,"data/mllib/sample_svm_data.txt")
print os.path.isfile(_fp)

True


In [94]:
_f=open(_fp,'r')
_lines=_f.readlines()
print _lines[0]
_f.close()

1 0 2.52078447201548 0 0 0 2.004684436494304 2.000347299268466 0 2.228387042742021 2.228387042742023 0 0 0 0 0 0



In [97]:
_rdd=sc.textFile(_fp).map(lambda line: [float(x) for x in line.split(' ')])

In [98]:
_rdd.take(1)[0]

[1.0,
 0.0,
 2.52078447201548,
 0.0,
 0.0,
 0.0,
 2.004684436494304,
 2.000347299268466,
 0.0,
 2.228387042742021,
 2.228387042742023,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0]

In [79]:
from pyspark.mllib.regression import LabeledPoint

_trainRdd0=_rdd.map(lambda line:LabeledPoint(line[0], line[1:]))

In [102]:
_trainRdd0.take(1)

[LabeledPoint(1.0, [0.0,2.52078447202,0.0,0.0,0.0,2.00468443649,2.00034729927,0.0,2.22838704274,2.22838704274,0.0,0.0,0.0,0.0,0.0,0.0])]

In [103]:
_trainRdd=sc.textFile(_fp).map(lambda line: [float(x) for x in line.split(' ')]).map(lambda p:LabeledPoint(p[0], p[1:]))

In [104]:
_trainRdd.take(1)

[LabeledPoint(1.0, [0.0,2.52078447202,0.0,0.0,0.0,2.00468443649,2.00034729927,0.0,2.22838704274,2.22838704274,0.0,0.0,0.0,0.0,0.0,0.0])]

* 다른 방식으로

In [107]:
def createLP(line):
    p = [float(x) for x in line.split(' ')]
    return LabeledPoint(p[0], p[1:])

_rdd=sc.textFile(_fp)
trainRdd = _rdd.map(createLP)

In [108]:
trainRdd.take(1)

[LabeledPoint(1.0, [0.0,2.52078447202,0.0,0.0,0.0,2.00468443649,2.00034729927,0.0,2.22838704274,2.22838704274,0.0,0.0,0.0,0.0,0.0,0.0])]

In [111]:
trainRdd.count()

322

In [110]:
from pyspark.mllib.classification import LogisticRegressionWithLBFGS, LogisticRegressionModel

model = LogisticRegressionWithLBFGS.train(trainRdd)

In [112]:
labelsAndPreds = trainRdd.map(lambda p: (p.label, model.predict(p.features)))
trainErr = labelsAndPreds.filter(lambda (v, p): v != p).count() / float(trainRdd.count())
print("Training Error = " + str(trainErr))

Training Error = 0.366459627329


* 모델을 저장할 수 있다.
    * 디렉토리를 지정하면 메타데이터를 저장한다.

In [114]:
model.save(sc, "myModelPath")
sameModel = LogisticRegressionModel.load(sc, "myModelPath")

## 문제 S-12: Spark MLib LDA

https://github.com/hacertilbec/LDA-spark-python/blob/master/SparkLDA.py#L6

In [116]:
from pyspark.sql import SQLContext, Row
from pyspark.ml.feature import CountVectorizer
from pyspark.mllib.clustering import LDA, LDAModel


In [118]:
sqlContext = SQLContext(sc)
path = 'data/ds_spark_wiki.txt'

_rdd = sc.textFile(path).zipWithIndex().map(lambda (words,idd): Row(idd= idd, words = words.split(" ")))
df = sqlContext.createDataFrame(_rdd)

In [119]:
df.printSchema()

root
 |-- idd: long (nullable = true)
 |-- words: array (nullable = true)
 |    |-- element: string (containsNull = true)



In [120]:
cv = CountVectorizer(inputCol="words", outputCol="vectors")
model = cv.fit(df)
cvDf = model.transform(df)

In [121]:
cvDf.printSchema()

root
 |-- idd: long (nullable = true)
 |-- words: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- vectors: vector (nullable = true)



In [122]:
cvDf.count()

8

In [123]:
corpus_size = cvDf.count()  # total number of words
corpus = cvDf.select("idd", "vectors").map(lambda (x,y): [x,y]).cache()

In [124]:
ldaModel = LDA.train(corpus, k=3,maxIterations=100,optimizer='online')
topics = ldaModel.topicsMatrix()
vocabArray = model.vocabulary

wordNumbers = 10  # number of words per topic
topicIndices = sc.parallelize(ldaModel.describeTopics(maxTermsPerTopic = wordNumbers))

In [125]:
def topic_render(topic):  # specify vector id of words to actual words
    terms = topic[0]
    result = []
    for i in range(wordNumbers):
        term = vocabArray[terms[i]]
        result.append(term)
    return result
    
topics_final = topicIndices.map(lambda topic: topic_render(topic)).collect()

for topic in range(len(topics_final)):
  print "Topic" + str(topic)
  for term in topics_final[topic]:
    print term
  print '\n'

Topic0
스파크는
클러스터
프레임워크이다.
소스
Originally
아파치
컴퓨팅
오픈
Berkeley's
California,


Topic1
Spark
an
Apache
provides
for
interface
with
the
programming
entire


Topic2
implicit
fault-tolerance.
data
and
parallelism
was
which
AMPLab,
at
provides




## 문제 S-13: Spark MLib movie recommendation

* [spark recommendation](https://www.codementor.io/spark/tutorial/building-a-recommender-with-apache-spark-python-example-app-part1)

* [spark flask](https://www.codementor.io/spark/tutorial/building-a-web-service-with-apache-spark-flask-example-app-part2)

* 주 13 - spark 추천 영화 음악? 
    * amazon similarity lookup http://blogs.gartner.com/martin-kihn/how-to-build-a-recommender-system-in-python/
    * https://www.codementor.io/spark/tutorial/building-a-web-service-with-apache-spark-flask-example-app-part2
    * https://github.com/grahamjenson/list_of_recommender_systems
        * content-based filtering
        * collaborative filtering

In [126]:
import os
import urllib

ml_url = 'http://files.grouplens.org/datasets/movielens/ml-latest.zip'
ml_small_url = 'http://files.grouplens.org/datasets/movielens/ml-latest-small.zip'

ml_fname=os.path.join(os.getcwd(),'data','ml-latest.zip')
if(not os.path.exists(ml_fname)):
    print "%s data does not exist! retrieving.." % ml_fname
    ml_f=urllib.urlretrieve(ml_url,ml_fname)

ml_small_fname=os.path.join(os.getcwd(),'data','ml-latest-small.zip')
if(not os.path.exists(ml_small_fname)):
    print "%s data does not exist! retrieving.." % ml_small_fname
    ml_small_f=urllib.urlretrieve(ml_small_url,ml_small_fname)

* unzip

In [128]:
import zipfile

zipfiles=[ml_fname,ml_small_fname]
for f in zipfiles:
    zip = zipfile.ZipFile(f)
    zip.extractall('data')


* 압축한 파일 살펴보기

In [129]:
!ls data/ml-latest-small/

links.csv  movies.csv  ratings.csv  README.txt	tags.csv


In [130]:
!head data/ml-latest-small/links.csv

movieId,imdbId,tmdbId
1,0114709,862
2,0113497,8844
3,0113228,15602
4,0114885,31357
5,0113041,11862
6,0113277,949
7,0114319,11860
8,0112302,45325
9,0114576,9091


In [131]:
!head data/ml-latest-small/movies.csv

movieId,title,genres
1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
2,Jumanji (1995),Adventure|Children|Fantasy
3,Grumpier Old Men (1995),Comedy|Romance
4,Waiting to Exhale (1995),Comedy|Drama|Romance
5,Father of the Bride Part II (1995),Comedy
6,Heat (1995),Action|Crime|Thriller
7,Sabrina (1995),Comedy|Romance
8,Tom and Huck (1995),Adventure|Children
9,Sudden Death (1995),Action


In [132]:
!head data/ml-latest-small/ratings.csv

userId,movieId,rating,timestamp
1,16,4.0,1217897793
1,24,1.5,1217895807
1,32,4.0,1217896246
1,47,4.0,1217896556
1,50,4.0,1217896523
1,110,4.0,1217896150
1,150,3.0,1217895940
1,161,4.0,1217897864
1,165,3.0,1217897135


In [133]:
!head data/ml-latest-small/tags.csv

userId,movieId,tag,timestamp
12,16,20060407,1144396544
12,16,robert de niro,1144396554
12,16,scorcese,1144396564
17,64116,movie to see,1234720092
21,260,action,1428011080
21,260,politics,1428011080
21,260,science fiction,1428011080
21,296,dark humor,1428788132
21,296,drugs,1428788132


* RDD
    * 앞서 정의한 sc를 사용

In [134]:
small_ratings = os.path.join('data', 'ml-latest-small', 'ratings.csv')

small_ratings_rdd = sc.textFile(small_ratings)
small_ratings_rdd_header = small_ratings_rdd.take(1)[0]
print small_ratings_rdd_header

userId,movieId,rating,timestamp


In [135]:
small_ratings_data = small_ratings_rdd\
    .filter(lambda line: line!=small_ratings_rdd_header)\
    .map(lambda line: line.split(","))\
    .map(lambda tokens: (tokens[0],tokens[1],tokens[2]))\
    .cache()
small_ratings_data.take(3)

[(u'1', u'16', u'4.0'), (u'1', u'24', u'1.5'), (u'1', u'32', u'4.0')]

In [139]:
def csvRdd(csvpath):
    _rdd = sc.textFile(csvpath)
    _rdd_header = _rdd.take(1)[0]
    print "header: %s" % _rdd_header
    rdd = _rdd.\
        filter(lambda line: line!=_rdd_header) \
        .map(lambda line: line.split(",")) \
        .map(lambda tokens: (tokens[0],tokens[1],tokens[2])) \
        .cache()
    return rdd

In [140]:
#ratingspath = os.path.join(datapath, 'ml-latest-small', 'ratings.csv')
#ratings=csvRdd(ratingspath)
ratings=csvRdd(small_ratings)
ratings.take(3)

header: userId,movieId,rating,timestamp


[(u'1', u'16', u'4.0'), (u'1', u'24', u'1.5'), (u'1', u'32', u'4.0')]

In [141]:
moviespath = os.path.join('data', 'ml-latest-small', 'movies.csv')
movies=csvRdd(moviespath)
movies.take(3)

header: movieId,title,genres


[(u'1', u'Toy Story (1995)', u'Adventure|Animation|Children|Comedy|Fantasy'),
 (u'2', u'Jumanji (1995)', u'Adventure|Children|Fantasy'),
 (u'3', u'Grumpier Old Men (1995)', u'Comedy|Romance')]

In [142]:
from pyspark.mllib.recommendation import ALS
import math

_train, _validation, _test=ratings.randomSplit([6, 2, 2], seed=0L)
_validation_01 = _validation.map(lambda x: (x[0], x[1]))
_test_01 = _test.map(lambda x: (x[0], x[1]))

seed = 5L
iterations = 10
regularization_parameter = 0.1
ranks = [4, 8, 12]
errors = [0, 0, 0]
err = 0
tolerance = 0.02

min_error = float('inf')
best_rank = -1
best_iteration = -1
for rank in ranks:
    model = ALS.train(_train, rank, seed=seed, iterations=iterations,lambda_=regularization_parameter)
    predictions = model.predictAll(_validation_01).map(lambda r: ((r[0], r[1]), r[2]))
    rates_and_preds = _validation\
        .map(lambda r: ((int(r[0]), int(r[1])), float(r[2])))\
        .join(predictions)
    error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    errors[err] = error
    err += 1
    print 'For rank %s the RMSE is %s' % (rank, error)
    if error < min_error:
        min_error = error
        best_rank = rank

print 'The best model was trained with rank %s' % best_rank


For rank 4 the RMSE is 0.922345252375
For rank 8 the RMSE is 0.930922837081
For rank 12 the RMSE is 0.925555162553
The best model was trained with rank 4


In [143]:
predictions.take(3)

[((384, 1084), 3.6775826431871153),
 ((668, 1084), 3.204225935944695),
 ((220, 1084), 3.8450699802260537)]

In [144]:

model = ALS.train(_train, best_rank, seed=seed, iterations=iterations,lambda_=regularization_parameter)
predictions = model.predictAll(_test_01).map(lambda r: ((r[0], r[1]), r[2]))
rates_and_preds = _test\
    .map(lambda r: ((int(r[0]), int(r[1])), float(r[2])))\
    .join(predictions)
error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())

print 'For testing data the RMSE is %s' % (error)


For testing data the RMSE is 0.920783817111
