# Text processing

## Init spark

In [1]:
import findspark
findspark.init()
from pyspark import SparkContext, SparkConf, Row
from pyspark.sql import SparkSession

sc = SparkContext(conf=SparkConf())
spark = SparkSession(sparkContext=sc)

## Dummy data

In [2]:


books_rdd_list = []
directory = r'data/'
for filename in os.listdir(directory):
    txt = open("data/"+filename).read()
    row = Row(id=filename.replace('.txt',''), text=txt)
    # row = Row(texts=txt)
    books_rdd_list.append(row)
rdd = sc.parallelize(books_rdd_list)
df = rdd.flatMap(lambda x: (x,)).toDF() 

df.show()

+-----+--------------------+
|   id|                text|
+-----+--------------------+
|10010|The Project Guten...|
|10019|Project Gutenberg...|
| 1002|The Project Guten...|
|10020|The Project Guten...|
|10037|Project Gutenberg...|
|10042|The Project Guten...|
|10046|The Project Guten...|
|10061|The Project Guten...|
|10074|The Project Guten...|
|10077|The Project Guten...|
|10088|The Project Guten...|
|10090|The Project Guten...|
|10098|The Project Guten...|
|10109|The Project Guten...|
|10110|The Project Guten...|
|10111|The Project Guten...|
|10119|The Project Guten...|
|10130|The Project Guten...|
|10144|The Project Guten...|
|10149|The Project Guten...|
+-----+--------------------+
only showing top 20 rows



In [3]:
%%time
import re
import unidecode
from pyspark.sql.types import *
from pyspark.sql.functions import udf

def clean(text):
    text = unidecode.unidecode(text) # remove accents
    text = text.lower()
    text = text.replace(r'\n','') # remove newline sign
    text = re.sub(r'\d+', '', text) # remove digits 
    text = re.sub(r'[.]?-[.]?', '', text) # concatenate divided words
    text = re.sub(r'[\W]+',' ', text) # replace non-alphanum with space  
    text = re.sub(' +', ' ', text) # replace multiple spaces with single space 
    return text

user_def_fun = udf(f=clean, returnType=StringType())

df = df.withColumn("cleaned", user_def_fun("text"))
df.show(2)
df = df.drop('text')

+-----+--------------------+--------------------+
|   id|                text|             cleaned|
+-----+--------------------+--------------------+
|10010|The Project Guten...|the project guten...|
|10019|Project Gutenberg...|project gutenberg...|
+-----+--------------------+--------------------+
only showing top 2 rows

Wall time: 9.41 s


## Tokenization

In [4]:
%%time
from pyspark.ml.feature import Tokenizer

tokenizer = Tokenizer(inputCol="cleaned", outputCol="tokens")
df = tokenizer.transform(df)
df = df.drop('cleaned')

Wall time: 147 ms


## Stopwords Removal

In [5]:
%%time
from pyspark.ml.feature import StopWordsRemover

stopword_removal = StopWordsRemover(inputCol='tokens', 
                                    outputCol='refined_tokens')
df = stopword_removal.transform(df)
df = df.drop('tokens')

Wall time: 466 ms


## Stemming vs lemmatization

### SnowballStemmer

In [6]:
%%time
from nltk.stem.snowball import SnowballStemmer

def list_stemmer(words):
    stemmer = SnowballStemmer(language='english')
    return [stemmer.stem(word) for word in words]

stemming = udf(list_stemmer, returnType=ArrayType(StringType()))

df = df.withColumn("stem", stemming("refined_tokens"))
# comment below line if you want to check also WordNetLemmatizer
df = df.drop("refined_tokens")

Wall time: 314 ms


### WordNetLemmatizer

In [7]:
import nltk
nltk.download('wordnet')

[nltk_data] Downloading package wordnet to
[nltk_data]     C:\Users\Dell\AppData\Roaming\nltk_data...
[nltk_data]   Package wordnet is already up-to-date!


True

In [9]:
%%time
from nltk.stem import WordNetLemmatizer

def list_lemmatizer(words):
    lemmatizer = WordNetLemmatizer()
    return [lemmatizer.lemmatize(word) for word in words]

lemmatization = udf(list_lemmatizer, returnType=ArrayType(StringType()))

df = df.withColumn("lem", lemmatization("refined_tokens"))

