In [1]:
from pyspark.sql import SparkSession
from pyspark import StorageLevel
import os

os.environ["PYSPARK_PYTHON"]="/home/pc/g5_env_tf/bin/python39"
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'

spark = SparkSession.builder.master("local[*]")\
            .appName("ms_pos_tagging")\
            .config('spark.executor.memory', '20g')\
            .config('spark.driver.maxResultSize', '10g')\
            .config('spark.driver.memory', '20g')\
            .config('spark.ui.showConsoleProgress', False)\
            .getOrCreate()

sparkContext = spark.sparkContext

spark

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/10/07 23:52:00 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/10/07 23:52:01 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
22/10/07 23:52:01 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
22/10/07 23:52:01 WARN Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043.
22/10/07 23:52:01 WARN Utils: Service 'SparkUI' could not bind on port 4043. Attempting port 4044.


In [2]:
file_path = 'hdfs://g5.bigtop.it:8020/user/root/filtered_ms_wiki.parquet/part-00000-aa4a2315-60e8-4435-bd76-cf008453f11e-c000.snappy.parquet'

In [3]:
df1 = spark.read.option("header",True).parquet(file_path)
df1.show()

+--------------------+----------+----------+--------------------+--------------------+--------------------+--------------------+
|      token_sentence|sent_count|word_count|              2grams|              3grams|              4grams|              5grams|
+--------------------+----------+----------+--------------------+--------------------+--------------------+--------------------+
|[medan, selera, p...|         1|         8|[pula terletak, t...|[selera pula terl...|[medan selera pul...|[medan selera pul...|
|[terutung, payung...|         1|        17|[yang terletak, t...|[gampong yang ter...|[sebuah gampong y...|[merupakan sebuah...|
|[empangan, bakun,...|         1|         7|[juga terletak, t...|[bakun juga terle...|[empangan bakun j...|[empangan bakun j...|
|[lapangan, terban...|         1|        31|[maputo terletak,...|[terbang maputo t...|[lapangan terbang...|[lapangan terbang...|
|[daerah, preov, w...|        91|        11|      [daerah preov]|[daerah preov wil...|[daerah pre

In [4]:
from pyspark.sql.functions import col

gram2 = df1.select(col("2grams")).withColumnRenamed("2grams","n-grams")
gram3 = df1.select(col("3grams")).filter(df1.word_count > 2).withColumnRenamed("3grams","n-grams")
gram4 = df1.select(col("4grams")).filter(df1.word_count > 3).withColumnRenamed("4grams","n-grams")
gram5 = df1.select(col("5grams")).filter(df1.word_count > 4).withColumnRenamed("5grams","n-grams")

In [5]:
n_gram = gram5.union(gram4).union(gram3).union(gram2).persist(StorageLevel.MEMORY_ONLY)

In [6]:
n_gram.show()

+--------------------+
|             n-grams|
+--------------------+
|[medan selera pul...|
|[merupakan sebuah...|
|[empangan bakun j...|
|[lapangan terbang...|
|[daerah preov wil...|
|[chepa kota bharu...|
|[anarjan bostanab...|
|[gkeli bozyk meru...|
|[kronburg merupak...|
|[sabah mencapai k...|
|[merupakan sebuah...|
|[merupakan sebuah...|
|[merupakan sebuah...|
|[pada april denga...|
|[daerah topoany w...|
|[sebuah gemeente ...|
|[merupakan sebuah...|
|[sebuah sekolah m...|
|[ovack lleburgaz ...|
|[merupakan sebuah...|
+--------------------+
only showing top 20 rows



In [7]:
n_gram.printSchema()

root
 |-- n-grams: array (nullable = true)
 |    |-- element: string (containsNull = true)



In [8]:
from pyspark.sql.functions import explode
ngrams = n_gram.select(explode(col('n-grams'))).withColumnRenamed("col","ngrams")

In [9]:
ngrams.show()

+--------------------+
|              ngrams|
+--------------------+
|medan selera pula...|
|selera pula terle...|
|pula terletak di ...|
|terletak di tingk...|
|merupakan sebuah ...|
|sebuah gampong ya...|
|gampong yang terl...|
|yang terletak di ...|
|terletak di kecam...|
|terletak di kecam...|
|di kecamatan bamb...|
|kecamatan bambel ...|
|bambel kabupaten ...|
|kabupaten aceh te...|
|empangan bakun ju...|
|bakun juga terlet...|
|juga terletak di ...|
|lapangan terbang ...|
|terbang maputo te...|
|maputo terletak p...|
+--------------------+
only showing top 20 rows



In [10]:
def f(x): return x   
def exchangePosition(x, y):
    return y, x

In [11]:
ngrams2 = ngrams.rdd.map(lambda x: (1,x))\
.flatMapValues(f)\
.map(lambda x: exchangePosition(x[0], x[1]))\
.reduceByKey(lambda a,b: a+b)\
.persist(StorageLevel.MEMORY_ONLY)

In [12]:
ngrams2.count()

1588337

In [13]:
column = ['ngram', 'gram_count']
ngrams2 = ngrams2.toDF(column)

22/10/07 23:52:19 WARN BlockManager: Task 240 already completed, not releasing lock for rdd_42_0


In [14]:
ngrams2.show()

+--------------------+----------+
|               ngram|gram_count|
+--------------------+----------+
|april dengan harg...|       111|
|lleburgaz merupak...|         2|
|kocapnar elmal me...|         1|
|padarincang kabup...|        13|
|di kawasan hauten...|        55|
|kabupaten lima pu...|        28|
|terletak di kecam...|        18|
|malaysia yang men...|         1|
|terbesar terletak...|         1|
|negara negeri dae...|        15|
|oleh pembesarpemb...|         1|
|census town dalam...|        28|
|daerah derbent mi...|         1|
|bandar yang terle...|        87|
|vercelli di piedm...|        87|
|daerah alt empord...|        70|
|universiti malays...|         1|
|terletak di wilay...|        70|
|bank islam malays...|         1|
|terletak di sepan...|      2150|
+--------------------+----------+
only showing top 20 rows



In [15]:
ngram2 = ngrams2.drop('gram_count')

In [16]:
ngram2.show()

+--------------------+
|               ngram|
+--------------------+
|april dengan harg...|
|lleburgaz merupak...|
|kocapnar elmal me...|
|padarincang kabup...|
|di kawasan hauten...|
|kabupaten lima pu...|
|terletak di kecam...|
|malaysia yang men...|
|terbesar terletak...|
|negara negeri dae...|
|oleh pembesarpemb...|
|census town dalam...|
|daerah derbent mi...|
|bandar yang terle...|
|vercelli di piedm...|
|daerah alt empord...|
|universiti malays...|
|terletak di wilay...|
|bank islam malays...|
|terletak di sepan...|
+--------------------+
only showing top 20 rows



In [17]:
import pyspark.sql.functions as f
ngram2 = ngram2.withColumn('word_count', f.size(f.split(f.col('ngram'), ' '))).persist(StorageLevel.MEMORY_ONLY)
ngram2.count()

1588337

In [18]:
ngram2.show(10, False)

+--------------------------------------------+----------+
|ngram                                       |word_count|
+--------------------------------------------+----------+
|april dengan harga malaysia barat           |5         |
|lleburgaz merupakan sebuah kawasan yang     |5         |
|kocapnar elmal merupakan sebuah kawasan     |5         |
|padarincang kabupaten serang provinsi banten|5         |
|di kawasan hautenormandie di utara          |5         |
|kabupaten lima puluh kota sumatera          |5         |
|terletak di kecamatan bukit tusam           |5         |
|malaysia yang menguruskan dan mengendalikan |5         |
|terbesar terletak di jalan kapten           |5         |
|negara negeri daerah daerah jelebu          |5         |
+--------------------------------------------+----------+
only showing top 10 rows



In [19]:
keywords =  ['terletak', 'kawasan', 'malaysia',' tentera', 'daerah',  'kabupaten']

def keyword_position(text, n_gram):

    keyword = []
    index_of_keyword =[]
    
    tempList = list(text.split(" "))
    
    for x in keywords:
        i = 0
        for y in tempList:
            if x == y:
                keyword.append(x)
                index_of_keyword.append(i)
            i = i + 1
    
    if keyword == []:
        return
    
    if n_gram == 3 or n_gram == 2:
        return text
    
    if n_gram == 5:
        if 2 in index_of_keyword:
            return text
    
    if n_gram == 4:
        if 1 in index_of_keyword or 2 in index_of_keyword:
            return text

    return

In [20]:
from pyspark.sql import Row

row = Row("ngram")
ngram3 = ngram2.rdd.map(lambda x: (keyword_position(x[0], x[1]))).map(row).toDF().dropna(how='any').persist(StorageLevel.MEMORY_ONLY)

In [21]:
ngram3.count()

671809

In [22]:
import malaya
import logging

def ms_pos_tag(text):
    # logging.basicConfig(level=logging.CRITICAL)
    results = malaya.pos.transformer('alxlnet', True).predict(text)
    
    empList = []

    for x, y in results:
        empList.append(y)
    return empList

  from .autonotebook import tqdm as notebook_tqdm
  self.tok = re.compile(r'({})'.format('|'.join(pipeline)))
  self.tok = re.compile(r'({})'.format('|'.join(pipeline)))


In [23]:
sampling_ngram = ngram3.limit(10000)

In [24]:
sampling_ngram.show()

+--------------------+
|               ngram|
+--------------------+
|negara negeri dae...|
|oleh pembesarpemb...|
|bandar yang terle...|
|bank islam malays...|
|artikular yang te...|
|bermaksud pegawai...|
|united yang terle...|
|berhampiran denga...|
|dari nama kawasan...|
|terletak dalam da...|
|ditubuhkan di mal...|
|terletak di daera...|
|bertindak sebagai...|
|terletak di daera...|
|kini pemerintah k...|
|islam seperti mal...|
|pejabat polis dae...|
|kampung sekati te...|
|perbandaran dalam...|
|terletak di daera...|
+--------------------+
only showing top 20 rows



In [25]:
sampling_ngram.rdd.getNumPartitions()

1

In [26]:
sampling_ngram = sampling_ngram.repartition(48)

In [27]:
sampling_ngram.count()

10000

In [28]:
pos_tag = sampling_ngram.rdd.map(lambda x: (x[0], ms_pos_tag(x[0]))).persist(StorageLevel.MEMORY_ONLY)

In [29]:
pos_tag.count()

  self.tok = re.compile(r'({})'.format('|'.join(pipeline)))
  self.tok = re.compile(r'({})'.format('|'.join(pipeline)))
  self.tok = re.compile(r'({})'.format('|'.join(pipeline)))
  self.tok = re.compile(r'({})'.format('|'.join(pipeline)))
  self.tok = re.compile(r'({})'.format('|'.join(pipeline)))
  self.tok = re.compile(r'({})'.format('|'.join(pipeline)))
  self.tok = re.compile(r'({})'.format('|'.join(pipeline)))
  self.tok = re.compile(r'({})'.format('|'.join(pipeline)))
  self.tok = re.compile(r'({})'.format('|'.join(pipeline)))
  self.tok = re.compile(r'({})'.format('|'.join(pipeline)))
  self.tok = re.compile(r'({})'.format('|'.join(pipeline)))
  self.tok = re.compile(r'({})'.format('|'.join(pipeline)))
  self.tok = re.compile(r'({})'.format('|'.join(pipeline)))
  self.tok = re.compile(r'({})'.format('|'.join(pipeline)))
  self.tok = re.compile(r'({})'.format('|'.join(pipeline)))
  self.tok = re.compile(r'({})'.format('|'.join(pipeline)))
  self.tok = re.compile(r'({})'.format('

10000

In [30]:
columns = ['sentence', 'pos_tag']

df_pos_tag = pos_tag.toDF(columns)

In [31]:
df_pos_tag.show()

+--------------------+--------------------+
|            sentence|             pos_tag|
+--------------------+--------------------+
|yang terletak di ...|[PRON, VERB, ADP,...|
|      seven terletak|          [PROPN, X]|
|fpso kikeh terlet...|[PROPN, PROPN, VE...|
|di sinema malaysi...|[ADP, PROPN, PROP...|
|   filmstdi terletak|        [NOUN, VERB]|
|di wilayah kabupa...|[ADP, NOUN, NOUN,...|
|       sncf terletak|        [NOUN, VERB]|
|tabung harapan ma...|[NOUN, NOUN, PROP...|
|teguh mingguan ma...|[NOUN, NOUN, PROP...|
|ditubuhkan dan ka...| [VERB, CCONJ, NOUN]|
|iaitu di kawasan ...|[CCONJ, ADP, NOUN...|
|dan kawasan yang ...|[CCONJ, NOUN, PRO...|
|diraja di kawasan...|[PROPN, ADP, NOUN...|
|kl monorail daera...|[PROPN, PROPN, NO...|
|melibatkan malays...| [VERB, PROPN, NOUN]|
|dari kawasan yang...|[ADP, NOUN, PRON,...|
|penggabungan ini ...|[NOUN, DET, NOUN,...|
|clever girl malay...|[PROPN, PROPN, PR...|
|mcorp dalam daera...|[PROPN, ADP, NOUN...|
|oleh kawasan pana...|[CCONJ, NO

In [32]:
import pyspark.sql.functions as f
df_final = df_pos_tag.withColumn('n-gram', f.size(f.split(f.col('sentence'), ' ')))

In [33]:
df_final.show()

+--------------------+--------------------+------+
|            sentence|             pos_tag|n-gram|
+--------------------+--------------------+------+
|yang terletak di ...|[PRON, VERB, ADP,...|     4|
|      seven terletak|          [PROPN, X]|     2|
|fpso kikeh terlet...|[PROPN, PROPN, VE...|     5|
|di sinema malaysi...|[ADP, PROPN, PROP...|     4|
|   filmstdi terletak|        [NOUN, VERB]|     2|
|di wilayah kabupa...|[ADP, NOUN, NOUN,...|     4|
|       sncf terletak|        [NOUN, VERB]|     2|
|tabung harapan ma...|[NOUN, NOUN, PROP...|     5|
|teguh mingguan ma...|[NOUN, NOUN, PROP...|     5|
|ditubuhkan dan ka...| [VERB, CCONJ, NOUN]|     3|
|iaitu di kawasan ...|[CCONJ, ADP, NOUN...|     5|
|dan kawasan yang ...|[CCONJ, NOUN, PRO...|     4|
|diraja di kawasan...|[PROPN, ADP, NOUN...|     5|
|kl monorail daera...|[PROPN, PROPN, NO...|     4|
|melibatkan malays...| [VERB, PROPN, NOUN]|     3|
|dari kawasan yang...|[ADP, NOUN, PRON,...|     4|
|penggabungan ini ...|[NOUN, DE

In [34]:
from pyspark.sql.functions import col, concat_ws
df_final = df_final.withColumn('pos_tag', concat_ws(', ', col('pos_tag')))

In [35]:
df_final.show()

+--------------------+--------------------+------+
|            sentence|             pos_tag|n-gram|
+--------------------+--------------------+------+
|yang terletak di ...|PRON, VERB, ADP, ...|     4|
|      seven terletak|            PROPN, X|     2|
|fpso kikeh terlet...|PROPN, PROPN, VER...|     5|
|di sinema malaysi...|ADP, PROPN, PROPN...|     4|
|   filmstdi terletak|          NOUN, VERB|     2|
|di wilayah kabupa...|ADP, NOUN, NOUN, NUM|     4|
|       sncf terletak|          NOUN, VERB|     2|
|tabung harapan ma...|NOUN, NOUN, PROPN...|     5|
|teguh mingguan ma...|NOUN, NOUN, PROPN...|     5|
|ditubuhkan dan ka...|   VERB, CCONJ, NOUN|     3|
|iaitu di kawasan ...|CCONJ, ADP, NOUN,...|     5|
|dan kawasan yang ...|CCONJ, NOUN, PRON...|     4|
|diraja di kawasan...|PROPN, ADP, NOUN,...|     5|
|kl monorail daera...|PROPN, PROPN, NOU...|     4|
|melibatkan malays...|   VERB, PROPN, NOUN|     3|
|dari kawasan yang...|ADP, NOUN, PRON, ...|     4|
|penggabungan ini ...|NOUN, DET

In [36]:
df_final = df_final.coalesce(1)

df_final.write.csv('ms_wiki_final',header = 'true')

In [37]:
df_final.write.parquet('hdfs://g5.bigtop.it:8020/user/root/ms_wiki_final')