### Importing pyspark library adn other necessary for the project

In [49]:
import pyspark
from pyspark.sql import SQLContext, SparkSession
from pyspark.sql.types import StructType, StructField, FloatType, BooleanType, LongType
from pyspark.sql.types import DoubleType, IntegerType, StringType, TimestampType, ArrayType
from pyspark.sql import functions as f
from pyspark.conf import SparkConf

In [50]:
%pip install transformers

Note: you may need to restart the kernel to use updated packages.


In [51]:
from transformers import AutoTokenizer, AutoModelForSequenceClassification
from transformers import pipeline

In [52]:
%pip install --upgrade numpy

Note: you may need to restart the kernel to use updated packages.


In [53]:
%pip install torch

Note: you may need to restart the kernel to use updated packages.


In [54]:
import torch
import torch.nn.functional as F

In [55]:
%pip install boto3

Note: you may need to restart the kernel to use updated packages.


In [56]:
import boto3
import configparser
import json
import datetime
from pyspark.sql.functions import *
from datetime import datetime
import re

In [57]:
config_obj = configparser.ConfigParser()
config_obj.read('config.ini')
# db_param = config_obj["postgresql"]
aws_user = config_obj["aws"]

In [58]:
s3 = boto3.client(
     service_name='s3',
     region_name=aws_user['region'],
     aws_access_key_id=aws_user['acc_key'],
     aws_secret_access_key=aws_user['secret_acc_key']
     )

In [59]:
obj = s3.get_object(Bucket='mylosh', Key=F'tweet/elon.json')
j = json.loads(obj['Body'].read().decode())

#### Creating a session, structure type and data frame

In [60]:
spark = (SparkSession
        .builder
        .appName('mylo')
        .getOrCreate())

In [61]:
schema_tweet = StructType([
StructField("author_id", IntegerType(),True),
StructField("username", StringType(),True),
StructField("author_followers", IntegerType(),True),
StructField("author_tweets", IntegerType(),True),
StructField("author_description", StringType(),True),
StructField("author_location", StringType(),True),
StructField("tweet_id", LongType(),True),
StructField("text", StringType(),True),
StructField("created_at", StringType(),True),
StructField("retweets", IntegerType(),True),
StructField("replies", IntegerType(),True),
StructField("likes", IntegerType(),True),
StructField("quote_count", IntegerType(),True),
StructField("conversation_id", LongType(),True),
StructField("attach_list", 
            StructType(
            [
                StructField("media_keys", StringType(), True)
            ]
                        )
            ,True),
                StructField("referenced_tweets_list", 
            StructType([
            StructField("data",
            StructType([
                StructField("type", StringType(), True),
                StructField("id", StringType(), True)
                ])
            , True)
            ])
            ,True)
])

In [62]:
df = spark.createDataFrame(j, schema=schema_tweet)

In [63]:
df.schema

StructType([StructField('author_id', IntegerType(), True), StructField('username', StringType(), True), StructField('author_followers', IntegerType(), True), StructField('author_tweets', IntegerType(), True), StructField('author_description', StringType(), True), StructField('author_location', StringType(), True), StructField('tweet_id', LongType(), True), StructField('text', StringType(), True), StructField('created_at', StringType(), True), StructField('retweets', IntegerType(), True), StructField('replies', IntegerType(), True), StructField('likes', IntegerType(), True), StructField('quote_count', IntegerType(), True), StructField('conversation_id', LongType(), True), StructField('attach_list', StructType([StructField('media_keys', StringType(), True)]), True), StructField('referenced_tweets_list', StructType([StructField('data', StructType([StructField('type', StringType(), True), StructField('id', StringType(), True)]), True)]), True)])

#### Removing garbage from text

In [64]:
my_punctuation = '!"$%&\'()*+,-./:;<=>?[\\]^_`{|}~@'

In [65]:
def remove_punctuation(tweet):
    t = re.sub(r'^https?:\/\/.*[\r\n]*', '', tweet)
    return t

In [66]:
def remove_links(tweet):
    tweet = re.sub(r'http\S+', '', tweet) 
    return tweet

In [67]:
def remove_users(tweet):
    tweet = re.sub('(RT\s@[A-Za-z]+[A-Za-z0-9-_]+)', '', tweet) 
    tweet = re.sub('(@[A-Za-z]+[A-Za-z0-9-_]+)', '', tweet) 
    return tweet

In [68]:
def remove_hashtag(tweet):
    tweet = re.sub('(#[A-Za-z]+[A-Za-z0-9-_]+)', '', tweet) 
    return tweet

In [69]:
remove_links=udf(remove_links, StringType())
remove_punctuation=udf(remove_punctuation, StringType())
remove_users = udf(remove_users, StringType())
remove_hashtag = udf(remove_hashtag, StringType())

In [70]:
df_clean_text = df.withColumn('cleaned_tweet_text', remove_links(df['text']))

In [71]:
df_clean_text = df_clean_text.withColumn('cleaned_tweet_text', remove_users(df_clean_text['cleaned_tweet_text']))

In [72]:
df_clean_text = df_clean_text.withColumn('cleaned_tweet_text', remove_punctuation(df_clean_text['cleaned_tweet_text']))

In [73]:
df_clean_text = df_clean_text.withColumn('cleaned_tweet_text', remove_hashtag(df_clean_text['cleaned_tweet_text']))

#### Inserting sentiment analysis in the data

