In [1]:
import pyspark
from pyspark.sql.functions import col,when,size,min, max,length
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder \
    .master('local[*]')\
    .appName('topic_modelling')\
    .config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp_2.11:2.4.5")\
    .getOrCreate()

In [3]:
df_parquet = spark.read.parquet("part-00000-a8a717e8-bf83-42e3-a0b7-084971053d99-c000.snappy.parquet")

In [4]:
# create a column and put in the detected language code
from langdetect import detect
getCode = lambda x:detect(x)
sent = "this is eng"
getCode(sent)

'en'

In [5]:
# create a column and put in the detected language code
from langdetect import detect
# app_name_result = df_parquet.select(col('description')).collect()
# # create udf function to add a new column
# import pyspark.sql.functions as F
# from pyspark.sql.types import *

# def detect_lang(value):
#   return detect(value)

# #convert to a UDF Function by passing in the function and return type of function
# udf_detect_lang = F.udf(detect_lang, StringType())
# df_with_code = df_parquet.withColumn('lang_code', udf_detect_lang(df_parquet.description))
# df_with_code.show(5)

# easier method to convert to pandas then apply then function
df_parquet_pd = df_parquet.toPandas()
df_parquet_pd['lang_code'] = df_parquet_pd['description'].apply(getCode)
df_parquet_pd


Unnamed: 0,app_name,bundle,description,genres,os_platform,lang_code
0,Black Box - Movie Listing,958957112,Black Box Movie Listing App is a collection li...,"[Entertainment, Magazines & Newspapers]",IOS,en
1,Knife Throwing Max,1491396285,you will have fun in this game\n100+ challengi...,"[Games_Sports, Games_Role Playing]",IOS,en
2,爆料公社,1387765782,全台最大社群「爆料公社」APP 上線啦，我們把社群解放了，不只有勁爆的爆料內容，還貼心的加上...,[Social Networking],IOS,zh-tw
3,Lidow - Photo Editor & Collage,894532288,layout/grid/collage、Square/no crop for instagr...,[Photo & Video],IOS,en
4,Vidstitch Frames for Instagram,712908978,"■ #Vidstitch\n■ Featured on 148apps, Stelapps,...","[Photo & Video, Social Networking]",IOS,en
...,...,...,...,...,...,...
860,Barber Hair Salon & Beard Makeover,com.hmg.haircutgames.beardsalon.barberhaircut,Barber hair makeover salon game is for those w...,"[ENTERTAINMENT, FAMILY_PRETEND]",ANDROID,en
861,DJ Remix Offline 2020,com.mpro.djremixoffline2020,Dj Remix Offline 2020 Mp3 merupakan aplikasi m...,[MUSIC_AND_AUDIO],ANDROID,id
862,Sahih Bukhari (English),com.bangladroid.sahihbukhari,We have made this application from the book of...,[BOOKS_AND_REFERENCE],ANDROID,en
863,Two Minute English,com.astrobix.twominuteenglish,Improve your spoken English rapidly with the h...,[EDUCATION],ANDROID,en


In [6]:
# load with only English data
df_parquet_en = df_parquet_pd.loc[df_parquet_pd['lang_code']=='en']
df_parquet_en.shape

(687, 6)

In [7]:
df_spark = spark.createDataFrame(df_parquet_en)

In [8]:
# Step 1. Text cleasing with punctuations
from pyspark.sql.functions import regexp_replace
REGEX = '[_,\\-.!?@#$%^&*+/\d]'
df_spark = df_spark.withColumn("description_clean",regexp_replace(df_spark.description,REGEX,' '))
df_spark.show()

