In [None]:

from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.types import StructType, StructField, StringType, LongType, IntegerType
import sparknlp
spark = sparknlp.start() 
# sparknlp.start(gpu=True) >> for training on GPU
from sparknlp.base import *
from sparknlp.annotator import *
from pyspark.ml import Pipeline
from langdetect import detect
from pyspark.sql.functions import col, lit, concat_ws

from sklearn.metrics import classification_report
import requests
from warcio import ArchiveIterator
from bs4 import BeautifulSoup
import time
import pandas as pd
import re
import yfinance as yf
import boto3
import botocore
import random
import sys 
#PARAMETERS

numcrawlsforrun = 1
batch_size_max = sys.maxsize -1
num_records_percrawl = 150 #number of recors to attempt to extract from each crawl
ticker = 'SPY'
#read in financewordlist.csv into the list
wordlist = pd.read_csv('./sentdat/topics.csv', header=None)[0].tolist()
wordlist.extend(yf.Ticker(ticker).info['longName'].split())


#start spark sesssion
spark = SparkSession.builder.appName("sentimentanalysis")\
.config("spark.jars.packages","com.johnsnowlabs.nlp:spark-nlp_2.12:4.2.3")\
.getOrCreate()
# .config("spark.driver.memory","8G")\
# .config("spark.driver.maxResultSize", "2G")\
# .config("spark.jars", "file:///home/ubuntu/sparknlp.jar")\
# .config("spark.driver.extraClassPath", "file:///home/ubuntu/sparknlp.jar")\
# .config("spark.executor.extraClassPath", "file:///home/ubuntu/sparknlp.jar")\

###GETTING WARC FILE NAMES FROM S3, GRABBING A RANDOM SAMPLE OF THEM
s3 = boto3.resource('s3')
my_bucket = s3.Bucket('commoncrawl')
warcs = []
for object in my_bucket.objects.filter(Prefix='crawl-data/CC-NEWS/'):
    if object.key.endswith('.warc.gz'):
        warcs.append(object.key)

#choose 100 random warcs
randomwarcs = random.sample(warcs, numcrawlsforrun)

for index, warc in enumerate(randomwarcs):
    randomwarcs[index] = 'https://data.commoncrawl.org/' + warc

#function to convert time from commoncrawl format to y-m-d
def convert_header_date(date):
    return time.strftime('%Y-%m-%d', time.strptime(date, '%Y-%m-%dT%H:%M:%SZ'))


#obtaining stock data from yahoo finance from 2019 to current date.
currentdate = time.strftime("%Y-%m-%d")
stockdata = yf.download(ticker, start='2010-01-01', end=currentdate)['Adj Close']

#creating scehma to store text and prices
data = StructType([\
  StructField("text", StringType(), True),
    StructField("price", StringType(), True),  
]
)



In [9]:
def contains_stock(plaintext, stklist=wordlist): 
    try:
        for stk in stklist:
            if plaintext.find(stk) != -1:
                return True
        return False
    except:
        raise Exception("issue with wordlist")

In [10]:
# creating the main rdd to store the data
df = spark.createDataFrame(spark.sparkContext.emptyRDD(), data)
list_of_rows_batch = []
rows_batch_len = 0
recordsfetched = 0
failures = 0

for warc_url in randomwarcs:
    response = requests.get(warc_url, stream=True)
    if response.ok!=True:
        raise Exception("Error downloading WARC file")
    records = ArchiveIterator(response.raw, arc2warc=True)
    #what this should do is write each record's plaintexxt to a csv file
    for record in records:
        if record.rec_type == 'response':
            try: 
                html = record.content_stream().read() .decode('utf-8')
                plaintext = BeautifulSoup(html, 'lxml').get_text()
                plaintext = re.sub(r'\s+', ' ', plaintext)
                plaintext = re.sub(r'[^a-zA-Z0-9\s]', '', plaintext).lower()

                #obtains plaintext from the html
                if detect(plaintext) == 'en' and len(plaintext) > 150 and contains_stock(plaintext) == False:  #TODO add classifier here later to ensure its a financial article
                    date = record.rec_headers.get_header('WARC-Date')
                    date = convert_header_date(date)
                    # append the plaintext and price to the batch
                    if date in stockdata.index:
                        list_of_rows_batch.append({'text':plaintext, 'price':float(stockdata[date])})
                        recordsfetched += 1
                        rows_batch_len += 1
                    else:
                        print('date not in stockdata',date)
                        #likely a weekend or holiday, so we will just skip the entire warc
                        break
                else:
                    recordsfetched += 1                          
            except:
                recordsfetched += 1  # because if the entire warc file is not in english or wrong date, we still want to move on to the next one
                failures += 1
                #print("attempt record: ", record.rec_headers.get_header('WARC-Target-URI'), " failed")
                pass

        if rows_batch_len >= batch_size_max: 
            batchdf = spark.createDataFrame(list_of_rows_batch, data)
            print("union started")
            df = df.union(batchdf)
            print("union done")
            print(df.count())
            rows_batch_len = 0
            list_of_rows_batch = []
        if recordsfetched >= num_records_percrawl:
            recordsfetched = 0
            print("warc done")
            break

    #finishing up for the last batch in it wasn't full and num batches wasnt maxed out.
