> Place for problem statement

> Recap of readme.md

> Code
>> Data preparation/processing 

>> Forecast

> Results

> Demo

In [1]:
import findspark
findspark.init()
findspark.find()
import pyspark
findspark.find() # solely to check whether path is correct

'/usr/local/spark/spark-2.4.3-bin-without-hadoop'

In [2]:
from pyspark.sql import SparkSession
from pyspark import SQLContext

import pyspark.sql.functions as F
import pyspark.sql.types as T

from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.ml.feature import BucketedRandomProjectionLSH

In [3]:
import os
import fasttext
import numpy as np
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq

In [4]:
dirpath = os.getcwd()
model_bin_path = f'{dirpath}/fasttext/crawl-300d-2M-subword.bin'
emb_length = 300

In [5]:
spark = SparkSession.builder.\
    master('yarn').\
    appName('scholar').\
    getOrCreate()

sc = spark.sparkContext
sqlContext = SQLContext(sc)

# enables cartesian (!) join
spark.conf.set("spark.sql.crossJoin.enabled", "true")

In [6]:
def pivot_keyterms(df):
    # year - single integer, entities - list of strings (keyterms)
    # keep only papers for periods (years) that fully passed
    # make separate row for each keyterm
    df = df.select('year', 'entities').\
        filter("year < 2019").\
        withColumn('entities', F.explode('entities'))
    
    # turn all keyterms to lowercase
    df = df.withColumn('entities', F.lower(F.col('entities')))
    
    # cut number formatting: 10,000 -> 10000
    df = df.withColumn('entities', F.regexp_replace(F.col('entities'), '(\d)[, ](\d{3})', '$1$2'))
    
    # treat comma separated values as a different keyterms (even though they were posed as a unit)
    df = df.withColumn('entities', F.explode(F.split('entities', '[,]')))
    
    # substitute underscore with a space as a means of separation
    # get rid of excessive spaces
    df = df.withColumn('entities', F.regexp_replace(F.col('entities'), '_', ' '))
    df = df.withColumn('entities', F.trim(F.col('entities')))
    df = df.withColumn('entities', F.regexp_replace(F.col('entities'), '\s+', ' '))

    # keep only keyterms that contain alpha-numeric values and spaces
    # retrieve counts for each keyterm for each year
    # \w also includes uderscore, but we handled it earlier
    df = df.filter(~(F.col('entities').rlike('[^\w\s]'))).\
        groupby('entities').\
        pivot('year').\
        count().\
        sort('entities')
    
    return df

In [7]:
def get_tokens_frequency(df):
    # keep only tokens starting from 3 characters in length
    df = df.filter('LENGTH(entities) > 2')

    # gather column names linked to years
    col_years = [col_name for col_name in df.columns]
    col_years.remove('entities')

    # Find peak usage of token across the years
    # https://stackoverflow.com/questions/40874657/pyspark-compute-row-maximum-of-the-subset-of-columns-and-add-to-an-exisiting-da
    minf = F.lit(float("-inf"))
    df = df.withColumn("year_max", F.greatest(*[F.coalesce(F.col(year), minf) for year in col_years]))

    # forget about tokens that have never been really used
    df = df.filter("year_max > 10").drop('year_max')

    # find total number of "valid" tokens used on each year
    df = df.join(df.groupby().sum(*col_years))

    # retrieve token frequency (times common coefficient) for each year
    # coefficient is to make sure we do not limitations of float precision too hard
    for year in col_years:
        df = df.withColumn(year, 100000.0*F.col(year) / F.col(f'sum({year})')).drop(f'sum({year})')
    
    return df 

In [8]:
spark.stop()

In [9]:
spark = SparkSession.builder.\
    master('local[2]').\
    appName('scholar').\
    getOrCreate()

sc = spark.sparkContext
sqlContext = SQLContext(sc)

In [10]:
def get_embeddings(pd_df):
    # retrieve embeding for each keyterm
    model = fasttext.load_model(model_bin_path)
    pd_df['embeddings'] = pd_df['entities'].apply(model.__getitem__)
    del model

    # append embeddings as another column to the data frame 
    emb_components = pd.DataFrame(pd_df['embeddings'].tolist(), columns=[f'v{i}' for i in range(emb_length)])
    pd_df['embeddings'] = pd_df['embeddings'].apply(lambda x: list(x))
    pd_df = pd.concat([pd_df[:], emb_components[:]], axis=1) 
    return pd_df

In [11]:
def make_normed_vector(x):
    # make L2-norm of the vector to be unit
    x_np = np.array(x, dtype=np.float64)
    x_np = x_np / np.linalg.norm(x_np)
    return Vectors.dense(x_np)

In [12]:
def fit_lsh_model(df):
    df = df.select('entities', 'embeddings')

    to_vector = F.udf(lambda x: Vectors.dense(x), VectorUDT())
    to_normed_vector = F.udf(make_normed_vector, VectorUDT())

    df = df.withColumn('normed_embeddings', to_normed_vector('embeddings'))
    df = df.withColumn('embeddings', to_vector('embeddings'))

    # even though method is designed for Euclidean distances, we can use it for cosine distances, as done here
    # to do so, we normalize input vectors first, then Euclidean distance is nothing else, but sqrt(2)*sqrt(cosine_distance), and sqrt is a monotone transformation
    brpLSH = BucketedRandomProjectionLSH(inputCol="normed_embeddings", outputCol="hashes", seed=42, bucketLength=12.0, numHashTables=20)
    brpLSHmodel = brpLSH.fit(df)

    return brpLSHmodel, df

In [13]:
spark.stop()