<h1>Table of Contents<span class="tocSkip"></span></h1>
<div class="toc"><ul class="toc-item"><li><span><a href="#Topic-Modelling-with-PySpark-and-Spark-NLP" data-toc-modified-id="Topic-Modelling-with-PySpark-and-Spark-NLP-1">Topic Modelling with PySpark and Spark NLP</a></span></li><li><span><a href="#Env-필독" data-toc-modified-id="Env-필독-2">Env 필독</a></span></li><li><span><a href="#Vectorization-with-PySpark" data-toc-modified-id="Vectorization-with-PySpark-3">Vectorization with PySpark</a></span></li><li><span><a href="#Topic-Modeling" data-toc-modified-id="Topic-Modeling-4">Topic Modeling</a></span></li></ul></div>

# Topic Modelling with PySpark and Spark NLP 

- [미디엄 링크](https://medium.com/trustyou-engineering/topic-modelling-with-pyspark-and-spark-nlp-a99d063f1a6e)
- 한국어 모델, 전체 모델은 [이쪽 링크](https://nlp.johnsnowlabs.com/models)

<img src="./data/1.png">
<img src="./data/2.png">

# Env 필독
- python 3.6 (3.8 안됨)
- pyspark 2.4.5 (3 안됨)
- 요거 실행시켜야 함. pyspark에서 sparknlp를 패키지로 쓰겠다는 뜻...? -> pyspark --packages com.johnsnowlabs.nlp:spark-nlp_2.11:2.7.4

In [1]:
%config Completer.use_jedi = False

# Import

In [6]:
import os
import json
os.environ['JAVA_HOME'] = "/usr/lib/jvm/java-8-openjdk-amd64"

In [7]:
import pyspark
import sparknlp
from pyspark.sql import functions as F
from pyspark.sql import SparkSession
from os import path

In [3]:
# spark = SparkSession.builder \
#     .appName("Spark NLP")\
#     .master("local[4]")\
#     .config("spark.driver.memory","16G")\
#     .config("spark.driver.maxResultSize", "0") \
#     .config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp_2.11:2.7.4")\  # Need This Line
#     .getOrCreate()

# spark.sparkContext.getConf().getAll()

In [4]:
spark = sparknlp.start()

In [8]:
data_path = './data'

review_path = path.join(data_path, 'Musical_instruments_reviews.csv')
data = spark.read.csv(review_path, header=True)
text_col = 'reviewText'
review_text = data.select(text_col).filter(F.col(text_col).isNotNull())

# Pipeline

## NLP Pipeline

- HuggingFace Pipeline : [링크](https://dev.to/amananandrai/5-nlp-tasks-using-hugging-face-pipeline-5b98)
- HuggingFace **Tokenizer** Pipeline <br>
<img src="https://miro.medium.com/max/4800/1*wiw2Pm01JIydNqCAqjoLPw.png">

- Task마다 다르다. Tokenizing을 어느정도로 할지마다 다르다.

## Spark Pipeline

- Assembler, Indexer, Transformer 등등... <br>

<img src='https://miro.medium.com/max/1070/1*xswnCXe9y_sHL2lypPkpOw.png'>

## SparkNLP Pipeline

```python
from pyspark.ml import Pipeline
pipeline = Pipeline() \
     .setStages([documentAssembler,
                 tokenizer,
                 normalizer,
                 lemmatizer,
                 stopwords_cleaner,
                 pos_tagger,
                 chunker,
                 finisher])
```

In [5]:
from sparknlp.base import DocumentAssembler
documentAssembler = DocumentAssembler() \      
     .setInputCol(text_col) \      
     .setOutputCol('document')

Py4JJavaError: An error occurred while calling None.com.johnsnowlabs.nlp.annotators.Tokenizer.
: java.lang.NoClassDefFoundError: org/apache/spark/ml/util/MLWritable$class
	at com.johnsnowlabs.nlp.AnnotatorApproach.<init>(AnnotatorApproach.scala:18)
	at com.johnsnowlabs.nlp.annotators.Tokenizer.<init>(Tokenizer.scala:41)
	at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:238)
	at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
	at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.ml.util.MLWritable$class
	at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
	... 13 more


In [9]:
from sparknlp.annotator import Tokenizer
tokenizer = Tokenizer() \
     .setInputCols(['document']) \
     .setOutputCol('tokenized')

In [12]:
from sparknlp.annotator import Normalizer
normalizer = Normalizer() \
     .setInputCols(['tokenized']) \
     .setOutputCol('normalized') \
     .setLowercase(True)

In [17]:
!pip install nltk



In [20]:
from nltk.corpus import stopwords
# eng_stopwords = stopwords.words('english')
eng_stopwords = ['the']

In [21]:
from sparknlp.annotator import StopWordsCleaner
stopwords_cleaner = StopWordsCleaner() \
     .setInputCols(['lemmatized']) \
     .setOutputCol('no_stop_lemmatized') \
     .setStopWords(eng_stopwords)

In [22]:
from sparknlp.annotator import PerceptronModel
pos_tagger = PerceptronModel.pretrained('pos_anc') \
     .setInputCols(['document', 'lemmatized']) \
     .setOutputCol('pos')

pos_anc download started this may take some time.
Approximate size to download 4.3 MB
[OK!]


In [23]:
from sparknlp.base import Finisher
finisher = Finisher() \
     .setInputCols(['unigrams', 'ngrams'])

In [24]:
from pyspark.ml import Pipeline
pipeline = Pipeline() \
     .setStages([documentAssembler,
                 tokenizer,
                 normalizer,
                 lemmatizer,
                 stopwords_cleaner,
                 pos_tagger,
                 chunker,
                 finisher])

NameError: name 'documentAssembler' is not defined

In [None]:
processed_review = pipeline.fit(review_text).transform(review_text)

In [None]:
from pyspark.sql.functions import concat
processed_review = processed_review.withColumn('final',
     concat(F.col('finished_unigrams'), 
            F.col('finished_ngrams')))

# Vectorization with PySpark

In [None]:
from pyspark.ml.feature import CountVectorizer
tfizer = CountVectorizer(inputCol='finished_no_stop_lemmatized',
                         outputCol='tf_features')
tf_model = tfizer.fit(processed_review)
tf_result = tf_model.transform(processed_review)

In [None]:
from pyspark.ml.feature import IDF
idfizer = IDF(inputCol='tf_features', 
              outputCol='tf_idf_features')
idf_model = idfizer.fit(tf_result)
tfidf_result = idf_model.transform(tf_result)

# Topic Modeling

In [None]:
from pyspark.ml.clustering import LDA
num_topics = 6
max_iter = 10
lda = LDA(k=num_topics, 
          maxIter=max_iter, 
          featuresCol='tf_idf_features')
lda_model = lda.fit(tfidf_result)

In [None]:
vocab = tf_model.vocabulary
def get_words(token_list):
    return [vocab[token_id] for token_id in token_list]
udf_to_words = F.udf(get_words, T.ArrayType(T.StringType()))

In [None]:
num_top_words = 7
topics = lda_model
     .describeTopics(num_top_words)
     .withColumn('topicWords', udf_to_words(F.col('termIndices')))
topics.select('topic', 'topicWords').show(truncate=100)