if rows_batch_len > 0:
    print(rows_batch_len)
    batchdf = spark.createDataFrame(list_of_rows_batch, data)
    df = df.union(batchdf)
    print("size of data: ", df.count())
    rows_batch_len = 0
print("done")
print("failures: ", failures)
   

warc done
38


[Stage 6:>                                                        (0 + 12) / 12]

size of data:  38
done
failures:  7


                                                                                

In [7]:
import math
ratio_com_yfin = 1
math.floor(df.count()*ratio_com_yfin)

0

In [19]:
#NOW THE DATA IS IN THE SPARK DATAFRAME. wE ARE TRAINING CLASSIFICATION MODEL ON FINANCIAL NEWS DATA and commoncrawl data
#which has been sorted by keywords to ensure it is not financial news.
#credit to the financial news articles at https://www.kaggle.com/datasets/jeet2016/us-financial-news-articles
#read in all the json files into a dataframe from 2018_01_112b52537b67659ad3609a234388c50a

articles = spark.read.json('./data/2018_01_112b52537b67659ad3609a234388c50a/')


                                                                                

In [20]:

articles = articles.withColumn('price', lit(0))
articles = articles.withColumn('financial', lit(1))
cols = articles.columns
for item in ['text', 'price', 'financial']:
    cols.remove(item)
articles = articles.drop(*cols)


In [21]:
#write articles to a parquet file
articles.write.parquet('./articlespar.parquet')

                                                                                

In [22]:
articles = spark.read.parquet('./articlespar.parquet')


                                                                                

57802

In [13]:


df = df.withColumn('financial', lit(0))
df = df.union(articles)

#split the data into training and testing
train, test = df.randomSplit([0.8, 0.2], seed=3204123)

In [14]:


# preprocess the text data
document_assembler = DocumentAssembler()\
    .setInputCol("text")\
    .setOutputCol("document")

tokenizer = Tokenizer()\
    .setInputCols(["document"])\
    .setOutputCol("token")

normalizer = Normalizer()\
    .setInputCols(["token"])\
    .setOutputCol("normalized")\
    .setLowercase(True)

stopwords_cleaner = StopWordsCleaner()\
    .setInputCols("normalized")\
    .setOutputCol("cleanTokens")\
    .setCaseSensitive(False)

lemma = LemmatizerModel.pretrained("lemma_antbnc")\
    .setInputCols(["cleanTokens"])\
    .setOutputCol("lemma")

word_embeddings = BertEmbeddings\
    .pretrained('bert_base_cased', 'en') \
    .setInputCols(["document",'lemma'])\
    .setOutputCol("embeddings")\

# word_embeddings = AlbertEmbeddings.pretrained('albert_base_uncased', 'en') \ #lighter eight bert embeddings
#     .setInputCols(["document",'lemma'])\
#     .setOutputCol("embeddings")\

# https://nlp.johnsnowlabs.com/docs/en/transformers#bertsentenceembeddings? better for sentence embeddings for later models, this one words is better
#https://nlp.johnsnowlabs.com/docs/en/transformers#debertaembeddings
#lots of transfoemrs to choose from for later tasks, for this one lightweight bert might be the best
embeddingsSentence = SentenceEmbeddings()\
    .setInputCols(["document", "embeddings"])\
    .setOutputCol("sentence_embeddings")\
    .setPoolingStrategy("AVERAGE")