In [74]:
model_name = 'distilbert-base-uncased-finetuned-sst-2-english'

In [75]:
model = AutoModelForSequenceClassification.from_pretrained(model_name)

In [76]:
tokenizer = AutoTokenizer.from_pretrained(model_name)

In [77]:
classifier = pipeline('sentiment-analysis', model=model, tokenizer=tokenizer)

In [78]:
# new model - cardiffnlp/twitter-roberta-base-sentiment-latest

In [79]:
model_name_berta = 'cardiffnlp/twitter-roberta-base-sentiment-latest'

In [80]:
model_berta = AutoModelForSequenceClassification.from_pretrained(model_name_berta)

Some weights of the model checkpoint at cardiffnlp/twitter-roberta-base-sentiment-latest were not used when initializing RobertaForSequenceClassification: ['roberta.pooler.dense.bias', 'roberta.pooler.dense.weight']
- This IS expected if you are initializing RobertaForSequenceClassification from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing RobertaForSequenceClassification from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).


In [81]:
tokenizer_berta = AutoTokenizer.from_pretrained(model_name_berta)

In [82]:
classifier_berta = pipeline('sentiment-analysis', model=model_berta, tokenizer=tokenizer_berta)

In [83]:
def berta_classifier_sentiment_lable(text):
    text = classifier_berta(text)
    return text[0]['label']

In [84]:
def berta_classifier_sentiment_score(text):
    text = classifier_berta(text)
    return text[0]['score']

In [85]:
berta_classifier_sentiment_lable = udf(berta_classifier_sentiment_lable, StringType())

In [86]:
berta_classifier_sentiment_score = udf(berta_classifier_sentiment_score, FloatType())

In [87]:
df_sentiment_label = df_clean_text.withColumn('berta_sent_analysis_label', berta_classifier_sentiment_lable(df_clean_text['cleaned_tweet_text']))

Py4JJavaError: An error occurred while calling o13.broadcast.
: java.lang.OutOfMemoryError: Java heap space
	at java.base/java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:64)
	at java.base/java.nio.ByteBuffer.allocate(ByteBuffer.java:363)
	at org.apache.spark.broadcast.TorrentBroadcast$.$anonfun$blockifyObject$1(TorrentBroadcast.scala:316)
	at org.apache.spark.broadcast.TorrentBroadcast$.$anonfun$blockifyObject$1$adapted(TorrentBroadcast.scala:316)
	at org.apache.spark.broadcast.TorrentBroadcast$$$Lambda$1710/0x0000000801705bb8.apply(Unknown Source)
	at org.apache.spark.util.io.ChunkedByteBufferOutputStream.allocateNewChunkIfNeeded(ChunkedByteBufferOutputStream.scala:87)
	at org.apache.spark.util.io.ChunkedByteBufferOutputStream.write(ChunkedByteBufferOutputStream.scala:75)
	at net.jpountz.lz4.LZ4BlockOutputStream.flushBufferedData(LZ4BlockOutputStream.java:225)
	at net.jpountz.lz4.LZ4BlockOutputStream.write(LZ4BlockOutputStream.java:178)
	at java.base/java.io.ObjectOutputStream$BlockDataOutputStream.write(ObjectOutputStream.java:1862)
	at java.base/java.io.ObjectOutputStream.write(ObjectOutputStream.java:714)
	at org.apache.spark.util.Utils$.$anonfun$copyStream$1(Utils.scala:373)
	at org.apache.spark.util.Utils$$$Lambda$1066/0x0000000801257250.apply$mcJ$sp(Unknown Source)
	at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.util.Utils$.copyStream(Utils.scala:380)
	at org.apache.spark.api.python.PythonBroadcast.$anonfun$writeObject$1(PythonRDD.scala:757)
	at org.apache.spark.api.python.PythonBroadcast$$Lambda$1715/0x000000080170f1d8.apply$mcJ$sp(Unknown Source)
	at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
	at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1470)
	at org.apache.spark.api.python.PythonBroadcast.writeObject(PythonRDD.scala:753)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at java.base/java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1070)
	at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1516)
	at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1438)
	at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1181)
	at java.base/java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:350)
	at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
	at org.apache.spark.broadcast.TorrentBroadcast$.$anonfun$blockifyObject$4(TorrentBroadcast.scala:321)


In [None]:
# df_new = df_new.withColumn('berta_sent_analysis_score', berta_classifier_sentiment_score(df_new['cleaned_tweet_text']))

In [None]:
df_sentiment_label.columns

In [None]:
df_sentiment_label.select(df_sentiment_label['tweet_id'], df_sentiment_label['cleaned_tweet_text'], df_sentiment_label['berta_sent_analysis_label']).show(5, False)

#### Transformation of the date column

In [None]:
df_date_manipul = df_sentiment_label.withColumn('created_at', (df_sentiment_label['created_at']/1000).cast(LongType()))

In [None]:
df_date_manipul = df_date_manipul.withColumn('created_at_date', f.from_utc_timestamp(df_date_manipul['created_at'].cast(TimestampType()), 'PST'))

In [None]:
df_date_manipul.select(df_date_manipul['tweet_id'], df_date_manipul['created_at'], df_date_manipul['created_at_date'], df_date_manipul['berta_sent_analysis_label']).show(5, False)

In [None]:
df_final = df_date_manipul.drop('created_at')

#### Final data frame for inserting in the mongo DB

In [None]:
df_final.columns