# DATA MANIPULATION: FEATURES

## 8.1 Feature Extraction

In [1]:
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder.appName("Python Spark Feature Extraction")
    .config("spark.some.config.option", "some-value")
    .getOrCreate()
)


25/09/07 10:30:10 WARN Utils: Your hostname, gogeon-uui-noteubug.local resolves to a loopback address: 127.0.0.1; using 172.29.56.7 instead (on interface en0)
25/09/07 10:30:10 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/09/07 10:30:11 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


### TF-IDF

Spark에서는 term frequency vector를 생성하는 방법으로 크게 두가지 1) `HashingTF (줄여서 HTF)`, 2)`CountVectorizer (CV)`가 있다. 두 개의 결과물은 동일하나, 아래의 차이점이 있다.

1. `reversible`(CV) vs `irreversible` (HTF)  
HTF는 각 문장의 토큰들을 hash를 통해 index로 변환 후에 count를 한다. 해시의 특성상 원래의 입력값으로 되돌릴수 없다. 반대로 CV는 모델에 인덱스의 원본 단어를 저장하고 있으므로, reversible하다.

2. memory & computational overhead  
CV는 모든 문장에서 사용되는 토큰들을 수집한 후에, # documents X # tokens의 TF dense vector를 return 한다. 1) 모든 문장을 돌아야 하고, 2) 모든 문장 X 모든 토큰의 TF를 반환하므로 비효율적이다.
HTF는 다른 문장에 dependency없이 각각의 token을 hash 하므로 시간/공간 효율적이다.

3. hashing has dependency from vector size, hashing function, and documents.  
hashing은 다른 토큰이 같은 해시값으로 매핑되는 해시 충돌이 일어날수 있다. 따라서 해시의 버킷 사이즈나 해시 함수등에 영향을 받는다.
4. a source of the information loss  
CV에선 infrequent tokens은 제거하지만, HTF에선 해시 충돌등으로 인해 infrequent token도 다른 토큰으로 병합될수 있다. 사용 환경에 따라서 적절히 알고리즘을 선택할 필요가 있다.

In [2]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import HashingTF, IDF, Tokenizer

sentenceData = spark.createDataFrame(
    [(0, "Python python Spark Spark"), (1, "Python SQL")], ["document", "sentence"]
)
sentenceData.show()

                                                                                

+--------+--------------------+
|document|            sentence|
+--------+--------------------+
|       0|Python python Spa...|
|       1|          Python SQL|
+--------+--------------------+



1. CountVectorizer

In [10]:
# Count Vectorizer
from pyspark.ml.feature import CountVectorizer, HashingTF, IDF, Tokenizer
import numpy as np

tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
vectorizer = CountVectorizer(inputCol=tokenizer.getOutputCol(), outputCol="features")
idf = IDF(inputCol=vectorizer.getOutputCol(), outputCol="tfidf")
# 토크나이저 -> 벡터라이저 -> IDF 순으로 순차적으로 계산
pipeline = Pipeline(stages=[tokenizer, vectorizer, idf])

model = pipeline.fit(sentenceData)
total_counts = (
    model.transform(sentenceData)
    .select("features")
    .rdd.map(lambda row: row["features"].toArray())
    .reduce(lambda a, b: [a[i] + b[i] for i in range(len(b))])
)

vocabList = model.stages[1].vocabulary
d = {"vocabList": vocabList, "total_counts": total_counts}

# Term frequency
spark.createDataFrame(np.array(list(d.values())).T.tolist(), list(d.keys())).show()

+---------+------------+
|vocabList|total_counts|
+---------+------------+
|   python|         3.0|
|    spark|         2.0|
|      sql|         1.0|
+---------+------------+



vectorizer와 idf 이후 결과는 메모리 효율성을 위해 아래와 같은 형식으로 데이터가 담긴다.
1. 전체 vocab 크기 (int)
2. non-zero값의 토큰 인덱스 (list of int)
3. 해당 인덱스의 실제 TF/TF-IDF 값 (list of float)

In [11]:
# 각 문장별 tf-idf 벡터 계산
result = model.transform(sentenceData)
result.show(truncate=False)

