# 1. **Define dependencies and constrains**

In order to download tweet from Twitter, first one must create an account and apply for **developer priviledges**. The application will grant the developer basic access the the [Twitter API](https://developer.twitter.com/en/docs/twitter-api) which are not enough because it only allows the download of tweet of the last 7 days. Therefore, I've applied to the [Premium plan](https://developer.twitter.com/en/support/twitter-api/premium) which allows the download of 25k of tweets per month along with the use _full archive_ and the _30 days_ search API but with limited amout of request per month.

In [91]:
COLAB_DIR = "/content/"

RANDOM_SEED = 42

# File with Twitter project credentials
CREDENTIALS = COLAB_DIR+'credentials.yaml'
CREDENTIALS_KEY = 'search_tweets_30_day_dev'

# csv file where tweet downloaded will be saved
DATASET = COLAB_DIR+'dataset.csv'
DATASET_ANNOTATED = COLAB_DIR+'dataset_annotated.csv'
SENTIPOLIC = COLAB_DIR+'sentipolic.csv'

# result of data processing
DATASET_PROCESSED = COLAB_DIR+"result_processed"

In [92]:
!python --version

Python 3.7.13


### install libraries

In [93]:
!apt-get install libenchant1c2a
!pip install pyenchant
!apt-get install hunspell-it

Reading package lists... Done
Building dependency tree       
Reading state information... Done
libenchant1c2a is already the newest version (1.6.0-11.1).
The following package was automatically installed and is no longer required:
  libnvidia-common-460
Use 'apt autoremove' to remove it.
0 upgraded, 0 newly installed, 0 to remove and 42 not upgraded.
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Reading package lists... Done
Building dependency tree       
Reading state information... Done
hunspell-it is already the newest version (1:6.0.3-3).
The following package was automatically installed and is no longer required:
  libnvidia-common-460
Use 'apt autoremove' to remove it.
0 upgraded, 0 newly installed, 0 to remove and 42 not upgraded.


In [94]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-2.4.8/spark-2.4.8-bin-hadoop2.7.tgz
!tar xf spark-2.4.8-bin-hadoop2.7.tgz
!pip install -q findspark

^C


In [95]:
!update-alternatives --set java /usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java
!java -version

openjdk version "1.8.0_312"
OpenJDK Runtime Environment (build 1.8.0_312-8u312-b07-0ubuntu1~18.04-b07)
OpenJDK 64-Bit Server VM (build 25.312-b07, mixed mode)


In [96]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ['ARROW_PRE_0_15_IPC_FORMAT'] = '1'

In [97]:
!pip install pyspark==3.1.2
!pip install spark-nlp==3.4.4

Traceback (most recent call last):
  File "/usr/local/bin/pip3", line 5, in <module>
    from pip._internal.cli.main import main
  File "/usr/local/lib/python3.7/dist-packages/pip/_internal/cli/main.py", line 9, in <module>
    from pip._internal.cli.autocompletion import autocomplete
  File "/usr/local/lib/python3.7/dist-packages/pip/_internal/cli/autocompletion.py", line 10, in <module>
    from pip._internal.cli.main_parser import create_main_parser
  File "/usr/local/lib/python3.7/dist-packages/pip/_internal/cli/main_parser.py", line 8, in <module>
    from pip._internal.cli import cmdoptions
  File "/usr/local/lib/python3.7/dist-packages/pip/_internal/cli/cmdoptions.py", line 23, in <module>
    from pip._internal.cli.parser import ConfigOptionParser
  File "/usr/local/lib/python3.7/dist-packages/pip/_internal/cli/parser.py", line 12, in <module>
    from pip._internal.configuration import Configuration, ConfigurationError
  File "/usr/local/lib/python3.7/dist-packages/pip/_intern

In [98]:
!pip install keras-tqdm

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [None]:
!pip install sparktorch

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


### PySpark configurations

In [None]:
# pyspark packages
from pyspark import *
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf
from pyspark.sql import DataFrame
from pyspark.ml.feature import VectorAssembler, SQLTransformer, Normalizer
from pyspark.sql.functions import udf, col, lower, trim, regexp_replace, transform

import sparknlp

In [None]:
import findspark
findspark.init()

In [None]:
spark = SparkSession.builder \
    .appName("Spark NLP")\
    .master("local[4]")\
    .config('spark.executor.memory', '8G')\
    .config("spark.driver.memory","100G")\
    .config("spark.driver.cores", "5")\
    .config("spark.scheduler.mode", "FAIR")\
    .config("spark.driver.maxResultSize", "10G") \
    .config("spark.kryoserializer.buffer.max", "2000M")\
    .config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp_2.12:3.4.4")\
    .getOrCreate()

print("Spark NLP version", sparknlp.version())
print("Apache Spark version:", spark.version)

In [None]:
sc = spark.sparkContext
type(sc)

In [None]:
! cd ~/.ivy2/cache/com.johnsnowlabs.nlp/spark-nlp_2.12/jars && ls -lt

## Download Files from GitHub

In [None]:
!wget https://github.com/deborahdore/italian-sarcastic-tweet-classification/raw/main/dataset/dataset.csv
!wget https://github.com/deborahdore/italian-sarcastic-tweet-classification/raw/main/dataset/other/sentipolic.csv
!wget https://raw.githubusercontent.com/deborahdore/italian-sarcastic-tweet-classification/main/credentials/credentials.yaml
!wget https://raw.githubusercontent.com/deborahdore/italian-sarcastic-tweet-classification/main/dataset/dataset_annotated.csv

In [None]:
# italian dictionary for lemmatization
!wget https://raw.githubusercontent.com/michmech/lemmatization-lists/master/lemmatization-it.txt

# 2. **Retrieve Tweet**


> Following, some code cell will be annotated with *%% script false* in order to avoid their execution. Those cell concern the download of the tweets from Twitter. Even if this may not sound dangerous, I've finished the request at my disposal. Therefore, calling the Twitter API will produce an error. Also, please don't run them otherwise the output of the cell will be lost.



In [None]:
# useful imports
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
%matplotlib inline
import requests
import json
import yaml
import csv
import pdb
import pandas as pd

- First we must retrieve and validate the credentials that we will need to access the Twitter API. I've store the bearer token in a yaml file: *credentials.yaml*





In [None]:
def handle_credentials(credentials, key):
  with open(credentials, "r") as stream:
    try:
        credentials = yaml.safe_load(stream)
        return credentials[key]
    except yaml.YAMLError as exc:
        print(exc)

In [None]:
credentials = handle_credentials(CREDENTIALS, CREDENTIALS_KEY)
endpoint = credentials['endpoint'] # we will use this endpoint to search for the tweet
print(endpoint)

- Second we must create the header for the request

In [None]:
def handle_headers(credentials:dict):
  headers = {
    'Content-Type': 'application/json',
    'Authorization': f'Bearer {credentials["bearer_token"]}'
  }
  return headers

In [None]:
headers = handle_headers(credentials)
headers

- Another parameter of the request is the query. The query determines which tweet will be returned in the response. In our case, we have 2 types of queries: the one that searches for sarcastic tweets and the one that returns non-sarcastic tweets

For the query about sarcastic tweet I've chosen some keyword that, in my opion, are used to express sarcasm and/or irony (sarcasm is a sub-type of irony):


1. sarcasmo (with or without #)
2. ironia (with or without #)
3. "*ridiamo per non piangere*"
4. #coincidenze (.. io non credo) is mostly used to express sarcasm
5. "*qualquadra non cosa*"

Many studies also suggest that sarcasm can be found in tweet related to politics. Therefore, these seems very good starting point:
1. monti, draghi, berlusconi (known italian prime minister)
2. governo
3. premier


For non-sarcastic tweet, I've excluded all the possibile word that may refer to sarcasm.

The list of operator used can be found in the [Twitter API documentation](https://developer.twitter.com/en/docs/twitter-api/enterprise/rules-and-filtering/operators-by-product).

In [None]:
sarcasm_query = '(#sarcasmo OR sarcasmo OR #ironia OR ironia OR "ridiamo per non piangere" \
                  OR #coincidenze OR "qualquadra non cosa" OR draghi OR monti OR berlusconi \
                  OR governo OR premier) lang:it -has:media'

non_sarcasm_query = '-"ridiamo per non piangere" -sarcasmo -ironia -"qualquadra non cosa" lang:it -has:media'

- Now we can define the function that will handle the request and the dataframe where tweet will be stored.


> Other parameters that we need in order to process the request are:
- *max_result_per_page* : the maximum number of tweets per call 
- *next_token* : a token that if passed to the request will return the next page of results
- I've defined a parameter *max_num_of_request* that will stop the call once that we've reached the desidered amount of calls. This must be done because the request at our disposal are not illimited. So we must be careful to the number of the request that we do




In [None]:
def handle_request(endpoint, headers, query, max_result_per_page, next_token = None):
  
  if next_token is not None:
    payload = json.dumps({
      "maxResults": max_result_per_page,
      "query": query,
      "next": next_token
    })
  else:
    payload = json.dumps({
      "maxResults": max_result_per_page,
      "query": query,
    })
  
  response = requests.post(endpoint, headers=headers, data=payload)

  return response.text

In [None]:
def extract_tweet(response, label):
  tweets = []
  json_response = json.loads(response)
  
  if 'results' in response:
    results = json_response["results"]

    for tweet in results:
      # is tweet a retweet?
      if 'retweeted_status' in tweet:
        if tweet['retweeted_status']['truncated']:
          text = tweet['retweeted_status']['extended_tweet']['full_text']
        else:
          text = tweet['retweeted_status']['text']
      else:
        if tweet['truncated']:
          text = tweet['extended_tweet']['full_text']
        else:
          text = tweet['text']
        
      text = text.replace('"', "'")
      data = Tweet(tweet["id"], f"{text}", label)
      
      tweets.append(data)

  else:
    print("Request went wrong")
    print(response)

  return tweets

In [None]:
def download_tweet(endpoint, 
                   headers, 
                   query, 
                   label,
                   max_result_per_page,
                   tweet_list,
                   next_token = None, 
                   max_num_of_request = 20):

  if max_num_of_request <= 0:
    return tweet_list

  response = handle_request(endpoint, headers, query, max_result_per_page, next_token)

  tweet_list.extend(extract_tweet(response, label))

  try:
      next_token = json.loads(response)['next']
  except:
      next_token = None

  if next_token is not None:
      return download_tweet(endpoint, headers, query, label, max_result_per_page,
                   tweet_list, next_token, max_num_of_request - 1)
  else:
      return tweet_list

In [None]:
# define tweet
Tweet = Row("id", "text", "sarcastic")

In [None]:
tweets = []

In [None]:
%%script false

# download sarcastic tweet
tweets = download_tweet(endpoint, 
                   headers, 
                   sarcasm_query, 
                   "Yes",
                   100,
                   [],
                   next_token = None, 
                   max_num_of_request = 40)

In [None]:
%%script false

# download non-sarcastic tweet
tweets.extend(
    download_tweet(endpoint, 
                   headers, 
                   non_sarcasm_query, 
                   "No",
                   100,
                   [],
                   next_token = None, 
                   max_num_of_request = 40))

In [None]:
%%script false
# create DataFrame
df = spark.createDataFrame(tweets)

In [None]:
%%script false
df.show(10, truncate=False)

In [None]:
%%script false

# create file
if not os.path.exists(DATASET):
  os.mknod(DATASET)

# save tweets
df.toPandas().to_csv(DATASET, header=True, index=False) 

# 3. **Annotate Tweet**

In [None]:
# python widgets
from ipywidgets import Button
import asyncio
from IPython.display import display, clear_output
import ipywidgets as widgets
from ipywidgets import HBox, Layout
import time as t

When we download tweet using an hashtag, we are not 100% sure of what we downloaded is correct. We must analyze - at least - the majority of the tweet to understand if what we have labelled is correct. There here's a little tool to help us with that.

In [None]:
Tweet = Row("id", "text", "sarcastic")

schema = StructType([StructField("id", StringType(), True)\
                   ,StructField("text", StringType(), True)\
                   ,StructField("sarcastic", StringType(), True)])

df = spark.createDataFrame(pd.read_csv(DATASET), schema=schema)
df.show(10)

In [None]:
def count_label(df, numeric=False):
  label_yes = 1 if numeric else "Yes"
  label_no = 0 if numeric else "No"
  return df.groupBy("sarcastic").agg(
      count(when(col("sarcastic") == label_yes, 1)),
      count(when(col("sarcastic") == label_no, 1)))

In [None]:
# count tweet
print(f'Total number of tweet retrieved {df.count()}')

In [None]:
# we want first to drop duplicates

print("Count before drop:")
count_label(df).show()

count_before_drop = df.count()
df = df.dropDuplicates(["text"])
print(f"Distinct count: {str(df.count())} \n")

print("Count after drop:")
count_label(df).show()

In [None]:
print(f'dropped {count_before_drop-df.count()} columns')
print(f'total count: {df.count()}')

In [None]:
# visually 
data = count_label(df).collect()

labels = ['sarcastic', 'non sarcastic']
colors = sns.color_palette('pastel')[0:5]

plt.pie([int(data[1][1]), int(data[0][2])], labels = labels, colors = colors, autopct='%.0f%%')
plt.show()

In [None]:
tweets_annotated = []

In [None]:
def wait_for_change(widget1, widget2): 
    future = asyncio.Future()
    def getvalue(change):
        future.set_result(change.description)
        widget1.on_click(getvalue, remove=True)
        widget2.on_click(getvalue, remove=True) 
    widget1.on_click(getvalue)
    widget2.on_click(getvalue)
    return future

async def f(df):
  df_pandas = df.toPandas()
  for index, row in df_pandas.iterrows():
    print(f'Is this tweet sarcastic? \n {row.text} \n', flush=True)

    x = await wait_for_change(sarcastic,non_sarcastic)
    
    if x == "Yes":
      print("Tagged ", row.id, "with sarcastic \n")
      data = Tweet(row.id, row.text, "Yes")
      tweets_annotated.append(data)
    else:
      print("Tagged ", row.id, "with non-sarcastic \n")
      data = Tweet(row.id, row.text, "No")      
      tweets_annotated.append(data)

    clear_output()
    display(HBox([sarcastic,non_sarcastic]))

Before going forward, we want to ask ourselves *How can know if a tweet is sarcastic or not?*

*In Harry Potter and the Half Blood Prince, there is a scene where Harry is leaving the Weasley house and Mrs. Weasley says: “Promise me you will look after yourself…stay out of trouble….” Harry responds: “I always do Mrs. Weasley. I like a quiet life, you know me.” Anyone familiar with Harry Potter knows that his life is far from quiet, and so he must not really mean what he is saying. In fact, Harry is being sarcastic.*

[source](https://kids.frontiersin.org/articles/10.3389/frym.2018.00056)

Sarcasm is the use of words that say the opposite of what you really mean, often as a joke and with a tone of voice that shows this. It is often used to mock or critize someone, express disapproval or as a defence mechanism.

For example:
> *Noi invece ce la caviamo con un grado in meno ai termosifoni d'inverno e spegnendo i condizionatori d'estate. Non è fantastico? (#Draghi è un cialtrone sesquipedale, nel caso aveste ancora qualche dubbio)*

Here we can imagine the sarcastic tone of the writer. He's obviously criticising the Italian prime minister, Mario Draghi, when, during an interview, he said that we must make sacrifices like lowering the grade of the radiator in order to cope with the possibility of not having the gas from Russia anymore. Obviously, this won't be enough. *Isn't this great?*

Sometimes it's difficult also for a human person to understand sarcasm therefore I don't expect the following dataset to be 100% free from bias.

In [None]:
# tool used for annotation: it displays each tweet and the user has to click "Yes" 
# if the tweet was sarcastic, "No" otherwise

sarcastic=Button(description="Yes", button_style='info', layout=Layout(width='150px', height='50px'))
non_sarcastic=Button(description="No", button_style='info', layout=Layout(width='150px', height='50px'))

asyncio.create_task(f(df))
t.sleep(2)
display(HBox([sarcastic,non_sarcastic]))

In [None]:
%%script false
print(tweets_annotated)

In [None]:
%%script false
df_annotated = spark.createDataFrame(tweets_annotated)
df_annotated.tail(5)

In [None]:
%%script false
if not os.path.exists(DATASET_ANNOTATED):
  os.mknod(DATASET_ANNOTATED)

# save tweets
df_annotated.toPandas().to_csv(DATASET_ANNOTATED, header=True, index=False) 

# 4. **Extend Dataset**

In [None]:
schema = StructType([StructField("id", StringType(), True)\
                   ,StructField("text", StringType(), True)\
                   ,StructField("sarcastic", StringType(), True)])

df_annotated = spark.createDataFrame(pd.read_csv(DATASET_ANNOTATED), schema=schema)

In [None]:
print(f"Annotated tweets: {df_annotated.count()}")

As we can see from the code below, we lost multiple *tweet*.
First of all, multiple tweets classified as sarcastic were not sarcastic. Also, I've dropped every tweet that contained only one word, that wasn't actually in italian or 
that had no sense.

In [None]:
count_label(df_annotated).show()

However, we can integrate we some external Dataset such as: [SENTIPOLIC](http://www.di.unito.it/~tutreeb/sentipolc-evalita16/index.html) from the challenge EVALITA2016 which contains several italian tweet already classified.

In [None]:
df_sentipolic = spark.createDataFrame(pd.read_csv(SENTIPOLIC))

In [None]:
df_sentipolic.show(10)

In [None]:
# we will extract only the tweets which are ironic since we have plenty non-ironic
df_sentipolic = df_sentipolic.filter(col("iro")==1)

In [None]:
print(f"Ironic tweet retrieved: {df_sentipolic.count()}")

In [None]:
# drop columns that we don't need
df_sentipolic = df_sentipolic.drop(*('subj', 'opos', 'oneg', 'lpos', 'lneg', 'top'))

# rename columns
df_sentipolic = df_sentipolic.withColumnRenamed("idTwitter", "id")\
                              .withColumnRenamed("iro", "sarcastic")

# change order
df_sentipolic = df_sentipolic.select("id", "text", "sarcastic")

In [None]:
df_sentipolic.show(10)

In [None]:
# now we want to join the two dataset. However we must use the same label for both.
# Therefore if the tweet is sarcastic, the label will be 1, 0 otherwise.


df_annotated = df_annotated.withColumn("sarcastic", 
                                         when(df_annotated.sarcastic == "Yes", 1)
                                         .when(df_annotated.sarcastic == "No", 0)                                    
                                         .otherwise(df_annotated.sarcastic))

In [None]:
df_annotated.show()

In [None]:
# concatenate DataFrames

df_complete = df_annotated.union(df_sentipolic)
df_complete.show(5)

In [None]:
print(f'Now we have a total of {df_complete.count()} tweets')

In [None]:
count_label(df_complete, numeric=True).show()

The dataset is still unbalanced, but better than before.

# 5. **Data Processing**

First we want to clean tweet: remove hashtag, links, emoji, whitespaces, mentions.

### Convert to lowercase

In [None]:
df_lowercase = df_complete.withColumn('text', lower(col('text')))
df_lowercase.show(5)

### Remove Links

In [None]:
df_links = df_lowercase.withColumn('text', regexp_replace('text', r'http\S+', ''))
df_links.show(5)

### Remove mentions

In [None]:
df_mentions = df_links.withColumn('text', regexp_replace('text', '@\w+', ''))
df_mentions.show(5)

### Remove hashtag, keeping the word

In [None]:
df_hashtag = df_mentions.withColumn('text', regexp_replace('text', '#', ''))
df_hashtag.show(5)

### Remove RT symbol

In [None]:
df_RT = df_hashtag.withColumn('text', regexp_replace('text', 'RT', ''))
df_RT.show(5)

### Remove punctuation

In [None]:
df_punctuation = df_RT.withColumn('text', regexp_replace('text', '[^a-zA-Z\\s]', ''))
df_punctuation.show(5)

### Remove new line symbol

In [None]:
df_new_line = df_punctuation.withColumn('text', regexp_replace('text', '\n', ''))
df_new_line.show(5)

### Remove emoij

In [None]:
df_emoij = df_new_line.withColumn('text', regexp_replace('text', "[^\x00-\x7F]+" , ''))
df_emoij.show(5)

### Remove Digits

In [None]:
df_digit = df_emoij.withColumn('text', regexp_replace('text', r'[0-9]{5,}', ''))
df_digit.show(5)

### Spell Checker

In [None]:
import enchant
from enchant.checker import SpellChecker

When annotating the tweets, I've noticed that many of them contained spelling errors. It is recommended to adjust those tweets before the model training.

In [None]:
broker = enchant.Broker()
broker.describe()
broker.list_languages()

In [None]:
def spell_checker(text):
  checker = SpellChecker("it_IT", text)
  for err in checker:
    if len(err.suggest())>0:
      sug = err.suggest()[0]
      err.replace(sug)
  return checker.get_text()

In [None]:
udf_spell_checker = udf(lambda x: spell_checker(x), StringType())
df_spell = df_digit.withColumn('text', udf_spell_checker(col('text')))

df_spell.cache()

df_spell.show(5)

### Removing exceeding whitespace

In [None]:
print("a. Trimming")
df_trimming = df_spell.withColumn('text', trim(col('text')))
df_trimming.show(5, truncate=False)

print("b. Filter out extra whitespaces")
df_cleaned = df_trimming.withColumn('text', regexp_replace(col("text"), " +", " "))

df_cleaned.show(5, truncate=False)

## Result

In [None]:
df = df_cleaned.select([col('text'), col('sarcastic')])

df.cache()
df.show(5, truncate=False)

df_spell.unpersist()

# 6. **Feature Engineering**

In [None]:
# libraries for feature engineering
from sparknlp.base import *
from sparknlp.annotator import *
from sparknlp.common import *
from sparknlp.annotator import Tokenizer
from pyspark.ml.functions import vector_to_array

In [None]:
print("Starting feature engineering, constructing pipeline..")

## Document assembler
Each annotator in Spark NLP takes specific sorts of columns and produces new columns of a different type. We have the following types in Spark NLP: document, token, chunk, pos, word embeddings, date, entity, sentiment, named entity, dependency, labeled dependency.

To implement the solution in Spark NLP, we must first transform raw data into Document type. DocumentAssembler() is a special transformer that builds the initial annotation of type Document that annotators can utilize later on.

In [None]:
document_assembler = DocumentAssembler()\
                        .setInputCol('text')\
                        .setOutputCol('document')\
                        .setCleanupMode("shrink")

## Sentence Detector
Finds sentence bounds in raw text.

In [None]:
sentence_detector = SentenceDetector()\
                      .setInputCols('document')\
                      .setOutputCol('sentence')

## Tokenizer
Tokenization is the process of breaking raw text into smaller pieces. Tokenization divides the raw text into words known as tokens. These tokens help to better understand the context or constructing the NLP model. Tokenization aids in determining the meaning of the text by evaluating the word sequence.

In [None]:
tokenizer = sparknlp.annotator.Tokenizer()\
                    .setInputCols(["sentence"])\
                    .setOutputCol("token")

## Lemmatizer
Lemmatization is a technique for reducing words to their normalized form. The transformation of lemmatization employs a dictionary to map distinct versions of a word back to its base format. So, using this method, we may reduce non-trivial inflections like "is," "was," and "were" down to the root "be."

In [None]:
lemmatizer = Lemmatizer()\
     .setInputCols(['token'])\
     .setOutputCol('lemma')\
     .setDictionary("lemmatization-it.txt", "->", "\t")

## Stopwords cleaner
Removes stopwords, that are not useful to our goal, from the text.

In [None]:
stopwords_cleaner = StopWordsCleaner.pretrained("stopwords_it", "it")\
     .setInputCols(['lemma'])\
     .setOutputCol('clean_lemma')

## Finisher

In [None]:
finisher = Finisher()\
                  .setInputCols("clean_lemma")\
                  .setOutputCols("pipeline_result")

## Fitting pipeline

In [None]:
nlpPipeline = Pipeline(stages=[document_assembler,
                               sentence_detector,
                               tokenizer,
                               lemmatizer,
                               stopwords_cleaner,
                               finisher
                               ])

In [None]:
%%time
df_fitted = nlpPipeline.fit(df).transform(df)

In [None]:
df_fitted.cache()
df_fitted.show()

In [None]:
df.unpersist()

## Word Embedding 
Word Embedding is a method that involves representing a word with a vector.

In [None]:
from pyspark.ml.feature import Word2Vec

In [None]:
embeddings = Word2Vec() \
            .setInputCol("pipeline_result") \
            .setOutputCol("embeddings")

In [None]:
word2vec_model = embeddings.fit(df_fitted)

In [None]:
vocabulary_size = word2vec_model.getVectors().count()
print(f'Vocabulary size is {vocabulary_size}')

In [None]:
df_target = word2vec_model.transform(df_fitted)

In [None]:
vector_size = len(df_target.take(1)[0][1])
print(df_target.take(1)[0]['embeddings'])
print(f'Size of each vector embedded is {vector_size}')

In [None]:
df_target.cache()
df_target.show(5)
df_target = df_target.select([col('sarcastic').alias('label'), col('embeddings').alias('features')])


df_fitted.unpersist()

In [None]:
df_target = df_target.filter(~col('label').contains('NaN'))

In [None]:
@udf("long")
def num_nonzeros(v):
    return v.numNonzeros()

df_target = df_target.where(num_nonzeros("features") > 0)

In [None]:
df_target = df_target.withColumn("label", col("label").cast('Float'))

In [None]:
df_target = df_target.withColumn("f", vector_to_array("features", "float64"))\
            .select(["label"] + [col("f")[i] for i in range(vector_size)])

In [None]:
df_target.write.option("header", True).csv(DATASET_PROCESSED)

# Training the model

In [94]:
from sklearn.model_selection import train_test_split
import tensorflow as tf
from keras_tqdm import TQDMNotebookCallback
from keras.layers.core import Dense, Dropout, Activation
from keras import optimizers, regularizers
from keras.models import Sequential
from keras.layers import Embedding, LSTM
from keras.callbacks import ModelCheckpoint, EarlyStopping

In [114]:
from sparktorch import SparkTorch, serialize_torch_obj
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.autograd import Variable
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [None]:
df_processed = spark.read.option("header", True).option("inferSchema",True).csv(DATASET_PROCESSED)

In [None]:
# change col types for training
columns_name = df_processed.columns

for col_name in df_processed.columns:
  df_processed = df_processed.withColumn((col_name), col(col_name).cast(LongType()))

In [100]:
train, test = df_processed.randomSplit([0.7, 0.3], 1234)

In [None]:
# preparing pipeline

vector_assembler = VectorAssembler(inputCols=columns_name[1:], outputCol='features')

## Simple NN

In [None]:
class TextClassificationModel(nn.Module):

    def __init__(self, vocab_size, embed_dim, num_class):
        super(TextClassificationModel, self).__init__()
        self.embedding = nn.EmbeddingBag(vocab_size, embed_dim, sparse=False)
        self.fc = nn.Linear(embed_dim, num_class)
        self.init_weights()

    def init_weights(self):
        initrange = 0.5
        self.embedding.weight.data.uniform_(-initrange, initrange)
        self.fc.weight.data.uniform_(-initrange, initrange)
        self.fc.bias.data.zero_()

    def forward(self, text, offsets):
        embedded = self.embedding(text, offsets)
        return self.fc(embedded)

num_class = 2
emsize = 100
model = TextClassificationModel(vocabulary_size, emsize, num_class).to(device)

In [None]:
torch_obj = serialize_torch_obj(
    model=network,
    criterion=nn.CrossEntropyLoss(),
    optimizer=torch.optim.Adam,
    lr=0.001
)

In [None]:
spark_model = SparkTorch(
    inputCol='features',
    labelCol='label',
    predictionCol='predictions',
    torchObj=torch_obj,
    iters=25,
    miniBatch=32,
    verbose=2
)

In [None]:
p = Pipeline(stages=[vector_assembler, spark_model]).fit(train)

In [None]:
predictions = p.transform(test).persist()
evaluator = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="predictions")
accuracy = evaluator.evaluate(predictions)
print("Area Under the ROC curve = %g" % evaluator.evaluate(predictions))

## LSTM

In [150]:
import torch.nn as nn

class LSTM_classifier(nn.Module):
    
    #define all the layers used in model
    def __init__(self):
        
        super().__init__()
        vocab_size = 2046
        embedding_dim = 100
        num_hidden_nodes = 32
        output_dim = 1
        n_layers = 2
        hidden_dim = 2
        bidirectional = True
        dropout = 0.2
        
        #embedding layer
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        
        #lstm layer
        self.lstm = nn.LSTM(embedding_dim, 
                           hidden_dim, 
                           num_layers=n_layers, 
                           bidirectional=bidirectional, 
                           dropout=dropout,
                           batch_first=True)
        
        #dense layer
        self.fc = nn.Linear(hidden_dim * 2, output_dim)
        
        #activation function
        self.act = nn.Sigmoid()
        
    def forward(self, text, text_lengths):
        
        #text = [batch size,sent_length]
        embedded = self.embedding(text)
        #embedded = [batch size, sent_len, emb dim]
      
        #packed sequence
        packed_embedded = nn.utils.rnn.pack_padded_sequence(embedded, text_lengths,batch_first=True)
        
        packed_output, (hidden, cell) = self.lstm(packed_embedded)
        #hidden = [batch size, num layers * num directions,hid dim]
        #cell = [batch size, num layers * num directions,hid dim]
        
        #concat the final forward and backward hidden state
        hidden = torch.cat((hidden[-2,:,:], hidden[-1,:,:]), dim = 1)
                
        #hidden = [batch size, hid dim * num directions]
        dense_outputs=self.fc(hidden)

        #Final activation function
        outputs=self.act(dense_outputs)
        
        return outputs

In [153]:
torch_obj = serialize_torch_obj(
    model=LSTMClassifier,
    criterion=nn.CrossEntropyLoss(),
    optimizer=torch.optim.Adam,
    lr=0.0001
)

In [154]:
spark_model = SparkTorch(
    inputCol='features',
    labelCol='label',
    predictionCol='predictions',
    torchObj=torch_obj,
    iters=2,
    miniBatch=32,
    verbose=2
)

In [161]:
p = Pipeline(stages=[vector_assembler, spark_model]).fit(train)

KeyboardInterrupt: ignored

In [160]:
predictions = p.transform(test).persist()
evaluator = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="predictions")
accuracy = evaluator.evaluate(predictions)
print("Area Under the ROC curve = %g" % evaluator.evaluate(predictions))

Train accuracy = 0.5
