In [9]:
import os
import findspark
findspark.init(os.getenv('SPARK_HOME'))
from pyspark.sql import SparkSession
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, CountVectorizer

In [2]:
spark = SparkSession.builder.appName('nlp').getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/08/28 15:31:21 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/08/28 15:31:22 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [3]:
df = spark.createDataFrame([
                            (0, 'Hi I heard about Spark'),
                            (0, 'I wish java could use case classes'), 
                            (1, 'Logistic Regression models are neat')
                            ], ['label', 'sentence'])
df.show()

                                                                                

+-----+--------------------+
|label|            sentence|
+-----+--------------------+
|    0|Hi I heard about ...|
|    0|I wish java could...|
|    1|Logistic Regressi...|
+-----+--------------------+



In [4]:
tokenizer = Tokenizer(inputCol='sentence', outputCol='words')
df = tokenizer.transform(df)
df.show()

+-----+--------------------+--------------------+
|label|            sentence|               words|
+-----+--------------------+--------------------+
|    0|Hi I heard about ...|[hi, i, heard, ab...|
|    0|I wish java could...|[i, wish, java, c...|
|    1|Logistic Regressi...|[logistic, regres...|
+-----+--------------------+--------------------+



In [5]:
hashing_tf = HashingTF(inputCol='words', outputCol='rawFeatures')
df = hashing_tf.transform(df)
df.show()

+-----+--------------------+--------------------+--------------------+
|label|            sentence|               words|         rawFeatures|
+-----+--------------------+--------------------+--------------------+
|    0|Hi I heard about ...|[hi, i, heard, ab...|(262144,[18700,19...|
|    0|I wish java could...|[i, wish, java, c...|(262144,[19036,20...|
|    1|Logistic Regressi...|[logistic, regres...|(262144,[46243,58...|
+-----+--------------------+--------------------+--------------------+



In [6]:
idf = IDF(inputCol='rawFeatures', outputCol='features')
idf_model = idf.fit(df)
df = idf_model.transform(df)
df.show()

22/08/28 15:31:29 WARN DAGScheduler: Broadcasting large task binary with size 4.0 MiB
22/08/28 15:31:29 WARN DAGScheduler: Broadcasting large task binary with size 4.0 MiB
22/08/28 15:31:29 WARN DAGScheduler: Broadcasting large task binary with size 4.0 MiB
+-----+--------------------+--------------------+--------------------+--------------------+
|label|            sentence|               words|         rawFeatures|            features|
+-----+--------------------+--------------------+--------------------+--------------------+
|    0|Hi I heard about ...|[hi, i, heard, ab...|(262144,[18700,19...|(262144,[18700,19...|
|    0|I wish java could...|[i, wish, java, c...|(262144,[19036,20...|(262144,[19036,20...|
|    1|Logistic Regressi...|[logistic, regres...|(262144,[46243,58...|(262144,[46243,58...|
+-----+--------------------+--------------------+--------------------+--------------------+



In [7]:
df = df.select('label', 'features')
df.show()

22/08/28 15:31:50 WARN DAGScheduler: Broadcasting large task binary with size 4.0 MiB
22/08/28 15:31:50 WARN DAGScheduler: Broadcasting large task binary with size 4.0 MiB
22/08/28 15:31:50 WARN DAGScheduler: Broadcasting large task binary with size 4.0 MiB
+-----+--------------------+
|label|            features|
+-----+--------------------+
|    0|(262144,[18700,19...|
|    0|(262144,[19036,20...|
|    1|(262144,[46243,58...|
+-----+--------------------+



In [13]:
df = spark.createDataFrame([
                            (0, 'a b c'.split()),
                            (1, 'a b b c a'.split())
                            ], ['label', 'words'])
df.show()

+-----+---------------+
|label|          words|
+-----+---------------+
|    0|      [a, b, c]|
|    1|[a, b, b, c, a]|
+-----+---------------+



In [14]:
cv = CountVectorizer(inputCol='words', outputCol='features', vocabSize=3, minDF=2.0)

In [15]:
model = cv.fit(df)
result = model.transform(df)

In [17]:
result.show(truncate=False)

+-----+---------------+-------------------------+
|label|words          |features                 |
+-----+---------------+-------------------------+
|0    |[a, b, c]      |(3,[0,1,2],[1.0,1.0,1.0])|
|1    |[a, b, b, c, a]|(3,[0,1,2],[2.0,2.0,1.0])|
+-----+---------------+-------------------------+