+--------+-------------------------+------------------------------+-------------------+----------------------------------+
|document|sentence                 |words                         |features           |tfidf                             |
+--------+-------------------------+------------------------------+-------------------+----------------------------------+
|0       |Python python Spark Spark|[python, python, spark, spark]|(3,[0,1],[2.0,2.0])|(3,[0,1],[0.0,0.8109302162163288])|
|1       |Python SQL               |[python, sql]                 |(3,[0,2],[1.0,1.0])|(3,[0,2],[0.0,0.4054651081081644])|
+--------+-------------------------+------------------------------+-------------------+----------------------------------+



In [15]:
# 원본 termidx를 term word로 변환
from pyspark.sql.types import ArrayType, StringType
from pyspark.sql.functions import udf


def termsIdx2Term(vocabulary):
    def termsIdx2Term(termIndices):
        return [vocabulary[int(index)] for index in termIndices]

    return udf(termsIdx2Term, ArrayType(StringType()))


vectorizerModel = model.stages[1]
vocabList = vectorizerModel.vocabulary
vocabList

['python', 'spark', 'sql']

In [17]:
# 위의 결과를 기반으로 전체 doc X token 행렬을 만든다.

from pyspark.sql.functions import udf
import pyspark.sql.functions as F
from pyspark.sql.types import StringType, DoubleType, IntegerType, ArrayType


indices_udf = udf(lambda vector: vector.indices.tolist(), ArrayType(IntegerType()))
tfidf_udf = udf(lambda vector: vector.toArray().tolist(), ArrayType(DoubleType()))

result.select("document", "sentence", "tfidf").withColumn(
    "indices", indices_udf(F.col("tfidf"))
).withColumn("tfidf", tfidf_udf(F.col("tfidf"))).withColumn(
    "Terms", F.size(F.col("indices"))
).withColumn("Terms", termsIdx2Term(vocabList)(F.col("indices"))).show(truncate=False)


+--------+-------------------------+------------------------------+-------+---------------+
|document|sentence                 |tfidf                         |indices|Terms          |
+--------+-------------------------+------------------------------+-------+---------------+
|0       |Python python Spark Spark|[0.0, 0.8109302162163288, 0.0]|[0, 1] |[python, spark]|
|1       |Python SQL               |[0.0, 0.0, 0.4054651081081644]|[0, 2] |[python, sql]  |
+--------+-------------------------+------------------------------+-------+---------------+



2. HashingTF

In [28]:
# Hashing에서 기본으로 사용되는 알고리즘은 murmurhash 3이다.
# 해시 충돌로 인해 다른 단어가 같은 토큰으로 들어갈 수 있다.
vectorizer = HashingTF(inputCol="words", outputCol="features", numFeatures=2000)

pipeline = Pipeline(stages=[tokenizer, vectorizer, idf])

model = pipeline.fit(sentenceData)
result = model.transform(sentenceData)
result.show(truncate=False)


+--------+-------------------------+------------------------------+----------------------------+-------------------------------------------+
|document|sentence                 |words                         |features                    |tfidf                                      |
+--------+-------------------------+------------------------------+----------------------------+-------------------------------------------+
|0       |Python python Spark Spark|[python, python, spark, spark]|(2000,[1286,1709],[2.0,2.0])|(2000,[1286,1709],[0.8109302162163288,0.0])|
|1       |Python SQL               |[python, sql]                 |(2000,[52,1709],[1.0,1.0])  |(2000,[52,1709],[0.4054651081081644,0.0])  |
+--------+-------------------------+------------------------------+----------------------------+-------------------------------------------+



In [None]:
result.select("document", "sentence", "tfidf").withColumn(
    "indices", indices_udf(F.col("tfidf"))
).withColumn("tfidf", tfidf_udf(F.col("tfidf"))).withColumn(
    "Terms", F.size(F.col("indices"))
).show(truncate=False)

# HashingTF는 원래의 word로 역변환이 불가능하다.
# tf-idf 벡터의 dimension은 hash 함수의 bucket 사이즈에 의존한다.
# .withColumn("Terms", termsIdx2Term(vocabList)(F.col("indices"))).show(truncate=False)


+--------+-------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

Word2Vec도 동일하게 패키지를 이용해 계산 가능하다.

In [3]:
from pyspark.ml.feature import Word2Vec
from pyspark.ml import Pipeline

tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
word2Vec = Word2Vec(
    inputCol=tokenizer.getOutputCol(), outputCol="features", vectorSize=3, minCount=0
)
pipeline = Pipeline(stages=[tokenizer, word2Vec])

model = pipeline.fit(sentenceData)
result = model.transform(sentenceData)
result.show(truncate=False)

