# Spark - Natural Language Process (NLP)

**Imports**

In [73]:
import findspark
findspark.init('/home/sedat/spark-3.3.2-bin-hadoop3')
from pyspark.sql import SparkSession
from pyspark.ml.feature import Tokenizer, RegexTokenizer, HashingTF, IDF
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType
from pyspark.ml.feature import StopWordsRemover, NGram, CountVectorizer

**Start spark session and create a dataframe**

In [3]:
spark = SparkSession.builder.appName('nlp').getOrCreate()

In [13]:
sen_df = spark.createDataFrame([(0, 'Hi, I heard about Spark'),
                               (1, 'I wish java could use case classes'),
                               (2, 'Logistic,regression,models,are,neat')], 
                               schema=['id', 'sentences'])

In [14]:
sen_df.show()

+---+--------------------+
| id|           sentences|
+---+--------------------+
|  0|Hi, I heard about...|
|  1|I wish java could...|
|  2|Logistic,regressi...|
+---+--------------------+



**Tokenizer and regex tokenizer**

In [15]:
tokenizer = Tokenizer(inputCol='sentences', outputCol='words')

In [17]:
regex_tokenizer = RegexTokenizer(inputCol='sentences', outputCol='words', pattern='\\W')

In [27]:
# user defined function (udf)
count_tokens = udf(lambda words:len(words), IntegerType())

In [56]:
tokenized = tokenizer.transform(sen_df)
tokenized.show()

