In [34]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('CorpusLoader').master('local[4]').config('spark.driver.memory', '8g').getOrCreate()

In [35]:
import os
from pyspark.sql.functions import split, element_at, explode, map_values, array_min, broadcast, map_from_entries, arrays_zip, array_contains, monotonically_increasing_id, array_distinct, transform, arrays_zip, size, slice, collect_list, first, map_from_arrays
from pyspark.sql.types import LongType, ArrayType, IntegerType, MapType

class CorpusLoader:

    def __init__(self, root_path, spark):
        self.__root_path = root_path
        self.__spark = spark

    def load(self):
        self.__array_df = self.__load_or_create_parquet('array.parquet', self.__create_array_df)
        self.__token_df = self.__load_or_create_parquet('token.parquet', self.__create_token_df)
        self.__contains_df = self.__load_or_create_parquet('contains.parquet', self.__create_contains_df)
        self.__data_df = self.__load_or_create_parquet('data.parquet', self.__create_data_df)

    def __load_or_create_parquet(self, name, create_function):
        parquet_path = os.path.join(os.path.join(self.__root_path, 'parquets'), name)
        
        if not os.path.exists(parquet_path):
            print(f'File "{name}" not found. \n\t -- Creating "{name}" ...')
            
            df = create_function()
            df.write.parquet(parquet_path)

            print('\t -- Done.')

        print(f'Loading "{name}" ...')
        return self.__spark.read.parquet(parquet_path)

    def __create_token_df(self):
        one_gram_path = os.path.join(self.__root_path, '1')

        one_gram_df = spark.read.csv(one_gram_path, sep='\n', quote="").withColumnRenamed('_c0', 'Input')
        token_df = one_gram_df \
                .select(split('Input', '\t').alias('SplitInput')) \
                .select(element_at('SplitInput', 1).alias('Tokens')) \
                .select(explode(split('Tokens', ' ')).alias('Token')) \
                .orderBy('Token') \
                .withColumn('TokenId', monotonically_increasing_id()) 
        
        return token_df

    def __create_array_df(self):
        n_gram_directories = [os.path.join(self.__root_path, x) for x in os.listdir(self.__root_path) if x.isdigit()]
        
        input_df = None

        for path in n_gram_directories:
            new_input_df = spark.read.csv(path, sep='\n', quote="").withColumnRenamed('_c0', 'Input')
            
            if input_df is None:
                input_df = new_input_df
            else:
                input_df = input_df.union(new_input_df)

        split_df = input_df \
                    .select(split('Input', '\t').alias('SplitInput')) \
                    .select(element_at('SplitInput', 1).alias('Tokens'),
                            slice('SplitInput', 2, size('SplitInput')).alias('Data')) \
                    .select(split('Tokens', ' ').alias('Tokens'), 'Data')

        array_df = split_df.select('Tokens', transform('Data', lambda d: split(d, ',')).alias('Data')) \
                    .select('Tokens', transform('Data', lambda x: x[0].cast(IntegerType())).alias('Years'),
                            transform('Data', lambda x: x[1].cast(LongType())).alias('Frequency'),
                            transform('Data', lambda x: x[2].cast(LongType())).alias('BookFrequency')) \
                    .withColumn('NgramId', monotonically_increasing_id())

        return array_df

    def __create_contains_df(self):
        n_gram_df = self.__array_df

        n_gram_to_token_id_df = n_gram_df.select('NgramId', 'Tokens') \
                .select(explode('Tokens').alias('Token'), 'NgramId') \
                .join(self.__token_df, on='Token') \
                .groupBy('NgramId').agg(collect_list('TokenId').alias('TokenIds'))
        print(n_gram_to_token_id_df.count())

        contains_df = n_gram_to_token_id_df.select('NgramId', 'TokenIds') \
            .withColumn('IndexArray', transform('TokenIds', lambda x, i: i)) \
            .select('NgramId', arrays_zip('IndexArray', 'TokenIds').alias('TokenIds')) \
            .select('NgramId', explode('TokenIds').alias('TokenId')) \
            .select('NgramId', 'TokenId.IndexArray', 'TokenId.TokenIds') \
            .withColumnsRenamed({'IndexArray': 'Position', 'TokenIds': 'TokenId'}) \
            .orderBy('NgramId')
        print(contains_df.count())

        return contains_df

    ## This horrific arrays to list of structs to map construct is required, because map_from_arrays zeroes everything out.
    def __create_data_df(self):
        data_df = self.__array_df.select('NgramId', 'Years', 'Frequency', 'BookFrequency')
        data_df = data_df.withColumn('FrequencyStructs', arrays_zip('Years', 'Frequency'))
        data_df = data_df.withColumn('BookFrequencyStructs', arrays_zip('Years', 'BookFrequency'))
        data_df = data_df.withColumn('FrequencyMap', map_from_entries('FrequencyStructs'))
        data_df = data_df.withColumn('BookFrequencyMap', map_from_entries('BookFrequencyStructs'))
        data_df = data_df.select('NgramId', 'FrequencyMap', 'BookFrequencyMap')

        data_df.printSchema()
        
        return data_df.withColumnsRenamed({'FrequencyMap': 'Frequency', 'BookFrequencyMap': 'BookFrequency'})

In [36]:
cl = CorpusLoader('C:/Users/bincl/BA-Thesis/Dataset/parquets_corpus/', spark)

cl.load()

Loading "array.parquet" ...
Loading "token.parquet" ...
Loading "contains.parquet" ...
Loading "data.parquet" ...


