In [0]:
# importar SparkSession
from pyspark.sql import SparkSession
from pyspark.sql.types import *
# crear objeto de sesión spark
spark=SparkSession.builder.appName("Feature engineering").getOrCreate()

In [0]:
column_names = ["emp_id", "salario"]
records = [(100, 120000), (200, 170000), (300, 150000)]
df = spark.createDataFrame(records, schema=column_names)
df.show()

+------+-------+
|emp_id|salario|
+------+-------+
|   100| 120000|
|   200| 170000|
|   300| 150000|
+------+-------+



In [0]:
df2 = df.withColumn("bonos", df.salario * 0.05)
df2.show()

+------+-------+------+
|emp_id|salario| bonos|
+------+-------+------+
|   100| 120000|6000.0|
|   200| 170000|8500.0|
|   300| 150000|7500.0|
+------+-------+------+



In [0]:
# Cargamo l función udf
from pyspark.sql.functions import udf

# Creamos una función 
@udf("integer")
def tripled(num):
    return 5*int(num)

# Agregamo una nueva variable DAtaFrame
df2 = df.withColumn('tripled_col', tripled(df.salario))
df2.show()

+------+-------+-----------+
|emp_id|salario|tripled_col|
+------+-------+-----------+
|   100| 120000|     600000|
|   200| 170000|     850000|
|   300| 150000|     750000|
+------+-------+-----------+



In [0]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()
columns = ["Seqno","Nombre"]
data = [("1", "juan Jones"),
    ("2", "tracey aguilar"),
    ("3", "amy castellon")]
df = spark.createDataFrame(data=data,schema=columns)
df.show(truncate=False)

+-----+--------------+
|Seqno|Nombre        |
+-----+--------------+
|1    |juan Jones    |
|2    |tracey aguilar|
|3    |amy castellon |
+-----+--------------+



In [0]:
def convertCase(str):
    resStr=""
    arr = str.split(" ")
    for x in arr:
       resStr= resStr + x[0:1].upper() + x[1:len(x)] + " "
    return resStr 

In [0]:
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType

# Convertiendo la función a UDF 
convertUDF = udf(lambda z: convertCase(z),StringType())

In [0]:
df.show()

+-----+--------------+
|Seqno|        Nombre|
+-----+--------------+
|    1|    juan Jones|
|    2|tracey aguilar|
|    3| amy castellon|
+-----+--------------+



In [0]:
df.select(col("Seqno"), \
    convertUDF(col("nombre")).alias("Nombre_Mayus") ) \
   .show(truncate=False)

+-----+---------------+
|Seqno|Nombre_Mayus   |
+-----+---------------+
|1    |Juan Jones     |
|2    |Tracey Aguilar |
|3    |Amy Castellon  |
+-----+---------------+



In [0]:
# spark: Una instancia en SparkSession
# creamos un DataFrame
df = spark.createDataFrame([
(1, 'CS', 'MS'),
(2, 'MATH', 'PHD'),
(3, 'MATH', 'MS'),
(4, 'CS', 'MS'),
(5, 'CS', 'PHD'),
(6, 'ECON', 'BS'), (7, 'ECON', 'BS'),], ['id', 'dept', 'education'])

In [0]:
df.show()

+---+----+---------+
| id|dept|education|
+---+----+---------+
|  1|  CS|       MS|
|  2|MATH|      PHD|
|  3|MATH|       MS|
|  4|  CS|       MS|
|  5|  CS|      PHD|
|  6|ECON|       BS|
|  7|ECON|       BS|
+---+----+---------+



In [0]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoder

In [0]:
# Stage 1: transforma el `dept` columna a numerica
stage_1 = StringIndexer(inputCol= 'dept', outputCol= 'dept_index')
#
# Stage 2: transforma la `education` a columna numérica
stage_2 = StringIndexer(inputCol= 'education', outputCol= 'education_index')
#

In [0]:
# Aplicar la transformación
stage_1.fit(df).transform(df).show()

+---+----+---------+----------+
| id|dept|education|dept_index|
+---+----+---------+----------+
|  1|  CS|       MS|       0.0|
|  2|MATH|      PHD|       2.0|
|  3|MATH|       MS|       2.0|
|  4|  CS|       MS|       0.0|
|  5|  CS|      PHD|       0.0|
|  6|ECON|       BS|       1.0|
|  7|ECON|       BS|       1.0|
+---+----+---------+----------+



In [0]:
# Aplicar la transformación
stage_2.fit(df).transform(df).show()

+---+----+---------+---------------+
| id|dept|education|education_index|
+---+----+---------+---------------+
|  1|  CS|       MS|            0.0|
|  2|MATH|      PHD|            2.0|
|  3|MATH|       MS|            0.0|
|  4|  CS|       MS|            0.0|
|  5|  CS|      PHD|            2.0|
|  6|ECON|       BS|            1.0|
|  7|ECON|       BS|            1.0|
+---+----+---------+---------------+



In [0]:
# Etapa 3: codificar one-hot la columna numérica `education_index`
stage_3 = OneHotEncoder(inputCols=['education_index'],outputCols=['education_OHE'])

In [0]:
df.show()

+---+----+---------+
| id|dept|education|
+---+----+---------+
|  1|  CS|       MS|
|  2|MATH|      PHD|
|  3|MATH|       MS|
|  4|  CS|       MS|
|  5|  CS|      PHD|
|  6|ECON|       BS|
|  7|ECON|       BS|
+---+----+---------+



In [0]:
# configurar la canalización: pegar las etapas juntas
pipeline = Pipeline(stages=[stage_1, stage_2, stage_3 ])# Entubar todas las etapas
# ajuste el modelo de tubería y transforme los datos como se define
pipeline_model = pipeline.fit(df)

In [0]:
# Se realiza la tranformación
final_df = pipeline_model.transform(df) # Se aplica nuestro datos
final_df.show(truncate=False)

+---+----+---------+----------+---------------+-------------+
|id |dept|education|dept_index|education_index|education_OHE|
+---+----+---------+----------+---------------+-------------+
|1  |CS  |MS       |0.0       |0.0            |(2,[0],[1.0])|
|2  |MATH|PHD      |2.0       |2.0            |(2,[],[])    |
|3  |MATH|MS       |2.0       |0.0            |(2,[0],[1.0])|
|4  |CS  |MS       |0.0       |0.0            |(2,[0],[1.0])|
|5  |CS  |PHD      |0.0       |2.0            |(2,[],[])    |
|6  |ECON|BS       |1.0       |1.0            |(2,[1],[1.0])|
|7  |ECON|BS       |1.0       |1.0            |(2,[1],[1.0])|
+---+----+---------+----------+---------------+-------------+



In [0]:
from pyspark.ml.feature import Binarizer
raw_df = spark.createDataFrame([
(1, 0.1),
(2, 0.2),
(3, 0.5),
(4, 0.8),
(5, 0.9),
(6, 1.1)
], ["id", "feature"])

