In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Test").config("spark.executor.instances", "1").config("spark.executor.cores", "1").config("spark.executor.memory", "2g").config("spark.driver.memory", "2g").getOrCreate()

In [2]:
df = spark.read.json("/home/lab13/data/simple-ml")
df.orderBy("value2").show()

+-----+----+------+------------------+
|color| lab|value1|            value2|
+-----+----+------+------------------+
|green|good|     1|14.386294994851129|
|green| bad|    16|14.386294994851129|
| blue| bad|     8|14.386294994851129|
| blue| bad|     8|14.386294994851129|
| blue| bad|    12|14.386294994851129|
|green| bad|    16|14.386294994851129|
|green|good|    12|14.386294994851129|
|  red|good|    35|14.386294994851129|
|  red|good|    35|14.386294994851129|
|  red| bad|     2|14.386294994851129|
|  red| bad|    16|14.386294994851129|
|  red| bad|    16|14.386294994851129|
| blue| bad|     8|14.386294994851129|
|green|good|     1|14.386294994851129|
|green|good|    12|14.386294994851129|
| blue| bad|     8|14.386294994851129|
|  red|good|    35|14.386294994851129|
| blue| bad|    12|14.386294994851129|
|  red| bad|    16|14.386294994851129|
|green|good|    12|14.386294994851129|
+-----+----+------+------------------+
only showing top 20 rows



In [3]:
from pyspark.ml.feature import RFormula

supervised = RFormula(formula="lab ~ . +color:value1 + color:value2")
#선언된 formaula에 따라 데이터를 변환할 객체(모델) 생성
fittedRF = supervised.fit(df)   
 # 데이터 변환 객체를 이용하여 데이터 변환
preparedRF = fittedRF.transform(df) 
preparedRF.show()



