In [None]:
# Install java
!apt-get update -qq
!apt-get install -y openjdk-8-jdk-headless -qq > /dev/null
!java -version

# Install pyspark
!pip install --ignore-installed -q pyspark==2.4.4

# Install Sparknlp
!pip install --ignore-installed spark-nlp

openjdk version "11.0.10" 2021-01-19
OpenJDK Runtime Environment (build 11.0.10+9-Ubuntu-0ubuntu1.18.04)
OpenJDK 64-Bit Server VM (build 11.0.10+9-Ubuntu-0ubuntu1.18.04, mixed mode, sharing)
[K     |████████████████████████████████| 215.7MB 63kB/s 
[K     |████████████████████████████████| 204kB 20.9MB/s 
[?25h  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
Collecting spark-nlp
[?25l  Downloading https://files.pythonhosted.org/packages/1b/d9/44fd438e15fa9a02c0e3b3ca9eaffc509fc626592f7a03ce05d8f156d448/spark_nlp-2.7.5-py2.py3-none-any.whl (139kB)
[K     |████████████████████████████████| 143kB 5.3MB/s 
[?25hInstalling collected packages: spark-nlp
Successfully installed spark-nlp-2.7.5


In [None]:
import pandas as pd
import numpy as np
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["PATH"] = os.environ["JAVA_HOME"] + "/bin:" + os.environ["PATH"]
import json
from pyspark.ml import Pipeline
from pyspark.ml import PipelineModel
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from sparknlp.annotator import *
from sparknlp.base import *
import sparknlp
from sparknlp.pretrained import PretrainedPipeline

In [None]:
from google.colab import files
uploaded = files.upload()

Saving YearAgo.csv to YearAgo.csv


In [None]:
import io
tweet_df = pd.read_csv(io.BytesIO(uploaded['YearAgo.csv']))

In [None]:
tweet_df = tweet_df.iloc[:,1:]

In [None]:
text_list = tweet_df['text']
text_list.shape

(1995,)

In [None]:
text_list2 = list(set(text_list))
len(text_list2)

1995

In [None]:
spark = sparknlp.start()

In [None]:
MODEL_NAME='sentimentdl_use_twitter'

In [None]:
documentAssembler = DocumentAssembler()\
    .setInputCol("text")\
    .setOutputCol("document")
    
use = UniversalSentenceEncoder.pretrained(name="tfhub_use", lang="en")\
 .setInputCols(["document"])\
 .setOutputCol("sentence_embeddings")


sentimentdl = SentimentDLModel.pretrained(name=MODEL_NAME, lang="en")\
    .setInputCols(["sentence_embeddings"])\
    .setOutputCol("sentiment")

nlpPipeline = Pipeline(
      stages = [
          documentAssembler,
          use,
          sentimentdl
      ])


tfhub_use download started this may take some time.
Approximate size to download 923.7 MB
[OK!]
sentimentdl_use_twitter download started this may take some time.
Approximate size to download 11.4 MB
[OK!]


In [None]:
empty_df = spark.createDataFrame([['']]).toDF("text")

pipelineModel = nlpPipeline.fit(empty_df)

df = spark.createDataFrame(pd.DataFrame({"text":text_list}))
result = pipelineModel.transform(df)

In [None]:
result.select(F.explode(F.arrays_zip('document.result', 'sentiment.result')).alias("cols")) \
.select(F.expr("cols['0']").alias("document"),
        F.expr("cols['1']").alias("sentiment")).show(truncate=False)

+-----------------------------------------------------------------------------------------------------------------------------------+---------+
|document                                                                                                                           |sentiment|
+-----------------------------------------------------------------------------------------------------------------------------------+---------+
| PBCJ says the PM s press briefing is now scheduled for 6pm today  COVID19 JaCovid19 Jamaica                                       |positive |
| Pretty special day for me today  Remember  Flatten the curve   Well it s one year since    I published that  our first            |positive |
| In other    news Schools across England are reopening again  today   No school mask requirements  No school air quality           |negative |
|    NCDC s Director of Surveillance   I recieved our 1st COVID19 vaccine shot todaySoon  all  staff   he                           |neg

In [None]:
result

DataFrame[text: string, document: array<struct<annotatorType:string,begin:int,end:int,result:string,metadata:map<string,string>,embeddings:array<float>>>, sentence_embeddings: array<struct<annotatorType:string,begin:int,end:int,result:string,metadata:map<string,string>,embeddings:array<float>>>, sentiment: array<struct<annotatorType:string,begin:int,end:int,result:string,metadata:map<string,string>,embeddings:array<float>>>]

In [None]:
result_df = result.toPandas()

In [None]:
from google.colab import files
result_df.to_csv('YearAgo_sentiment.csv') 
files.download('YearAgo_sentiment.csv')

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

In [None]:
# Change these to wherever you want your inputs and outputs to go
INPUT_FILE_PATH = "inputs"
OUTPUT_FILE_PATH = "outputs"

In [None]:
# Transforms the raw text into a document readable by the later stages of the
# pipeline
document_assembler = DocumentAssembler() \
    .setInputCol('text') \
    .setOutputCol('document')

# Separates the document into sentences
sentence_detector = SentenceDetector() \
    .setInputCols(['document']) \
    .setOutputCol('sentences')# \
    #.setDetectLists(True)

# Separates sentences into individial tokens (words)
tokenizer = Tokenizer() \
    .setInputCols(['sentences']) \
    .setOutputCol('tokens') \
    .setContextChars(['(', ')', '?', '!', '.', ','])

# The keyphrase extraction model. Change MinNGrams and MaxNGrams to set the
# minimum and maximum length of possible keyphrases, and change NKeywords to
# set the amount of potential keyphrases identified per document.
keywords = YakeModel() \
    .setInputCols('tokens') \
    .setOutputCol('keywords') \
    .setMinNGrams(2) \
    .setMaxNGrams(5) \
    .setNKeywords(100) \
    .setStopWords(StopWordsCleaner().getStopWords())

# Assemble all of these stages into a pipeline, then fit the pipeline on an
# empty data frame so it can be used to transform new inputs.
pipeline = Pipeline(stages=[
    document_assembler, 
    sentence_detector,
    tokenizer,
    keywords
])
empty_df = spark.createDataFrame([[""]]).toDF('text')
pipeline_model = pipeline.fit(empty_df)

# LightPipeline is faster than Pipeline for small datasets
light_pipeline = LightPipeline(pipeline_model)

In [None]:
def adjusted_score(row, pow=2.5):
    """This function adjusts the scores of potential key phrases to give better
    scores to phrases with more words (which will naturally have worse scores
    due to the nature of the model). You can change the exponent to reward
    longer phrases more or less. Higher exponents reward longer phrases."""
    return ((row.result.count(' ') + 1) ** pow /
            (float(row.metadata['score']) + 0.1))

def get_top_ranges(phrases, input_text):
    """Combine phrases that overlap."""
    starts = sorted([row['begin'] for row in phrases])
    ends = sorted([row['end'] for row in phrases])

    ranges = [[starts[0], None]]
    for i in range(len(starts) - 1):
        if ends[i] < starts[i + 1]:
            ranges[-1][1] = ends[i]
            ranges.append([starts[i + 1], None])
    ranges[-1][1] = ends[-1]
    return [{
        'begin': range[0],
        'end': range[1],
        'phrase': input_text[range[0]:range[1] + 1]
     } for range in ranges]

def remove_duplicates(phrases):
    """Remove phrases that appear multiple times."""
    i = 0
    while i < len(phrases):
        j = i + 1
        while j < len(phrases):
            if phrases[i]['phrase'] == phrases[j]['phrase']:
                phrases.remove(phrases[j])
            j += 1
        i += 1

    return phrases

def get_output_lists(df_row):
    """Returns a tuple with two lists of five phrases each. The first combines
    key phrases that overlap to create longer kep phrases, which is best for
    highlighting key phrases in text, and the seocnd is simply the keyphrases
    with the highest scores, which is best for summarizing a document."""
    keyphrases = []
    for row in df_row.keywords:
        keyphrases.append({
            'begin': row.begin,
            'end': row.end,
            'phrase': row.result,
            'score': adjusted_score(row)
        })
    keyphrases = sorted(keyphrases, key=lambda x: x['score'], reverse=True)

    return (
        get_top_ranges(keyphrases[:20], df_row.text)[:5],
        remove_duplicates(keyphrases[:10])[:5]
    )

In [None]:
df2 = spark.createDataFrame(pd.DataFrame({'text': text_list2}))

In [None]:
result2 = light_pipeline.transform(df2)

In [None]:
result2_2 = result2['text','keywords']

In [None]:
result2_2.show()

+--------------------+--------------------+
|                text|            keywords|
+--------------------+--------------------+
| Joining us for t...|[[keyword, 1, 10,...|
|LargestVaccineDri...|[[keyword, 29, 40...|
|A year today WHO ...|[[keyword, 2, 11,...|
| The world s 1 yr...|[[keyword, 15, 25...|
|There s still tim...|[[keyword, 8, 17,...|
|A year ago today ...|[[keyword, 2, 9, ...|
| today announced ...|[[keyword, 1, 15,...|
|Nearly 17 lakh va...|[[keyword, 10, 21...|
|It s been exactly...|[[keyword, 10, 20...|
| One year ago tod...|[[keyword, 1, 8, ...|
|Today marks one y...|[[keyword, 0, 10,...|
|Are you 56 59  Di...|[[keyword, 104, 1...|
|In case you misse...|[[keyword, 31, 41...|
|One year ago toda...|[[keyword, 0, 7, ...|
|HCPs  Attend toda...|[[keyword, 0, 11,...|
| One year ago tod...|[[keyword, 1, 8, ...|
|It s great to see...|[[keyword, 22, 40...|
| 44 new positive ...|[[keyword, 4, 15,...|
|Check out the new...|[[keyword, 14, 24...|
|My parents just g...|[[keyword,

In [None]:
ya = result2_2.toPandas()

In [None]:
result2_2.to_csv('YearAgo_sentiment.csv') 