raw_df.show()

+---+-------+
| id|feature|
+---+-------+
|  1|    0.1|
|  2|    0.2|
|  3|    0.5|
|  4|    0.8|
|  5|    0.9|
|  6|    1.1|
+---+-------+



In [0]:
from pyspark.ml.feature import Binarizer
binarizer = Binarizer(threshold=0.5, inputCol="feature",outputCol="binarized_feature")

In [0]:
binarized_df = binarizer.transform(raw_df)
print("Salida del binarizador con Threshold = %f" % binarizer.getThreshold())

Salida del binarizador con Threshold = 0.500000


In [0]:
#binarized_df = binarizer.transform(raw_df)
binarized_df.show(truncate=False)

+---+-------+-----------------+
|id |feature|binarized_feature|
+---+-------+-----------------+
|1  |0.1    |0.0              |
|2  |0.2    |0.0              |
|3  |0.5    |0.0              |
|4  |0.8    |1.0              |
|5  |0.9    |1.0              |
|6  |1.1    |1.0              |
+---+-------+-----------------+



In [0]:
df = spark.createDataFrame([(1, 12.0, 5.0),(2, 7.0, 10.0),(3, 10.0, 12.0),(4, 5.0, float("nan")),(5, 6.0, None),
                            (6, float("nan"), float("nan")),(7, None, None)], ["id", "col1", "col2"])
df.show(truncate=False)

+---+----+----+
|id |col1|col2|
+---+----+----+
|1  |12.0|5.0 |
|2  |7.0 |10.0|
|3  |10.0|12.0|
|4  |5.0 |NaN |
|5  |6.0 |null|
|6  |NaN |NaN |
|7  |null|null|
+---+----+----+



In [0]:
# LLamamos a la función Imputer
from pyspark.ml.feature import Imputer
# Por defauld usa la estrategia de la media
imputer = Imputer(inputCols=["col1", "col2"],outputCols=["col1_out", "col2_out"])
# Realiza la imputación
model = imputer.fit(df)
transformed = model.transform(df)
# Muestra el resultado
transformed.show(truncate=False)

+---+----+----+--------+--------+
|id |col1|col2|col1_out|col2_out|
+---+----+----+--------+--------+
|1  |12.0|5.0 |12.0    |5.0     |
|2  |7.0 |10.0|7.0     |10.0    |
|3  |10.0|12.0|10.0    |12.0    |
|4  |5.0 |NaN |5.0     |9.0     |
|5  |6.0 |null|6.0     |9.0     |
|6  |NaN |NaN |8.0     |9.0     |
|7  |null|null|8.0     |9.0     |
+---+----+----+--------+--------+



In [0]:
# Estrategia con la mediana
imputer.setStrategy("median")
# Rellena con la mediana
model = imputer.fit(df)
transformed = model.transform(df)
# Muestra el resultado
transformed.show(truncate=False)

+---+----+----+--------+--------+
|id |col1|col2|col1_out|col2_out|
+---+----+----+--------+--------+
|1  |12.0|5.0 |12.0    |5.0     |
|2  |7.0 |10.0|7.0     |10.0    |
|3  |10.0|12.0|10.0    |12.0    |
|4  |5.0 |NaN |5.0     |10.0    |
|5  |6.0 |null|6.0     |10.0    |
|6  |NaN |NaN |7.0     |10.0    |
|7  |null|null|7.0     |10.0    |
+---+----+----+--------+--------+



In [0]:
from pyspark.ml.feature import Tokenizer

In [0]:
docs = [(1, "a Fox jumped over FOX"),(2, "RED of fox jumped")]
df = spark.createDataFrame(docs, ["id", "texto"])
df.show(truncate=False)

+---+---------------------+
|id |texto                |
+---+---------------------+
|1  |a Fox jumped over FOX|
|2  |RED of fox jumped    |
+---+---------------------+



In [0]:
# Le decimos cual es la columna o variable que desea Tokenizer y la salida
tokenizer = Tokenizer(inputCol="texto", outputCol="tokens")
# Se realiza el Tokenizer
tokenized = tokenizer.transform(df)
tokenized.show(truncate=False)

+---+---------------------+---------------------------+
|id |texto                |tokens                     |
+---+---------------------+---------------------------+
|1  |a Fox jumped over FOX|[a, fox, jumped, over, fox]|
|2  |RED of fox jumped    |[red, of, fox, jumped]     |
+---+---------------------+---------------------------+



In [0]:
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType

countTokens = udf(lambda words: len(words), IntegerType())
tokenized.select("texto", "tokens").withColumn("tokens_length",countTokens(col("tokens"))).show(truncate=False)

+---------------------+---------------------------+-------------+
|texto                |tokens                     |tokens_length|
+---------------------+---------------------------+-------------+
|a Fox jumped over FOX|[a, fox, jumped, over, fox]|5            |
|RED of fox jumped    |[red, of, fox, jumped]     |4            |
+---------------------+---------------------------+-------------+



In [0]:
from pyspark.ml.feature import Tokenizer, RegexTokenizer

regexTokenizer = RegexTokenizer(inputCol="texto", outputCol="tokens",
pattern="\\W", minTokenLength=3)
# Relizame la tranformación
regex_tokenized = regexTokenizer.transform(df)
regex_tokenized.select("texto", "tokens").withColumn("tokens_length", countTokens(col("tokens"))).show(truncate=False)

+---------------------+------------------------+-------------+
|texto                |tokens                  |tokens_length|
+---------------------+------------------------+-------------+
|a Fox jumped over FOX|[fox, jumped, over, fox]|4            |
|RED of fox jumped    |[red, fox, jumped]      |3            |
+---------------------+------------------------+-------------+



In [0]:
docs = [(1, "a Fox jumped, over, the fence?"),(2, "a RED, of fox?"),(3, "Curso Pyspark Bolivia")]
df = spark.createDataFrame(docs, ["id", "text"])
df.show(truncate=False)

+---+------------------------------+
|id |text                          |
+---+------------------------------+
|1  |a Fox jumped, over, the fence?|
|2  |a RED, of fox?                |
|3  |Curso Pyspark Bolivia         |
+---+------------------------------+



In [0]:
from pyspark.ml.feature import StopWordsRemover

tk = RegexTokenizer(pattern=r'(?:\p{Punct}|\s)+', inputCol="text",outputCol='text2')
sw = StopWordsRemover(inputCol='text2', outputCol='text3')
# Creamo nuestro Pipline
pipeline = Pipeline(stages=[tk, sw])
df4 = pipeline.fit(df).transform(df)
df4.show(truncate=False)

+---+------------------------------+----------------------------------+-------------------------+
|id |text                          |text2                             |text3                    |
+---+------------------------------+----------------------------------+-------------------------+
|1  |a Fox jumped, over, the fence?|[a, fox, jumped, over, the, fence]|[fox, jumped, fence]     |
|2  |a RED, of fox?                |[a, red, of, fox]                 |[red, fox]               |
|3  |Curso Pyspark Bolivia         |[curso, pyspark, bolivia]         |[curso, pyspark, bolivia]|
+---+------------------------------+----------------------------------+-------------------------+



