#介紹Spark MLlib的Data Type

#import Library

In [1]:
import numpy as np
from pyspark.mllib.linalg import Vectors

#Dense vetcor

In [9]:
#使用list
x = [1,2,3,4,5]
dense_x = Vectors.dense(x)
print("dense_x = " + str(dense_x))

dense_x = [1.0,2.0,3.0,4.0,5.0]


# Spars vector

In [20]:
#三種產生Sparse vector的方法

sparse_x = Vectors.sparse(5, {1: 1.0, 3: 5.5})
print("sparse_x = " + str(sparse_x))

sparse_y = Vectors.sparse(5, [(1, 1.0), (3, 5.5)])
print("sparse_y = " + str(sparse_y))

sparse_z = Vectors.sparse(5, [1, 3], [1.0, 5.5])
print("sparse_z = " + str(sparse_z))

sparse_x = (5,[1,3],[1.0,5.5])
sparse_y = (5,[1,3],[1.0,5.5])
sparse_z = (5,[1,3],[1.0,5.5])


## 確認Sparse vector

In [27]:
def print_sparse(x):
    for i in range(x.size):
        #當saprse vector最後一位遇到缺值會因為省略而出現Index Error
        try:
            print(x[i])
        except IndexError: 
            print(0.0)

        
print_sparse(sparse_x)

0.0
1.0
0.0
5.5
0.0


# Vector 方法

In [11]:
# 平方和
dense_x.dot(dense_x)

55.0

In [15]:
# dense 和 sparse vector可以一起進行運算(注意維度要相同)
dense_x.dot(sparse_x)

24.0

In [30]:
# 計算距離
dense_x.squared_distance(sparse_y)

38.25

# DenseVector = numpy.ndarray

In [34]:
type(dense_x)

pyspark.mllib.linalg.DenseVector

In [35]:
dense_x.reduce(lambda x, y : x + y)

AttributeError: 'numpy.ndarray' object has no attribute 'reduce'

In [37]:
#要透過spark context轉成RDD
sc.parallelize(dense_x).reduce(lambda x, y : x + y)

15.0

In [39]:
# sparse vector也要透過 spark context轉成RDD
sc.parallelize(sparse_x).reduce(lambda x, y : x + y)

6.5

In [43]:
sc.parallelize(dense_x).sum()

15.0

In [63]:
sc.parallelize(sparse_x).sum()

6.5

# 將vector已Row為單位疊成data set

In [80]:
data = [sparse_x, sparse_y, sparse_z]

In [81]:
data

[SparseVector(5, {1: 1.0, 3: 5.5}),
 SparseVector(5, {1: 1.0, 3: 5.5}),
 SparseVector(5, {1: 1.0, 3: 5.5})]

In [82]:
# dataset 可以做統計計算
from pyspark.mllib.stat import Statistics
Statistics.colStats(sc.parallelize(data)).mean()

array([ 0. ,  1. ,  0. ,  5.5,  0. ])

# Label Point

In [55]:
# 要注意 LabelPoint 和 Vector 來自不同的物件
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.regression import LabeledPoint

### LabelPoint(label, [feature1, feature2, feature3])

In [75]:
data_label = [
     LabeledPoint(0.0, [0.0,1.0,1.0]),
     LabeledPoint(1.0, [1.0,1.0,2.0]),
     LabeledPoint(1.0, [2.0,3.0,2.0]),
     LabeledPoint(0.0, [3.0,2.0,5.0])
    ]

In [76]:
data_label

[LabeledPoint(0.0, [0.0,1.0,1.0]),
 LabeledPoint(1.0, [1.0,1.0,2.0]),
 LabeledPoint(1.0, [2.0,3.0,2.0]),
 LabeledPoint(0.0, [3.0,2.0,5.0])]

# Read Data From txt File

In [115]:
# 讀取放在hdfs上的資料
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.stat import Statistics


[u'1,2,3,3', u'1,2,3,3', u'1,2,3,3', u'1,2,3,3', u'1,2,3,3']

In [118]:
##Raw Data
#1,2,3,3
#1,2,3,3
#1,2,3,3
#1,2,3,3
#1,2,3,3
#1,2,3,3
#1,2,3,3

In [120]:
data = sc.textFile("hdfs://sp1:9000/data/demo.txt")
print(data.take(5))
print(type(data))

[u'1,2,3,3', u'1,2,3,3', u'1,2,3,3', u'1,2,3,3', u'1,2,3,3']
<class 'pyspark.rdd.RDD'>


In [121]:
def parse_data(textRDD):
    #將資料先以“,”分隔後，轉成DenseVector
    return textRDD.map(lambda x: x.split(',')).map(lambda x: Vectors.dense(x))
    
dense_data = parse_data(data)

In [122]:
Statistics.colStats(dense_data).mean()

array([ 1.,  2.,  3.,  3.])