## 스파크 기본설정

In [1]:
import os
import sys

#home=os.path.expanduser("~") # HOME이 설정되어 있지 않으면 expanduser('~')를 사용한다.
#osn.environ["PYSPARK_PYTHON"] = "/usr/bin/python"
os.environ["SPARK_HOME"]=os.path.join(os.path.expanduser("~"),r"C:\Users\user\spark-2.0.0-bin-hadoop2.7\spark-2.0.0-bin-hadoop2.7")
os.environ["PYLIB"]=os.path.join(os.environ["SPARK_HOME"],'python','lib')
sys.path.insert(0,os.path.join(os.environ["PYLIB"],'py4j-0.10.1-src.zip'))
sys.path.insert(0,os.path.join(os.environ["PYLIB"],'pyspark.zip'))

In [2]:
import pyspark
spark = pyspark.sql.SparkSession.builder\
    .master("local")\
    .appName("myApp")\
    .config("spark.sql.warehouse.dir", r"C:\Users\user\MyStudySpace\2019-2\BigData_Spark\src")\
    .getOrCreate()

## vector

### Dense veotors

In [3]:
import numpy as np

dv= np.array([1.0, 2.1, 3])

In [4]:
type(dv)

numpy.ndarray

In [5]:
from pyspark.mllib.linalg import Vectors

dv1mllib=Vectors.dense([1.0, 2.1, 3])
print dv1mllib, type(dv1mllib)

[1.0,2.1,3.0] <class 'pyspark.mllib.linalg.DenseVector'>


In [6]:
from pyspark.ml.linalg import Vectors

dv1ml = Vectors.dense([1.0, 2.1, 3])
print dv1ml

[1.0,2.1,3.0]


**product, dot, norm**과 같은 벡터 연산을 할 수도 있다. 결과 값은 numpy와 동일

In [7]:
print dv1ml.dot(dv1ml)

14.41


In [8]:
np.dot(dv,dv)

14.41

In [9]:
print dv1ml*dv1ml

[1.0,4.41,9.0]


### Sparse vecotors

In [10]:
sv1 = Vectors.sparse(3, [1, 2], [1.0, 3.0])
print sv1.toArray()

[0. 1. 3.]


In [11]:
import numpy as np
import scipy.sparse as sps

row = np.array([0, 0, 1, 2, 2, 2])
col = np.array([0, 2, 2, 0, 1, 2])
data = np.array([1, 2, 3, 4, 5, 6])
mtx = sps.csc_matrix((data, (row, col)), shape=(3, 3))
print mtx.todense()   

[[1 0 2]
 [0 0 3]
 [4 5 6]]


### RowMatrix

In [12]:
p = [[1.0,2.0,3.0],[1.1,2.1,3.1],[1.2,2.2,3.3]]
my=spark.sparkContext.parallelize(p)

my.collect()

[[1.0, 2.0, 3.0], [1.1, 2.1, 3.1], [1.2, 2.2, 3.3]]

RowMatrix는 pyspark.mllib.linalg.distributed에서 제공되는 분산벡터

In [13]:
from pyspark.mllib.linalg.distributed import RowMatrix
rm=RowMatrix(my)

In [14]:
print type(rm)

<class 'pyspark.mllib.linalg.distributed.RowMatrix'>


In [15]:
rm.rows.collect()

[DenseVector([1.0, 2.0, 3.0]),
 DenseVector([1.1, 2.1, 3.1]),
 DenseVector([1.2, 2.2, 3.3])]

## Labeled Point

분류 및 회귀분석에 사용되는 데이터 타잎
- 형태 : [Label,[f1,f2,......,fn]]  // label, features로 구성
    - [f1,f2,......,fn] : 학습 데이터
    - 학습한 후에는 Label 제거
    

In [16]:
from pyspark.mllib.regression import LabeledPoint

print LabeledPoint(1.0, [1.0, 2.0, 3.0])

(1.0,[1.0,2.0,3.0])


In [17]:
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.linalg import Vectors

print LabeledPoint(1992, Vectors.sparse(10, {0: 3.0, 1:5.5, 2: 10.0}))

(1992.0,(10,[0,1,2],[3.0,5.5,10.0]))


