In [None]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark data cleaning and engineering") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [None]:
from sklearn.datasets import fetch_20newsgroups

newsgroup_train = fetch_20newsgroups(subset="train")
newsgroup_test = fetch_20newsgroups(subset="test")

In [None]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [None]:
from nltk.stem import WordNetLemmatizer, PorterStemmer
import nltk
nltk.download("wordnet")

In [None]:
nltk.download("averaged_perceptron_tagger")

In [None]:
type(newsgroup_train)

In [None]:
import pandas as pd
df = pd.DataFrame([newsgroup_train.data, newsgroup_train.target.tolist()])

In [None]:
df.head()

In [None]:
df = pd.DataFrame([newsgroup_train.data, newsgroup_train.target.tolist()]).T
df.columns = ['text', 'target']
targets = pd.DataFrame(newsgroup_train.target_names)
targets.columns = ['title']
ngout = pd.merge(df, targets, left_on='target', right_index=True)

In [None]:
df

In [None]:
targets

In [None]:
ngout.head()

In [None]:
sdf = spark.createDataFrame(ngout)

In [None]:
from pyspark.sql.functions import split
from pyspark.sql.functions import monotonically_increasing_id, col

In [None]:
sdf = sdf.withColumn("text_sep", split(sdf.text, "\n\n")).select(col("text"), col("target"),
                    col("title"), col("text_sep").getItem(1), col("text_sep").getItem(2)).withColumn("id", monotonically_increasing_id())

In [None]:
sdf.show(5)

In [None]:
sdf.printSchema()

In [None]:
temp_table_name = "newsgroup"

sdf.createOrReplaceTempView(temp_table_name)

In [None]:
from pyspark.sql.types import FloatType
import re

def clean_text(in_string):
    remove_email = re.sub('\S*@\S*\s?', '', in_string)
    remove_nl = re.sub('\s+', ' ', remove_email)
    remove_othr = re.sub("\'|\>|\:|\-", "", remove_nl)
    return remove_othr

spark.udf.register("clean", clean_text)

In [None]:
sdf=spark.sql("select clean(CASE when`text_sep[2]` is null then `text_sep[1]` when `text_sep[1]`='' then `text_sep[2]` else CONCAT(`text_sep[1]`, ' ', `text_sep[2]`) END) as text, target, title, id FROM newsgroup where `text_sep[2]` is not null and `text_sep[1]` <> ''")

In [None]:
sdf.show()

In [None]:
sdf.count()

In [None]:
from pyspark.sql.functions import col, length
sdf.where(length(col("text")) < 100).show()

In [None]:
sdf = sdf.where(length(col("text")) > 100)

In [None]:
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover
tokenizer = RegexTokenizer(inputCol="text", outputCol="tokens", pattern="\\W+", minTokenLength=4, toLowercase=True)
tokenized = tokenizer.transform(sdf)

In [None]:
tokenized.show()

In [None]:
spremover = StopWordsRemover(inputCol="tokens", outputCol="spfiltered")
spremoved = spremover.transform(tokenized)
spremoved.select("tokens", "spfiltered").show()

In [None]:
porter = PorterStemmer()
lemma = WordNetLemmatizer()
def word_tokenize(text):
    #print(text)
    pos = nltk.pos_tag(text)
    final = [lemma.lemmatize(word[0]) if (lemma.lemmatize(word[0]).endswith(('e','ion')) or len(word[0]) < 4 ) else porter.stem(word[0]) for word in pos]
    return final

In [None]:
spremoved.printSchema()

In [None]:
stemmed = spremoved.rdd.map(lambda tup: (tup[1],tup[2],tup[3], word_tokenize(tup[5])))

In [None]:
stemmed.collect()

In [None]:
news_df = stemmed.toDF(schema=['target', 'title', 'id', 'word'])

In [None]:
news_df.show()

In [None]:
spwordlist = ["article", "write", "entry", "date", "udel", "said", "tell", "think", "know", "just", "isnt", "line", "like", "does", "going", "make", "thanks","also"]

spremover1 = StopWordsRemover(inputCol="word", outputCol="word_new", stopWords=spwordlist)
news_df = spremover1.transform(news_df)



In [None]:
news_df.select("word","word_new").show()

In [None]:
df_explode = news_df.withColumn("word_new", explode("word_new"))

In [None]:
df_explode.show()

In [None]:
news_df.show()

In [None]:
from pyspark.ml.feature import CountVectorizer
cv = CountVectorizer(inputCol="word_new", outputCol="rawFeatures", vocabSize=10000, minDF=5)
cvmodel = cv.fit(news_df)
featurized_data = cvmodel.transform(news_df)

In [None]:
featurized_data.show()

In [None]:
vocab = cvmodel.vocabulary
vocab

In [None]:
from pyspark.ml.feature import IDF
idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurized_data)
rescaledData = idfModel.transform(featurized_data)

In [None]:
rescaledData.show()

In [None]:
corpus = rescaledData.select("id", "features").cache()
display(corpus)

In [None]:
corpus.show()

In [None]:
from pyspark.ml.clustering import LDA
lda = LDA(k=20, maxIter=50, optimizer="em")
model = lda.fit(corpus)