25/09/07 10:30:21 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
25/09/07 10:30:21 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS


+--------+-------------------------+------------------------------+----------------------------------------------------------------+
|document|sentence                 |words                         |features                                                        |
+--------+-------------------------+------------------------------+----------------------------------------------------------------+
|0       |Python python Spark Spark|[python, python, spark, spark]|[-0.09911318868398666,-0.06356196707929485,0.061470928601920605]|
|1       |Python SQL               |[python, sql]                 |[-0.09358901157975197,-0.03516584075987339,0.13219960406422615] |
+--------+-------------------------+------------------------------+----------------------------------------------------------------+



[FeatuerHasher](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.feature.FeatureHasher.html): 해싱을 이용해 각 피쳐를 토큰으로 변환 하고, 토큰의 scale을 계산한다

In [15]:
from pyspark.ml.feature import FeatureHasher

dataset = spark.createDataFrame(
    [
        (2.2, True, "1", "foo"),
        (3.3, False, "2", "bar"),
        (4.4, False, "3", "baz"),
        (5.5, True, "4", "foo"),
    ],
    ["real", "bool", "stringNum", "string"],
)


hasher = FeatureHasher(
    inputCols=["real", "bool", "stringNum", "string"], outputCol="features"
)

featurized = hasher.transform(dataset)
featurized.show(truncate=False)

+----+-----+---------+------+--------------------------------------------------------+
|real|bool |stringNum|string|features                                                |
+----+-----+---------+------+--------------------------------------------------------+
|2.2 |true |1        |foo   |(262144,[174475,247670,257907,262126],[2.2,1.0,1.0,1.0])|
|3.3 |false|2        |bar   |(262144,[70644,89673,173866,174475],[1.0,1.0,1.0,3.3])  |
|4.4 |false|3        |baz   |(262144,[22406,70644,174475,187923],[1.0,1.0,4.4,1.0])  |
|5.5 |true |4        |foo   |(262144,[101499,174475,247670,257907],[1.0,5.5,1.0,1.0])|
+----+-----+---------+------+--------------------------------------------------------+



[RFormula](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.feature.RFormula.html): R-style의 선형 방정식을 작성하여 데이터를 feature와 label로 분리한다 

In [None]:
from pyspark.ml.feature import RFormula

dataset = spark.createDataFrame(
    [
        (2.2, 1.0, "1", "foo"),
        (3.3, 0.0, "2", "bar"),
        (4.4, 0.0, "3", "baz"),
        (5.5, 1.0, "3", "foo"),
    ],
    ["real", "bool", "stringNum", "string"],
)

formula = RFormula(
    formula="bool ~ real + stringNum",  # stringNum은 string이므로, 원핫벡터로 표현됨
    featuresCol="features",
    labelCol="label",
    forceIndexLabel=True,
)

transformed = formula.fit(dataset).transform(dataset)
transformed.select("features", "label").show(truncate=False)


+-------------+-----+
|features     |label|
+-------------+-----+
|[2.2,0.0,1.0]|1.0  |
|[3.3,0.0,0.0]|0.0  |
|[4.4,1.0,0.0]|0.0  |
|[5.5,1.0,0.0]|1.0  |
+-------------+-----+



## Feature Transform