classifierdl = ClassifierDLApproach()\
    .setInputCols(["sentence_embeddings"])\
    .setOutputCol("financial_model_pred")\
    .setLabelColumn("financial")\
    .setMaxEpochs(5)\
        .setEnableOutputLogs(True)\
    .setLr(0.001)\

DLpipeline = Pipeline(
    stages = [
        document_assembler,
        tokenizer,
        normalizer,
        stopwords_cleaner,
        lemma,
        word_embeddings,
        embeddingsSentence,
        classifierdl
    ])


lemma_antbnc download started this may take some time.
Approximate size to download 907.6 KB
[ | ]lemma_antbnc download started this may take some time.
Approximate size to download 907.6 KB
[ / ]Download done! Loading the resource.
[OK!]
bert_base_cased download started this may take some time.
Approximate size to download 389.1 MB
[ | ]bert_base_cased download started this may take some time.
Approximate size to download 389.1 MB
[ / ]Download done! Loading the resource.
[ | ]

2022-12-10 21:03:31.746316: I external/org_tensorflow/tensorflow/core/platform/cpu_feature_guard.cc:151] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.


[OK!]


In [15]:
DLpipelineModel = DLpipeline.fit(train)
DLpipelineModel.save("dl_model")
# model = PipelineModel.load("dl_model")
print("done training")
test_predict = DLpipelineModel.transform(test)



results = test_predict.select('text','price', 'financial','financial_model_pred.result')
results = results.withColumn('result', results['result'].getItem(0).cast('float'))

results = results.withColumn('result', results['result'].cast('float'))
print("done predicting, here are results on the test set")
print(classification_report(results.select('financial').collect(), results.select('result').collect()), 'green')

#https://github.com/JohnSnowLabs/spark-nlp-workshop/blob/master/jupyter/transformers/HuggingFace%20in%20Spark%20NLP%20-%20BERT.ipynb
#https://towardsdatascience.com/text-classification-in-spark-nlp-with-bert-and-universal-sentence-encoders-e644d618ca32
#can get bert from there, then create a piepline that uses the bert model to get embeddings, then use the embeddings to train a classifier
#then we conver this to a script, upload to emr and get a large scale model.

2022-12-10 21:05:40.291373: I external/org_tensorflow/tensorflow/cc/saved_model/reader.cc:43] Reading SavedModel from: /tmp/a0d89cc9ddf7_classifier_dl7804348254998463337
2022-12-10 21:05:40.690361: I external/org_tensorflow/tensorflow/cc/saved_model/reader.cc:107] Reading meta graph with tags { serve }
2022-12-10 21:05:40.690901: I external/org_tensorflow/tensorflow/cc/saved_model/reader.cc:148] Reading SavedModel debug info (if present) from: /tmp/a0d89cc9ddf7_classifier_dl7804348254998463337
2022-12-10 21:05:42.023701: I external/org_tensorflow/tensorflow/cc/saved_model/loader.cc:228] Restoring SavedModel bundle.
2022-12-10 21:05:43.255010: I external/org_tensorflow/tensorflow/cc/saved_model/loader.cc:212] Running initialization op on SavedModel bundle at path: /tmp/a0d89cc9ddf7_classifier_dl7804348254998463337
2022-12-10 21:05:43.503337: I external/org_tensorflow/tensorflow/cc/saved_model/loader.cc:301] SavedModel load for tags { serve }; Status: success: OK. Took 3212453 microsecon

Training started - epochs: 5 - learning_rate: 0.001 - batch_size: 64 - training_examples: 112 - classes: 2
Epoch 1/5 - 0.44s - loss: 1.7652658 - acc: 1.0 - batches: 2
Epoch 2/5 - 0.02s - loss: 1.3402622 - acc: 1.0 - batches: 2
Epoch 3/5 - 0.02s - loss: 1.0547364 - acc: 1.0 - batches: 2
Epoch 4/5 - 0.02s - loss: 0.87432295 - acc: 1.0 - batches: 2
Epoch 5/5 - 0.02s - loss: 0.80400145 - acc: 1.0 - batches: 2
done training
done predicting, here are results on the test set




              precision    recall  f1-score   support

           0       1.00      1.00      1.00        10
           1       1.00      1.00      1.00        16

    accuracy                           1.00        26
   macro avg       1.00      1.00      1.00        26
