In [1]:
import pickle
import os
import pandas as pd
import pyspark as ps
import numpy as np
from pyspark import SparkContext
from pyspark.sql.functions import col, lower, regexp_replace, split, lit
from pyspark.ml.feature import Tokenizer, StopWordsRemover, Word2Vec, Word2VecModel
from pyspark.sql.functions import udf
from pyspark.sql.types import *
from nltk.stem.porter import *

In [2]:
SparkContext.setSystemProperty('spark.driver.memory', '3g')
SparkContext.setSystemProperty('spark.executor.memory', '3g')

spark = ps.sql.SparkSession.builder \
            .master("local[4]") \
            .appName("word2vec") \
            .getOrCreate()

In [3]:
tokenizer = Tokenizer(inputCol="text", outputCol="vector")
remover = StopWordsRemover()
remover.setInputCol("vector")
remover.setOutputCol("vector_no_stopw")
stopwords = remover.getStopWords()
stemmer = PorterStemmer()
stemmer_udf = udf(lambda x: stem(x), ArrayType(StringType()))

def word_to_index(df):
    df = df.sample(False, 0.1, 42)
    df = df.selectExpr("value as text")
    df = tokenize_df(df)
    
    return df

def tokenize_df(df):    
    df = df.select(clean_text(col("text")).alias("text"))
    df = tokenizer.transform(df).select("vector")
    df = remover.transform(df).select("vector_no_stopw")
    df = (df
        .withColumn("vector_stemmed", stemmer_udf("vector_no_stopw"))
        .select("vector_stemmed")
        )
    
    return df

    
def clean_text(c):
    c = lower(c)
    c = regexp_replace(c, "^rt ", "")
    c = regexp_replace(c, "(https?\://)\S+", "")
    c = regexp_replace(c, "[^a-zA-Z0-9\\s]", "")
    c = regexp_replace(c, "[0-9]", "")
  #c = split(c, "\\s+") tokenization...
    return c


def stem(in_vec):
    out_vec = []
    for t in in_vec:
        t_stem = stemmer.stem(t)
        if len(t_stem) > 2:
            out_vec.append(t_stem)       
    return out_vec

In [5]:
path = '/Users/andradea/Documents/languages/en_US/'

news = spark.read.text(path + 'en_US.news.txt')
news = word_to_index(news)
# news = news.withColumn('y', lit(1))

news.show(2)

+--------------------+
|      vector_stemmed|
+--------------------+
|[charlevoix, detr...|
|[mhta, presid, ce...|
+--------------------+
only showing top 2 rows



In [6]:
blogs = spark.read.text(path + 'en_US.blogs.txt')
blogs = word_to_index(blogs)
# blogs = blogs.withColumn('y', lit(0))

blogs.show(2)

+--------------------+
|      vector_stemmed|
+--------------------+
|              [bear]|
|[winter, time, sl...|
+--------------------+
only showing top 2 rows



In [7]:
df = news.union(blogs)

In [8]:
with open('/Users/andradea/Documents/languages/en_US/word_index.pkl', 'rb') as f:
    word_index = pickle.load(f)

In [9]:
row = news.take(1)
row[0][0]

['charlevoix', 'detroit']

In [10]:
def sentence_to_index(sentence, word_index, max_length = 100):
    indexed = np.zeros(max_length)
    for i, word in enumerate(sentence):
        if i < max_length:
            if word in (word_index.keys()):
                indexed[i] = word_index[word]
    return indexed

In [11]:
news_to_index = [sentence_to_index(row[0], word_index) for row in news.collect()]
news_to_index = np.asarray(news_to_index)
y_news = np.repeat(1, len(news_to_index))

In [12]:
blogs_to_index = [sentence_to_index(row[0], word_index) for row in blogs.collect()]
blogs_to_index = np.asarray(blogs_to_index)
y_blogs = np.repeat(0, len(blogs_to_index))

In [13]:
np.shape(news_to_index)

(101193, 100)

In [15]:
X = np.vstack([news_to_index, blogs_to_index])
Y = np.hstack([y_news, y_blogs])

In [18]:
with open('/Users/andradea/Documents/languages/en_US/sentences.pkl', 'wb') as f:
    pickle.dump((X, Y), f, pickle.HIGHEST_PROTOCOL)