In [0]:
features = [('alex', 1), ('jans', 3), ('ali', 6), ('bruno', 10)]
columns = ("nombre", "edad")
samples = spark.createDataFrame(features, columns)
samples.show()

+------+----+
|nombre|edad|
+------+----+
|  alex|   1|
|  jans|   3|
|   ali|   6|
| bruno|  10|
+------+----+



In [0]:
from pyspark.sql.functions import stddev, mean, col

(samples.select(mean("edad").alias("mean_edad"),
                stddev("edad").alias("stddev_edad")).crossJoin(samples).withColumn("edad_scaled",
                                                                                 (col("edad") - col("mean_edad")) / col("stddev_edad"))) .show(truncate=False)

+---------+------------------+------+----+-------------------+
|mean_edad|stddev_edad       |nombre|edad|edad_scaled        |
+---------+------------------+------+----+-------------------+
|5.0      |3.9157800414902435|alex  |1   |-1.0215078369104984|
|5.0      |3.9157800414902435|jans  |3   |-0.5107539184552492|
|5.0      |3.9157800414902435|ali   |6   |0.2553769592276246 |
|5.0      |3.9157800414902435|bruno |10  |1.276884796138123  |
+---------+------------------+------+----+-------------------+



In [0]:
mean_age, sttdev_age = samples.select(mean("edad"), stddev("edad")).first()
samples.withColumn("edad_scaled",(col("edad") - mean_age) / sttdev_age).show(truncate=False)

+------+----+-------------------+
|nombre|edad|edad_scaled        |
+------+----+-------------------+
|alex  |1   |-1.0215078369104984|
|jans  |3   |-0.5107539184552492|
|ali   |6   |0.2553769592276246 |
|bruno |10  |1.276884796138123  |
+------+----+-------------------+



In [0]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler
vecAssembler = VectorAssembler(inputCols=['edad'], outputCol="edad_vector")
samples2 = vecAssembler.transform(samples)
samples2.show()

+------+----+-----------+
|nombre|edad|edad_vector|
+------+----+-----------+
|  alex|   1|      [1.0]|
|  jans|   3|      [3.0]|
|   ali|   6|      [6.0]|
| bruno|  10|     [10.0]|
+------+----+-----------+



In [0]:
scaler = StandardScaler(inputCol="edad_vector", outputCol="edad_scaled",withStd=True, withMean=True)
scalerModel = scaler.fit(samples2)
scaledData = scalerModel.transform(samples2)
scaledData.show(truncate=False)

+------+----+-----------+---------------------+
|nombre|edad|edad_vector|edad_scaled          |
+------+----+-----------+---------------------+
|alex  |1   |[1.0]      |[-1.0215078369104984]|
|jans  |3   |[3.0]      |[-0.5107539184552492]|
|ali   |6   |[6.0]      |[0.2553769592276246] |
|bruno |10  |[10.0]     |[1.276884796138123]  |
+------+----+-----------+---------------------+



In [0]:
df = spark.createDataFrame([ (100, 77560, 45),(200, 41560, 23),(300, 30285, 20),
                            (400, 10345, 6),(500, 88000, 50)], 
                           ["user_id", "revenue","num_days"])
print("Antes de Scaling :")
df.show()

Antes de Scaling :
+-------+-------+--------+
|user_id|revenue|num_days|
+-------+-------+--------+
|    100|  77560|      45|
|    200|  41560|      23|
|    300|  30285|      20|
|    400|  10345|       6|
|    500|  88000|      50|
+-------+-------+--------+



In [0]:
from pyspark.ml.feature import MinMaxScaler # Para la normalización
from pyspark.ml.feature import VectorAssembler#  convierte las variables en vector
from pyspark.ml import Pipeline # Entuva los procesos
from pyspark.sql.functions import udf # Función definiada por el usuario
from pyspark.sql.types import DoubleType

# UDF para convertir el tipo de columna de vector a tipo doble
unlist = udf(lambda x: round(float(list(x)[0]),3), DoubleType())

In [0]:
# Iterando sobre las columnas a escalar
for i in ["revenue","num_days"]:
    # Transformación VectorAssembler: conversión de columna a tipo vectorial
    assembler = VectorAssembler(inputCols=[i],outputCol=i+"_Vect")
    # MinMaxScaler transformation
    scaler = MinMaxScaler(inputCol=i+"_Vect", outputCol=i+"_Scaled")
    # Pipeline y VectorAssembler y MinMaxScaler
    pipeline = Pipeline(stages=[assembler, scaler])
    # Fitting pipeline on DataFrame
df = pipeline.fit(df).transform(df).withColumn(i+"_Scaled", unlist(i+"_Scaled")).drop(i+"_Vect")
    

df.show(5)

+-------+-------+--------+---------------+
|user_id|revenue|num_days|num_days_Scaled|
+-------+-------+--------+---------------+
|    100|  77560|      45|          0.886|
|    200|  41560|      23|          0.386|
|    300|  30285|      20|          0.318|
|    400|  10345|       6|            0.0|
|    500|  88000|      50|            1.0|
+-------+-------+--------+---------------+



In [0]:
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
triplets = [(0, 1, 100), (1, 2, 200), (2, 5, 1000)]
df = spark.createDataFrame(triplets, ['x', 'y', 'z'])
df.show()

+---+---+----+
|  x|  y|   z|
+---+---+----+
|  0|  1| 100|
|  1|  2| 200|
|  2|  5|1000|
+---+---+----+



In [0]:
assembler = VectorAssembler(inputCols=["x"], outputCol="x_vector")
scaler = MinMaxScaler(inputCol="x_vector", outputCol="x_scaled")
pipeline = Pipeline(stages=[assembler, scaler])
scalerModel = pipeline.fit(df)
scaledData = scalerModel.transform(df)
scaledData.show(truncate=False)

+---+---+----+--------+--------+
|x  |y  |z   |x_vector|x_scaled|
+---+---+----+--------+--------+
|0  |1  |100 |[0.0]   |[0.0]   |
|1  |2  |200 |[1.0]   |[0.5]   |
|2  |5  |1000|[2.0]   |[1.0]   |
+---+---+----+--------+--------+



In [0]:
triplets = [(0, 1, 100), (1, 2, 200), (2, 5, 1000)]
df = spark.createDataFrame(triplets, ['x', 'y', 'z'])
df.show()

+---+---+----+
|  x|  y|   z|
+---+---+----+
|  0|  1| 100|
|  1|  2| 200|
|  2|  5|1000|
+---+---+----+



In [0]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import MinMaxScaler