서로 다른 패키지의 데이터타잎 mllib LabeledPoint와 ml Vectors를 혼용하면, 형변환 오류가 발생
- fromML(ml벡터)사용하여 mllib 입력받는 labeledpoint에 사용가능

In [18]:
from pyspark.mllib.regression import LabeledPoint

LabeledPoint(1.0, dv1mllib)

LabeledPoint(1.0, [1.0,2.1,3.0])

In [19]:
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.linalg import Vectors

LabeledPoint(1.0, Vectors.fromML(dv1ml))

LabeledPoint(1.0, [1.0,2.1,3.0])

### DataFrame에서 Labeled Point
- Python list에서 DataFrame 생성

In [20]:
p = [[1,[1.0,2.0,3.0]],[1,[1.1,2.1,3.1]],[0,[1.2,2.2,3.3]]]
trainDf=spark.createDataFrame(p)
trainDf.collect()

[Row(_1=1, _2=[1.0, 2.0, 3.0]),
 Row(_1=1, _2=[1.1, 2.1, 3.1]),
 Row(_1=0, _2=[1.2, 2.2, 3.3])]

In [21]:
from pyspark.mllib.regression import LabeledPoint
p = [LabeledPoint(1,[1.0,2.0,3.0]),
     LabeledPoint(1,[1.1,2.1,3.1]),
     LabeledPoint(0,[1.2,2.2,3.3])]
trainDf=spark.createDataFrame(p)
trainDf.collect()

[Row(features=DenseVector([1.0, 2.0, 3.0]), label=1.0),
 Row(features=DenseVector([1.1, 2.1, 3.1]), label=1.0),
 Row(features=DenseVector([1.2, 2.2, 3.3]), label=0.0)]

schema를 사용해서 DataFrame을 생성
- 'label'은 DoubleType
- 'features'는 VectorType

In [22]:
from pyspark.mllib.linalg import SparseVector, VectorUDT
from pyspark.sql.types import StructType, StructField, DoubleType
_rdd = spark.sparkContext.parallelize([
    (0.0, SparseVector(4, {1: 1.0, 3: 5.5})),
    (1.0, SparseVector(4, {0: -1.0, 2: 0.5}))])

schema = StructType([
    StructField("label", DoubleType(), True),
    StructField("features", VectorUDT(), True)
])

In [23]:
trainDf=_rdd.toDF(schema)
trainDf.printSchema()

root
 |-- label: double (nullable = true)
 |-- features: vector (nullable = true)



### sparse에서 dense vector로 변환

In [24]:
from pyspark.sql.functions import udf
#from pyspark.ml.linalg import DenseVector, VectorUDT
from pyspark.mllib.linalg import DenseVector, VectorUDT
#myudf=udf(lambda x: Vectors.dense(x), VectorUDT())
#myudf=udf(lambda x: Vectors.dense(x))
myudf=udf(lambda x: DenseVector(x.toArray()), VectorUDT())
_trainDf2=trainDf.withColumn('dvf',myudf(trainDf.features))

In [25]:
_trainDf2.printSchema()

root
 |-- label: double (nullable = true)
 |-- features: vector (nullable = true)
 |-- dvf: vector (nullable = true)



In [26]:
_trainDf2.show()

+-----+--------------------+------------------+
|label|            features|               dvf|
+-----+--------------------+------------------+
|  0.0| (4,[1,3],[1.0,5.5])| [0.0,1.0,0.0,5.5]|
|  1.0|(4,[0,2],[-1.0,0.5])|[-1.0,0.0,0.5,0.0]|
+-----+--------------------+------------------+



## maxtrix

In [27]:
from pyspark.mllib.linalg import Matrix, Matrices

# Create a dense matrix ((1.0, 2.0), (3.0, 4.0), (5.0, 6.0))
dm = Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])

# Create a sparse matrix ((9.0, 0.0), (0.0, 8.0), (0.0, 6.0))
sm = Matrices.sparse(3, 2, [0, 1, 3], [0, 2, 1], [9, 6, 8])

## libsvm format

In [28]:
fsvm=os.path.join(os.environ["SPARK_HOME"],'data','mllib','sample_libsvm_data.txt')
dfsvm = spark.read.format("libsvm").load(fsvm)

In [29]:
type(dfsvm)

pyspark.sql.dataframe.DataFrame