전체 Feature Transformation API는 [여기서](https://spark.apache.org/docs/latest/ml-features.html) 확인 가능하다.

In [None]:
# RegexTokenizer: https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.feature.RegexTokenizer.html

from pyspark.ml.feature import Tokenizer, RegexTokenizer
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType

sentenceData = spark.createDataFrame(
    [(0, "Python python Spark Spark"), (1, "Python SQL")], ["id", "sentence"]
)

tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
tokenized = tokenizer.transform(sentenceData)
tokenized.show(truncate=False)

regexTokenizer = RegexTokenizer(
    inputCol="sentence",
    outputCol="words",
    pattern="\\w+",  # \w는 word character(문자)를 찾는 정규표현식
    gaps=False,  # gaps 옵션은 패턴과 일치하는 부분을 구분자(True) / 토큰(False)로 사용한다.
)

tokenized = regexTokenizer.transform(sentenceData)
tokenized.show(truncate=False)


regexTokenizer = RegexTokenizer(
    inputCol="sentence",
    outputCol="words",
    pattern="\\W",  # \W는 word character가 아닌 문자를 찾는 정규표현식
    gaps=True,  # gaps 옵션은 패턴과 일치하는 부분을 구분자(True) / 토큰(False)로 사용한다. (즉, 공백을 찾아 구분자로 분리한다.)
)

tokenized = regexTokenizer.transform(sentenceData)
tokenized.show(truncate=False)


+---+-------------------------+------------------------------+
|id |sentence                 |words                         |
+---+-------------------------+------------------------------+
|0  |Python python Spark Spark|[python, python, spark, spark]|
|1  |Python SQL               |[python, sql]                 |
+---+-------------------------+------------------------------+

+---+-------------------------+------------------------------+
|id |sentence                 |words                         |
+---+-------------------------+------------------------------+
|0  |Python python Spark Spark|[python, python, spark, spark]|
|1  |Python SQL               |[python, sql]                 |
+---+-------------------------+------------------------------+

+---+-------------------------+------------------------------+
|id |sentence                 |words                         |
+---+-------------------------+------------------------------+
|0  |Python python Spark Spark|[python, python, spark

In [None]:
# Stop Words Remover

from pyspark.ml.feature import StopWordsRemover

sentenceData = spark.createDataFrame(
    [
        (0, ["I", "saw", "the", "red", "balloon"]),
        (1, ["Mary", "had", "a", "little", "lamb"]),
    ],
    ["id", "words"],
)

# 등록된 stop words 제거 (e.g., I, the, a, had)
remover = StopWordsRemover(inputCol="words", outputCol="filtered")

result = remover.transform(sentenceData)
result.show(truncate=False)


+---+----------------------------+--------------------+
|id |words                       |filtered            |
+---+----------------------------+--------------------+
|0  |[I, saw, the, red, balloon] |[saw, red, balloon] |
|1  |[Mary, had, a, little, lamb]|[Mary, little, lamb]|
+---+----------------------------+--------------------+



In [57]:
# NGram

from pyspark.ml.feature import NGram

sentenceData = spark.createDataFrame(
    [
        (0, "I love Spark"),
        (1, "I love Python"),
        (1, "I love SQL"),
    ],
    ["id", "sentence"],
)

tokenizer = Tokenizer(inputCol="sentence", outputCol="words")

# N-gram(bi-gram) 토큰화
ngram = NGram(n=2, inputCol="words", outputCol="ngrams")

pipeline = Pipeline(stages=[tokenizer, ngram])
pipeline.fit(sentenceData).transform(sentenceData).show(truncate=False)

+---+-------------+-----------------+---------------------+
|id |sentence     |words            |ngrams               |
+---+-------------+-----------------+---------------------+
|0  |I love Spark |[i, love, spark] |[i love, love spark] |
|1  |I love Python|[i, love, python]|[i love, love python]|
|1  |I love SQL   |[i, love, sql]   |[i love, love sql]   |
+---+-------------+-----------------+---------------------+



In [61]:
# label Converter

from pyspark.ml.feature import StringIndexer, IndexToString

df = spark.createDataFrame(
    [
        (0, "a"),
        (1, "b"),
        (2, "c"),
    ],
    ["id", "label"],
)

indexer = StringIndexer(inputCol="label", outputCol="indexedLabel")

indexed = indexer.fit(df).transform(df)

converter = IndexToString(inputCol="indexedLabel", outputCol="originalLabel")

result = converter.transform(indexed)

result.show(truncate=False)

+---+-----+------------+-------------+
|id |label|indexedLabel|originalLabel|
+---+-----+------------+-------------+
|0  |a    |0.0         |a            |
|1  |b    |1.0         |b            |
|2  |c    |2.0         |c            |
+---+-----+------------+-------------+



In [None]:
# VectorIndexer
# 각 피쳐의 타입을 추론하여, 벡터화 한다.

from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.feature import RFormula

df = spark.createDataFrame(
    [
        (0, 2.2, True, "1", "foo", "CA"),
        (1, 3.3, False, "2", "bar", "US"),
        (0, 4.4, False, "3", "baz", "CHN"),
        (1, 5.5, False, "4", "foo", "AUS"),
    ],
    ["label", "real", "bool", "stringNum", "string", "country"],
)

formula = RFormula(
    formula="label ~ real + bool + stringNum + string + country",
    featuresCol="features",
    labelCol="label",
)

# Category가 3개 이상인 경우는 continuous로 취급
featureIndexer = VectorIndexer(
    inputCol="features", outputCol="indexedFeatures", maxCategories=2
)
pipeline = Pipeline(stages=[formula, featureIndexer])
model = pipeline.fit(df)

result = model.transform(df)
result.show()

+-----+----+-----+---------+------+-------+--------------------+--------------------+
|label|real| bool|stringNum|string|country|            features|     indexedFeatures|
+-----+----+-----+---------+------+-------+--------------------+--------------------+
|    0| 2.2| true|        1|   foo|     CA|(10,[0,1,2,5,8],[...|(10,[0,1,2,5,8],[...|
|    1| 3.3|false|        2|   bar|     US|(10,[0,3,6],[3.3,...|(10,[0,3,6],[3.3,...|
|    0| 4.4|false|        3|   baz|    CHN|(10,[0,4,9],[4.4,...|(10,[0,4,9],[4.4,...|
|    1| 5.5|false|        4|   foo|    AUS|(10,[0,5,7],[5.5,...|(10,[0,5,7],[5.5,...|
+-----+----+-----+---------+------+-------+--------------------+--------------------+



In [66]:
# VectorAssembler: 사용할 피쳐들을 concat하여 하나의 벡터로 만든다.

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vectors

dataset = spark.createDataFrame(
    [
        (0, 18, 1.0, Vectors.dense(1.0, 0.0, 0.0)),
        (1, 20, 2.0, Vectors.dense(0.0, 1.0, 0.0)),
        (2, 25, 3.0, Vectors.dense(0.0, 0.0, 1.0)),
    ],
    ["id", "hour", "mobile", "userFeatures"],
)

assembler = VectorAssembler(
    inputCols=["hour", "mobile", "userFeatures"],
    outputCol="features",
)

output = assembler.transform(dataset)
output.select("features", "id").show(truncate=False)


+----------------------+---+
|features              |id |
+----------------------+---+
|[18.0,1.0,1.0,0.0,0.0]|0  |
|[20.0,2.0,0.0,1.0,0.0]|1  |
|[25.0,3.0,0.0,0.0,1.0]|2  |
+----------------------+---+



In [70]:
## Get dummy UDF
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.sql.functions import col, udf


def get_dummy_udf(df, indexCol, categoryCols, continuousCols, labelCol, dropLast=True):
    # 각 범주형 변수를 카테고리화
    indexers = [
        StringIndexer(inputCol=c, outputCol=f"{c}_index").fit(df) for c in categoryCols
    ]
    # 카테고리화된 변수를 원-핫 인코딩
    encoders = [
        OneHotEncoder(
            inputCol=indexer.getOutputCol(),
            outputCol=f"{indexer.getOutputCol()}_encoded",
        )
        for indexer in indexers
    ]

    # 원핫 인코딩 벡터들을 concat
    assembler = VectorAssembler(
        inputCols=[encoder.getOutputCol() for encoder in encoders] + continuousCols,
        outputCol="features",
    )

    pipeline = Pipeline(stages=indexers + encoders + [assembler])
    model = pipeline.fit(df)
    result = model.transform(df)

    # 인덱스와 라벨도 주어질때
    if indexCol and labelCol:
        result = result.withColumn(labelCol, col(labelCol).cast("double"))
        return result.select(indexCol, "features", labelCol)
    # 라벨만 주어질때
    elif not indexCol and labelCol:
        return result.select("features", labelCol)
    # 인덱스만 주어질때
    elif indexCol and not labelCol:
        return result.select(indexCol, "features")
    else:
        return result.select("features")

In [72]:
df = spark.createDataFrame(
    [
        (0, "a", 1, 1),
        (1, "b", 2, 2),
        (2, "c", 3, 3),
    ],
    ["id", "category", "label", "continuous"],
)

get_dummy_udf(df, "id", ["category"], ["continuous"], "label").show(truncate=False)


+---+-------------+-----+
|id |features     |label|
+---+-------------+-----+
|0  |[1.0,0.0,1.0]|1.0  |
|1  |[0.0,1.0,2.0]|2.0  |
|2  |[0.0,0.0,3.0]|3.0  |
+---+-------------+-----+



## 이외의 feature engineering method

* Scaler (e.g., StandardScaler, MinMaxScaler, etc.)

* Dimensionality Reduction (e.g., PCA and T-SVD)

* Feature Selection (e.g., Lasso and Random Forest)

