In [4]:
from connectPyspark import *
from pyspark.ml.feature import NGram
import pyspark.sql.functions as F

In [5]:
a = read_avro("output/22__READY_NGRAM")
a = a.coalesce(54)

a.show()

# 🆂🅰🅼🅿🅻🅴

+--------------------+--------------------+--------------------+--------+-----------+--------------------+
|            original|               token|                 tag|language|chinese_tag|    language_pattern|
+--------------------+--------------------+--------------------+--------+-----------+--------------------+
|taste nice better...|[taste, nice, bet...|[NOUN, ADJ, ADJ, ...| english|       null|[en, en, en, en, ...|
|barang sampai den...|[barang, sampai, ...|[NOUN, ADP, ADP, ...| english|       null|[ms, ms, ms, ms, ...|
|to seller thanks ...|[to, seller, than...|[ADP, NOUN, NOUN,...| english|       null|[en, en, en, en, ...|
|thank you seller ...|[thank, you, sell...|[VERB, PRON, NOUN...| english|       null|[en, en, en, en, ...|
|wow fast delivery...|[wow, fast, deliv...|[PROPN, PROPN, NO...| english|       null|[en, en, en, en, ...|
|good product qual...|[good, product, q...|[PROPN, PROPN, PR...| english|       null|[en, en, en, en, ...|
|unrelated photo a...|[unrelated, pho

In [6]:
a= a.withColumn("sentence_id", F.monotonically_increasing_id())

In [7]:
a.show()

# 🆂🅰🅼🅿🅻🅴

+--------------------+--------------------+--------------------+--------+-----------+--------------------+-----------+
|            original|               token|                 tag|language|chinese_tag|    language_pattern|sentence_id|
+--------------------+--------------------+--------------------+--------+-----------+--------------------+-----------+
|taste nice better...|[taste, nice, bet...|[NOUN, ADJ, ADJ, ...| english|       null|[en, en, en, en, ...|          0|
|barang sampai den...|[barang, sampai, ...|[NOUN, ADP, ADP, ...| english|       null|[ms, ms, ms, ms, ...|          1|
|to seller thanks ...|[to, seller, than...|[ADP, NOUN, NOUN,...| english|       null|[en, en, en, en, ...|          2|
|thank you seller ...|[thank, you, sell...|[VERB, PRON, NOUN...| english|       null|[en, en, en, en, ...|          3|
|wow fast delivery...|[wow, fast, deliv...|[PROPN, PROPN, NO...| english|       null|[en, en, en, en, ...|          4|
|good product qual...|[good, product, q...|[PROP

### Create bi-gram

In [14]:
tokenNgram =NGram(n=2, inputCol="token", outputCol="token_gram")
tagNgram =NGram(n=2, inputCol="tag", outputCol="tag_gram")
langNgram =NGram(n=2, inputCol="language_pattern", outputCol="lang_gram")

In [15]:
n2 = tokenNgram.transform(a)
n2 = tagNgram.transform(n2)
n2 = langNgram.transform(n2)


In [16]:
n2.show()

+--------------------+--------------------+--------------------+--------+-----------+--------------------+-----------+--------------------+--------------------+--------------------+
|            original|               token|                 tag|language|chinese_tag|    language_pattern|sentence_id|          token_gram|            tag_gram|           lang_gram|
+--------------------+--------------------+--------------------+--------+-----------+--------------------+-----------+--------------------+--------------------+--------------------+
|taste nice better...|[taste, nice, bet...|[NOUN, ADJ, ADJ, ...| english|       null|[en, en, en, en, ...|          0|[taste nice, nice...|[NOUN ADJ, ADJ AD...|[en en, en en, en...|
|barang sampai den...|[barang, sampai, ...|[NOUN, ADP, ADP, ...| english|       null|[ms, ms, ms, ms, ...|          1|[barang sampai, s...|[NOUN ADP, ADP AD...|[ms ms, ms ms, ms...|
|to seller thanks ...|[to, seller, than...|[ADP, NOUN, NOUN,...| english|       null|[en, 

In [11]:
n2

DataFrame[original: string, token: array<string>, tag: array<string>, language: string, chinese_tag: array<string>, language_pattern: array<string>, sentence_id: bigint, token_gram: array<string>, tag_gram: array<string>, lang_gram: array<string>]

In [17]:
from pyspark.sql.functions import arrays_zip, col, explode
from pyspark.sql.window import Window
from pyspark.sql.window import Window
import pyspark.sql.functions as F
t = n2.withColumn("tmp", arrays_zip("token_gram", "tag_gram", "lang_gram"))\
.select("sentence_id", "original", "tag","language", "token", "chinese_tag", "language_pattern","token", F.posexplode(col("tmp")))\
.select("sentence_id", "original", "tag","language", "token", "chinese_tag", "language_pattern", "pos",col("col.token_gram"), col("col.tag_gram"),  col("col.lang_gram"))

t.show(100)

+-----------+--------------------+--------------------+--------+--------------------+-----------+--------------------+---+-----------------+-----------+---------+
|sentence_id|            original|                 tag|language|               token|chinese_tag|    language_pattern|pos|       token_gram|   tag_gram|lang_gram|
+-----------+--------------------+--------------------+--------+--------------------+-----------+--------------------+---+-----------------+-----------+---------+
|          0|taste nice better...|[NOUN, ADJ, ADJ, ...| english|[taste, nice, bet...|       null|[en, en, en, en, ...|  0|       taste nice|   NOUN ADJ|    en en|
|          0|taste nice better...|[NOUN, ADJ, ADJ, ...| english|[taste, nice, bet...|       null|[en, en, en, en, ...|  1|      nice better|    ADJ ADJ|    en en|
|          0|taste nice better...|[NOUN, ADJ, ADJ, ...| english|[taste, nice, bet...|       null|[en, en, en, en, ...|  2|      better than|   ADJ VERB|    en en|
|          0|taste nic

In [18]:
shutil.rmtree("ngramV2/N2_GRAM")
write_avro(t, "ngramV2/N2_GRAM")

                                                                                

### Create tri-gram

In [22]:
tokenNgram =NGram(n=3, inputCol="token", outputCol="token_gram")
tagNgram =NGram(n=3, inputCol="tag", outputCol="tag_gram")
langNgram =NGram(n=3, inputCol="language_pattern", outputCol="lang_gram")

In [23]:
n = tokenNgram.transform(a)
n = tagNgram.transform(n)
n = langNgram.transform(n)

In [24]:
from pyspark.sql.functions import arrays_zip, col, explode
from pyspark.sql.window import Window
from pyspark.sql.window import Window
import pyspark.sql.functions as F
t = n.withColumn("tmp", arrays_zip("token_gram", "tag_gram", "lang_gram"))\
.select("sentence_id", "original", "tag","language", "token", "chinese_tag", "language_pattern","token", F.posexplode(col("tmp")))\
.select("sentence_id", "original", "tag","language", "token", "chinese_tag", "language_pattern", "pos",col("col.token_gram"), col("col.tag_gram"),  col("col.lang_gram"))

t.show(100)

+-----------+--------------------+--------------------+--------+--------------------+-----------+--------------------+---+--------------------+-----------------+---------+
|sentence_id|            original|                 tag|language|               token|chinese_tag|    language_pattern|pos|          token_gram|         tag_gram|lang_gram|
+-----------+--------------------+--------------------+--------+--------------------+-----------+--------------------+---+--------------------+-----------------+---------+
|          0|taste nice better...|[NOUN, ADJ, ADJ, ...| english|[taste, nice, bet...|       null|[en, en, en, en, ...|  0|   taste nice better|     NOUN ADJ ADJ| en en en|
|          0|taste nice better...|[NOUN, ADJ, ADJ, ...| english|[taste, nice, bet...|       null|[en, en, en, en, ...|  1|    nice better than|     ADJ ADJ VERB| en en en|
|          0|taste nice better...|[NOUN, ADJ, ADJ, ...| english|[taste, nice, bet...|       null|[en, en, en, en, ...|  2|   better than mag

In [25]:
shutil.rmtree("ngramV2/N3_GRAM")
write_avro(t, "ngramV2/N3_GRAM")

                                                                                

### Create 4-gram

In [28]:
tokenNgram =NGram(n=4, inputCol="token", outputCol="token_gram")
tagNgram =NGram(n=4, inputCol="tag", outputCol="tag_gram")
langNgram =NGram(n=4, inputCol="language_pattern", outputCol="lang_gram")

In [29]:
n = tokenNgram.transform(a)
n = tagNgram.transform(n)
n = langNgram.transform(n)

In [30]:
from pyspark.sql.functions import arrays_zip, col, explode
from pyspark.sql.window import Window
from pyspark.sql.window import Window
import pyspark.sql.functions as F
t = n.withColumn("tmp", arrays_zip("token_gram", "tag_gram", "lang_gram"))\
.select("sentence_id", "original", "tag","language", "token", "chinese_tag", "language_pattern","token", F.posexplode(col("tmp")))\
.select("sentence_id", "original", "tag","language", "token", "chinese_tag", "language_pattern", "pos",col("col.token_gram"), col("col.tag_gram"),  col("col.lang_gram"))

t.show(100)

+-----------+--------------------+--------------------+--------+--------------------+-----------+--------------------+---+--------------------+--------------------+-----------+
|sentence_id|            original|                 tag|language|               token|chinese_tag|    language_pattern|pos|          token_gram|            tag_gram|  lang_gram|
+-----------+--------------------+--------------------+--------+--------------------+-----------+--------------------+---+--------------------+--------------------+-----------+
|          0|taste nice better...|[NOUN, ADJ, ADJ, ...| english|[taste, nice, bet...|       null|[en, en, en, en, ...|  0|taste nice better...|   NOUN ADJ ADJ VERB|en en en en|
|          0|taste nice better...|[NOUN, ADJ, ADJ, ...| english|[taste, nice, bet...|       null|[en, en, en, en, ...|  1|nice better than ...|   ADJ ADJ VERB NOUN|en en en en|
|          0|taste nice better...|[NOUN, ADJ, ADJ, ...| english|[taste, nice, bet...|       null|[en, en, en, en, .

In [31]:
shutil.rmtree("ngramV2/N4_GRAM")
write_avro(t, "ngramV2/N4_GRAM")

                                                                                

### Create 5-gram

In [32]:
tokenNgram =NGram(n=5, inputCol="token", outputCol="token_gram")
tagNgram =NGram(n=5, inputCol="tag", outputCol="tag_gram")
langNgram =NGram(n=5, inputCol="language_pattern", outputCol="lang_gram")

In [33]:
n = tokenNgram.transform(a)
n = tagNgram.transform(n)
n = langNgram.transform(n)

In [34]:
from pyspark.sql.functions import arrays_zip, col, explode
from pyspark.sql.window import Window
from pyspark.sql.window import Window
import pyspark.sql.functions as F
t = n.withColumn("tmp", arrays_zip("token_gram", "tag_gram", "lang_gram"))\
.select("sentence_id", "original", "tag","language", "token", "chinese_tag", "language_pattern","token", F.posexplode(col("tmp")))\
.select("sentence_id", "original", "tag","language", "token", "chinese_tag", "language_pattern", "pos",col("col.token_gram"), col("col.tag_gram"),  col("col.lang_gram"))

t.show(100)

+-----------+--------------------+--------------------+--------+--------------------+-----------+--------------------+---+--------------------+--------------------+--------------+
|sentence_id|            original|                 tag|language|               token|chinese_tag|    language_pattern|pos|          token_gram|            tag_gram|     lang_gram|
+-----------+--------------------+--------------------+--------+--------------------+-----------+--------------------+---+--------------------+--------------------+--------------+
|          0|taste nice better...|[NOUN, ADJ, ADJ, ...| english|[taste, nice, bet...|       null|[en, en, en, en, ...|  0|taste nice better...|NOUN ADJ ADJ VERB...|en en en en en|
|          0|taste nice better...|[NOUN, ADJ, ADJ, ...| english|[taste, nice, bet...|       null|[en, en, en, en, ...|  1|nice better than ...|ADJ ADJ VERB NOUN...|en en en en en|
|          0|taste nice better...|[NOUN, ADJ, ADJ, ...| english|[taste, nice, bet...|       null|[en

In [35]:
shutil.rmtree("ngramV2/N5_GRAM")
write_avro(t, "ngramV2/N5_GRAM")

                                                                                

### 23 ASSIGN GRAM TYPE & save the data

In [47]:
g2 = read_avro("ngramV2/N2_GRAM")
g3 = read_avro("ngramV2/N3_GRAM")
g4 = read_avro("ngramV2/N4_GRAM")
g5 = read_avro("ngramV2/N5_GRAM")

In [48]:
g2 = g2.withColumn("gram_type", F.lit("2"))
g3 = g3.withColumn("gram_type", F.lit("3"))
g4 = g4.withColumn("gram_type", F.lit("4"))
g5 = g5.withColumn("gram_type", F.lit("5"))

In [49]:
def unionAll(*dfs):
    return reduce(DataFrame.unionAll, dfs)
merge = unionAll(g2, g3, g4, g5)

In [39]:
shutil.rmtree("output/23_ALL__NGRAM.avro")
write_avro(merge, "output/23_ALL__NGRAM.avro")

                                                                                

### 24 RETAIN, KEYWORDS N-GRAM

In [50]:
import pyspark.sql.types as T

In [51]:
engKey = ["seller","recommend","nice"]
chiKey = ["好","赞","不错","快","喜欢","谢","值","包装","卖家"]
bmKey = ["berbaloi","barang","cantik","murah"]

keywords = set(engKey).union(set(chiKey)).union(set(bmKey))

def contains_keywords(text):
    for x in text.split(" "):
        if x in keywords:
            return True
        else:
            return False

keyDeter = udf(lambda x : contains_keywords(x), T.BooleanType())

In [52]:
df = read_avro("output/23_ALL__NGRAM.avro")
withKey = df.withColumn("containsKey", keyDeter(F.col("token_gram")))

In [None]:
write_avro(withKey, "output/24_WITH_KEY_GRAM")

[Stage 15:>                                                       (0 + 20) / 27]

In [107]:
withKey = read_avro("output/24_WITH_KEY_GRAM").filter(F.col("containsKey") == True)

In [108]:
write_avro(withKey.drop("containKey"), "output/24_WITH_KEY")

                                                                                

In [110]:
read_avro("output/24_WITH_KEY").filter(F.col("token_gram").contains("好")).show()



+------------+---------------------------------------+--------------------+--------+-----------------------------+--------------------+--------------------+---+----------+--------+---------+---------+-----------+
| sentence_id|                               original|                 tag|language|                        token|          normal_tag|    language_pattern|pos|token_gram|tag_gram|lang_gram|gram_type|containsKey|
+------------+---------------------------------------+--------------------+--------+-----------------------------+--------------------+--------------------+---+----------+--------+---------+---------+-----------+
|171798691842|    很好 赞 赞赞赞赞赞赞赞赞赞赞赞赞...|[d, a, v, v, v, v...| chinese|  [很, 好, 赞, 赞赞, 赞赞,...|[ADV, ADJ, VERB, ...|[zh, zh, zh, zh, ...|  1|     好 赞|     a v|    zh zh|        2|       true|
|171798691847|                   good quality prod...|[eng, eng, eng, e...| chinese|         [good, quality, p...|[X, X, X, X, X, X...|[en, en, en, en, ...| 11|     好 还|   

                                                                                

### 25 Determine at middle

In [59]:
import pyspark.sql.functions as f
import pyspark.sql.types as T


engKey = ["seller","recommend","nice"]
chiKey = ["好","赞","不错","快","喜欢","谢","值","包装","卖家"]
bmKey = ["berbaloi","barang","cantik","murah"]

keywords = set(engKey).union(set(chiKey)).union(set(bmKey))

def is_middle(gramType, token_gram):
    token_gram = token_gram.split(" ")
    if gramType == "2" or gramType == "4":
        return False
    elif gramType == "3":
        if token_gram[1] in keywords:
            return True
        else:
            return False
    elif gramType == "5":
        if token_gram[2] in keywords:
            return True
        else:
            return False
    else:
        return False
    
middleUDF = f.udf(lambda x,y : is_middle(x,y), T.BooleanType() )

In [60]:
df = read_avro("output/24_WITH_KEY")

In [61]:
df = df.withColumn("middle_key", middleUDF(df.gram_type, df.token_gram))

In [63]:
shutil.rmtree("output/25_CENTER_DETERMINED")
write_avro(df, "output/25_CENTER_DETERMINED")

                                                                                

In [66]:
df = read_avro( "output/25_CENTER_DETERMINED")

In [67]:
df.filter(df.middle_key ==  True).show()

+------------+--------------------+--------------------+--------+--------------------+-----------+--------------------+---+--------------------+-----------------+---------+---------+-----------+----------+
| sentence_id|            original|                 tag|language|               token|chinese_tag|    language_pattern|pos|          token_gram|         tag_gram|lang_gram|gram_type|containsKey|middle_key|
+------------+--------------------+--------------------+--------+--------------------+-----------+--------------------+---+--------------------+-----------------+---------+---------+-----------+----------+
|128849018909|berbaloi murah ce...|[ADJ, ADJ, ADJ, N...| english|[berbaloi, murah,...|       null|[ms, ms, ms, en, ...|  0|berbaloi murah cepat|      ADJ ADJ ADJ| ms ms ms|        3|       true|      true|
|128849018924|brg dh dpt thank ...|[NOUN, ADV, VERB,...| english|[brg, dh, dpt, th...|       null|[en, en, en, en, ...|  5| seller nice packing|   PROPN ADJ NOUN| en en en|    