## Exercise 3 Pipeline API and ML workflows

In [179]:
from pyspark.sql import SparkSession

from pyspark.ml import Pipeline, Transformer
from pyspark.ml.feature import HashingTF, Tokenizer, StopWordsRemover, IDF, CountVectorizer, VectorAssembler, Word2Vec
from pyspark.sql.types import StructType, StringType, ArrayType, IntegerType
from pyspark.sql import DataFrame

import random
import pandas as pd
import math

from pyspark.sql.functions import mean, col, udf, regexp_replace, lower, size, monotonically_increasing_id
import pyspark.sql.functions as f
from pyspark.sql.functions import *

from nltk.tokenize import word_tokenize
from nltk.corpus import stopwords

In [170]:
spark = SparkSession \
    .builder \
    .appName("Pipeline API and spark SQL") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

sc = spark.sparkContext

### Read and sample tweets

In [171]:
schema = StructType().add("tweets",StringType(),True)


df = spark.read.schema(schema).csv("tweets.json", header=True)
df = df.sample(withReplacement=False, fraction=0.1)

In [107]:
#df.collect()[3]
#df.select("tweets").collect()[0]

In [81]:
df.toPandas()

Unnamed: 0,tweets
0,"""To come in and play against England my first ..."
1,@TunnockCup It's absolutely class. They could ...
2,I can\u2019t quite believe it but we made it t...
3,#Amakhosi4Life We are Kaizer Chiefs @KaizerChi...
4,If you are a 5 star recruit in football you sh...
5,@jumbleofideas @paldhous @PeterHotez I searche...
6,True loneliness is a solitary comet flying thr...
7,"""don't buy wine with my money and then give it..."
8,How might we invest in equitable collaborative...
9,\u2b55 Admission Alert \u2b55 &gt;&gt;&gt;&gt...


### Data Preprocessing without pipeline

In [82]:
# Remove irrelevant characters, numbers, links and punctuation marks
regex = "\\ud83c\\udc00-\\ud83c\\udfff]|[\\ud83d\\udc00-\\ud83d\\udfff]|[\\u2600-\\u27ff]|[^#]|[^@*$]|[^https?:\/\/.*[\r\n]*]"


df_clean = df.select(regexp_replace('tweets', r'[0-9]{5,}', '').alias('tweets'))
#df_clean = df_clean.select('tweets', (lower(regexp_replace('tweets', "[\s+[a-zA-Z]\s+][\^[a-zA-Z]\s+]", "")).alias('tweets2')))
df_clean = df_clean.select("tweets", f.translate(f.col("tweets"), regex, "").alias("text"))

# Tokenizing
tokenizer = Tokenizer(inputCol='text', outputCol='tokens')
df_tokens = tokenizer.transform(df_clean).select('tokens')

# Remove stop words
remover = StopWordsRemover(inputCol='tokens', outputCol='words')
df_words = remover.transform(df_tokens).select('words')

# Remove words of length less than 1
len_udf = udf(lambda tokens: [token for token in tokens if len(token) > 1], ArrayType(StringType()))

df_final = df_words.withColumn("words", len_udf("words")).select('words')

#df_tokens.collect()[3]

# Computing tf-idf
cv = CountVectorizer(inputCol="words", outputCol="tf_features")
cvModel = cv.fit(df_final)
tfDf = cvModel.transform(df_final)

idf = IDF(inputCol="tf_features", outputCol="tf-idf_features")
idfModel = idf.fit(tfDf)
tfidfDf = idfModel.transform(tfDf)

tfidfDf.collect()

