## Requirements

In [2]:
# !pip install pyspark==3.1.1 pyarrow tldextract nltk
# !pip install plotly
# !pip install chart_studio

## Imports

In [1]:
# Make sure that spark uses the same python distribution to avoid serialization issues due to missing packages
%env PYSPARK_PYTHON=/opt/conda/bin/python
%env PYSPARK_DRIVER_PYTHON=/opt/conda/bin/python
%env JAVA_HOME=/usr/lib/jvm/adoptopenjdk-8-hotspot-amd64/jre

env: PYSPARK_PYTHON=/opt/conda/bin/python
env: PYSPARK_DRIVER_PYTHON=/opt/conda/bin/python
env: JAVA_HOME=/usr/lib/jvm/adoptopenjdk-8-hotspot-amd64/jre


In [2]:
import sys
import os

assert sys.executable == os.environ['PYSPARK_PYTHON'], \
    ('Please make sure that PYSPARK_PYTHON environment variable is set to %s' % sys.executable)
assert sys.executable == os.environ['PYSPARK_DRIVER_PYTHON'], \
    ('Please make sure that PYSPARK_DRIVER_PYTHON environment variable is set to %s' % sys.executable)

In [4]:
# Data handling and visualization
import pandas as pd
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt

import plotly as pl
import plotly.express as px
import plotly.graph_objects as go

import chart_studio
import chart_studio.plotly as py
chart_studio.tools.set_credentials_file(username='k_beans', api_key='0HwUP0lWpxz05CY8Jeth')

# PySpark
import pyspark
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType
from pyspark.sql import functions as f
from pyspark.sql.types import IntegerType, BooleanType, ArrayType, StringType, DoubleType

# Helpers
import os
import json

# External libraries
import tldextract

import nltk
nltk.download('stopwords', './data/')
from nltk.corpus import stopwords

[nltk_data] Downloading package stopwords to ./data/...
[nltk_data]   Package stopwords is already up-to-date!


## Load Spark

In [6]:
conf = pyspark.SparkConf().setMaster('local[*]').setAll([
    ('spark.driver.memory','16G'),
    ('spark.driver.maxResultSize', '16G'),
    ('spark.executor.memory', '24G'),
    ('spark.sql.execution.arrow.pyspark.enabled', True),
    ('spark.sql.execution.arrow.maxRecordsPerBatch', 10000),
    ('spark.local.dir', '/tmp')
])

spark = SparkSession.builder.config(conf=conf) \
                            .config('spark.jars.packages', 'com.johnsnowlabs.nlp:spark-nlp_2.12:3.3.2') \
                            .getOrCreate()
sc = spark.sparkContext
sc.setLogLevel('ERROR')
spark

# Load data

In [7]:
DATA_DIR    = 'data/'
QB_DATA_DIR = DATA_DIR + 'quotebank_data/'

QB_DATA_JSON_BZ2_PATH = QB_DATA_DIR + 'json/quotes-%s.json.bz2'
QB_DATA_PROC_PATH     = QB_DATA_DIR + 'processed/quotes-%s.parquet'
QB_DATA_EXTENDED_PATH = QB_DATA_DIR + '/home/bigdata/quotebank_extended.parquet' 

PROFANITY_PARQUET_PATH        = DATA_DIR + 'profanity_expanded.parquet'
PROFANITY_SCORES_PARQUET_PATH = DATA_DIR + 'profanity_scores.parquet'
SENTIMENTS_PARQUET_PATH       = DATA_DIR + 'sentiments.parquet'
EMPATH_PARQUET_PATH           = DATA_DIR + 'empath_ultimate.parquet'

SPK_ATTR_PATH             = DATA_DIR + 'speaker_attributes.parquet'
SPK_ATTR_WITH_LABELS_PATH = DATA_DIR + 'speaker_attributes_with_labels.parquet'

LABELS_PATH = DATA_DIR + 'wikidata_labels_descriptions_quotebank.csv.bz2'