In [37]:
token_array_df = cl._CorpusLoader__contains_df.orderBy('NgramId', 'Position').groupBy('NgramId').agg(collect_list('TokenId').alias('Tokens'))
df = token_array_df.withColumnRenamed('Tokens', 'Ngram').withColumn('Length', size('Ngram')).where('Length > 1')
#df.cache()

df = df.withColumn('LeftChildTokenIds', slice(df.Ngram, 1, df.Length - 1))
df = df.withColumn('RightChildTokenIds', slice(df.Ngram, 2,df.Length - 1))

result = df.join(token_array_df.withColumnRenamed('NgramId', 'LeftChildNgramId'), on=df.LeftChildTokenIds == token_array_df.Tokens).withColumnRenamed('Tokens', 'LCTokens')

table = result.join(token_array_df.withColumnRenamed('NgramId', 'RightChildNgramId'), on=df.RightChildTokenIds == token_array_df.Tokens).withColumnRenamed('Tokens', 'RCTokens').cache()

table.show()

+-------------+----------+------+-----------------+------------------+----------------+--------+-----------------+--------+
|      NgramId|     Ngram|Length|LeftChildTokenIds|RightChildTokenIds|LeftChildNgramId|LCTokens|RightChildNgramId|RCTokens|
+-------------+----------+------+-----------------+------------------+----------------+--------+-----------------+--------+
|1932735304174|  [26, 26]|     2|             [26]|              [26]|            3879|    [26]|             3879|    [26]|
|1924145471799|  [29, 26]|     2|             [29]|              [26]|            3705|    [29]|             3879|    [26]|
| 352187426737| [474, 26]|     2|            [474]|              [26]|            3785|   [474]|             3879|    [26]|
| 214748505745| [964, 26]|     2|            [964]|              [26]|            3860|   [964]|             3879|    [26]|
| 128849131373|[1677, 26]|     2|           [1677]|              [26]|            3688|  [1677]|             3879|    [26]|
| 120259

In [38]:
result = table.select("NgramId","LeftChildNgramId","RightChildNgramId")

result.show()

+-------------+----------------+-----------------+
|      NgramId|LeftChildNgramId|RightChildNgramId|
+-------------+----------------+-----------------+
|1932735304174|            3879|             3879|
|1924145471799|            3705|             3879|
| 352187426737|            3785|             3879|
| 214748505745|            3860|             3879|
| 128849131373|            3688|             3879|
| 120259226401|            3703|             3879|
|1365799641803|            3741|             3879|
|1425929268117|             464|             3879|
| 506806310159|             197|             3879|
|1090921827076|            1696|             3879|
|1108101572635|            1125|             3879|
|1400159351595|            1677|             3879|
|1400159406945|             685|             3879|
|1408749285871|            1765|             3879|
|1949915339020|            1452|             3879|
|1005022493869|             266|             3879|
| 326417541462|            1500

In [39]:
cl._CorpusLoader__data_df.printSchema()

root
 |-- NgramId: long (nullable = true)
 |-- Frequency: map (nullable = true)
 |    |-- key: integer
 |    |-- value: long (valueContainsNull = true)
 |-- BookFrequency: map (nullable = true)
 |    |-- key: integer
 |    |-- value: long (valueContainsNull = true)



In [40]:
result = result

In [41]:
from pyspark.sql.functions import col
#1000 limit is 3m 30
result = result.join(cl._CorpusLoader__data_df, on=("NgramId")).withColumnRenamed("Frequency","Frequency_N")
result = result.join(cl._CorpusLoader__data_df.alias("dataL"),(col("LeftChildNgramId") == col("dataL.NgramId"))).withColumnRenamed("Frequency","Frequency_L")
ngram_table = result.join(cl._CorpusLoader__data_df.alias("dataR"),(col("LeftChildNgramId") == col("dataR.NgramId"))).withColumnRenamed("Frequency","Frequency_R")
ngram_table= ngram_table.select("Frequency_N","Frequency_L","Frequency_R")
ngram_table.show()

+--------------------+--------------------+--------------------+
|         Frequency_N|         Frequency_L|         Frequency_R|
+--------------------+--------------------+--------------------+
|{1845 -> 1, 1964 ...|{1522 -> 3, 1533 ...|{1522 -> 3, 1533 ...|
|{1732 -> 1, 1780 ...|{1522 -> 3, 1533 ...|{1522 -> 3, 1533 ...|
|{1890 -> 1, 1894 ...|{1522 -> 3, 1533 ...|{1522 -> 3, 1533 ...|
|{1857 -> 2, 1858 ...|{1522 -> 3, 1533 ...|{1522 -> 3, 1533 ...|
|{1818 -> 1, 1863 ...|{1522 -> 3, 1533 ...|{1522 -> 3, 1533 ...|
|{1860 -> 2, 1862 ...|{1522 -> 3, 1533 ...|{1522 -> 3, 1533 ...|
|{1904 -> 3, 1905 ...|{1522 -> 3, 1533 ...|{1522 -> 3, 1533 ...|
|{1857 -> 2, 1862 ...|{1522 -> 3, 1533 ...|{1522 -> 3, 1533 ...|
|{1812 -> 7, 1818 ...|{1522 -> 3, 1533 ...|{1522 -> 3, 1533 ...|
|{1851 -> 1, 1852 ...|{1522 -> 3, 1533 ...|{1522 -> 3, 1533 ...|
|{1702 -> 1, 1720 ...|{1522 -> 3, 1533 ...|{1522 -> 3, 1533 ...|
|{1765 -> 1, 1831 ...|{1522 -> 3, 1533 ...|{1522 -> 3, 1533 ...|
|{1836 -> 1, 1837 ...|{15