AnalysisException: "cannot resolve '`refined_tokens`' given input columns: [id, stem];;\n'Project [id#0, stem#42, list_lemmatizer('refined_tokens) AS lem#51]\n+- Project [id#0, stem#42]\n   +- Project [id#0, refined_tokens#35, list_stemmer(refined_tokens#35) AS stem#42]\n      +- Project [id#0, refined_tokens#35]\n         +- Project [id#0, tokens#29, UDF(tokens#29) AS refined_tokens#35]\n            +- Project [id#0, tokens#29]\n               +- Project [id#0, cleaned#12, UDF(cleaned#12) AS tokens#29]\n                  +- Project [id#0, cleaned#12]\n                     +- Project [id#0, text#1, clean(text#1) AS cleaned#12]\n                        +- LogicalRDD [id#0, text#1], false\n"

## TF-IDF
[Spark documentation](https://spark.apache.org/docs/latest/mllib-feature-extraction.html)

![title](fig/IDF.png)

![title](fig/TFIDF.png)

In [None]:
%%time
from pyspark.ml.feature import HashingTF, IDF

hashing_vec = HashingTF(numFeatures=262144,
                        inputCol='stem',
                        outputCol='tf_features')

hashing_df = hashing_vec.transform(df)
tf_idf_vec = IDF(inputCol='tf_features',
               outputCol='tf_idf_features')

tf_idf_df = tf_idf_vec.fit(hashing_df).transform(hashing_df)
tf_idf_df.select(['stem','tf_idf_features']).show(2)
tf_idf_df = tf_idf_df.drop(*['stem','tf_features']) 

In [None]:
tf_idf_df.show()

# Reduction of dimensions
## PCA

### Explained variance for k principal components  

In [None]:
%%time
from pyspark.ml.feature import PCA 
from pyspark.ml.linalg import Vectors

k = 10
pca = PCA(k=k, inputCol='tf_idf_features', outputCol="pca")
model = pca.fit(tf_idf_df)
exp_var = model.explainedVariance.values
plt.bar(['PCT_' + str(i+1) for i in range(exp_var.shape[0])], exp_var)
plt.show()

### Dimensionality reduction

In [None]:
%%time

pca = PCA(k=2, inputCol='tf_idf_features', outputCol="pca")
model = pca.fit(tf_idf_df)
transformed = model.transform(tf_idf_df)
transformed.show(2) 
transformed = transformed.drop('tf_idf_features') 

### split vectors in to two columns

In [None]:
from pyspark.sql import Row
from pyspark.sql.types import *

two_columns_rdd = transformed.rdd.map(lambda x: Row(title=str(x[0]),
                                                    PCA_1=float(x[1][0]), 
                                                    PCA_2=float(x[1][1])))

schema = StructType([StructField("title", StringType(), True),
                     StructField("PCA_1", FloatType(), True),
                     StructField("PCA_2", FloatType(), True)])

pca_df = spark.createDataFrame(two_columns_rdd, schema=schema)
pca_df.show(2)

# Visualization

In [None]:
plot_df = pca_df.toPandas()
x =plot_df['PCA_1'].values
y = plot_df['PCA_2'].values
titles = plot_df['title'].values

In [None]:
fig, ax = plt.subplots(figsize=(8,6))

ax.set_xmargin(0.2)
ax.set_ymargin(0.2)
ax.scatter(x,y, zorder=3)

# Display titles
for i, title in enumerate(titles):
    ax.annotate(title, (x[i]+0.03, y[i]))
    
ax.set(xlabel='PCA 1',
       ylabel='PCA 2', 
       title='PCA')
ax.grid()

plt.show()

# Cluster analysis

## Selecting the number of clusters with silhouette analysis on KMeans clustering

In [None]:
from silhouette_analysis import *

silhouette_plot(plot_df.values, range_n_clusters=[2])

## Clusterization

In [None]:
from sklearn.cluster import KMeans

clustering_model = KMeans(n_clusters=3,
                          max_iter=100,
                          n_jobs=-1)

labels = clustering_model.fit_predict(plot_df)

In [None]:
color_map = {0: 'red', 
             1: 'green', 
             2: 'blue'}
    
titles = ['title_1','title_2','title_3']

fig, ax = plt.subplots(figsize=(8,6))

ax.set_xmargin(0.2)
ax.set_ymargin(0.2)

for i, _ in enumerate(x):
    # Add colors
    color = color_map[labels[i]]
    ax.scatter(x[i], y[i], c=color, zorder=3)
    # Display titles
    ax.annotate(titles[i], (x[i]+0.03, y[i]))
    
ax.set(xlabel='PCA 1',
       ylabel='PCA 2', 
       title='PCA')
ax.grid()

plt.show()