YEARS = ['2015', '2016', '2017', '2018', '2019', '2020']

In [8]:
QB_JSON_SCHEMA = StructType.fromJson(json.loads(
    '''{  "type": "struct", 
        "fields": [
            {"name": "date", "type": "string", "nullable": true, "metadata": {}}, 
            {"name": "numOccurrences", "type": "long", "nullable": true,"metadata": {}}, 
            {"name": "phase", "type": "string", "nullable": true, "metadata": {}}, 
            {"name": "probas", "type": {"type": "array", "elementType": {"type": "array", "elementType": "string", 
                "containsNull": true}, "containsNull": true}, "nullable": true, "metadata": {}}, 
            {"name": "qids", "type": {"type": "array", "elementType": "string", "containsNull": true}, 
                "nullable": true, "metadata": {}}, 
            {"name": "quotation", "type": "string", "nullable": true, "metadata": {}}, 
            {"name": "quoteID", "type": "string", "nullable": true, "metadata": {}}, 
            {"name": "speaker", "type": "string", "nullable": true, "metadata": {}}, 
            {"name": "urls", "type": {"type": "array", "elementType": "string", "containsNull": true}, "nullable": true, 
                "metadata": {}}
            ]
    }'''))


In [9]:
def url_list_to_tlds(urls):
    try:
        if urls is None or len(urls) == 0:
            return None
        res = list(map(lambda url : tldextract.extract(url).domain, urls))
        return res if res else None
    except:
        return None

def fill_columns(iterator):
    for df in iterator:
        df['speaker_qid']   = df['qids'].apply(lambda qids : None if (qids is None or len(qids) == 0) else qids[0])         
        df['speaker_prob']  = df['probas'].apply(lambda p : 0.0 if (p is None or len(p) == 0) else float(p[0][1]))
        df['tlds']          = df['urls'].apply(url_list_to_tlds)
        yield df
        
def prepare_data(df):    
    df = df.withColumn('speaker_qid', f.lit(None).cast(StringType()))
    df = df.withColumn('speaker_prob', f.lit(None).cast(DoubleType()))
    df = df.withColumn('tlds', df.urls)
    df = df.mapInPandas(fill_columns, schema = df.schema)
    return df

def prepare_data_for_years(years):
    for y in years:
        df = spark.read.json(QB_DATA_JSON_BZ2_PATH % y, schema=QB_JSON_SCHEMA)
        df = df.withColumnRenamed('numOccurrences', 'occurrences')
        df = prepare_data(df)
        df.write.parquet(QB_DATA_PROC_PATH % y, 'overwrite')
        

In [None]:
prepare_data_for_years(YEARS)

In [10]:
qb_all = spark.read.parquet(QB_DATA_PROC_PATH % YEARS[0])
for y in YEARS[1:]:
    qb_all = qb_all.union(spark.read.parquet(QB_DATA_PROC_PATH % y))
    
profanity = spark.read.parquet(PROFANITY_PARQUET_PATH)
profanity_scores = spark.read.parquet(PROFANITY_SCORES_PARQUET_PATH)
empath = spark.read.parquet(EMPATH_PARQUET_PATH)
sentiments = spark.read.parquet(SENTIMENTS_PARQUET_PATH)

                                                                                

In [11]:
profanity.groupby('censored', 'profanity').count().show()

                                                                                

+--------+---------+---------+
|censored|profanity|    count|
+--------+---------+---------+
|     0.0|        0|113818503|
|     0.0|        1|   780678|
|     1.0|        1|   365490|
+--------+---------+---------+



In [12]:
empath_ext = empath \
    .withColumn('has_pos_emotion', f.when(f.col('positive_emotion') == 0, 0).otherwise(1)) \
    .withColumn('has_neg_emotion', f.when(f.col('negative_emotion') == 0, 0).otherwise(1)) 

