## **Task B**

In [1]:
!apt-get update -qq > /dev/null
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-3.1.1/spark-3.1.1-bin-hadoop2.7.tgz
!tar xf spark-3.1.1-bin-hadoop2.7.tgz
!pip install -q findspark

In [2]:
!pip install sentence-transformers -q

[K     |████████████████████████████████| 81kB 4.5MB/s 
[K     |████████████████████████████████| 2.1MB 8.5MB/s 
[K     |████████████████████████████████| 1.2MB 35.7MB/s 
[K     |████████████████████████████████| 901kB 43.3MB/s 
[K     |████████████████████████████████| 3.3MB 47.7MB/s 
[?25h  Building wheel for sentence-transformers (setup.py) ... [?25l[?25hdone


In [3]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop2.7"

import findspark
findspark.init()

from pyspark import SparkContext
sc = SparkContext(appName="YourTest", master="local[*]")

In [4]:
!wget https://ndownloader.figshare.com/files/16188500 -q
!tar -xvf 16188500
!unzip -qn rumoureval2019/rumoureval-2019-training-data.zip

rumoureval2019/
rumoureval2019/final-eval-key.json
rumoureval2019/LICENSE
rumoureval2019/home_scorer_macro.py
rumoureval2019/README
rumoureval2019/rumoureval-2019-training-data.zip
rumoureval2019/rumoureval-2019-test-data.zip


In [5]:
#### IMPORT ####

from pyspark import SparkContext
from pyspark.ml.feature import Tokenizer
from pyspark.sql.functions import udf, col, expr, explode, struct, regexp_replace, collect_list, lit
from pyspark.sql.types import *
from pyspark.sql import functions as F
from pyspark.sql import SparkSession, Row
from pyspark.ml.feature import * 
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.ml.linalg import VectorUDT, Vectors
from functools import partial
from sentence_transformers import SentenceTransformer, util
import re


In [6]:
spark = SparkSession.builder.appName("YourTest").getOrCreate()

In [7]:
#### DOWNLOAD SOURCE TWEETS && REPLY TWEETS ###

path = "./rumoureval-2019-training-data/twitter-english/*/*/source-tweet/*.json"
source_tweets_df = spark.read.json(path)
path = "./rumoureval-2019-training-data/twitter-english/*/*/replies/*.json"
reply_tweets_df = spark.read.json(path)


In [8]:
#### DOWNLOAD TRUE LABELS ###

schema = StructType([StructField("subtaskaenglish", MapType(StringType(), StringType())),StructField("subtaskbenglish", MapType(StringType(), StringType()))])
dev_key = spark.read.schema(schema).option("multiline", "true").json('rumoureval-2019-training-data/dev-key.json')
train_key = spark.read.schema(schema).option("multiline", "true").json('rumoureval-2019-training-data/train-key.json')

# TRUE LABELS FOR TASK B

dev_key_taskB = dev_key.select(explode(col("subtaskbenglish")))
train_key_taskB = train_key.select(explode(col("subtaskbenglish")))

In [9]:
### DATA CLEANING  ##

def clean(df):

    def replace_url(text):
        return re.sub(r'https?:\/\/.*[\r\n]*', 'url_url_url', text, flags=re.MULTILINE)

    replace_url_udf = udf(replace_url, StringType())

    df = df.withColumn('cleaned_text', replace_url_udf(col('text')))

    ### REMOVE @
    
    df = df.withColumn('cleaned_text', regexp_replace(col('cleaned_text'), r'(@([A-Za-z0-9]+))', ''))

    return df

In [10]:
words_dict = dict(
      has_belief_words = set("assume believe apparent per-haps suspect think thought consider".split()),
      has_report_words = set("evidence source official footage capture assert told claim according".split()),
      has_doubt_words = set("wonder allege unsure guess speculate doubt".split()),
      has_knowledge = set("confirm definitely admit".split()),
      has_denial_words = set("refuse reject rebuff dismiss contradict oppose".split()),
      has_curse_words = set("lol rofl lmfao yeah stfu aha wtf shit".split()),
      has_question_words = set("when which what who how whom why whose".split()),
      has_other_words = set("irresponsible careless liar false witness untrue neglect integrity murder fake".split())
)

In [11]:
## FEATURE EXTRACTION ##

def extract_features(df):
    hasqmark = udf(lambda x: int('?' in x), IntegerType())
    df = df.withColumn('hasqmark', hasqmark(col('cleaned_text')))

    hasmark = udf(lambda x: int('!' in x), IntegerType())
    df = df.withColumn('hasmark', hasmark(col('cleaned_text')))

    hasperiod = udf(lambda x: int('.' in x), IntegerType())
    df = df.withColumn('hasperiod', hasperiod(col('cleaned_text')))

    df = df.withColumn('hashtags_count', expr('size(entities.hashtags)'))

    df = df.withColumn('mentions_count', expr('size(entities.user_mentions)'))

    df = df.withColumn('hasurls', expr('cast(size(entities.urls) >= 1 AS int)'))

    df = df.withColumn('hasmedia', expr('cast(size(entities.media) >= 1 AS int)'))

    df = df.withColumn('friends_count', expr('user.friends_count'))

    df = df.withColumn('followers_count', expr('user.followers_count'))

    ratiocapital = udf(lambda x: sum(map(str.isupper, x))/(len(x)+1), FloatType())
    df = df.withColumn('ratiocapital', ratiocapital(col('cleaned_text')))

    charlen = udf(lambda x: len(x), IntegerType())
    df = df.withColumn('charlen', charlen(col('cleaned_text')))

    df = df.withColumn('issource', expr('CAST((in_reply_to_status_id IS NULL) AS INT)'))

    ## TOKENIZATION ##

    tokenizer = Tokenizer(inputCol="cleaned_text", outputCol="words")
    temp_df = tokenizer.transform(df)

    #TODO remove stop words?

    remover = StopWordsRemover(inputCol="words", outputCol="filtered")
    df = remover.transform(temp_df)

    hashingTF = HashingTF(inputCol="filtered", outputCol="rawhashtf", numFeatures=100)
    df = hashingTF.transform(df)

    idf = IDF(inputCol="rawhashtf", outputCol="hashtf")
    idfModel = idf.fit(df)
    df = idfModel.transform(df)

    ## TOKEN FEATURE EXTRACTION ##
    wordlen = udf(lambda words: len(words), IntegerType())
    df = df.withColumn('wordlen', wordlen(col('words')))

    def contains(y, x):
      return int(bool(len(y.intersection(set(x)))))

    for name, ys in words_dict.items():
        df = df.withColumn(name, udf(partial(contains, ys), IntegerType())(col('words')))

    #TODO negation words etc.
    negationwords = ['not', 'no', 'nobody', 'nothing', 'none', 'never',
              'neither', 'nor', 'nowhere', 'hardly', 'scarcely',
              'barely', 'don', 'isn', 'wasn', 'shouldn', 'wouldn',
              'couldn', 'doesn']
    def negacount(words):
      c = 0
      for negationword in negationwords:
        if negationword in words:
          c += 1
      return c
    negationcount = udf(negacount, IntegerType())
    df = df.withColumn('hasnegation', negationcount(col('words')))

    @udf('float')
    def count_upper(x):
        a = x.split()
        return sum(map(str.isupper, a))/(len(a) + 1)

    df = df.withColumn('allcapsratio', count_upper(col('cleaned_text')))

    ## SENTENCE VECTORS  ##
    ## BERT ##
    model = SentenceTransformer('stsb-distilbert-base').eval()

    temp = (df.select(['id','cleaned_text']).collect())

    j = model.encode([x.cleaned_text for x in temp], convert_to_tensor=True)

    bert_rows = [Row(id=temp[i]['id'], glove=Vectors.dense(j[i])) for i in range(len(j))]

    bert_schema = StructType([
      StructField("id", LongType(), True),
      StructField("bert_vector", VectorUDT(), True)
    ])

    d_df = spark.createDataFrame(bert_rows, bert_schema)

    df  = df.join(d_df, on='id')

    return df

In [12]:
source_preprocessed = extract_features(clean(source_tweets_df))
reply_preprocessed = extract_features(clean(reply_tweets_df))
print(source_preprocessed.count())
print(reply_preprocessed.count())

HBox(children=(FloatProgress(value=0.0, max=244715968.0), HTML(value='')))


325
5243


In [13]:
@udf('float')
def cosine_udf(x,y):
  return max(0,float(1 - x.dot(y)/(x.norm(2)*y.norm(2))))

temp = source_preprocessed.selectExpr('id AS in_reply_to_status_id', 'bert_vector AS bert_vector_source')
reply_preprocessed = reply_preprocessed.join(temp, 'in_reply_to_status_id')

reply_preprocessed = reply_preprocessed.withColumn('wrt_source_bert', cosine_udf(col('bert_vector'), col('bert_vector_source')))


In [14]:
all_features = """hasmark hasqmark hasperiod hashtags_count mentions_count hasurls hasmedia
ratiocapital charlen issource wordlen hasnegation allcapsratio hashtf
favorite_count friends_count followers_count""".split() + list(words_dict.keys())

source_features = ['avg_charlen','avg_has_belief_words','avg_hashtags_count','avg_has_doubt_words','avg_has_denial_words','avg_has_other_words',
          'avg_has_curse_words','avg_hasqmark','avg_has_knowledge','avg_has_report_words',
          'avg_hasurls','avg_mentions_count','avg_has_question_words','avg_hasmark','avg_hasnegation','avg_wordlen','avg_hasmedia','avg_ratiocapital','avg_allcapsratio','avg_wrt_source_bert']

temp = reply_preprocessed.groupBy('in_reply_to_status_id').agg({'mentions_count':'avg', 'hasqmark':'avg', 'hasmedia':'avg', 'hasurls':'avg',
                             'charlen':'avg', 'wordlen':'avg', 'hashtags_count':'avg', 'has_question_words':'avg', 
                             'hasnegation':'avg', 'has_knowledge':'avg', 'hasmark':'avg', 'has_report_words':'avg',
                             'has_curse_words':'avg', 'has_belief_words':'avg', 'has_doubt_words':'avg', 'has_denial_words':'avg',
                             'has_other_words':'avg', 'ratiocapital':'avg', 'allcapsratio':'avg', 'wrt_source_bert':'avg'})

In [15]:
def rename_cols(agg_df, ignore_first_n=1):
    """changes the default spark aggregate names `avg(colname)` 
    to something a bit more useful.
    """
    delimiters = "(", ")"
    split_pattern = '|'.join(map(re.escape, delimiters))
    splitter = partial(re.split, split_pattern)
    split_agg = lambda x: '_'.join(splitter(x))[0:-ignore_first_n]
    renamed = map(split_agg, agg_df.columns[ignore_first_n:])
    renamed = zip(agg_df.columns[ignore_first_n:], renamed)
    for old, new in renamed:
        agg_df = agg_df.withColumnRenamed(old, new)
    return agg_df
temp = rename_cols(temp)

In [16]:
source_all_features = source_preprocessed.join(temp, source_preprocessed.id == temp.in_reply_to_status_id)

train_all_taskb = train_key_taskB.withColumnRenamed('key', 'id').withColumnRenamed('value', 'label').join(
    source_all_features.select(['id'] + all_features + source_features), 'id'
)

dev_taskb = dev_key_taskB.withColumnRenamed('key', 'id').withColumnRenamed('value', 'label').join(
    source_all_features.select(['id'] + all_features + source_features), 'id'
)

In [17]:
print(train_key_taskB.count())
print(train_all_taskb.count())
print(dev_key_taskB.count())
print(dev_taskb.count())

327
296
38
28


In [18]:
# ML TEST

from sklearn.metrics import classification_report, confusion_matrix
from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline

inputCols = """hasmark hasqmark hasperiod hashtags_count mentions_count hasurls
hasmedia ratiocapital charlen issource wordlen hasnegation allcapsratio hashtf
favorite_count friends_count followers_count""".split() + list(words_dict.keys()) + source_features

assembler = VectorAssembler(inputCols=inputCols,outputCol="features")
scaler = StandardScaler(inputCol='features', outputCol='scaledFeatures', withStd=True, withMean=False)
indexer = StringIndexer(inputCol="label", outputCol="label_index")
pipeline = Pipeline(stages=[assembler, scaler, indexer])

processor = pipeline.fit(train_all_taskb)

temp = processor.transform(train_all_taskb)
train_all_taskb_features_df = temp.select(['features', 'label_index'])

temp = processor.transform(dev_taskb)
dev_taskb_features_df = temp.select(['features', 'label_index'])

In [19]:
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit, CrossValidator, TrainValidationSplitModel
lr = LogisticRegression(labelCol='label_index', maxIter=10)
paramGrid = ParamGridBuilder()\
    .addGrid(lr.regParam, [0.01, 0.001]) \
    .addGrid(lr.fitIntercept, [False, True])\
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])
paramGrid = paramGrid.build()

In [20]:
evaluator = MulticlassClassificationEvaluator(metricName="f1", labelCol='label_index')
tvs = CrossValidator(estimator=lr,
                           estimatorParamMaps=paramGrid,
                           evaluator=evaluator,
                           numFolds=3)

In [21]:
# train model
# trainer = LogisticRegression(maxIter=10, regParam=0.001, labelCol='label_index')
# model = trainer.fit(train_all_taskb_features_df, )
model = tvs.fit(train_all_taskb_features_df)

In [None]:
# # compute f1 on the dev set
# result = model.transform(dev_taskb_features_df)
# predictionAndLabels = result.select("prediction", "label_index")
# evaluator = MulticlassClassificationEvaluator(metricName="f1", labelCol='label_index')
# print("Test set f1 = " + str(evaluator.evaluate(predictionAndLabels)))

Test set f1 = 0.36860282574568287


In [22]:
result = model.transform(dev_taskb_features_df)
y_true = result.select(['label_index']).collect()
y_pred = result.select(['prediction']).collect()
print(classification_report(y_true, y_pred))

              precision    recall  f1-score   support

         0.0       0.30      0.75      0.43         8
         1.0       0.50      0.25      0.33         8
         2.0       0.50      0.17      0.25        12

    accuracy                           0.36        28
   macro avg       0.43      0.39      0.34        28
weighted avg       0.44      0.36      0.32        28



In [27]:
best_params = {param[0].name: param[1] for param in model.bestModel.extractParamMap().items()}
best_lr = LogisticRegression(**best_params)
best_model = best_lr.fit(train_all_taskb_features_df)

In [29]:
result = best_model.transform(dev_taskb_features_df)
y_true = result.select(['label_index']).collect()
y_pred = result.select(['prediction']).collect()
print(classification_report(y_true, y_pred))

              precision    recall  f1-score   support

         0.0       0.30      0.75      0.43         8
         1.0       0.50      0.25      0.33         8
         2.0       0.50      0.17      0.25        12

    accuracy                           0.36        28
   macro avg       0.43      0.39      0.34        28
weighted avg       0.44      0.36      0.32        28



In [None]:
all_pipeline = Pipeline(stages=[processor, best_model])
!rm -rf subtaskB_model/
all_pipeline.write().overwrite().save('subtaskB_model')

In [None]:
!zip -r subtaskB_model subtaskB_model/