<a href="https://colab.research.google.com/github/bartek-wojcik/usa_election_2020_sentiment_analysis/blob/master/sentiment_analysis_dl.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>


#### Colab Setup

In [None]:
# Install java
import os
! apt-get install -y openjdk-8-jdk-headless -qq > /dev/null
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["PATH"] = os.environ["JAVA_HOME"] + "/bin:" + os.environ["PATH"]
! java -version

# Install pyspark
! pip install --ignore-installed -q pyspark==2.4.4

# Install Spark NLP
! pip install --ignore-installed -q spark-nlp==2.6.5


openjdk version "1.8.0_275"
OpenJDK Runtime Environment (build 1.8.0_275-8u275-b01-0ubuntu1~18.04-b01)
OpenJDK 64-Bit Server VM (build 25.275-b01, mixed mode)


#### Dataset

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


## Sentiment Analysis Pipeline



#### 1. Call necessary imports

In [None]:
#Imports
import sys
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.sql.functions import array_contains
from pyspark.ml import Pipeline, PipelineModel
from sparknlp.annotator import *
from sparknlp.pretrained import PretrainedPipeline

#### 2. Load SparkSession

In [None]:
import sparknlp
from pyspark.sql import SQLContext

session = SparkSession.builder \
    .appName("Spark NLP")\
    .master("local[4]")\
    .config("spark.driver.memory","8G")\
    .config("spark.executor.memory", "8G")\
    .config("spark.driver.maxResultSize", "0") \
    .config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp_2.11:2.6.5")\
    .config("spark.kryoserializer.buffer.max", "1000M")\
    .getOrCreate()

print("Spark NLP version: ", sparknlp.version())
print("Apache Spark version: ", session.version)

sc = session.sparkContext
sqlContext = SQLContext(sc)

Spark NLP version:  2.6.5
Apache Spark version:  2.4.4


#### 3. Load data

In [None]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

candidate = 'donaldtrump'

data = session.read\
        .option("sep", ",")\
        .option("multiLine", "true")\
        .option("header", "true")\
        .option("escape", '"')\
        .option("delimiter", ",")\
        .csv(f'/content/drive/MyDrive/Datasets/hashtag_{candidate}.csv')

data.show()

+-------------------+--------------------+--------------------+-----+-------------+-------------------+--------------------+--------------------+----------------+--------------------+-------------------+--------------------+--------------------+------------------+-------------------+-----------------+--------------------+-------------+--------------------+----------+--------------------+
|         created_at|            tweet_id|               tweet|likes|retweet_count|             source|             user_id|           user_name|user_screen_name|    user_description|     user_join_date|user_followers_count|       user_location|               lat|               long|             city|             country|    continent|               state|state_code|        collected_at|
+-------------------+--------------------+--------------------+-----+-------------+-------------------+--------------------+--------------------+----------------+--------------------+-------------------+---------------

#### 4. Clean data


In [None]:
def prepare_df(df):
    df = df.drop('user_description')
    df = df.filter(df.tweet.isNotNull())
    df = df.filter(df.collected_at.isNotNull())
    df = df.withColumn('tweet', regexp_replace('tweet', 'http\S+', ""))
    df = df.withColumn('tweet', regexp_replace('tweet', '(@[A-Za-z]+[A-Za-z0-9-_]+)', ""))
    df = df.withColumn('tweet', regexp_replace('tweet', '[^a-zA-Z0-9 -]', ""))
    df = df.withColumn('tweet', regexp_replace('tweet', '/\s\s+/g', " "))
    df = df.withColumnRenamed('tweet', 'text')
    return df

data = prepare_df(data)

data.show()


+-------------------+--------------------+--------------------+-----+-------------+-------------------+--------------------+--------------------+----------------+-------------------+--------------------+--------------------+------------------+-------------------+-----------------+--------------------+-------------+--------------------+----------+--------------------+
|         created_at|            tweet_id|                text|likes|retweet_count|             source|             user_id|           user_name|user_screen_name|     user_join_date|user_followers_count|       user_location|               lat|               long|             city|             country|    continent|               state|state_code|        collected_at|
+-------------------+--------------------+--------------------+-----+-------------+-------------------+--------------------+--------------------+----------------+-------------------+--------------------+--------------------+------------------+-----------------

#### 5. Create pipelines


In [None]:
dl_pipeline = PretrainedPipeline('analyze_sentimentdl_use_twitter', lang='en')
dl_pipeline.model.stages

analyze_sentimentdl_use_twitter download started this may take some time.
Approx size to download 928.3 MB
[OK!]


[document_4e525b9c8a4a,
 UNIVERSAL_SENTENCE_ENCODER_4de71669b7ec,
 SentimentDLModel_bf15d1ab889b]

In [None]:
%%time
dl_pipeline.annotate("I like to play football")

CPU times: user 20.1 ms, sys: 5.96 ms, total: 26 ms
Wall time: 640 ms


{'document': ['I like to play football'],
 'sentence_embeddings': ['I like to play football'],
 'sentiment': ['positive']}

In [None]:
def dl_analyze(df):
  result = dl_pipeline.transform(df)
  return result
 
dl_result = dl_analyze(data)

In [None]:
import pyspark.sql.functions as F

def results(df):
    return df.withColumn("sentiment_res", df.sentiment.getItem(0).result)

In [None]:
%%time

results(dl_result).withColumn('sentiment_res', regexp_replace('sentiment_res', 'na', 'neutral'))\
    .select(['created_at', 'tweet_id', 'text', 'likes', 'retweet_count', 'source', 'user_join_date', 'user_followers_count', 'user_location', 'lat', 'long', 'city', 'country', 'continent', 'state', 'state_code', 'collected_at', 'sentiment_res'])\
    .toPandas().to_csv(f'/content/drive/MyDrive/Datasets/{candidate}_sentiment_dl.csv', header=True, index=False)

CPU times: user 25.3 s, sys: 1.54 s, total: 26.8 s
Wall time: 48min 6s
