In [1]:
import os
import sys

os.environ["PYLIB"]=os.path.join(os.environ["SPARK_HOME"],'python','lib')
sys.path.insert(0,os.path.join(os.environ["PYLIB"],'py4j-0.10.1-src.zip'))
sys.path.insert(0,os.path.join(os.environ["PYLIB"],'pyspark.zip'))

import pyspark
myConf=pyspark.SparkConf()
spark = pyspark.sql.SparkSession.builder\
    .master("local")\
    .appName("myApp")\
    .config("spark.sql.warehouse.dir", "C:/Users/G312")\
    .getOrCreate()

In [3]:
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.regression import LabeledPoint

dv1mllib=Vectors.dense([1.0, 2.1, 3])
print dv1mllib, type(dv1mllib)

LabeledPoint(1.0, dv1mllib)

[1.0,2.1,3.0] <class 'pyspark.mllib.linalg.DenseVector'>


LabeledPoint(1.0, [1.0,2.1,3.0])

In [5]:
from pyspark.ml.linalg import Vectors

dv1ml=Vectors.dense([1.0, 2.1, 3])
print dv1ml

[1.0,2.1,3.0]


In [6]:
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.linalg import Vectors
LabeledPoint(1.0, Vectors.fromML(dv1ml)) #ML타입을 Mllib로 변환하기 위해 fromML을 사용

LabeledPoint(1.0, [1.0,2.1,3.0])

지금부터 Labeled Point에서 DF를 생성한다.

In [8]:
p = [[1,[1.0,2.0,3.0]],[1,[1.1,2.1,3.1]],[0,[1.2,2.2,3.3]]]
trainDf=spark.createDataFrame(p)
trainDf.collect()
#DF로 List를 만든다.

[Row(_1=1, _2=[1.0, 2.0, 3.0]),
 Row(_1=1, _2=[1.1, 2.1, 3.1]),
 Row(_1=0, _2=[1.2, 2.2, 3.3])]

In [9]:
from pyspark.mllib.regression import LabeledPoint
p = [LabeledPoint(1,[1.0,2.0,3.0]),
     LabeledPoint(1,[1.1,2.1,3.1]),
     LabeledPoint(0,[1.2,2.2,3.3])]
trainDf=spark.createDataFrame(p)
trainDf.collect()
#Labeled Point로 DF를 만든다.

[Row(features=DenseVector([1.0, 2.0, 3.0]), label=1.0),
 Row(features=DenseVector([1.1, 2.1, 3.1]), label=1.0),
 Row(features=DenseVector([1.2, 2.2, 3.3]), label=0.0)]

In [10]:
from pyspark.mllib.linalg import Vectors

trainDf = spark.createDataFrame([
    (1.0, Vectors.dense([0.0, 1.1, 0.1])),
    (0.0, Vectors.dense([2.0, 1.0, 1.0])),
    (0.0, Vectors.dense([2.0, 1.3, 1.0])),
    (1.0, Vectors.dense([0.0, 1.2, 0.5]))], ["label", "features"])
trainDf.collect()

[Row(label=1.0, features=DenseVector([0.0, 1.1, 0.1])),
 Row(label=0.0, features=DenseVector([2.0, 1.0, 1.0])),
 Row(label=0.0, features=DenseVector([2.0, 1.3, 1.0])),
 Row(label=1.0, features=DenseVector([0.0, 1.2, 0.5]))]

In [11]:
from pyspark.mllib.linalg import SparseVector, VectorUDT
from pyspark.sql.types import StructType, StructField, DoubleType
_rdd = spark.sparkContext.parallelize([
    (0.0, SparseVector(4, {1: 1.0, 3: 5.5})),
    (1.0, SparseVector(4, {0: -1.0, 2: 0.5}))])

schema = StructType([
    StructField("label", DoubleType(), True),
    StructField("features", VectorUDT(), True)
])


In [12]:
trainDf=_rdd.toDF(schema)
trainDf.printSchema()

root
 |-- label: double (nullable = true)
 |-- features: vector (nullable = true)



In [13]:
from pyspark.sql.functions import udf
#from pyspark.ml.linalg import DenseVector, VectorUDT
from pyspark.mllib.linalg import DenseVector, VectorUDT
#myudf=udf(lambda x: Vectors.dense(x), VectorUDT())
#myudf=udf(lambda x: Vectors.dense(x))
myudf=udf(lambda x: DenseVector(x.toArray()), VectorUDT())
_trainDf2=trainDf.withColumn('dvf',myudf(trainDf.features))

_trainDf2.printSchema()


root
 |-- label: double (nullable = true)
 |-- features: vector (nullable = true)
 |-- dvf: vector (nullable = true)



In [14]:
_trainDf2.show()

+-----+--------------------+------------------+
|label|            features|               dvf|
+-----+--------------------+------------------+
|  0.0| (4,[1,3],[1.0,5.5])| [0.0,1.0,0.0,5.5]|
|  1.0|(4,[0,2],[-1.0,0.5])|[-1.0,0.0,0.5,0.0]|
+-----+--------------------+------------------+



In [15]:
from pyspark.mllib.linalg import Matrix, Matrices

# Create a dense matrix ((1.0, 2.0), (3.0, 4.0), (5.0, 6.0))
dm = Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])

# Create a sparse matrix ((9.0, 0.0), (0.0, 8.0), (0.0, 6.0))
sm = Matrices.sparse(3, 2, [0, 1, 3], [0, 2, 1], [9, 6, 8])


svm(Support Vector Machine) : Machine Learning 기술

In [16]:
fsvm=os.path.join(os.environ["SPARK_HOME"],'data','mllib','sample_libsvm_data.txt')
dfsvm = spark.read.format("libsvm").load(fsvm)

In [17]:
dfsvm.take(1)

[Row(label=0.0, features=SparseVector(692, {127: 51.0, 128: 159.0, 129: 253.0, 130: 159.0, 131: 50.0, 154: 48.0, 155: 238.0, 156: 252.0, 157: 252.0, 158: 252.0, 159: 237.0, 181: 54.0, 182: 227.0, 183: 253.0, 184: 252.0, 185: 239.0, 186: 233.0, 187: 252.0, 188: 57.0, 189: 6.0, 207: 10.0, 208: 60.0, 209: 224.0, 210: 252.0, 211: 253.0, 212: 252.0, 213: 202.0, 214: 84.0, 215: 252.0, 216: 253.0, 217: 122.0, 235: 163.0, 236: 252.0, 237: 252.0, 238: 252.0, 239: 253.0, 240: 252.0, 241: 252.0, 242: 96.0, 243: 189.0, 244: 253.0, 245: 167.0, 262: 51.0, 263: 238.0, 264: 253.0, 265: 253.0, 266: 190.0, 267: 114.0, 268: 253.0, 269: 228.0, 270: 47.0, 271: 79.0, 272: 255.0, 273: 168.0, 289: 48.0, 290: 238.0, 291: 252.0, 292: 252.0, 293: 179.0, 294: 12.0, 295: 75.0, 296: 121.0, 297: 21.0, 300: 253.0, 301: 243.0, 302: 50.0, 316: 38.0, 317: 165.0, 318: 253.0, 319: 233.0, 320: 208.0, 321: 84.0, 328: 253.0, 329: 252.0, 330: 165.0, 343: 7.0, 344: 178.0, 345: 252.0, 346: 240.0, 347: 71.0, 348: 19.0, 349: 28.0

위의 데이터를 분석.
label : 0,
features : point : value
    
    SparseVector 형이라는 것을 알 수 있는 것이, point:value의 형은 0은 제외하고 반환하기 때문이다.