columns_to_scale = ["x", "y", "z"]
assemblers = [VectorAssembler(inputCols=[col],outputCol=col + "_vector") for col in columns_to_scale]
scalers = [MinMaxScaler(inputCol=col + "_vector",outputCol=col + "_scaled") for col in columns_to_scale]
pipeline = Pipeline(stages=assemblers + scalers)
scalerModel = pipeline.fit(df)
scaledData = scalerModel.transform(df)
scaledData.show(truncate=False)

+---+---+----+--------+--------+--------+--------+--------+--------------------+
|x  |y  |z   |x_vector|y_vector|z_vector|x_scaled|y_scaled|z_scaled            |
+---+---+----+--------+--------+--------+--------+--------+--------------------+
|0  |1  |100 |[0.0]   |[1.0]   |[100.0] |[0.0]   |[0.0]   |[0.0]               |
|1  |2  |200 |[1.0]   |[2.0]   |[200.0] |[0.5]   |[0.25]  |[0.1111111111111111]|
|2  |5  |1000|[2.0]   |[5.0]   |[1000.0]|[1.0]   |[1.0]   |[1.0]               |
+---+---+----+--------+--------+--------+--------+--------+--------------------+



In [0]:
from pyspark.sql import functions as f

names = {x + "_scaled": x for x in columns_to_scale}
scaledData = scaledData.select([f.col(c).alias(names[c]) for c in names.keys()])
scaledData.show()

+-----+------+--------------------+
|    x|     y|                   z|
+-----+------+--------------------+
|[0.0]| [0.0]|               [0.0]|
|[0.5]|[0.25]|[0.1111111111111111]|
|[1.0]| [1.0]|               [1.0]|
+-----+------+--------------------+



In [0]:
from pyspark.ml.feature import Normalizer
from pyspark.ml.linalg import Vectors

data = spark.createDataFrame([
    (0, Vectors.dense([1.0, 0.5, -1.0]),),
    (1, Vectors.dense([2.0, 1.0, 1.0]),),
    (2, Vectors.dense([4.0, 10.0, 2.0]),)
], ["id", "features"])
data.show()

+---+--------------+
| id|      features|
+---+--------------+
|  0|[1.0,0.5,-1.0]|
|  1| [2.0,1.0,1.0]|
|  2|[4.0,10.0,2.0]|
+---+--------------+



In [0]:
# Normalize each Vector using $L^1$ norm.
normalizer = Normalizer(inputCol="features", outputCol="normFeatures", p=1.0)
l1NormData = normalizer.transform(data)
print("Normalized usando la norma L^1")
l1NormData.show()

Normalized usando la norma L^1
+---+--------------+------------------+
| id|      features|      normFeatures|
+---+--------------+------------------+
|  0|[1.0,0.5,-1.0]|    [0.4,0.2,-0.4]|
|  1| [2.0,1.0,1.0]|   [0.5,0.25,0.25]|
|  2|[4.0,10.0,2.0]|[0.25,0.625,0.125]|
+---+--------------+------------------+



In [0]:
# Normalize each Vector using $L^2$ norm.
normalizer = Normalizer(inputCol="features", outputCol="norma2", p=2.0)
l2NormData = normalizer.transform(data)
print("Normalizando usando la norma L^2")
l2NormData.show()