+-----+----+------+------------------+--------------------+-----+
|color| lab|value1|            value2|            features|label|
+-----+----+------+------------------+--------------------+-----+
|green|good|     1|14.386294994851129|(10,[1,2,3,5,8],[...|  1.0|
| blue| bad|     8|14.386294994851129|(10,[2,3,6,9],[8....|  0.0|
| blue| bad|    12|14.386294994851129|(10,[2,3,6,9],[12...|  0.0|
|green|good|    15| 38.97187133755819|(10,[1,2,3,5,8],[...|  1.0|
|green|good|    12|14.386294994851129|(10,[1,2,3,5,8],[...|  1.0|
|green| bad|    16|14.386294994851129|(10,[1,2,3,5,8],[...|  0.0|
|  red|good|    35|14.386294994851129|(10,[0,2,3,4,7],[...|  1.0|
|  red| bad|     1| 38.97187133755819|(10,[0,2,3,4,7],[...|  0.0|
|  red| bad|     2|14.386294994851129|(10,[0,2,3,4,7],[...|  0.0|
|  red| bad|    16|14.386294994851129|(10,[0,2,3,4,7],[...|  0.0|
|  red|good|    45| 38.97187133755819|(10,[0,2,3,4,7],[...|  1.0|
|green|good|     1|14.386294994851129|(10,[1,2,3,5,8],[...|  1.0|
| blue| ba

In [4]:
preparedRF.show(20, False)

+-----+----+------+------------------+----------------------------------------------------------------------+-----+
|color|lab |value1|value2            |features                                                              |label|
+-----+----+------+------------------+----------------------------------------------------------------------+-----+
|green|good|1     |14.386294994851129|(10,[1,2,3,5,8],[1.0,1.0,14.386294994851129,1.0,14.386294994851129])  |1.0  |
|blue |bad |8     |14.386294994851129|(10,[2,3,6,9],[8.0,14.386294994851129,8.0,14.386294994851129])        |0.0  |
|blue |bad |12    |14.386294994851129|(10,[2,3,6,9],[12.0,14.386294994851129,12.0,14.386294994851129])      |0.0  |
|green|good|15    |38.97187133755819 |(10,[1,2,3,5,8],[1.0,15.0,38.97187133755819,15.0,38.97187133755819])  |1.0  |
|green|good|12    |14.386294994851129|(10,[1,2,3,5,8],[1.0,12.0,14.386294994851129,12.0,14.386294994851129])|1.0  |
|green|bad |16    |14.386294994851129|(10,[1,2,3,5,8],[1.0,16.0,14.38629

In [5]:
# 피처 엔지니어링이 수행된 데이터를 임의분할
# 간단한 테스트셋 생성
# 학습, 테스트 데이터 분리 (7:3)
train, test = preparedRF.randomSplit([0.7, 0.3])

In [6]:
# 추정자 사용해보기 
# 분류 알고리즘에 해당하는 로지스틱 회귀 알고리즘 객체 생성 
# 레이블 칼럼 = label, 특징 컬럼 = feature
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(labelCol="label",featuresCol="features")

#로지스틱 회귀 알고리즘의 파라미터들 설명 확인
print(lr.explainParams())

#로지스틱 회귀 알고리즘 적용(학습)하여 모델 생성
fittedLR = lr.fit(train)
fittedLR.transform(train).select("label", "prediction").show()

aggregationDepth: suggested depth for treeAggregate (>= 2). (default: 2)
elasticNetParam: the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty. (default: 0.0)
family: The name of family which is a description of the label distribution to be used in the model. Supported options: auto, binomial, multinomial (default: auto)
featuresCol: features column name. (default: features, current: features)
fitIntercept: whether to fit an intercept term. (default: True)
labelCol: label column name. (default: label, current: label)
lowerBoundsOnCoefficients: The lower bounds on coefficients if fitting under bound constrained optimization. The bound matrix must be compatible with the shape (1, number of features) for binomial regression, or (number of classes, number of features) for multinomial regression. (undefined)
lowerBoundsOnIntercepts: The lower bounds on intercepts if fitting under bound constrained optimization. The

In [7]:
#모델로부터 학습 데이터의 예측값과 실제값 출력 

# 정확도 평가 후 하이퍼 파라미터 조정해서 학습 후 평가 -> 하이퍼 파라미터 조정해서 학습 후 평가를 반복 수행해야 하므로 ...
#Pipeline으로 캡슐화 (적용할 모델과  모델의 하이퍼파라미터와 평가자를 조합한 ParmMap을 설정) 하여 수행 


# 워크플로를 파이프라인 만들기 
# 홀드 아웃 테스트셋 생성
#pipeline으로 수행하기 위해 데이터를 임의 분할
train, test = df.randomSplit([0.7, 0.3])

In [8]:
# 특성 변환을 위한 RFormaul객체 생성
rForm = RFormula()
lr = LogisticRegression().setLabelCol("label").setFeaturesCol("features")


In [9]:
#pipeline객체 생성 (수행 단계 설정 - RFormaula와 LogisticRegression객체)
# RFormula 에 대한 다양한 값을 설정 
from pyspark.ml import Pipeline
stages = [rForm, lr]
pipeline = Pipeline().setStages(stages)



In [10]:
# 생성된 모델 학습 및 평가 
#RFormaula와 하이퍼파라미터를 12가지조합으로 학습시킬 ParamMap객체 생성
from pyspark.ml.tuning import ParamGridBuilder
params = ParamGridBuilder().addGrid(rForm.formula, [
    "lab ~ . + color:value1",
    "lab ~ . + color:value1 + color:value2"]).addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]).addGrid(lr.regParam, [0.1, 2.0]).build()

In [11]:
#이진분류 평가기 객체 생성
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator().setMetricName("areaUnderROC").setRawPredictionCol("prediction").setLabelCol("label")

In [12]:
#ParamMap, 추정자Pipeline, 평가기 , 학습 데이터 를 설정
from pyspark.ml.tuning import TrainValidationSplit
tvs = TrainValidationSplit().setTrainRatio(0.75).setEstimatorParamMaps(params).setEstimator(pipeline).setEvaluator(evaluator)

In [13]:
#학습 시킴(->모델 생성)
tvsFitted = tvs.fit(train)

In [14]:
#평가하기  (테스트데이터에 모델 적용)
evaluator.evaluate(tvsFitted.transform(test)) 

0.9166666666666667

In [None]:
###################################################################################################################################

In [24]:
####StandardScaler(평균 0, 분산 1이 되도록 데이터 변환) 실습 #####
scaleDF = spark.read.parquet("/home/lab13/data/simple-ml-scaling/")
scaleDF.show()

from pyspark.ml.feature import StandardScaler
sScaler = StandardScaler().setInputCol("features")
sScaler.fit(scaleDF).transform(scaleDF).show()

+---+--------------+
| id|      features|
+---+--------------+
|  0|[1.0,0.1,-1.0]|
|  1| [2.0,1.1,1.0]|
|  0|[1.0,0.1,-1.0]|
|  1| [2.0,1.1,1.0]|
|  1|[3.0,10.1,3.0]|
+---+--------------+

+---+--------------+-----------------------------------+
| id|      features|StandardScaler_55b2fb5026e7__output|
+---+--------------+-----------------------------------+
|  0|[1.0,0.1,-1.0]|               [1.19522860933439...|
|  1| [2.0,1.1,1.0]|               [2.39045721866878...|
|  0|[1.0,0.1,-1.0]|               [1.19522860933439...|
|  1| [2.0,1.1,1.0]|               [2.39045721866878...|
|  1|[3.0,10.1,3.0]|               [3.58568582800318...|
+---+--------------+-----------------------------------+



In [33]:
########버켓팅으로 데이터 변환(경계값을 이용하여 버켓 생성) ###
contDF = spark.range(20).selectExpr("cast(id as double)")
contDF.show()

from pyspark.ml.feature import Bucketizer

bucketBorders = [-1.0 , 0.5, 5.0, 10.0, 15.0, 20.0 ]
bucketer = Bucketizer().setSplits(bucketBorders).setInputCol("id")
bucketer.transform(contDF).show()


+----+
|  id|
+----+
| 0.0|
| 1.0|
| 2.0|
| 3.0|
| 4.0|
| 5.0|
| 6.0|
| 7.0|
| 8.0|
| 9.0|
|10.0|
|11.0|
|12.0|
|13.0|
|14.0|
|15.0|
|16.0|
|17.0|
|18.0|
|19.0|
+----+

+----+-------------------------------+
|  id|Bucketizer_3b7c8dc9030f__output|
+----+-------------------------------+
| 0.0|                            0.0|
| 1.0|                            1.0|
| 2.0|                            1.0|
| 3.0|                            1.0|
| 4.0|                            1.0|
| 5.0|                            2.0|
| 6.0|                            2.0|
| 7.0|                            2.0|
| 8.0|                            2.0|
| 9.0|                            2.0|
|10.0|                            3.0|
|11.0|                            3.0|
|12.0|                            3.0|
|13.0|                            3.0|
|14.0|                            3.0|
|15.0|                            4.0|
|16.0|                            4.0|
|17.0|                            4.0|
|18.0|      

In [34]:
from pyspark.ml.feature import StandardScaler
'''
자료형 double 에서 int로 바꿔주기
contDF = spark.range(20)
contDF.show()
'''
sScaler = StandardScaler().setInputCol("features")
sScaler.fit(scaleDF).transform(scaleDF).show()



+---+--------------+-----------------------------------+
| id|      features|StandardScaler_c1185a560ac6__output|
+---+--------------+-----------------------------------+
|  0|[1.0,0.1,-1.0]|               [1.19522860933439...|
|  1| [2.0,1.1,1.0]|               [2.39045721866878...|
|  0|[1.0,0.1,-1.0]|               [1.19522860933439...|
|  1| [2.0,1.1,1.0]|               [2.39045721866878...|
|  1|[3.0,10.1,3.0]|               [3.58568582800318...|
+---+--------------+-----------------------------------+



In [35]:
from pyspark.ml.feature import MinMaxScaler
minMax = MinMaxScaler().setMin(5).setMax(10).setInputCol("features")
minMax.fit(scaleDF).transform(scaleDF).show()


+---+--------------+---------------------------------+
| id|      features|MinMaxScaler_ad94506207aa__output|
+---+--------------+---------------------------------+
|  0|[1.0,0.1,-1.0]|                    [5.0,5.0,5.0]|
|  1| [2.0,1.1,1.0]|                    [7.5,5.5,7.5]|
|  0|[1.0,0.1,-1.0]|                    [5.0,5.0,5.0]|
|  1| [2.0,1.1,1.0]|                    [7.5,5.5,7.5]|
|  1|[3.0,10.1,3.0]|                 [10.0,10.0,10.0]|
+---+--------------+---------------------------------+



In [36]:
from pyspark.ml.feature import MaxAbsScaler
maScaler = MaxAbsScaler(). setInputCol("features")
maScaler.fit(scaleDF).transform(scaleDF).show()


+---+--------------+---------------------------------+
| id|      features|MaxAbsScaler_d6ada2d416e2__output|
+---+--------------+---------------------------------+
|  0|[1.0,0.1,-1.0]|             [0.33333333333333...|
|  1| [2.0,1.1,1.0]|             [0.66666666666666...|
|  0|[1.0,0.1,-1.0]|             [0.33333333333333...|
|  1| [2.0,1.1,1.0]|             [0.66666666666666...|
|  1|[3.0,10.1,3.0]|                    [1.0,1.0,1.0]|
+---+--------------+---------------------------------+



In [41]:
from pyspark.ml.feature import ElementwiseProduct
from pyspark.ml.linalg import Vectors
scaleVec = Vectors.dense(10.0, 15.0, 20.0)
scalingUp = ElementwiseProduct(). setScalingVec(scaleVec).setInputCol("features")
scalingUp.transform(scaleDF).show()

+---+--------------+---------------------------------------+
| id|      features|ElementwiseProduct_b36df450308a__output|
+---+--------------+---------------------------------------+
|  0|[1.0,0.1,-1.0]|                       [10.0,1.5,-20.0]|
|  1| [2.0,1.1,1.0]|                       [20.0,16.5,20.0]|
|  0|[1.0,0.1,-1.0]|                       [10.0,1.5,-20.0]|
|  1| [2.0,1.1,1.0]|                       [20.0,16.5,20.0]|
|  1|[3.0,10.1,3.0]|                      [30.0,151.5,60.0]|
+---+--------------+---------------------------------------+



In [39]:
from pyspark.ml.feature import Normalizer
manhattenDistance = Normalizer().setP(1).setInputCol("features")
manhattenDistance.transform(scaleDF).show()
uclideDistance = Normalizer().setP(2).setInputCol("features")
uclideDistance.transform(scaleDF).show()

+---+--------------+-------------------------------+
| id|      features|Normalizer_da76019113c1__output|
+---+--------------+-------------------------------+
|  0|[1.0,0.1,-1.0]|           [0.47619047619047...|
|  1| [2.0,1.1,1.0]|           [0.48780487804878...|
|  0|[1.0,0.1,-1.0]|           [0.47619047619047...|
|  1| [2.0,1.1,1.0]|           [0.48780487804878...|
|  1|[3.0,10.1,3.0]|           [0.18633540372670...|
+---+--------------+-------------------------------+

+---+--------------+-------------------------------+
| id|      features|Normalizer_39f1c474d94c__output|
+---+--------------+-------------------------------+
|  0|[1.0,0.1,-1.0]|           [0.70534561585859...|
|  1| [2.0,1.1,1.0]|           [0.80257235390512...|
|  0|[1.0,0.1,-1.0]|           [0.70534561585859...|
|  1| [2.0,1.1,1.0]|           [0.80257235390512...|
|  1|[3.0,10.1,3.0]|           [0.27384986857909...|
+---+--------------+-------------------------------+



In [43]:
###########범주형 데이터 인덱싱화(변환) ###############
from pyspark.ml.feature import StringIndexer

simpleDF = spark.read.json("/home/lab13/data/simple-ml")

valIndexer = StringIndexer().setInputCol("value1").setOutputCol("valueInd")
valIndexer.fit(simpleDF).transform(simpleDF).show()


+-----+----+------+------------------+--------+
|color| lab|value1|            value2|valueInd|
+-----+----+------+------------------+--------+
|green|good|     1|14.386294994851129|     2.0|
| blue| bad|     8|14.386294994851129|     4.0|
| blue| bad|    12|14.386294994851129|     0.0|
|green|good|    15| 38.97187133755819|     5.0|
|green|good|    12|14.386294994851129|     0.0|
|green| bad|    16|14.386294994851129|     1.0|
|  red|good|    35|14.386294994851129|     6.0|
|  red| bad|     1| 38.97187133755819|     2.0|
|  red| bad|     2|14.386294994851129|     7.0|
|  red| bad|    16|14.386294994851129|     1.0|
|  red|good|    45| 38.97187133755819|     3.0|
|green|good|     1|14.386294994851129|     2.0|
| blue| bad|     8|14.386294994851129|     4.0|
| blue| bad|    12|14.386294994851129|     0.0|
|green|good|    15| 38.97187133755819|     5.0|
|green|good|    12|14.386294994851129|     0.0|
|green| bad|    16|14.386294994851129|     1.0|
|  red|good|    35|14.386294994851129|  

In [46]:
###########(25.7) 텍스트 데이터 변환자 ###############
##텍스트 처리 (불용어 제거, ngram)#####################

##데이터셋 부터 불러오기 
sales = spark.read.format("csv") .option("header", "true")\
  .option("inferSchema", "true").load("/home/lab13/data/retail_data/all/*.csv").coalesce(5).where("Description IS NOT NULL")


In [47]:
# 25.7.1 텍스트 토큰화하기 
#tokenizer 클래스를 사용하는 방법 
from pyspark.ml.feature import Tokenizer

tkn = Tokenizer().setInputCol("Description").setOutputCol("DescOut")
tokenized = tkn.transform(sales.select("Description"))
tokenized.show(20,False)

+-----------------------------------+------------------------------------------+
|Description                        |DescOut                                   |
+-----------------------------------+------------------------------------------+
|WHITE HANGING HEART T-LIGHT HOLDER |[white, hanging, heart, t-light, holder]  |
|WHITE METAL LANTERN                |[white, metal, lantern]                   |
|CREAM CUPID HEARTS COAT HANGER     |[cream, cupid, hearts, coat, hanger]      |
|KNITTED UNION FLAG HOT WATER BOTTLE|[knitted, union, flag, hot, water, bottle]|
|RED WOOLLY HOTTIE WHITE HEART.     |[red, woolly, hottie, white, heart.]      |
|SET 7 BABUSHKA NESTING BOXES       |[set, 7, babushka, nesting, boxes]        |
|GLASS STAR FROSTED T-LIGHT HOLDER  |[glass, star, frosted, t-light, holder]   |
|HAND WARMER UNION JACK             |[hand, warmer, union, jack]               |
|HAND WARMER RED POLKA DOT          |[hand, warmer, red, polka, dot]           |
|ASSORTED COLOUR BIRD ORNAME

In [49]:
# 다른 tokenize 방법
# RegexTokenizer 사용 
from pyspark.ml.feature import RegexTokenizer 

rt = RegexTokenizer().setInputCol("Description").setOutputCol("DescOut").setPattern(" ").setToLowercase(True)
rt.transform(sales.select("Description")).show(20, False)

+-----------------------------------+------------------------------------------+
|Description                        |DescOut                                   |
+-----------------------------------+------------------------------------------+
|WHITE HANGING HEART T-LIGHT HOLDER |[white, hanging, heart, t-light, holder]  |
|WHITE METAL LANTERN                |[white, metal, lantern]                   |
|CREAM CUPID HEARTS COAT HANGER     |[cream, cupid, hearts, coat, hanger]      |
|KNITTED UNION FLAG HOT WATER BOTTLE|[knitted, union, flag, hot, water, bottle]|
|RED WOOLLY HOTTIE WHITE HEART.     |[red, woolly, hottie, white, heart.]      |
|SET 7 BABUSHKA NESTING BOXES       |[set, 7, babushka, nesting, boxes]        |
|GLASS STAR FROSTED T-LIGHT HOLDER  |[glass, star, frosted, t-light, holder]   |
|HAND WARMER UNION JACK             |[hand, warmer, union, jack]               |
|HAND WARMER RED POLKA DOT          |[hand, warmer, red, polka, dot]           |
|ASSORTED COLOUR BIRD ORNAME

In [50]:
# 25.7.2 일반적인 단어 제거하기 
from pyspark.ml.feature import StopWordsRemover
englishStopWords = StopWordsRemover.loadDefaultStopWords("english")
stops = StopWordsRemover().setStopWords(englishStopWords)  .setInputCol("DescOut")
stops.transform(tokenized).show()


+--------------------+--------------------+-------------------------------------+
|         Description|             DescOut|StopWordsRemover_a8a921a2b58d__output|
+--------------------+--------------------+-------------------------------------+
|WHITE HANGING HEA...|[white, hanging, ...|                 [white, hanging, ...|
| WHITE METAL LANTERN|[white, metal, la...|                 [white, metal, la...|
|CREAM CUPID HEART...|[cream, cupid, he...|                 [cream, cupid, he...|
|KNITTED UNION FLA...|[knitted, union, ...|                 [knitted, union, ...|
|RED WOOLLY HOTTIE...|[red, woolly, hot...|                 [red, woolly, hot...|
|SET 7 BABUSHKA NE...|[set, 7, babushka...|                 [set, 7, babushka...|
|GLASS STAR FROSTE...|[glass, star, fro...|                 [glass, star, fro...|
|HAND WARMER UNION...|[hand, warmer, un...|                 [hand, warmer, un...|
|HAND WARMER RED P...|[hand, warmer, re...|                 [hand, warmer, re...|
|ASSORTED COLOUR

In [51]:
# 25.7.3 단어 조합 만들기 
from pyspark.ml.feature import NGram
unigram = NGram().setInputCol("DescOut").setN(1)
bigram = NGram().setInputCol("DescOut").setN(2)
unigram.transform(tokenized.select("DescOut")).show(10, False)
bigram.transform(tokenized.select("DescOut")).show(10, False)

+------------------------------------------+------------------------------------------+
|DescOut                                   |NGram_fe8cf7ab2363__output                |
+------------------------------------------+------------------------------------------+
|[white, hanging, heart, t-light, holder]  |[white, hanging, heart, t-light, holder]  |
|[white, metal, lantern]                   |[white, metal, lantern]                   |
|[cream, cupid, hearts, coat, hanger]      |[cream, cupid, hearts, coat, hanger]      |
|[knitted, union, flag, hot, water, bottle]|[knitted, union, flag, hot, water, bottle]|
|[red, woolly, hottie, white, heart.]      |[red, woolly, hottie, white, heart.]      |
|[set, 7, babushka, nesting, boxes]        |[set, 7, babushka, nesting, boxes]        |
|[glass, star, frosted, t-light, holder]   |[glass, star, frosted, t-light, holder]   |
|[hand, warmer, union, jack]               |[hand, warmer, union, jack]               |
|[hand, warmer, red, polka, dot]

In [58]:
sales.show()


+---------+---------+--------------------+--------+--------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|   InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+--------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|12/1/2010 8:26|     2.55|     17850|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|12/1/2010 8:26|     2.75|     17850|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|
|   536365|    22752|SET 7 BABUSHKA NE...|       2|12/1/2010 8:26|     7.65|     17850|United Kingdom|
|   536365|    21730|GLASS STAR FROSTE...|       6|12/1/2010 8:26|     4.

In [62]:
# 25.7.4 단어를 숫자로 변환하기 
'''
CountVectorizer는 토큰화된 데이터에서 작동하며, 다음 두 가지 작업을 수행합니다. 
1. 모델을 적합하는 프로세스 동안 모든 문서에서 단어 집합을 찾은 다음 문서별로 해당 단어의 
   출현 빈도를 계산합니다. 
2. 그런 다음 변환 과정에서 DataFrame 컬럼의 각 로우에서 주어진 단어의 발생 빈도를 계산하고 
   해당 로우에 포함된 용어를 벡터 형태로 출력합니다. 
'''
from pyspark.ml.feature import CountVectorizer
cv = CountVectorizer()\
    .setInputCol("DescOut")\
    .setOutputCol("countVec")\
    .setVocabSize(500)\
    .setMinTF(1)\
    .setMinDF(2)
    
fittedCV = cv.fit(tokenized)
fittedCV.transform(tokenized).show(10, False)

+-----------------------------------+------------------------------------------+---------------------------------------------+
|Description                        |DescOut                                   |countVec                                     |
+-----------------------------------+------------------------------------------+---------------------------------------------+
|WHITE HANGING HEART T-LIGHT HOLDER |[white, hanging, heart, t-light, holder]  |(500,[4,15,19,21,23],[1.0,1.0,1.0,1.0,1.0])  |
|WHITE METAL LANTERN                |[white, metal, lantern]                   |(500,[14,15,208],[1.0,1.0,1.0])              |
|CREAM CUPID HEARTS COAT HANGER     |[cream, cupid, hearts, coat, hanger]      |(500,[57,98,222,280],[1.0,1.0,1.0,1.0])      |
|KNITTED UNION FLAG HOT WATER BOTTLE|[knitted, union, flag, hot, water, bottle]|(500,[34,39,40,89,256],[1.0,1.0,1.0,1.0,1.0])|
|RED WOOLLY HOTTIE WHITE HEART.     |[red, woolly, hottie, white, heart.]      |(500,[3,15],[1.0,1.0])         

In [66]:
# 다른방법: Tf-IDF 사용하기 
# 여러 문서에서 발생하는 용어보다 적은 문서에서 출현하는 용어가 더 많은 가중치가 부여
# ex/ the : 가중치가 매우 낮음, streaming : 가중치가 상대적으로 높게 부여 

#예시로 'red'라는 단어가 포함된 일부 문서확인 
tfIdfIn = tokenized\
    .where("array_contains(DescOut, 'red')")\
    .select("DescOut")\
    .limit(10)
tfIdfIn.show(10, False)

+------------------------------------+
|DescOut                             |
+------------------------------------+
|[red, woolly, hottie, white, heart.]|
|[hand, warmer, red, polka, dot]     |
|[red, coat, rack, paris, fashion]   |
|[alarm, clock, bakelike, red]       |
|[set/2, red, retrospot, tea, towels]|
|[red, toadstool, led, night, light] |
|[hand, warmer, red, polka, dot]     |
|[edwardian, parasol, red]           |
|[red, woolly, hottie, white, heart.]|
|[edwardian, parasol, red]           |
+------------------------------------+



In [67]:
# 상기 출력된 단어들을 해싱(hashing) 하여 TF-IDF에 입력 
# 해싱 : 임의의 길이를 가진 데이터를 고정된 길이의 데이터로 매핑 (검색 성능 개선)
from pyspark.ml.feature import HashingTF, IDF

tf = HashingTF()\
    .setInputCol("DescOut")\
    .setOutputCol("TFOut")\
    .setNumFeatures(10000)
idf = IDF()\
    .setInputCol("TFOut")\
    .setOutputCol("IDFOut")\
    .setMinDocFreq(2)

idf.fit(tf.transform(tfIdfIn)).transform(tf.transform(tfIdfIn)).show(10, False)

+------------------------------------+--------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------+
|DescOut                             |TFOut                                                   |IDFOut                                                                                                              |
+------------------------------------+--------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------+
|[red, woolly, hottie, white, heart.]|(10000,[4077,4291,6756,6872,7142],[1.0,1.0,1.0,1.0,1.0])|(10000,[4077,4291,6756,6872,7142],[1.2992829841302609,0.0,1.2992829841302609,1.2992829841302609,1.2992829841302609])|
|[hand, warmer, red, polka, dot]     |(10000,[3280,3423,4220,4291,8977],[1.0,1.0,1.0,1.0,1.0])|(10000,[3280,3423,4220,4291,8977],[1.2992829841302609

In [60]:
####### 25.7.5 word2Vec #####
# https://ratsgo.github.io/from%20frequency%20to%20semantics/2017/03/30/word2vec/
from pyspark.ml.feature import Word2Vec

#입력데이터: 각 로우는 문장 또는 문서의 단어주머니입니다. 
documentDF = spark.createDataFrame([
    ("Hi I heard about Spark" .split(" "), ),
    ("I wish Java could use case classes" .split(" "), ),
    ("Logistic regression models are neat" .split(" "),)
    
], ["text"])

#단어를 백터에 매핑합니다.
word2Vec = Word2Vec(vectorSize=3, minCount=0, inputCol="text", outputCol="result")
model = word2Vec.fit(documentDF)
result = model.transform(documentDF)
for row in result.collect():
    text, vector = row 
    print("Text: [%s] => \nVector: %s\n" % (", ".join(text), str(vector)))
    

Text: [Hi, I, heard, about, Spark] => 
Vector: [-0.08174108117818833,-0.021760249510407448,-0.020029566623270514]

Text: [I, wish, Java, could, use, case, classes] => 
Vector: [-0.03161891416779586,-0.028586057147809436,-0.04831806330808571]

Text: [Logistic, regression, models, are, neat] => 
Vector: [0.015932495146989824,0.00997152216732502,0.014660616219043732]



In [None]:
F