[Row(words=['"to', 'ome', 'lay', 'englan', 'ir', 'game,', 'ro"', 'billy', 'gilmor', 'give', 'reaion', 'imreive', 'eb', "solan'", 'raw', 'wi', 'englan', 'eng', 'sco', 'engsco', 'euro', 'taranarmy', 'treelion', 'oqprsnq'], tf_features=SparseVector(190, {8: 1.0, 9: 1.0, 10: 2.0, 27: 1.0, 35: 1.0, 36: 1.0, 47: 1.0, 53: 1.0, 56: 1.0, 61: 1.0, 63: 1.0, 67: 1.0, 70: 1.0, 79: 1.0, 102: 1.0, 103: 1.0, 106: 1.0, 112: 1.0, 125: 1.0, 140: 1.0, 150: 1.0, 156: 1.0, 168: 1.0}), tf-idf_features=SparseVector(190, {8: 1.3863, 9: 1.674, 10: 4.1589, 27: 1.674, 35: 2.0794, 36: 2.0794, 47: 2.0794, 53: 2.0794, 56: 2.0794, 61: 2.0794, 63: 2.0794, 67: 2.0794, 70: 2.0794, 79: 2.0794, 102: 2.0794, 103: 2.0794, 106: 2.0794, 112: 2.0794, 125: 2.0794, 140: 2.0794, 150: 2.0794, 156: 2.0794, 168: 2.0794})),
 Row(words=['tnnokc', "i'", 'abolely', 'la', 'tey', 'ol', 'ave', 'enjoye', 'ooball', 'like', 'normal', 'eole', 'ala'], tf_features=SparseVector(190, {3: 1.0, 5: 1.0, 11: 1.0, 12: 1.0, 15: 1.0, 19: 1.0, 25: 1.0, 72

### Data Preprocessing through a pipeline

In [172]:
# Custom Transformer

class CustomCleaner(Transformer):
    """
    A custom Transformer which removes punctuations, undefined chars, numerals, links
    """

    def __init__(self, regex=""):
        self.regex ="\\ud83c\\udc00-\\ud83c\\udfff]|[\\ud83d\\udc00-\\ud83d\\udfff]|[\\u2600-\\u27ff]|[^#]|[^@*$]|[^https?:\/\/.*[\r\n]*]"

    def _transform(self, df: DataFrame) -> DataFrame:
        df_clean = df.select(regexp_replace('tweets', r'[0-9]{5,}', '').alias('tweets'))
        df_clean = df_clean.select("tweets", f.translate(f.col("tweets"), regex, "").alias('tweets2'))
        
        return df_clean

In [174]:
class CustomTokenizer(Transformer):
    """
    A custom Tokenizer
    """

    def __init__(self):
        pass
  

    def _transform(self, df: DataFrame) -> DataFrame:
        tokenize = udf(lambda tokens: word_tokenize(tokens), ArrayType(StringType()))
        df = df.withColumn("words", tokenize("tweets2"))
    
        return df
        
        

In [175]:
class CustomStopwordRemover(Transformer):
    """
    A custom stopword remover
    """

    def __init__(self):
        pass
  

    def _transform(self, df: DataFrame) -> DataFrame:
        remover = udf(lambda tokens: [token for token in tokens if token not in [stopwords]], ArrayType(StringType()))
        df = df.withColumn("tokens", remover("words"))
        
        return df
        

In [176]:
# Custom Transformer

class CustomFilter(Transformer):
    """
    A custom Transformer which removes words having length less than 1
    """

    def __init__(self):
        pass
    
    def _transform(self, df: DataFrame) -> DataFrame:
        len_udf = udf(lambda tokens: [token for token in tokens if len(token) > 1], ArrayType(StringType()))
        df = df.withColumn("filtered", len_udf("tokens"))
        
        return df

In [191]:
# Custom Transformer for tf

class CustomTF(Transformer):
    """
    A custom Transformer which computes TF-IDF
    The code has been inspired from here: https://towardsdatascience.com/tf-idf-calculation-using-map-reduce-algorithm-in-pyspark-e89b5758e64c
    """

    def __init__(self):
        pass
    
    def _transform(self, df: DataFrame) -> DataFrame:
        #df = df.withColumn("tweet_id", monotonically_increasing_id())

        all_df = df.select('filtered').collect()

        allll = [(i,all_df[i].filtered) for i in range(0,10)]


        lines = sc.parallelize(allll)

        map1 = lines.flatMap(lambda x: [((x[0],i),1) for i in x[1]])

        reduce = map1.reduceByKey(lambda x,y:x+y)

        tf = reduce.map(lambda x: (x[0][1],(x[0][0],x[1])))

        map3 = reduce.map(lambda x: (x[0][1],(x[0][0],x[1],1)))

        map4 = map3.map(lambda x:(x[0],x[1][2]))

        reduce2 = map4.reduceByKey(lambda x,y:x+y)

        idf = reduce2.map(lambda x: (x[0],math.log10(len(allll)/x[1])))


        rdd = tf.join(idf)



        rdd = rdd.map(lambda x: (x[1][0][0],(x[0],x[1][0][1],x[1][1],x[1][0][1]*x[1][1]))).sortByKey()
        rdd = rdd.map(lambda x: (x[0],x[1][0],x[1][1],x[1][2],x[1][3]))
        tf = rdd.toDF(["DocumentId","Token","TF","IDF","TF-IDF"])

        
        return tf

In [188]:

cleaner = CustomCleaner()
tokenizer = CustomTokenizer()
remover = CustomStopwordRemover()
final = CustomFilter()
tfidf = CustomTF()



pipeline = Pipeline(stages=[cleaner, tokenizer, remover, final, tfidf])

# Send data through the pipeline
model = pipeline.fit(df)
results = model.transform(df)

results.show()





+----------+---------+---+------------------+------------------+
|DocumentId|    Token| TF|               IDF|            TF-IDF|
+----------+---------+---+------------------+------------------+
|         0| JameToe1|  1|               1.0|               1.0|
|         0|       we|  1|0.6989700043360189|0.6989700043360189|
|         0|      win|  1|               1.0|               1.0|
|         0|   ooball|  1|0.3010299956639812|0.3010299956639812|
|         0|       Di|  1|               1.0|               1.0|
|         1|     noie|  1|               1.0|               1.0|
|         1|    Raial|  1|               1.0|               1.0|
|         1|    Brexi|  1|               1.0|               1.0|
|         1|       An|  1|               1.0|               1.0|
|         1|     oern|  1|               1.0|               1.0|
|         1|   ooball|  1|0.3010299956639812|0.3010299956639812|
|         1|     orer|  1|               1.0|               1.0|
|         1|       in|  2

## Exercise 4 -- Word2Vec

In [190]:
# Tokenizing
tokenizer = Tokenizer(inputCol='tweets', outputCol='tokens')
df_tokens = tokenizer.transform(df).select('tokens')

# Remove stop words
remover = StopWordsRemover(inputCol='tokens', outputCol='words')
df_words = remover.transform(df_tokens).select('words')

word2Vec = Word2Vec(vectorSize=10, minCount=0, inputCol="words", outputCol="result")
model = word2Vec.fit(df_words)

result = model.transform(df_words)
for row in result.collect():
    text, vector = row
    print("Text: [%s] => \nVector: %s\n" % (", ".join(text), str(vector)))

Text: [@jamestudhope1, win, football?, \ud83d\ude2f] => 
Vector: [0.010278148984070867,-0.014843338401988149,-0.021595153375528753,0.012239520438015461,-0.0008178940042853355,-0.02115490846335888,0.033132049953565,-0.01352766321360832,-0.001545988954603672,-0.0016770113725215197]

Text: [@sazmeister88, notice, least, 75%, brexit, trolls, bio, arsenal, tottenham, number, southern, english, football, teams., , mentality., racial, hatred,, football, nationalism, =, brexit, supporter.] => 
Vector: [0.0013939971509187117,-0.0007995152238594449,-0.004296151420061031,0.004122104738717494,-0.01219565887004137,3.765797768921956e-05,-0.010953001455282387,-0.00827415520325303,-0.01415190027024516,8.47697247872534e-05]

Text: [@tasaflteam, comments, clearly, show, conflict, interest,, club, presidents, involved, process., narrow, minded,, uneducated, comments, reek, self, interests, fear., decision, needs, made, facts,, merit, future, tas, football.] => 
Vector: [-0.0024091002792724445,-0.00475077