+--------------------+--------------------+--------------------+--------------------+-----------+---------+--------------------+
|            app_name|              bundle|         description|              genres|os_platform|lang_code|   description_clean|
+--------------------+--------------------+--------------------+--------------------+-----------+---------+--------------------+
|Black Box - Movie...|           958957112|Black Box Movie L...|[Entertainment, M...|        IOS|       en|Black Box Movie L...|
|  Knife Throwing Max|          1491396285|you will have fun...|[Games_Sports, Ga...|        IOS|       en|you will have fun...|
|Lidow - Photo Edi...|           894532288|layout/grid/colla...|     [Photo & Video]|        IOS|       en|layout grid colla...|
|Vidstitch Frames ...|           712908978|■ #Vidstitch
■ Fe...|[Photo & Video, S...|        IOS|       en|■  Vidstitch
■ Fe...|
|Where's my Cat? -...|          1259238703|The cat is hiding...|[Entertainment, G...|        IOS|

In [9]:
# Step 2. Tokenization
# df_spark = df_spark.drop("description_token")
from pyspark.ml.feature import Tokenizer
tokenizer = Tokenizer(inputCol='description_clean',outputCol='description_token')
df_spark = tokenizer.transform(df_spark)
df_spark.show()

+--------------------+--------------------+--------------------+--------------------+-----------+---------+--------------------+--------------------+
|            app_name|              bundle|         description|              genres|os_platform|lang_code|   description_clean|   description_token|
+--------------------+--------------------+--------------------+--------------------+-----------+---------+--------------------+--------------------+
|Black Box - Movie...|           958957112|Black Box Movie L...|[Entertainment, M...|        IOS|       en|Black Box Movie L...|[black, box, movi...|
|  Knife Throwing Max|          1491396285|you will have fun...|[Games_Sports, Ga...|        IOS|       en|you will have fun...|[you, will, have,...|
|Lidow - Photo Edi...|           894532288|layout/grid/colla...|     [Photo & Video]|        IOS|       en|layout grid colla...|[layout, grid, co...|
|Vidstitch Frames ...|           712908978|■ #Vidstitch
■ Fe...|[Photo & Video, S...|        IOS|   

In [10]:
# Stemming
# from nltk.stem.porter import *
# # Instantiate stemmer object
# stemmer = PorterStemmer()
# stemF = lambda x:stemmer.stem(x)

import nltk
nltk.download('wordnet')
from nltk.stem import WordNetLemmatizer 
lemmatizer = WordNetLemmatizer() 

[nltk_data] Downloading package wordnet to
[nltk_data]     C:\Users\ChengXiaotian\AppData\Roaming\nltk_data...
[nltk_data]   Package wordnet is already up-to-date!


In [11]:
from pyspark.sql.types import *
import pyspark.sql.functions as F
import array as arr 

def lemm_function(list):
    list_clean = []
    for item in list:
        list_clean.append(lemmatizer.lemmatize(item))
        
    return list_clean
    
udf_lemm_function= F.udf(lemm_function, ArrayType(StringType()))

df_spark = df_spark.withColumn("description_lemm",udf_lemm_function(df_spark.description_token))

In [12]:
df_spark.select("description_lemm").show()

+--------------------+
|    description_lemm|
+--------------------+
|[black, box, movi...|
|[you, will, have,...|
|[layout, grid, co...|
|[■, , vidstitch, ...|
|[the, cat, is, hi...|
|[martial, art, ar...|
|[google, image, s...|
|[your, face, reve...|
|[💌, neon, pastel...|
|[⭕, arabic, voice...|
|[sometimes, in, a...|
|[easy, to, use, t...|
|[finding, a, tv, ...|
|[if, you, are, a,...|
|[play, new, , eas...|
|[click, , merge, ...|
|[date, me, get, y...|
|[the, app, is, a,...|
|[want, to, login,...|
|[information, , t...|
+--------------------+
only showing top 20 rows



In [15]:
# Step 3. Remove stopword
# df_spark = df_spark.drop("description_no_stop")
from pyspark.ml.feature import StopWordsRemover
import numpy as np

stopwords_list = StopWordsRemover.loadDefaultStopWords("english")
stopwords_customize_list = ["app","apps"]
stopwords_list = np.append(stopwords_list,stopwords_customize_list)

stopwords = StopWordsRemover(inputCol="description_lemm",outputCol="description_no_stop",stopWords=stopwords_list)
stopwords.getStopWords()
df_spark = stopwords.transform(df_spark)

In [16]:
df_pd_desc_final = df_spark.toPandas()

In [None]:
df_pd_desc_final

### Note: IDF vector must be trained with large corpus, otherwise lose the advance of IDF

In [17]:
# get the "description" column
joinF= lambda x:" ".join(x)
df_pd_desc_final["description_join"] = df_pd_desc_final["description_no_stop"].apply(joinF)

In [18]:
corpus_list = df_pd_desc_final["description_join"].tolist()

In [19]:
# Step 4. TF-IDF countvector, Transform documents to document-term matrix
from sklearn.feature_extraction.text import CountVectorizer
cv = CountVectorizer(max_df=0.95) #ignore terms that have a document frequency strictly higher than the given threshold
dtm = cv.fit_transform(corpus_list)

In [None]:
dtm.shape

In [20]:
feature_names = cv.get_feature_names()

In [23]:
feature_names

['aac',
 'aap',
 'aapake',
 'aapko',
 'aarti',
 'ab',
 'abacus',
 'abandon',
 'abc',
 'abdul',
 'abdullah',
 'ability',
 'ablar',
 'able',
 'ablution',
 'abnormal',
 'aboard',
 'aboki',
 'about',
 'aboutads',
 'aboutus',
 'above',
 'abraxas',
 'abreast',
 'abridged',
 'abroad',
 'absolute',
 'absolutely',
 'absorb',
 'absorbed',
 'absorbing',
 'abstract',
 'abstraction',
 'abu',
 'abundant',
 'abx',
 'ac',
 'academic',
 'academy',
 'acca',
 'accelerate',
 'acceleration',
 'accelerator',
 'accelerometer',
 'accent',
 'accept',
 'acceptance',
 'accepted',
 'access',
 'accessibility',
 'accessible',
 'accessing',
 'accessory',
 'accident',
 'accidental',
 'accidentally',
 'accidently',
 'acclaim',
 'accolade',
 'accompanied',
 'accompany',
 'accompanying',
 'accomplish',
 'accomplishment',
 'accordance',
 'according',
 'accordingly',
 'account',
 'accounting',
 'accounts',
 'accumulate',
 'accumulated',
 'accuracy',
 'accurate',
 'accurately',
 'ace',
 'ach',
 'acharya',
 'achieve',
 'ach

In [22]:
len(cv.vocabulary_)

10720

In [24]:
# Step 5. TfidfTransformer to compute the IDF
from sklearn.feature_extraction.text import TfidfTransformer
tfidfTransformer = TfidfTransformer(smooth_idf=True, use_idf=True)
tfidfTransformer.fit(dtm)

TfidfTransformer(norm='l2', smooth_idf=True, sublinear_tf=False, use_idf=True)

In [25]:
def get_keyword_list(doc):
    # generate tfidf vector for given doc
    tf_idf_vector = tfidfTransformer.transform(cv.transform([doc]))
    
    # Return a C00rdinate representation of this matrix
    coo_matrix = tf_idf_vector.tocoo()
    # sorted_items = sort_coo(coo_matrix)

    # get Top-n index list
    index_sort = np.argsort(coo_matrix.data)[-topn:]
    
    keyword_list = []
    # extract keywords
    for idx in index_sort[::-1]: 
        keyword_list.append(feature_names[coo_matrix.col[idx]])
    
    return keyword_list

In [26]:
# Step 6. Get consolidated keyword list for entire dataset

# get topn = 10, keywords
topn = 10
df_pd_desc_final['description_keyword_list']=df_pd_desc_final['description_join'].apply(get_keyword_list)

In [27]:
df_pd_desc_final['description_keyword_list']

0      [movie, subscription, news, list, account, tv,...
1      [knife, apple, score, progressive, blade, swor...
2      [photo, lidow, subscription, instagram, leak, ...
3      [vidstitch, video, instagram, combine, squeeze...
4      [cat, hiding, item, place, unlikely, find, som...
                             ...                        
682    [mining, term, commit, alphabetical, determine...
683    [hair, barber, salon, shop, cutting, kid, game...
684    [volume, prayer, bukhari, ahadeeth, al, book, ...
685    [lesson, english, phrasal, confusing, twominen...
686    [galaxy, theme, plus, note, pro, smartphones, ...
Name: description_keyword_list, Length: 687, dtype: object

In [28]:
df_desc_final_spark = spark.createDataFrame(df_pd_desc_final)

In [29]:
for i in range(topn):
    df_desc_final_spark = df_desc_final_spark.withColumn("keyword-"+str(i+1),df_desc_final_spark['description_keyword_list'][i])

In [30]:
df_desc_final_spark.columns

['app_name',
 'bundle',
 'description',
 'genres',
 'os_platform',
 'lang_code',
 'description_clean',
 'description_token',
 'description_lemm',
 'description_no_stop',
 'description_join',
 'description_keyword_list',
 'keyword-1',
 'keyword-2',
 'keyword-3',
 'keyword-4',
 'keyword-5',
 'keyword-6',
 'keyword-7',
 'keyword-8',
 'keyword-9',
 'keyword-10']

In [31]:
# save the final keyword spreadsheet to local
df_desc_final = df_desc_final_spark.toPandas()
df_desc_final.to_csv('desc_keyword.csv')

In [None]:
# # BLOCK for testing individual sentence
# # get the document that we want to extract from
# test_idx = 3
# doc_test = df_pd_desc_final['description_join'][test_idx]

# # generate tfidf vector for given doc
# tf_idf_vector = tfidfTransformer.transform(cv.transform([doc_test]))

In [None]:
# import numpy as np
# # Return a C00rdinate representation of this matrix
# coo_matrix = tf_idf_vector.tocoo()
# # sorted_items = sort_coo(coo_matrix)

# # get Top-n list
# index_sort = np.argsort(coo_matrix.data)[-10:]

# # extract keywords, topn
# print("Top-n keywords in the given document:\n")
# for idx in index_sort[::-1]: 

#     print(feature_names[coo_matrix.col[idx]],":",coo_matrix.data[idx])

# # keywords = extract_topn_from_vector(feature_names,sorted_items,10)

# # # print result
# # print("Top-n keywords in the given document:\n")
# # for k in keywords:
# #     print(k,":",keywords[k])

## -- END --

In [None]:
# doc_test

In [None]:
# doc_origin = df_pd_desc_final['description'][test_idx]
# doc_origin

In [None]:
# df_pd_desc_final['app_name'][test_idx]

In [None]:
# df_pd_desc_final['genres'][test_idx]

In [None]:
# dm = tf_idf_vector.todense()

In [None]:
# def sort_coo(coo_matrix):
#     tuples = zip(coo_matrix.col,coo_matrix.data)
#     return sorted(tuples,key=lambda x:x[1],reverse=True)

# def extract_topn_from_vector(feature_names,sorted_items,topn):
#     """get the feature names according to the tf-idf score of top-n items"""
    
#     #use only top-n itmes
#     sorted_items = sorted_items[:topn]
    
#     score_vals = []
#     features_vals = []
    
#     # word index and corresponding tf-idf score
#     for index,score in sorted_items:
        
#         #feature name and score
#         score_vals.append(round(score,3))
#         features_vals.append(feature_names[index])
        
#     # create a tuples of feature,score
#     results = {}
#     for index in range(len(features_vals)):
#         results[features_vals[index]] = score_vals[index]
        
#     return results