## 1. SetUp
Importing & Device Setup 

In [1]:
from multiprocessing import Pool
import sqlite3 as sql
import pandas as pd
import numpy as np
import logging
import time
import random
import re
import os
import sys
os.chdir("..")
os.getcwd()

'd:\\INFO323\\TokenizedToast'

## Loading In Datasets 

In [2]:
db = 'Data\enwiki-20170820.db'

In [3]:
def get_query(select, db=db):
    '''
    1. Connects to SQLite database (db)
    2. Executes select statement
    3. Return results and column names
    
    Input: 'select * from analytics limit 2'
    Output: ([(1, 2, 3)], ['col_1', 'col_2', 'col_3'])
    '''
    with sql.connect(db) as conn:
        c = conn.cursor()
        c.execute(select)
        col_names = [str(name[0]).lower() for name in c.description]
    return c.fetchall(), col_names

In [4]:
def tokenize(text, lower=True):
    '''
    1. Strips apostrophes
    2. Searches for all alpha tokens (exception for underscore)
    3. Return list of tokens

    Input: 'The 3 dogs jumped over Scott's tent!'
    Output: ['the', 'dogs', 'jumped', 'over', 'scotts', 'tent']
    '''
    text = re.sub("'", "", text)
    if lower:
        tokens = re.findall('''[a-z_]+''', text.lower())
    else:
        tokens = re.findall('''[A-Za-z_]''', text)
    return tokens

In [5]:
def get_article(article_id):
    '''
    1. Construct select statement
    2. Retrieve all section_texts associated with article_id
    3. Join section_texts into a single string (article_text)
    4. Tokenize article_text
    5. Return list of tokens
    
    Input: 100
    Output: ['the','austroasiatic','languages','in',...]
    '''
    select = "SELECT section_text FROM articles WHERE article_id = " + str(article_id)

    # # Execute the query with the article_id as a parameter
    # article = spark.sql(select, article_id).collect()
    docs, _ = get_query(select)
    
    docs = [doc[0] for doc in docs]
    doc = '\n'.join(docs)
    
    tokens = tokenize(doc)
    return ' '.join(tokens)

In [6]:
def get_bulk_articles(article_ids):
    corpus = []
    for article_id in article_ids:
        article = get_article(article_id)
        output = (article_id, article)
        corpus.append(output)
    return corpus

In [7]:
select = '''select distinct article_id from articles'''
article_ids, _ = get_query(select)
article_ids = [article_id[0] for article_id in article_ids]

In [None]:
num_articles = 500000
random_article_ids = random.sample(range(1, 4902648), num_articles)
corpus = get_bulk_articles(random_article_ids)


#### PreProcessing Text

## Spark Setup 

In [None]:
import findspark
findspark.init()

# Spark Packages
from pyspark.ml import Pipeline
from pyspark.ml.feature import Tokenizer, CountVectorizer, IDF
from pyspark.ml.clustering import LDA
from pyspark.ml.feature import CountVectorizer, IDF, Word2Vec
from pyspark.ml.clustering import LDA
from pyspark.ml.linalg import VectorUDT
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType
from pyspark.sql.functions import lower, regexp_replace, trim
from pyspark.sql.functions import split
from pyspark.ml.feature import StopWordsRemover

from sparknlp.base import DocumentAssembler
from sparknlp import annotator
from pyspark.ml import Pipeline


# Bert Tokenizer
# from transformers import BertTokenizer, BertModel # Hugging Face Package
# import torch # PyTorch
from pyspark.sql import SparkSession

In [None]:
spark = SparkSession.builder \
    .appName("TokenizedToast")\
    .config("spark.driver.memory","28G")\
    .getOrCreate()

In [None]:

stopwords = StopWordsRemover.loadDefaultStopWords("english")

In [None]:
spark_path = os.environ['SPARK_HOME']

In [None]:
sys.path.insert(0, spark_path + "/bin")
sys.path.insert(0, spark_path + "/python/pyspark/")
sys.path.insert(0, spark_path + "/python/lib/pyspark.zip")
sys.path.insert(0, spark_path + "/python/lib/py4j-0.10.7-src.zip")
os.environ['PYSPARK_DRIVER_PYTHON_OPTS']= "notebook"
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
os.environ['PYSPARK_PYTHON'] = sys.executable

## Tokenization & Vectorization Methods
1. TFIDF
2. Word2Vec
3. Doc2Vec
4. BERT Tokenization

#### Initializing Classes

In [None]:
k_val = 2

In [None]:
df = spark.createDataFrame(corpus, ['id', 'text'])
df = df.dropna()
main_df = df # How to create copy of dataframe

##### TFIDF 

In [None]:
tokenizer = Tokenizer(inputCol='text', outputCol='temp')
df = tokenizer.transform(main_df)
df = df.drop('text').withColumnRenamed('temp', 'text')

stopwords_remover = StopWordsRemover(inputCol='text', outputCol='temp', stopWords=stopwords)
df = stopwords_remover.transform(df)
df = df.drop('text').withColumnRenamed('temp', 'text')

count_vectorizer = CountVectorizer(inputCol='text', outputCol='temp')
model_cv = count_vectorizer.fit(df)
df = model_cv.transform(df)
df = df.drop('text').withColumnRenamed('temp', 'text')

idf = IDF(inputCol='text', outputCol='temp')
model_idf = idf.fit(df)
df = model_idf.transform(df)
df = df.drop('text').withColumnRenamed('temp', 'text')

lda = LDA(k=k_val, maxIter=5, featuresCol='text')

In [None]:
model_tfidf = lda.fit(df)

##### word2Vec

In [None]:
from pyspark.sql.functions import col
from pyspark.ml.linalg import Vectors

tokenizer = Tokenizer(inputCol='text', outputCol='temp')
df = tokenizer.transform(main_df)
df = df.drop('text').withColumnRenamed('temp', 'text')

stopwords_remover = StopWordsRemover(inputCol='text', outputCol='temp', stopWords=stopwords)
df = stopwords_remover.transform(df)
df = df.drop('text').withColumnRenamed('temp', 'text')

word2vec = Word2Vec(vectorSize=100, windowSize=5, minCount=2, inputCol='text', outputCol='temp')
model_w2v = word2vec.fit(df)
df = model_w2v.transform(df)
df = df.drop('text').withColumnRenamed('temp', 'text')

lda = LDA(k=k_val, maxIter=5, featuresCol='text')

In [None]:
model_word = lda.fit(df)

##### doc2Vec

In [None]:
tokenizer = Tokenizer(inputCol='text', outputCol='temp')
df = tokenizer.transform(main_df)
df = df.drop('text').withColumnRenamed('temp', 'text')

stopwords_remover = StopWordsRemover(inputCol='text', outputCol='temp', stopWords=stopwords)
df = stopwords_remover.transform(df)
df = df.drop('text').withColumnRenamed('temp', 'text')

doc2vec = Word2Vec(vectorSize=100, windowSize=5, minCount=2, inputCol='text', outputCol='temp')
model_w2v = doc2vec.fit(df)
df = model_w2v.transform(df)
average_vector_udf = udf(lambda vectors: Vectors.dense(np.mean(vectors, axis=0)), VectorUDT())
df = df.withColumn('documentVector', average_vector_udf(col('temp')))
df = df.drop('text').withColumnRenamed('temp', 'text')
df = df.drop('text').withColumnRenamed('documentVector', 'text')

lda = LDA(k=k_val, maxIter=5, featuresCol='text')

In [None]:
model_doc = lda.fit(df)

## Topic Modelling Methods

In [None]:
spark.stop()