Normalizando usando la norma L^2
+---+--------------+--------------------+
| id|      features|              norma2|
+---+--------------+--------------------+
|  0|[1.0,0.5,-1.0]|[0.66666666666666...|
|  1| [2.0,1.0,1.0]|[0.81649658092772...|
|  2|[4.0,10.0,2.0]|[0.36514837167011...|
+---+--------------+--------------------+



In [0]:
# Normalize each Vector using $L^\infty$ norm.
lInfNormData = normalizer.transform(data, {normalizer.p: float("inf")})
print("Normalizando usando la norma L^inf")
lInfNormData.show()

Normalizando usando la norma L^inf
+---+--------------+--------------+
| id|      features|        norma2|
+---+--------------+--------------+
|  0|[1.0,0.5,-1.0]|[1.0,0.5,-1.0]|
|  1| [2.0,1.0,1.0]| [1.0,0.5,0.5]|
|  2|[4.0,10.0,2.0]| [0.4,1.0,0.2]|
+---+--------------+--------------+



In [0]:
from pyspark.ml.feature import Normalizer
# Creando un object de la class Normalizer
ManhattanDistance=Normalizer().setP(1).setInputCol("features").setOutputCol("Manhattan Distance")
EuclideanDistance=Normalizer().setP(2).setInputCol("features").setOutputCol("Euclidean Distance")

In [0]:
df = spark.createDataFrame(
    [(1111111,20151122045510, "Yin","gre"), (1111111,20151122045501, "Yin","gre"), (1111111,20151122045500, "Yln","gra")
     , (1111112,20151122065832, "Yun","ddd"), (1111113,20160101003221, "Yan","fdf"), (1111111,20160703045231, "Yin","gre"),
    (1111114,20150419134543, "Yin","fdf"), (1111115,20151123174302, "Yen","ddd"),(2111115, 20123192, "Yen","gre")],
    ["address", "date","name","food"])
df.show()

+-------+--------------+----+----+
|address|          date|name|food|
+-------+--------------+----+----+
|1111111|20151122045510| Yin| gre|
|1111111|20151122045501| Yin| gre|
|1111111|20151122045500| Yln| gra|
|1111112|20151122065832| Yun| ddd|
|1111113|20160101003221| Yan| fdf|
|1111111|20160703045231| Yin| gre|
|1111114|20150419134543| Yin| fdf|
|1111115|20151123174302| Yen| ddd|
|2111115|      20123192| Yen| gre|
+-------+--------------+----+----+



In [0]:
from pyspark.ml.feature import StringIndexer
indexer = StringIndexer(inputCol="name", outputCol="name_index").fit(df)
df_ind = indexer.transform(df)
df_ind.show()

+-------+--------------+----+----+----------+
|address|          date|name|food|name_index|
+-------+--------------+----+----+----------+
|1111111|20151122045510| Yin| gre|       0.0|
|1111111|20151122045501| Yin| gre|       0.0|
|1111111|20151122045500| Yln| gra|       3.0|
|1111112|20151122065832| Yun| ddd|       4.0|
|1111113|20160101003221| Yan| fdf|       2.0|
|1111111|20160703045231| Yin| gre|       0.0|
|1111114|20150419134543| Yin| fdf|       0.0|
|1111115|20151123174302| Yen| ddd|       1.0|
|2111115|      20123192| Yen| gre|       1.0|
+-------+--------------+----+----+----------+



In [0]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer

indexers = [ StringIndexer(inputCol=column, outputCol=column+"_index").fit(df)
            for column in list(set(df.columns)-set(['date'])) ]
pipeline = Pipeline(stages=indexers)
df_indexed = pipeline.fit(df).transform(df)

df_indexed.show()

+-------+--------------+----+----+----------+----------+-------------+
|address|          date|name|food|food_index|name_index|address_index|
+-------+--------------+----+----+----------+----------+-------------+
|1111111|20151122045510| Yin| gre|       0.0|       0.0|          0.0|
|1111111|20151122045501| Yin| gre|       0.0|       0.0|          0.0|
|1111111|20151122045500| Yln| gra|       3.0|       3.0|          0.0|
|1111112|20151122065832| Yun| ddd|       1.0|       4.0|          1.0|
|1111113|20160101003221| Yan| fdf|       2.0|       2.0|          2.0|
|1111111|20160703045231| Yin| gre|       0.0|       0.0|          0.0|
|1111114|20150419134543| Yin| fdf|       2.0|       0.0|          3.0|
|1111115|20151123174302| Yen| ddd|       1.0|       1.0|          4.0|
|2111115|      20123192| Yen| gre|       0.0|       1.0|          5.0|
+-------+--------------+----+----+----------+----------+-------------+



In [0]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
dataset = spark.createDataFrame(
    [(0, 18, 1.0, Vectors.dense([0.0, 10.0, 0.5]), 1.0),(1, 20, 1.0, Vectors.dense([0.0, 15.0, 0.2]), 5.0),
    (1, 15, 3.0, Vectors.dense([0.0, 11.0, 0.8]), 7.0), (0, 10, 5.0, Vectors.dense([0.3, 11.0, 0.6]), 4.0),
    (5, 3, 5.0, Vectors.dense([0.7, 19.0, 0.1]), 3.3)],
    ["id", "hour", "mobile", "userFeatures", "clicked"])
dataset.show()

+---+----+------+--------------+-------+
| id|hour|mobile|  userFeatures|clicked|
+---+----+------+--------------+-------+
|  0|  18|   1.0|[0.0,10.0,0.5]|    1.0|
|  1|  20|   1.0|[0.0,15.0,0.2]|    5.0|
|  1|  15|   3.0|[0.0,11.0,0.8]|    7.0|
|  0|  10|   5.0|[0.3,11.0,0.6]|    4.0|
|  5|   3|   5.0|[0.7,19.0,0.1]|    3.3|
+---+----+------+--------------+-------+



In [0]:
assembler = VectorAssembler(
    inputCols=["hour", "mobile", "userFeatures"],
    outputCol="vector")

output = assembler.transform(dataset)
print("Assembled columns 'hour', 'mobile', 'userFeatures' to vector column 'features'")
output.select("vector", "clicked").show(truncate=False)

Assembled columns 'hour', 'mobile', 'userFeatures' to vector column 'features'
+-----------------------+-------+
|vector                 |clicked|
+-----------------------+-------+
|[18.0,1.0,0.0,10.0,0.5]|1.0    |
|[20.0,1.0,0.0,15.0,0.2]|5.0    |
|[15.0,3.0,0.0,11.0,0.8]|7.0    |
|[10.0,5.0,0.3,11.0,0.6]|4.0    |
|[3.0,5.0,0.7,19.0,0.1] |3.3    |
+-----------------------+-------+



In [0]:
# Creamos el vector
assembler2 = VectorAssembler(inputCols=["hour", "mobile", "userFeatures"], outputCol="features").setParams(handleInvalid="skip")# omita los valore nulos
assembler2.transform(dataset).show()

+---+----+------+--------------+-------+--------------------+
| id|hour|mobile|  userFeatures|clicked|            features|
+---+----+------+--------------+-------+--------------------+
|  0|  18|   1.0|[0.0,10.0,0.5]|    1.0|[18.0,1.0,0.0,10....|
|  1|  20|   1.0|[0.0,15.0,0.2]|    5.0|[20.0,1.0,0.0,15....|
|  1|  15|   3.0|[0.0,11.0,0.8]|    7.0|[15.0,3.0,0.0,11....|
|  0|  10|   5.0|[0.3,11.0,0.6]|    4.0|[10.0,5.0,0.3,11....|
|  5|   3|   5.0|[0.7,19.0,0.1]|    3.3|[3.0,5.0,0.7,19.0...|
+---+----+------+--------------+-------+--------------------+



In [0]:
data = [('A', -99.99), ('B', -0.5), ('C', -0.3),('D', 0.0), ('E', 0.7), ('F', 99.99)]

dataframe = spark.createDataFrame(data, ["id", "features"])
dataframe.show()     

+---+--------+
| id|features|
+---+--------+
|  A|  -99.99|
|  B|    -0.5|
|  C|    -0.3|
|  D|     0.0|
|  E|     0.7|
|  F|   99.99|
+---+--------+



In [0]:
from pyspark.ml.feature import Bucketizer

splits = [-float("inf"), -0.5, 0.0, 0.5, float("inf")]
bucketizer = Bucketizer(splits=splits, inputCol="features", outputCol="bucketedFeatures")
# Transform original data into its bucket index.
bucketedData = bucketizer.transform(dataframe)
print("Bucketizer output with %d buckets" % (len(bucketizer.getSplits()) - 1))
bucketedData.show()

Bucketizer output with 4 buckets
+---+--------+----------------+
| id|features|bucketedFeatures|
+---+--------+----------------+
|  A|  -99.99|             0.0|
|  B|    -0.5|             1.0|
|  C|    -0.3|             1.0|
|  D|     0.0|             2.0|
|  E|     0.7|             3.0|
|  F|   99.99|             3.0|
+---+--------+----------------+



In [0]:
from pyspark.ml.feature import Bucketizer
from pyspark.ml.feature import QuantileDiscretizer

In [0]:
data = [(0, 18.0), (1, 19.0), (2, 8.0), (3, 5.0), (4, 2.2)]
df = spark.createDataFrame(data, ["id", "hour"])
print(df.show())

+---+----+
| id|hour|
+---+----+
|  0|18.0|
|  1|19.0|
|  2| 8.0|
|  3| 5.0|
|  4| 2.2|
+---+----+

None


In [0]:
qds = QuantileDiscretizer(numBuckets=5, inputCol="hour",outputCol="buckets", relativeError=0.01, handleInvalid="error")
bucketizer = qds.fit(df)
bucketizer.setHandleInvalid("skip").transform(df).show()

+---+----+-------+
| id|hour|buckets|
+---+----+-------+
|  0|18.0|    4.0|
|  1|19.0|    4.0|
|  2| 8.0|    3.0|
|  3| 5.0|    2.0|
|  4| 2.2|    1.0|
+---+----+-------+



In [0]:
data = [('gene1', 1.2), ('gene2', 3.4), ('gene1', 3.5), ('gene2', 12.6)]
df = spark.createDataFrame(data, ["gene", "value"])
df.show()

+-----+-----+
| gene|value|
+-----+-----+
|gene1|  1.2|
|gene2|  3.4|
|gene1|  3.5|
|gene2| 12.6|
+-----+-----+



In [0]:
from pyspark.sql.functions import log
df.withColumn("base-10", log(10.0, df.value)).withColumn("base-e", log(df.value)).show()

+-----+-----+------------------+------------------+
| gene|value|           base-10|            base-e|
+-----+-----+------------------+------------------+
|gene1|  1.2|0.0791812460476248|0.1823215567939546|
|gene2|  3.4| 0.531478917042255|1.2237754316221157|
|gene1|  3.5|0.5440680443502756| 1.252762968495368|
|gene2| 12.6|1.1003705451175627| 2.533696813957432|
+-----+-----+------------------+------------------+



In [0]:
from pyspark.sql.types import *

schema = StructType().add("id","integer").add("safety_level","string").add("engine_type","string")
schema
#StructType(list(StructField(id,IntegerType,True),StructField(safety_level,StringType,True),StructField(engine_type,StringType,True)))

data = [(1,'Very-Low','v4'),(2,'Very-Low','v6'),(3,'Low','v6'),(4,'Low','v6'),(5,'Medium','v4'),
        (6,'High','v6'),(7,'High','v6'),(8,'Very-High','v4'),(9,'Very-High','v6')]

df = spark.createDataFrame(data, schema=schema)
df.show(truncate=False)

+---+------------+-----------+
|id |safety_level|engine_type|
+---+------------+-----------+
|1  |Very-Low    |v4         |
|2  |Very-Low    |v6         |
|3  |Low         |v6         |
|4  |Low         |v6         |
|5  |Medium      |v4         |
|6  |High        |v6         |
|7  |High        |v6         |
|8  |Very-High   |v4         |
|9  |Very-High   |v6         |
+---+------------+-----------+



In [0]:
df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- safety_level: string (nullable = true)
 |-- engine_type: string (nullable = true)



In [0]:
from pyspark.ml.feature import StringIndexer

safety_level_indexer = StringIndexer(inputCol="safety_level",outputCol="safety_level_index")
df1 = safety_level_indexer.fit(df).transform(df)
df1.show()

+---+------------+-----------+------------------+
| id|safety_level|engine_type|safety_level_index|
+---+------------+-----------+------------------+
|  1|    Very-Low|         v4|               3.0|
|  2|    Very-Low|         v6|               3.0|
|  3|         Low|         v6|               1.0|
|  4|         Low|         v6|               1.0|
|  5|      Medium|         v4|               4.0|
|  6|        High|         v6|               0.0|
|  7|        High|         v6|               0.0|
|  8|   Very-High|         v4|               2.0|
|  9|   Very-High|         v6|               2.0|
+---+------------+-----------+------------------+



In [0]:
engine_type_indexer = StringIndexer(inputCol="engine_type",outputCol="engine_type_index")
df2 = engine_type_indexer.fit(df).transform(df)
df2.show()

+---+------------+-----------+-----------------+
| id|safety_level|engine_type|engine_type_index|
+---+------------+-----------+-----------------+
|  1|    Very-Low|         v4|              1.0|
|  2|    Very-Low|         v6|              0.0|
|  3|         Low|         v6|              0.0|
|  4|         Low|         v6|              0.0|
|  5|      Medium|         v4|              1.0|
|  6|        High|         v6|              0.0|
|  7|        High|         v6|              0.0|
|  8|   Very-High|         v4|              1.0|
|  9|   Very-High|         v6|              0.0|
+---+------------+-----------+-----------------+



In [0]:
from pyspark.ml.feature import OneHotEncoder
onehotencoder_safety_level = OneHotEncoder(inputCol="safety_level_index",outputCol="safety_level_vector")
#                                Ajusta     Transforma 
df11 = onehotencoder_safety_level.fit(df1).transform(df1)
df11.show(truncate=False)

+---+------------+-----------+------------------+-------------------+
|id |safety_level|engine_type|safety_level_index|safety_level_vector|
+---+------------+-----------+------------------+-------------------+
|1  |Very-Low    |v4         |3.0               |(4,[3],[1.0])      |
|2  |Very-Low    |v6         |3.0               |(4,[3],[1.0])      |
|3  |Low         |v6         |1.0               |(4,[1],[1.0])      |
|4  |Low         |v6         |1.0               |(4,[1],[1.0])      |
|5  |Medium      |v4         |4.0               |(4,[],[])          |
|6  |High        |v6         |0.0               |(4,[0],[1.0])      |
|7  |High        |v6         |0.0               |(4,[0],[1.0])      |
|8  |Very-High   |v4         |2.0               |(4,[2],[1.0])      |
|9  |Very-High   |v6         |2.0               |(4,[2],[1.0])      |
+---+------------+-----------+------------------+-------------------+



In [0]:
onehotencoder_engine_type = OneHotEncoder(inputCol="engine_type_index",outputCol="engine_type_vector")
df12 = onehotencoder_engine_type.fit(df2).transform(df2)
df12.show(truncate=False)

+---+------------+-----------+-----------------+------------------+
|id |safety_level|engine_type|engine_type_index|engine_type_vector|
+---+------------+-----------+-----------------+------------------+
|1  |Very-Low    |v4         |1.0              |(1,[],[])         |
|2  |Very-Low    |v6         |0.0              |(1,[0],[1.0])     |
|3  |Low         |v6         |0.0              |(1,[0],[1.0])     |
|4  |Low         |v6         |0.0              |(1,[0],[1.0])     |
|5  |Medium      |v4         |1.0              |(1,[],[])         |
|6  |High        |v6         |0.0              |(1,[0],[1.0])     |
|7  |High        |v6         |0.0              |(1,[0],[1.0])     |
|8  |Very-High   |v4         |1.0              |(1,[],[])         |
|9  |Very-High   |v6         |0.0              |(1,[0],[1.0])     |
+---+------------+-----------+-----------------+------------------+



In [0]:
indexers = [StringIndexer(inputCol=column, outputCol=column+"_index").fit(df) for column in list(set(df.columns)-set(['id'])) ]

from pyspark.ml import Pipeline

pipeline = Pipeline(stages=indexers)
df_indexed = pipeline.fit(df).transform(df)
df_indexed.show()

+---+------------+-----------+------------------+-----------------+
| id|safety_level|engine_type|safety_level_index|engine_type_index|
+---+------------+-----------+------------------+-----------------+
|  1|    Very-Low|         v4|               3.0|              1.0|
|  2|    Very-Low|         v6|               3.0|              0.0|
|  3|         Low|         v6|               1.0|              0.0|
|  4|         Low|         v6|               1.0|              0.0|
|  5|      Medium|         v4|               4.0|              1.0|
|  6|        High|         v6|               0.0|              0.0|
|  7|        High|         v6|               0.0|              0.0|
|  8|   Very-High|         v4|               2.0|              1.0|
|  9|   Very-High|         v6|               2.0|              0.0|
+---+------------+-----------+------------------+-----------------+



In [0]:
encoder = OneHotEncoder(inputCols=[indexer.getOutputCol() for indexer in indexers],
                        outputCols=["{0}_encoded".format(indexer.getOutputCol()) for indexer in indexers])

from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(inputCols=encoder.getOutputCols(),outputCol="features")

pipeline = Pipeline(stages=indexers + [encoder, assembler])

pipeline.fit(df).transform(df).show()

+---+------------+-----------+------------------+-----------------+--------------------------+-------------------------+-------------------+
| id|safety_level|engine_type|safety_level_index|engine_type_index|safety_level_index_encoded|engine_type_index_encoded|           features|
+---+------------+-----------+------------------+-----------------+--------------------------+-------------------------+-------------------+
|  1|    Very-Low|         v4|               3.0|              1.0|             (4,[3],[1.0])|                (1,[],[])|      (5,[3],[1.0])|
|  2|    Very-Low|         v6|               3.0|              0.0|             (4,[3],[1.0])|            (1,[0],[1.0])|(5,[3,4],[1.0,1.0])|
|  3|         Low|         v6|               1.0|              0.0|             (4,[1],[1.0])|            (1,[0],[1.0])|(5,[1,4],[1.0,1.0])|
|  4|         Low|         v6|               1.0|              0.0|             (4,[1],[1.0])|            (1,[0],[1.0])|(5,[1,4],[1.0,1.0])|
|  5|      Me

In [0]:
safety_level_indexer = StringIndexer(inputCol="safety_level",outputCol="safety_level_index")
engine_type_indexer = StringIndexer(inputCol="engine_type", outputCol="engine_type_index")
onehotencoder_safety_level = OneHotEncoder(inputCol="safety_level_index",outputCol="safety_level_vector")
onehotencoder_engine_type = OneHotEncoder(inputCol="engine_type_index",outputCol="engine_type_vector")

In [0]:
pipeline = Pipeline(stages=[safety_level_indexer,
                            engine_type_indexer, onehotencoder_safety_level, onehotencoder_engine_type])

df_transformed = pipeline.fit(df).transform(df)
df_transformed.show(truncate=False)

+---+------------+-----------+------------------+-----------------+-------------------+------------------+
|id |safety_level|engine_type|safety_level_index|engine_type_index|safety_level_vector|engine_type_vector|
+---+------------+-----------+------------------+-----------------+-------------------+------------------+
|1  |Very-Low    |v4         |3.0               |1.0              |(4,[3],[1.0])      |(1,[],[])         |
|2  |Very-Low    |v6         |3.0               |0.0              |(4,[3],[1.0])      |(1,[0],[1.0])     |
|3  |Low         |v6         |1.0               |0.0              |(4,[1],[1.0])      |(1,[0],[1.0])     |
|4  |Low         |v6         |1.0               |0.0              |(4,[1],[1.0])      |(1,[0],[1.0])     |
|5  |Medium      |v4         |4.0               |1.0              |(4,[],[])          |(1,[],[])         |
|6  |High        |v6         |0.0               |0.0              |(4,[0],[1.0])      |(1,[0],[1.0])     |
|7  |High        |v6         |0.0    

In [0]:
documents = spark.createDataFrame([("doc1", "Ada Ada Spark Spark Spark"),("doc2", "Ada SQL")],["id", "document"])
documents.show()

+----+--------------------+
|  id|            document|
+----+--------------------+
|doc1|Ada Ada Spark Spa...|
|doc2|             Ada SQL|
+----+--------------------+



In [0]:
TF(Ada, doc1) = 2
TF(Spark, doc1) = 3
TF(Ada, doc2) = 1
TF(SQL, doc2) = 1
DF(Ada, D) = 2
DF(Spark, D) = 1
DF(SQL, D) = 1

[0;36m  File [0;32m<command-3848327506660287>:1[0;36m[0m
[0;31m    TF(Ada, doc1) = 2[0m
[0m    ^[0m
[0;31mSyntaxError[0m[0;31m:[0m cannot assign to function call


In [0]:
IDF(Ada, D) = log ( (|D|+1) / (DF(t,D)+1) )
= log ( (2+1) / (DF(Ada, D)+1) )
= log ( 3 / (2+1)) = log(1)
= 0.00

IDF(Spark, D) = log ( (|D|+1) / (DF(t,D)+1) )
= log ( (2+1) / (DF(Spark, D)+1) )
= log ( 3 / (1+1) )
= log (1.5)
= 0.40546510811

TF-IDF(Ada, doc1, D) = TF(Ada, doc1) x IDF(Ada, D)
= 2 x 0.0
= 0.0

TF-IDF(Spark, doc1, D) = TF(Spark, doc1) x IDF(Spark, D)
= 3 x 0.40546510811
= 1.21639532433

[0;36m  File [0;32m<command-3848327506660288>:1[0;36m[0m
[0;31m    IDF(Ada, D) = log ( (|D|+1) / (DF(t,D)+1) )[0m
[0m    ^[0m
[0;31mSyntaxError[0m[0;31m:[0m cannot assign to function call


In [0]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer

sentences = spark.createDataFrame([ (0.0, "we heard about Spark and Java"),(0.0, "Does Java use case classes"),
                                   (1.0, "fox jumped over fence"),(1.0, "red fox jumped over")], ["label", "text"])

sentences.show(truncate=False)

+-----+-----------------------------+
|label|text                         |
+-----+-----------------------------+
|0.0  |we heard about Spark and Java|
|0.0  |Does Java use case classes   |
|1.0  |fox jumped over fence        |
|1.0  |red fox jumped over          |
+-----+-----------------------------+



In [0]:
tokenizer = Tokenizer(inputCol="text", outputCol="words")
words_data = tokenizer.transform(sentences)
words_data.show(truncate=False)

+-----+-----------------------------+------------------------------------+
|label|text                         |words                               |
+-----+-----------------------------+------------------------------------+
|0.0  |we heard about Spark and Java|[we, heard, about, spark, and, java]|
|0.0  |Does Java use case classes   |[does, java, use, case, classes]    |
|1.0  |fox jumped over fence        |[fox, jumped, over, fence]          |
|1.0  |red fox jumped over          |[red, fox, jumped, over]            |
+-----+-----------------------------+------------------------------------+



In [0]:
hashingTF = HashingTF(inputCol="words", outputCol="raw_features",numFeatures=16)
featurized_data = hashingTF.transform(words_data)
featurized_data.select("label", "raw_features").show(truncate=False)

+-----+-----------------------------------------------+
|label|raw_features                                   |
+-----+-----------------------------------------------+
|0.0  |(16,[1,4,6,11,12,15],[1.0,1.0,1.0,1.0,1.0,1.0])|
|0.0  |(16,[2,6,11,13,15],[1.0,1.0,1.0,1.0,1.0])      |
|1.0  |(16,[0,1,6,8],[1.0,1.0,1.0,1.0])               |
|1.0  |(16,[1,4,6,8],[1.0,1.0,1.0,1.0])               |
+-----+-----------------------------------------------+



In [0]:
idf = IDF(inputCol="raw_features", outputCol="features")
idf_model = idf.fit(featurized_data)
rescaled_data = idf_model.transform(featurized_data)
rescaled_data.select("label", "features").show(truncate=False)

+-----+---------------------------------------------------------------------------------------------------------------------------+
|label|features                                                                                                                   |
+-----+---------------------------------------------------------------------------------------------------------------------------+
|0.0  |(16,[1,4,6,11,12,15],[0.22314355131420976,0.5108256237659907,0.0,0.5108256237659907,0.9162907318741551,0.5108256237659907])|
|0.0  |(16,[2,6,11,13,15],[0.9162907318741551,0.0,0.5108256237659907,0.9162907318741551,0.5108256237659907])                      |
|1.0  |(16,[0,1,6,8],[0.9162907318741551,0.22314355131420976,0.0,0.5108256237659907])                                             |
|1.0  |(16,[1,4,6,8],[0.22314355131420976,0.5108256237659907,0.0,0.5108256237659907])                                             |
+-----+---------------------------------------------------------------------

In [0]:
df = spark.createDataFrame([(0, ["a", "b", "c"]), (1, ["a", "b", "b", "c", "a"])], ["label", "raw"] )
df.show()

+-----+---------------+
|label|            raw|
+-----+---------------+
|    0|      [a, b, c]|
|    1|[a, b, b, c, a]|
+-----+---------------+



In [0]:
from pyspark.ml.feature import CountVectorizer
cv = CountVectorizer().setInputCol("raw").setOutputCol("features")
model = cv.fit(df)
transformed = model.transform(df)
transformed.show(truncate=False)

+-----+---------------+-------------------------+
|label|raw            |features                 |
+-----+---------------+-------------------------+
|0    |[a, b, c]      |(3,[0,1,2],[1.0,1.0,1.0])|
|1    |[a, b, b, c, a]|(3,[0,1,2],[2.0,2.0,1.0])|
+-----+---------------+-------------------------+



In [0]:
hashing_TF = HashingTF(inputCol="raw", outputCol="features", numFeatures=128)
result = hashing_TF.transform(df)
result.show(truncate=False)

+-----+---------------+-------------------------------+
|label|raw            |features                       |
+-----+---------------+-------------------------------+
|0    |[a, b, c]      |(128,[40,99,117],[1.0,1.0,1.0])|
|1    |[a, b, b, c, a]|(128,[40,99,117],[1.0,2.0,2.0])|
+-----+---------------+-------------------------------+



In [0]:
from pyspark.ml.feature import Word2Vec

# Input data: Each row is a bag of words from a sentence or document.
documentDF = spark.createDataFrame([
    ("Hi I heard about Spark".split(" "), ),
    ("I wish Java could use case classes".split(" "), ),
    ("Logistic regression models are neat".split(" "), )
], ["text"])

documentDF.show()

+--------------------+
|                text|
+--------------------+
|[Hi, I, heard, ab...|
|[I, wish, Java, c...|
|[Logistic, regres...|
+--------------------+



In [0]:
# Learn a mapping from words to Vectors.
word2Vec = Word2Vec(vectorSize=3, minCount=0, inputCol="text", outputCol="result")
model = word2Vec.fit(documentDF)

result = model.transform(documentDF)
for row in result.collect():
    text, vector = row
    print("Text: [%s] => \nVector: %s\n" % (", ".join(text), str(vector)))

Text: [Hi, I, heard, about, Spark] => 
Vector: [0.012264367192983627,-0.06442034244537354,-0.007622340321540833]

Text: [I, wish, Java, could, use, case, classes] => 
Vector: [0.05160687722465289,0.025969027541577816,0.02736483487699713]

Text: [Logistic, regression, models, are, neat] => 
Vector: [-0.06564115285873413,0.02060299552977085,-0.08455150425434113]



In [0]:
from pyspark.ml.feature import CountVectorizer

# Input data: Each row is a bag of words with a ID.
df = spark.createDataFrame([
    (0, "a b c".split(" ")),
    (1, "a b b c a".split(" "))
], ["id", "words"])

# fit a CountVectorizerModel from the corpus.
cv = CountVectorizer(inputCol="words", outputCol="features", vocabSize=3, minDF=2.0)

model = cv.fit(df)

result = model.transform(df)
result.show(truncate=False)

+---+---------------+-------------------------+
|id |words          |features                 |
+---+---------------+-------------------------+
|0  |[a, b, c]      |(3,[0,1,2],[1.0,1.0,1.0])|
|1  |[a, b, b, c, a]|(3,[0,1,2],[2.0,2.0,1.0])|
+---+---------------+-------------------------+



In [0]:
from pyspark.ml.feature import FeatureHasher
df = spark.createDataFrame([(2.1, True, "1", "fox"), (2.1, False, "2", "gray"), (3.3, False, "2", "red"),
                            (4.4, True, "4", "fox")], ["number", "boolean", "string_number", "string"])

df.show()

+------+-------+-------------+------+
|number|boolean|string_number|string|
+------+-------+-------------+------+
|   2.1|   true|            1|   fox|
|   2.1|  false|            2|  gray|
|   3.3|  false|            2|   red|
|   4.4|   true|            4|   fox|
+------+-------+-------------+------+



In [0]:
input_columns = ["number", "boolean", "string_number", "string"]
hasher = FeatureHasher(inputCols=input_columns, outputCol="features")
#hasher.setInputCols(input_columns)
featurized = hasher.transform(df)
featurized.show(truncate=False)

+------+-------+-------------+------+--------------------------------------------------------+
|number|boolean|string_number|string|features                                                |
+------+-------+-------------+------+--------------------------------------------------------+
|2.1   |true   |1            |fox   |(262144,[102440,112150,135239,185244],[1.0,1.0,2.1,1.0])|
|2.1   |false  |2            |gray  |(262144,[43117,93531,135239,210818],[1.0,1.0,2.1,1.0])  |
|3.3   |false  |2            |red   |(262144,[93531,110541,135239,210818],[1.0,1.0,3.3,1.0]) |
|4.4   |true   |4            |fox   |(262144,[75860,102440,135239,185244],[1.0,1.0,4.4,1.0]) |
+------+-------+-------------+------+--------------------------------------------------------+



In [0]:
from pyspark.ml.feature import SQLTransformer
df = spark.createDataFrame([(10, "d1", 27000),(20, "d1", 29000),
                                (40, "d2", 31000),(50, "d2", 39000)], 
                               ["id", "dept", "salary"])

df.show()

+---+----+------+
| id|dept|salary|
+---+----+------+
| 10|  d1| 27000|
| 20|  d1| 29000|
| 40|  d2| 31000|
| 50|  d2| 39000|
+---+----+------+



In [0]:
query = "SELECT dept, SUM(salary) AS sum_of_salary FROM __THIS__ GROUP BY dept"
sqlTrans = SQLTransformer(statement=query)
sqlTrans.transform(df).show()

+----+-------------+
|dept|sum_of_salary|
+----+-------------+
|  d1|        56000|
|  d2|        70000|
+----+-------------+