+---+--------------------+--------------------+
| id|           sentences|               words|
+---+--------------------+--------------------+
|  0|Hi, I heard about...|[hi,, i, heard, a...|
|  1|I wish java could...|[i, wish, java, c...|
|  2|Logistic,regressi...|[logistic,regress...|
+---+--------------------+--------------------+



In [28]:
tokenized.withColumn('tokens', count_tokens(col('words'))).show() # something wrong in the last tokens

+---+--------------------+--------------------+------+
| id|           sentences|               words|tokens|
+---+--------------------+--------------------+------+
|  0|Hi, I heard about...|[hi,, i, heard, a...|     5|
|  1|I wish java could...|[i, wish, java, c...|     7|
|  2|Logistic,regressi...|[logistic,regress...|     1|
+---+--------------------+--------------------+------+



In [29]:
rg_tokenized = regex_tokenizer.transform(sen_df)

In [30]:
rg_tokenized.withColumn('tokens', count_tokens(col('words'))).show()

+---+--------------------+--------------------+------+
| id|           sentences|               words|tokens|
+---+--------------------+--------------------+------+
|  0|Hi, I heard about...|[hi, i, heard, ab...|     5|
|  1|I wish java could...|[i, wish, java, c...|     7|
|  2|Logistic,regressi...|[logistic, regres...|     5|
+---+--------------------+--------------------+------+



**Stop words**

In [36]:
sentence_dataframe = spark.createDataFrame([(0, ['I', 'saw', 'the', 'green', 'horse']),
                                            (1, ['Marry', 'had', 'a', 'little', 'lamb'])], schema=['id', 'sentences'])

In [37]:
sentence_dataframe.show()

+---+--------------------+
| id|           sentences|
+---+--------------------+
|  0|[I, saw, the, gre...|
|  1|[Marry, had, a, l...|
+---+--------------------+



In [38]:
remover = StopWordsRemover(inputCol='sentences', outputCol='filtered')

In [40]:
remover.transform(sentence_dataframe).show()

+---+--------------------+--------------------+
| id|           sentences|            filtered|
+---+--------------------+--------------------+
|  0|[I, saw, the, gre...| [saw, green, horse]|
|  1|[Marry, had, a, l...|[Marry, little, l...|
+---+--------------------+--------------------+



**NGrams (n=2)**

In [43]:
word_dataframe = spark.createDataFrame([(0, ['Hi', 'I', 'heard', 'about', 'Spark']),
                               (1, ['I', 'wish', 'java', 'could', 'use', 'case', 'classes']),
                               (2, ['Logistic', 'regression', 'models', 'are' ,'neat'])], 
                               schema=['id', 'words'])
word_dataframe.show()

+---+--------------------+
| id|               words|
+---+--------------------+
|  0|[Hi, I, heard, ab...|
|  1|[I, wish, java, c...|
|  2|[Logistic, regres...|
+---+--------------------+



In [50]:
ngram = NGram(n=2, inputCol='words', outputCol='ngrams_col')
ngram.transform(word_dataframe).show()

+---+--------------------+--------------------+
| id|               words|          ngrams_col|
+---+--------------------+--------------------+
|  0|[Hi, I, heard, ab...|[Hi I, I heard, h...|
|  1|[I, wish, java, c...|[I wish, wish jav...|
|  2|[Logistic, regres...|[Logistic regress...|
+---+--------------------+--------------------+



In [51]:
ngram.transform(word_dataframe).select('ngrams_col').show(truncate=False)

+------------------------------------------------------------------+
|ngrams_col                                                        |
+------------------------------------------------------------------+
|[Hi I, I heard, heard about, about Spark]                         |
|[I wish, wish java, java could, could use, use case, case classes]|
|[Logistic regression, regression models, models are, are neat]    |
+------------------------------------------------------------------+



**NGrams (n=3)**

In [54]:
ngram = NGram(n=3, inputCol='words', outputCol='ngrams_col')
ngram.transform(word_dataframe).select('ngrams_col').show(truncate=False)

+--------------------------------------------------------------------------------+
|ngrams_col                                                                      |
+--------------------------------------------------------------------------------+
|[Hi I heard, I heard about, heard about Spark]                                  |
|[I wish java, wish java could, java could use, could use case, use case classes]|
|[Logistic regression models, regression models are, models are neat]            |
+--------------------------------------------------------------------------------+



**Hashing TF and IDF**

In [57]:
tokenized.show()

+---+--------------------+--------------------+
| id|           sentences|               words|
+---+--------------------+--------------------+
|  0|Hi, I heard about...|[hi,, i, heard, a...|
|  1|I wish java could...|[i, wish, java, c...|
|  2|Logistic,regressi...|[logistic,regress...|
+---+--------------------+--------------------+



In [61]:
hashing_tf = HashingTF(inputCol='words', outputCol='raw_features')
featurized_data = hashing_tf.transform(tokenized)

In [62]:
featurized_data.show()

+---+--------------------+--------------------+--------------------+
| id|           sentences|               words|        raw_features|
+---+--------------------+--------------------+--------------------+
|  0|Hi, I heard about...|[hi,, i, heard, a...|(262144,[18700,19...|
|  1|I wish java could...|[i, wish, java, c...|(262144,[19036,20...|
|  2|Logistic,regressi...|[logistic,regress...|(262144,[11534],[...|
+---+--------------------+--------------------+--------------------+



In [68]:
idf = IDF(inputCol='raw_features', outputCol='features')

In [69]:
idf_model = idf.fit(featurized_data)

In [70]:
rescaled_data = idf_model.transform(featurized_data)

In [71]:
rescaled_data.show()

23/03/30 12:51:13 WARN DAGScheduler: Broadcasting large task binary with size 4.0 MiB
23/03/30 12:51:13 WARN DAGScheduler: Broadcasting large task binary with size 4.0 MiB
+---+--------------------+--------------------+--------------------+--------------------+
| id|           sentences|               words|        raw_features|            features|
+---+--------------------+--------------------+--------------------+--------------------+
|  0|Hi, I heard about...|[hi,, i, heard, a...|(262144,[18700,19...|(262144,[18700,19...|
|  1|I wish java could...|[i, wish, java, c...|(262144,[19036,20...|(262144,[19036,20...|
|  2|Logistic,regressi...|[logistic,regress...|(262144,[11534],[...|(262144,[11534],[...|
+---+--------------------+--------------------+--------------------+--------------------+