In [13]:
empath_ext.groupby('has_pos_emotion', 'has_neg_emotion').count().show()

                                                                                

+---------------+---------------+--------+
|has_pos_emotion|has_neg_emotion|   count|
+---------------+---------------+--------+
|              1|              0|12505906|
|              1|              1| 3271197|
|              0|              0|89357914|
|              0|              1|10449240|
+---------------+---------------+--------+



In [14]:
sentiments.groupby('sentiment').count().show()



+---------+--------+
|sentiment|   count|
+---------+--------+
|        0|   68959|
|        1|50350768|
|       -1|64544944|
+---------+--------+



                                                                                

In [11]:
sentiments.join(empath_ext, on='quoteID').groupby('sentiment', 'has_pos_emotion', 'has_neg_emotion').count()\
    .sort('sentiment', 'has_pos_emotion', 'has_neg_emotion').show()



+---------+---------------+---------------+--------+
|sentiment|has_pos_emotion|has_neg_emotion|   count|
+---------+---------------+---------------+--------+
|       -1|              0|              0|50496242|
|       -1|              0|              1| 5806235|
|       -1|              1|              0| 6547476|
|       -1|              1|              1| 1694991|
|        0|              0|              0|   64394|
|        0|              0|              1|    1985|
|        0|              1|              0|    2064|
|        0|              1|              1|     516|
|        1|              0|              0|38267137|
|        1|              0|              1| 4591720|
|        1|              1|              0| 5927459|
|        1|              1|              1| 1564452|
+---------+---------------+---------------+--------+



                                                                                

In [12]:
sentiments.withColumn('filtered_sentiment', f.when(f.col('confidence') < 0.6, 0).otherwise(f.col('sentiment'))) \
    .join(empath_ext, on='quoteID').groupby('filtered_sentiment', 'has_pos_emotion', 'has_neg_emotion').count()\
    .sort('filtered_sentiment', 'has_pos_emotion', 'has_neg_emotion').show()



+------------------+---------------+---------------+--------+
|filtered_sentiment|has_pos_emotion|has_neg_emotion|   count|
+------------------+---------------+---------------+--------+
|                -1|              0|              0| 4847163|
|                -1|              0|              1|  315645|
|                -1|              1|              0|  294105|
|                -1|              1|              1|   42795|
|                 0|              0|              0|81762980|
|                 0|              0|              1| 9910340|
|                 0|              1|              0|11838230|
|                 0|              1|              1| 3156397|
|                 1|              0|              0| 2217630|
|                 1|              0|              1|  173955|
|                 1|              1|              0|  344664|
|                 1|              1|              1|   60767|
+------------------+---------------+---------------+--------+



                                                                                

In [13]:
print('# records in quotebank      ', qb_all.count())
print('# records in empath         ', empath.count())
print('# records in profanity      ', profanity.count())
print('# records in prfanity_scores', profanity_scores.count())
print('# records in sentiments     ', sentiments.count())


                                                                                

# records in quotebank       115584257
# records in empath          115584257
# records in profanity       114964671
# records in prfanity_scores 114964671
# records in sentiments      114964671


---

#### Extend quotebank data with additional columns and save the intemediate dataframe for faster processing

In [14]:
qb = qb_all \
        .join(profanity, on='quoteID') \
        .join(profanity_scores \
                  .withColumnRenamed('scores', 'profanity_score') \
                  .select('quoteID', 'profanity_score'), on='quoteID') \
        .join(empath, on='quoteID') \
        .join(sentiments, on='quoteID') \
        .select('quoteID', 'date', 'occurrences', 'quotation', 'speaker', 'speaker_qid', 
                'speaker_prob', 'tlds', 'profanity', 'profanity_score', 'censored', 
                'sentiment', 'confidence', 'positive_emotion', 'negative_emotion')

In [None]:
qb.write.parquet(QB_DATA_EXTENDED_PATH, 'overwrite')

                                                                                