weighted avg       1.00      1.00      1.00        26
 green


                                                                                

In [53]:
#NEED TO PROCESS THE BAG OF BIG WORDS, dont need to run this every time
#http://mpqa.cs.pitt.edu/lexicons/subj_lexicon/ from this source for academic sentiment analysis
#copy sentiment-big.tff to a txt file
#then use this to create a csv file

sentimentprocess = pd.read_csv('./sentdat/sentiment-big.txt', sep=' ', header=None)
#remove all columns except 5 and 0
sentimentprocess = sentimentprocess[[2,5]]

#strip type= from column 0
sentimentprocess[2] = sentimentprocess[2].str.replace('word1=', '')
#strip priorpolarity= from column 5
sentimentprocess[5] = sentimentprocess[5].str.replace('priorpolarity=', '')
sentimentprocess.head(10)
#save to csv
sentimentprocess.to_csv('./sentdat/sentiment-big.csv', index=False)

In [54]:


print("Spark NLP version", sparknlp.version())
print("Apache Spark version:", spark.version)

#model = NerDLModel.pretrained('ner_dl')

# doc_df=documentAssembler.transform(df)
# doc_df.show()
#has been transformed into a text, vector column
#https://nlp.johnsnowlabs.com/2022/09/06/finclf_bert_sentiment_en.html

# sort data by language? use different models for different languages?
#train own model for english with amazon reviews?

document_assembler = DocumentAssembler() \
    .setInputCol('text') \
    .setOutputCol('document')

sentence_detector = SentenceDetector() \
    .setInputCols(["document"]) \
    .setOutputCol("sentence")

tokenizer = Tokenizer() \
    .setInputCols(['sentence']) \
    .setOutputCol('token')

# normalizer = Normalizer() \
#     .setInputCols(['token']) \
#     .setOutputCol('normalized') \

lemmatizer = Lemmatizer()\
    .setInputCols(['token'])\
    .setOutputCol('lemma')\
  .setDictionary("./sentdat/lemmas_small.txt", key_delimiter="->", value_delimiter="\t")
#! wget -N https://s3.amazonaws.com/auxdata.johnsnowlabs.com/public/resources/en/lemma-corpus-small/lemmas_small.txt -P /tmp
SentimentDetector = sentiment.SentimentDetector() \
    .setInputCols(['lemma', 'sentence'])\
    .setOutputCol('sentiment_score')\
    .setDictionary('./sentdat/sentiment-big.csv', ',')\

pipeline = Pipeline(stages=[
    document_assembler, 
    sentence_detector,
    tokenizer,
    lemmatizer,
    SentimentDetector
])







Spark NLP version 4.2.3
Apache Spark version: 3.3.1


In [None]:
! wget -N https://s3.amazonaws.com/auxdata.johnsnowlabs.com/public/resources/en/sentiment-corpus/default-sentiment-dict.txt -P ./sentdat 


In [None]:
! wget -N https://s3.amazonaws.com/auxdata.johnsnowlabs.com/public/resources/en/lemma-corpus-small/lemmas_small.txt -P ./sentdat

In [55]:
newdf = pipeline.fit(df).transform(df)
#getting a new df with sentiment score

In [56]:


newdf = newdf.withColumn("sentiment_score", concat_ws(",", "sentiment_score.result"))
newdf.show(2)


+--------------------+------------------+--------------------+--------------------+--------------------+--------------------+---------------+
|                text|             price|            document|            sentence|               token|               lemma|sentiment_score|
+--------------------+------------------+--------------------+--------------------+--------------------+--------------------+---------------+
| Chicago Med Seas...|394.69000244140625|[{document, 0, 14...|[{document, 1, 64...|[{token, 1, 7, Ch...|[{token, 1, 7, Ch...|       positive|
| Roberto Guilherm...|394.69000244140625|[{document, 0, 60...|[{document, 1, 13...|[{token, 1, 7, Ro...|[{token, 1, 7, Ro...|       positive|
+--------------------+------------------+--------------------+--------------------+--------------------+--------------------+---------------+
only showing top 2 rows



                                                                                

In [57]:
print("positives", newdf.filter(col('sentiment_score') == 'positive').count())
print("negatives", newdf.filter(col('sentiment_score') == 'negative').count())


                                                                                

positives 77




negatives 23


                                                                                

In [None]:
spark.stop()