In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.ml.feature import *

import pandas as pd

from collections import Counter
from itertools import islice
from os import chdir
import re
from urllib.parse import urlsplit, urlunsplit

In [None]:
# The following configuration works well on machines with 256 cores and 1TB memory.
# It configures Spark in local mode and uses all the available resources.
# To run it on a machine with less memory available, please reduce spark.executor.memory & spark.driver.memory,
# and increase spark.default.parallelism & spark.sql.shuffle.partitions to reduce the memory demand.

spark = SparkSession \
    .builder \
    .config("spark.executor.memory", "1000g") \
    .config("spark.driver.memory", "1000g") \
    .config("spark.local.dir", "/mnt/vol1/tmp") \
    .getOrCreate()
sc = spark.sparkContext

In [None]:
# Spark UI url

sparkUrlParts = list(urlsplit(sc.uiWebUrl))
sparkUrlParts[1] = re.sub('^[^:]*', 'localhost', sparkUrlParts[1])
sparkUrl = urlunsplit(sparkUrlParts)

print(sparkUrl, sc.defaultParallelism)

http://localhost:4040 256


In [None]:
# Read all the corpus csv.

df = spark.read.format("csv") \
    .option("header", "true") \
    .load("../corpus.csv")

df.rdd.getNumPartitions()

15

In [None]:
# Data validation
# print("Missing title", df.filter(df.title.isNull()).count(), "Missing text", df.filter(df.text.isNull()).count())

# Keep minimal data in memory.
df = df.select(df.text)

In [None]:
# Tokenize the corpus

regexTokenizer = RegexTokenizer(inputCol="text", outputCol="char", pattern=".", gaps=False, minTokenLength=1, toLowercase=False)
char_df = regexTokenizer.transform(df).select("char")

char_df.cache()
# char_df.show()

DataFrame[char: array<string>]

In [11]:
def gen_ngram(n):
    ngram_gen = NGram(n=n, inputCol="char", outputCol="ngrams_list")
    ngram_df = ngram_gen.transform(char_df).select(explode('ngrams_list').alias('ngrams'))
    ngram_df = ngram_df.groupBy('ngrams').count()
    
    return ngram_df

In [15]:
# Generate ngram with different lengths

ngram_result_df = None
ngram_max_len = 2

for n in range(1, ngram_max_len + 1):
    print("Generating " + str(n) + "-ngram...")
    ngram_n_df = gen_ngram(n)
    if ngram_result_df == None:
        ngram_result_df = ngram_n_df
    else:
        ngram_result_df = ngram_result_df.unionByName(ngram_n_df)
        
ngram_result_df = ngram_result_df.orderBy(col("count").desc())
ngram_result_df.cache()

Generating 1-ngram...
Generating 2-ngram...


DataFrame[ngrams: string, count: bigint]

In [16]:
# Start Spark calcuation
ngram_result_df.count()

1077646

In [19]:
# Collect the result into Pythonland for processing.
ngram_result_rows = ngram_result_df.collect()

In [46]:
# Convert rows into a Counter dict.

ngram_result_dict = Counter()

for row in ngram_result_rows:
    ngrams_str = row.ngrams[::2] # NGram inserts space between chars. Remove them.
    has_ascii = any(ord(c) < 256 for c in ngrams_str)
    if ' ' in ngrams_str or has_ascii: continue # Filter out any strings containing space
    ngram_result_dict[ngrams_str] = row['count']
    # Calculate total count of single char.
    if len(ngrams_str) == 1:
        ngram_result_dict[ngrams_str[:-1]] += row['count']

In [48]:
# Calculate freqency from count

ngram_prob_dict = dict()

for ngram_str in ngram_result_dict:
    if ngram_str == '': continue
    parent_freq = ngram_result_dict[ngram_str[:-1]]
    if parent_freq == 0: continue
    ngram_prob_dict[ngram_str] = ngram_result_dict[ngram_str] / parent_freq

In [50]:
# Dump the result into a csv file.

output_path = "wiki_bigram.csv"
with open(output_path, "w") as f:
    f.write("ngram,freq\n")
    for k in ngram_prob_dict:
        f.write(k + "," + str(ngram_prob_dict[k]) + "\n")

In [55]:
for ngram_str in ngram_result_dict:
    if ngram_str.startswith('香'):
        print(ngram_str, ngram_prob_dict[ngram_str])

香 0.003079112950439033
香港 0.9359114900527812
香山 0.003207470564352416
香料 0.00292326431181486
香， 0.002679658952496955
香蕉 0.002050345107592367
香雪 0.002050345107592367
香味 0.0017661388550548112
香江 0.0016240357287860333
香草 0.0015022330491270808
香（ 0.0014210312626877792
香、 0.001319529029638652
香。 0.0011977263499796996
香嘅 0.0011977263499796996
香淳 0.0010556232237109216
香水 0.0010556232237109216
香洲 0.0010556232237109216
香園 0.00097442143727162
香》 0.0009338205440519692
香蘭 0.0009338205440519692
香口 0.0008729192042224929
香凝 0.0008120178643930166
香火 0.0007511165245635404
香科 0.0006699147381242388
香島 0.0006293138449045879
香粉 0.000588712951684937
香鯨 0.0005075111652456354
香爐 0.0005075111652456354
香」 0.0004669102720259846
香川 0.0004669102720259846
香同 0.0004669102720259846
香樓 0.0004669102720259846
香檳 0.0004669102720259846
香琴 0.00044660982541615913
香氣 0.00044660982541615913
香煙 0.00044660982541615913
香海 0.0004263093788063337
香油 0.0004060089321965083
香腸 0.0003654080389768575
香木 0.0003654080389768575
香菜 0.0003654