In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.ml.feature import *

import pandas as pd

from collections import Counter
from itertools import islice
from os import chdir
import re
from urllib.parse import urlsplit, urlunsplit

In [2]:
# The following configuration works well on machines with 256 cores and 1TB memory.
# It configures Spark in local mode and uses all the available resources.
# To run it on a machine with less memory available, please reduce spark.executor.memory & spark.driver.memory,
# and increase spark.default.parallelism & spark.sql.shuffle.partitions to reduce the memory demand.

spark = SparkSession \
    .builder \
    .config("spark.executor.memory", "1000g") \
    .config("spark.driver.memory", "1000g") \
    .config("spark.local.dir", "/mnt/vol1/tmp") \
    .getOrCreate()
sc = spark.sparkContext

In [3]:
# Spark UI url

sparkUrlParts = list(urlsplit(sc.uiWebUrl))
sparkUrlParts[1] = re.sub('^[^:]*', 'localhost', sparkUrlParts[1])
sparkUrl = urlunsplit(sparkUrlParts)

print(sparkUrl, sc.defaultParallelism)

http://localhost:4041 256


In [5]:
# Read all the corpus csv.

df = spark.read.format("csv") \
    .option("header", "true") \
    .load("../corpus")

df.rdd.getNumPartitions()

253

In [6]:
# Data validation
# print("Missing title", df.filter(df.title.isNull()).count(), "Missing text", df.filter(df.text.isNull()).count())

# Keep minimal data in memory.
df = df.select(df.text)

In [7]:
# Tokenize the corpus

regexTokenizer = RegexTokenizer(inputCol="text", outputCol="char", pattern=".", gaps=False, minTokenLength=1, toLowercase=False)
char_df = regexTokenizer.transform(df).select("char")

char_df.cache()
# char_df.show()

DataFrame[char: array<string>]

In [8]:
def gen_ngram(n):
    ngram_gen = NGram(n=n, inputCol="char", outputCol="ngrams_list")
    ngram_df = ngram_gen.transform(char_df).select(explode('ngrams_list').alias('ngrams'))
    ngram_df = ngram_df.groupBy('ngrams').count()
    
    return ngram_df

In [9]:
# Generate ngram with different lengths

ngram_result_df = None
ngram_max_len = 2

for n in range(1, ngram_max_len + 1):
    print("Generating " + str(n) + "-ngram...")
    ngram_n_df = gen_ngram(n)
    if ngram_result_df == None:
        ngram_result_df = ngram_n_df
    else:
        ngram_result_df = ngram_result_df.unionByName(ngram_n_df)
        
ngram_result_df = ngram_result_df.orderBy(col("count").desc())
ngram_result_df.cache()

Generating 1-ngram...
Generating 2-ngram...


DataFrame[ngrams: string, count: bigint]

In [10]:
# Start Spark calcuation
ngram_result_df.count()

5544299

In [11]:
# Collect the result into Pythonland for processing.
ngram_result_rows = ngram_result_df.collect()

In [12]:
# Convert rows into a Counter dict.

ngram_result_dict = Counter()

for row in ngram_result_rows:
    ngrams_str = row.ngrams[::2] # NGram inserts space between chars. Remove them.
    has_ascii = any(ord(c) < 256 for c in ngrams_str)
    if ' ' in ngrams_str or has_ascii: continue # Filter out any strings containing space
    ngram_result_dict[ngrams_str] = row['count']
    # Calculate total count of single char.
    if len(ngrams_str) == 1:
        ngram_result_dict[ngrams_str[:-1]] += row['count']

In [13]:
# Calculate freqency from count

ngram_prob_dict = dict()

for ngram_str in ngram_result_dict:
    if ngram_str == '': continue
    parent_freq = ngram_result_dict[ngram_str[:-1]]
    if parent_freq == 0: continue
    ngram_prob_dict[ngram_str] = ngram_result_dict[ngram_str] / parent_freq

In [14]:
# Dump the result into a csv file.

output_path = "apple_bigram.csv"
with open(output_path, "w") as f:
    f.write("ngram,freq\n")
    for k in ngram_prob_dict:
        f.write(k + "," + str(ngram_prob_dict[k]) + "\n")