In [8]:
!pip install keras



In [1]:
import pandas as pd

df = pd.read_csv('bitcoin_tweets.csv')
df.shape

(151012, 4)

In [9]:
import tensorflow as tf
from tensorflow.keras.layers import Input, Dense, Dropout, Embedding, LayerNormalization, MultiHeadAttention, \
    TimeDistributed, concatenate

2023-04-16 22:01:23.637265: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  SSE4.1 SSE4.2 AVX AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [10]:
import tensorflow as tf
from tensorflow.keras.layers import Input, Dense, Dropout, Embedding, LayerNormalization, MultiHeadAttention, \
    TimeDistributed, concatenate


# Transformer Encoder
class TransformerEncoder(tf.keras.Model):
    def __init__(self, num_layers, d_model, num_heads, dff, input_vocab_size, maximum_position_encoding, rate=0.1):
        super().__init__()

        self.d_model = d_model
        self.num_layers = num_layers

        self.embedding = Embedding(input_vocab_size, d_model)
        self.positional_encoding = self.positional_encoding(maximum_position_encoding, d_model)
        self.dropout = Dropout(rate)

        self.enc_layers = [EncoderLayer(d_model, num_heads, dff, rate) for _ in range(num_layers)]

    def call(self, x, training, mask):
        seq_len = tf.shape(x)[1]
        x = self.embedding(x)
        x *= tf.math.sqrt(tf.cast(self.d_model, tf.float32))
        x = self.positional_encoding[:, :seq_len, :] + x
        x = self.dropout(x, training=training)

        for i in range(self.num_layers):
            x = self.enc_layers[i](x, training, mask)

        return x

    def positional_encoding(self, position, d_model):
        angle_rads = self.get_angles(position, tf.range(d_model)[tf.newaxis, :], d_model)
        sines = tf.math.sin(angle_rads[:, 0::2])
        cosines = tf.math.cos(angle_rads[:, 1::2])
        pos_encoding = tf.concat([sines, cosines], axis=-1)
        pos_encoding = pos_encoding[tf.newaxis, ...]
        return tf.cast(pos_encoding, dtype=tf.float32)

    def get_angles(self, position, i, d_model):
        angle_rates = 1 / tf.pow(10000, (2 * (i // 2)) / tf.cast(d_model, tf.float32))
        return position * angle_rates[:, tf.newaxis]


class EncoderLayer(tf.keras.layers.Layer):
    def __init__(self, d_model, num_heads, dff, rate=0.1):
        super().__init__()

        self.mha = MultiHeadAttention(num_heads=num_heads, key_dim=d_model)
        self.ffn = tf.keras.Sequential([
            Dense(dff, activation='relu'),
            Dense(d_model)
        ])

        self.layernorm1 = LayerNormalization(epsilon=1e-6)
        self.layernorm2 = LayerNormalization(epsilon=1e-6)

        self.dropout1 = Dropout(rate)
        self.dropout2 = Dropout(rate)

    def call(self, x, training, mask):
        attn_output, _ = self.mha(x, x, x, mask)
        attn_output = self.dropout1(attn_output, training=training)
        out1 = self.layernorm1(x + attn_output)

        ffn_output = self.ffn(out1)
        ffn_output = self.dropout2(ffn_output, training=training)
        out2 = self.layernorm2(out1 + ffn_output)

        return out2


# Transformer Decoder
class TransformerDecoder(tf.keras.Model):
    def __init__(self, num_layers, d_model, num_heads, dff, target_vocab_size, maximum_position_encoding, rate=0.1):
        super().__init__()

        self.d_model = d_model
        self.num_layers = num_layers

        self.embedding = Embedding(target_vocab_size, d_model)
        self.position
        self.positional_encoding = self.positional_encoding(maximum_position_encoding, d_model)
        self.dropout = Dropout(rate)

        self.dec_layers = [DecoderLayer(d_model, num_heads, dff, rate) for _ in range(num_layers)]

    def call(self, x, enc_output, training, look_ahead_mask, padding_mask):
        seq_len = tf.shape(x)[1]
        attention_weights = {}

        x = self.embedding(x)
        x *= tf.math.sqrt(tf.cast(self.d_model, tf.float32))
        x = self.positional_encoding[:, :seq_len, :] + x
        x = self.dropout(x, training=training)

        for i in range(self.num_layers):
            x, block1, block2 = self.dec_layers[i](x, enc_output, training, look_ahead_mask, padding_mask)
            attention_weights[f'decoder_layer{i + 1}_block1'] = block1
            attention_weights[f'decoder_layer{i + 1}_block2'] = block2

        return x, attention_weights

    def positional_encoding(self, position, d_model):
        angle_rads = self.get_angles(position, tf.range(d_model)[tf.newaxis, :], d_model)
        sines = tf.math.sin(angle_rads[:, 0::2])
        cosines = tf.math.cos(angle_rads[:, 1::2])
        pos_encoding = tf.concat([sines, cosines], axis=-1)
        pos_encoding = pos_encoding[tf.newaxis, ...]
        return tf.cast(pos_encoding, dtype=tf.float32)

    def get_angles(self, position, i, d_model):
        angle_rates = 1 / tf.pow(10000, (2 * (i // 2)) / tf.cast(d_model, tf.float32))
        return position * angle_rates[:, tf.newaxis]


class DecoderLayer(tf.keras.layers.Layer):
    def init(self, d_model, num_heads, dff, rate=0.1):
        super().init()
        self.mha1 = MultiHeadAttention(num_heads=num_heads, key_dim=d_model)
        self.mha2 = MultiHeadAttention(num_heads=num_heads, key_dim=d_model)

        self.ffn = tf.keras.Sequential([
            Dense(dff, activation='relu'),
            Dense(d_model)
        ])

        self.layernorm1 = LayerNormalization(epsilon=1e-6)
        self.layernorm2 = LayerNormalization(epsilon=1e-6)
        self.layernorm3 = LayerNormalization(epsilon=1e-6)

        self.dropout1 = Dropout(rate)
        self.dropout2 = Dropout(rate)
        self.dropout3 = Dropout(rate)

    def call(self, x, enc_output, training, look_ahead_mask, padding_mask):
        attn1, block1 = self.mha1(x, x, x, look_ahead_mask)
        attn1 = self.dropout1(attn1, training=training)
        out1 = self.layernorm1(attn1 + x)

        attn2, block2 = self.mha2(enc_output, enc_output, out1, padding_mask)
        attn2 = self.dropout2(attn2, training=training)
        out2 = self.layernorm2(attn2 + out1)

        ffn_output = self.ffn(out2)
        ffn_output = self.dropout3(ffn_output, training=training)
        out3 = self.layernorm3(ffn_output + out2)

        return out3, block1, block2


class BERT(tf.keras.Model):
    def init(self, num_layers, d_model, num_heads, vocab_size, rate):
        super().__init__()

        self.d_model = d_model
        self.num_layers = num_layers

        self.embedding = TokenAndPositionEmbedding(d_model=d_model, maximum_position_encoding=512,
                                                   vocab_size=vocab_size)

        self.encoder_layers = [EncoderLayer(d_model=d_model, num_heads=num_heads, dff=4 * d_model) for _ in
                               range(num_layers)]

        self.dropout = Dropout(rate)

    def call(self, x, training):
        seq_len = tf.shape(x)[1]
        mask = self.create_padding_mask(x)

        x = self.embedding(x)
        x *= tf.math.sqrt(tf.cast(self.d_model, tf.float32))
        x = self.dropout(x, training=training)

        for i in range(self.num_layers):
            x = self.encoder_layers[i](x, mask, training)

        return x

    def create_padding_mask(self, seq):
        seq = tf.cast(tf.math.equal(seq, 0), tf.float32)
        return seq[:, tf.newaxis, tf.newaxis, :]


class EncoderLayer(tf.keras.layers.Layer):
    def init(self, d_model, num_heads, dff, rate=0.1):
        super().init()

        self.mha = MultiHeadAttention(num_heads=num_heads, key_dim=d_model)
        self.ffn = tf.keras.Sequential([
            Dense(dff, activation='relu'),
            Dense(d_model)
        ])

        self.layernorm1 = LayerNormalization(epsilon=1e-6)
        self.layernorm2 = LayerNormalization(epsilon=1e-6)

        self.dropout1 = Dropout(rate)
        self.dropout2 = Dropout(rate)

    def call(self, x, mask, training):
        attn_output, _ = self.mha(x, x, x, mask)
        attn_output = self.dropout1(attn_output, training=training)
        out1 = self.layernorm1(x + attn_output)

        ffn_output = self.ffn(out1)
        ffn_output = self.dropout2(ffn_output, training=training)
        out2 = self.layernorm2(out1 + ffn_output)

        return out2


class MultiHeadAttention(tf.keras.layers.Layer):
    def init(self, num_heads, key_dim, dropout_rate=0.1):
        super().init()
        self.num_heads = num_heads
        self.key_dim = key_dim
        self.dropout_rate = dropout_rate
        assert key_dim % self.num_heads == 0

        self.depth = self.key_dim // self.num_heads

        self.wq = Dense(key_dim)
        self.wk = Dense(key_dim)
        self.wv = Dense(key_dim)

        self.dense = Dense(key_dim)

        self.dropout = Dropout(dropout_rate)

    def split_heads(self, x, batch_size):
        x = tf.reshape(x, (batch_size, -1, self.num_heads, self.depth))
        return tf.transpose(x, perm=[0, 2, 1, 3])

    def scaled_dot_product_attention(self, q, k, v, mask):
        matmul_qk = tf.matmul(q, k, transpose_b=True)
        dk = tf.cast(tf.shape(k)[-1], tf.float32)
        scaled_attention_logits = matmul_qk / tf.math.sqrt(dk)

        if mask is not None:
            scaled_attention_logits += (mask * -1e9)

        attention_weights = tf.nn.softmax(scaled_attention_logits, axis=-1)
        output = tf.matmul(attention_weights, v)

        return output, attention_weights

    def call(self, query, key, value, mask=None):
        batch_size = tf.shape(query)[0]

        q = self.wq(query)
        k = self.wk(key)
        v = self.wv(value)

        q = self.split_heads(q, batch_size)
        k = self.split_heads(k, batch_size)
        v = self.split_heads(v, batch_size)

        scaled_attention, attention_weights = scaled_dot_product_attention(self, q, k, v, mask)

        scaled_attention = tf.transpose(scaled_attention, perm=[0, 2, 1, 3])
        concat_attention = tf.reshape(scaled_attention, (batch_size, -1, self.key_dim))

        output = self.dense(concat_attention)
        output = self.dropout(output, training=self.training)

        return output, attention_weights


class Decoder(tf.keras.layers.Layer):
    def init(self, num_layers, d_model, num_heads, dff, target_vocab_size, maximum_position_encoding, rate=0.1):
        super().init()

        self.d_model = d_model
        self.num_layers = num_layers

        self.embedding = TokenAndPositionEmbedding(d_model=d_model, maximum_position_encoding=maximum_position_encoding,
                                                   vocab_size=target_vocab_size)

        self.decoder_layers = [DecoderLayer(d_model=d_model, num_heads=num_heads, dff=dff) for _ in range(num_layers)]

        self.dropout = Dropout(rate)

    def call(self, x, enc_output, training, look_ahead_mask, padding_mask):
        seq_len = tf.shape(x)[1]
        attention_weights = {}

        x = self.embedding(x)
        x *= tf.math.sqrt(tf.cast(self.d_model, tf.float32))
        x = self.dropout(x, training=training)

        for i in range(self.num_layers):
            x, block1, block2 = self.decoder_layers[i](x, enc_output, training, look_ahead_mask, padding_mask)

            attention_weights[f'decoder_layer{i + 1}_block1'] = block1
            attention_weights[f'decoder_layer{i + 1}_block2'] = block2

        return x, attention_weights


class DecoderLayer(tf.keras.layers.Layer):
    def init(self, d_model, num_heads, dff, rate=0.1):
        super().init()

        self.mha1 = MultiHeadAttention(num_heads=num_heads, key_dim=d_model)
        self.mha2 = MultiHeadAttention(num_heads=num_heads, key_dim=d_model)

        self.ffn = tf.keras.Sequential([
            Dense(dff, activation='relu'),
            Dense(d_model)
        ])

        self.layernorm1 = LayerNormalization(epsilon=1e-6)
        self.layernorm2 = LayerNormalization(epsilon=1e-6)
        self.layernorm3 = LayerNormalization(epsilon=1e-6)

        self.dropout1 = Dropout(rate)
        self.dropout2 = Dropout(rate)
        self.dropout3 = Dropout(rate)

    def call(self, x, enc_output, training, look_ahead_mask, padding_mask):
        attn1, block1 = self.mha1(x, x, x, look_ahead_mask)
        attn1 = self.dropout1(attn1, training=training)
        out1 = self.layernorm1(attn1 + x)

        attn2, block2 = self.mha2(enc_output, enc_output, out1, padding_mask)
        attn2 = self.dropout2(attn2, training=training)
        out2 = self.layernorm2(attn2 + out1)

        ffn_output = self.ffn(out2)
        ffn_output = self.dropout3(ffn_output, training=training)
        out3 = self.layernorm3(ffn_output + out2)

        return out3, block1, block2


class TokenAndPositionEmbedding(tf.keras.layers.Layer):
    def __init__(self, d_model, maximum_position_encoding, vocab_size):
        super().__init__()
        self.d_model = d_model
        self.vocab_size = vocab_size
        self.maximum_position_encoding = maximum_position_encoding

        self.token_embedding = tf.keras.layers.Embedding(input_dim=vocab_size, output_dim=d_model)
        self.position_embedding = self.add_weight(shape=(maximum_position_encoding, d_model),
                                                  initializer='random_normal', trainable=True)

    def call(self, inputs):
        token_embed = self.token_embedding(inputs)
        positions = tf.range(start=0, limit=tf.shape(inputs)[1])  # Generate positions for each token
        positions = self.position_embedding(positions)  # Lookup the position embeddings for each position
        return token_embed + positions


class BertEncoder(tf.keras.layers.Layer):
    def init(self, num_layers, d_model, num_heads, dff, input_vocab_size, maximum_position_encoding, rate=0.1):
        super().init()

        self.d_model = d_model
        self.num_layers = num_layers

        self.embedding = TokenAndPositionEmbedding(d_model=d_model, maximum_position_encoding=maximum_position_encoding,
                                                   vocab_size=input_vocab_size)

        self.bert_layers = [BertLayer(d_model=d_model, num_heads=num_heads, dff=dff) for _ in range(num_layers)]

        self.dropout = Dropout(rate)

    def call(self, x, training, mask):
        attention_weights = {}

        x = self.embedding(x)
        x = self.dropout(x, training=training)

        for i in range(self.num_layers):
            x, block = self.bert_layers[i](x, training, mask)

            attention_weights[f'bert_layer{i + 1}'] = block

        return x, attention_weights


class BertLayer(tf.keras.layers.Layer):
    def init(self, d_model, num_heads, dff, rate=0.1):
        super().init()

        self.mha1 = MultiHeadAttention(num_heads=num_heads, key_dim=d_model)
        self.mha2 = MultiHeadAttention(num_heads=num_heads, key_dim=d_model)

        self.ffn = tf.keras.Sequential([
            Dense(dff, activation='relu'),
            Dense(d_model)
        ])

        self.layernorm1 = LayerNormalization(epsilon=1e-6)
        self.layernorm2 = LayerNormalization(epsilon=1e-6)
        self.layernorm3 = LayerNormalization(epsilon=1e-6)

        self.dropout1 = Dropout(rate)
        self.dropout2 = Dropout(rate)
        self.dropout3 = Dropout(rate)

    def call(self, x, training, mask):
        attn1, block1 = self.mha1(x, x, x, mask)
        attn1 = self.dropout1(attn1, training=training)
        out1 = self.layernorm1(attn1 + x)

        attn2, block2 = self.mha2(out1, out1, out1, mask)
        attn2 = self.dropout2(attn2, training=training)
        out2 = self.layernorm2(attn2 + out1)

        ffn_output = self.ffn(out2)
        ffn_output = self.dropout3(ffn_output, training=training)
        out3 = self.layernorm3(ffn_output + out2)

        return out3, block1, block2


class MultiHeadAttention(tf.keras.layers.Layer):
    def init(self, num_heads, key_dim):
        super().init()

        self.num_heads = num_heads

        self.key_dim = key_dim

        assert self.key_dim % self.num_heads == 0, "Key dimension must be divisible by number of heads"
        self.depth = self.key_dim // self.num_heads

        self.query_dense = Dense(units=self.key_dim)
        self.key_dense = Dense(units=self.key_dim)
        self.value_dense = Dense(units=self.key_dim)

        self.final_dense = Dense(units=self.key_dim)

    def split_heads(self, inputs, batch_size):
        inputs = tf.reshape(inputs, shape=(batch_size, -1, self.num_heads, self.depth))
        return tf.transpose(inputs, perm=[0, 2, 1, 3])

    def call(self, query, key, value, mask):
        batch_size = tf.shape(query)[0]

        query = self.query_dense(query)
        key = self
        key = self.key_dense(key)
        value = self.value_dense(value)

        query = self.split_heads(query, batch_size)
        key = self.split_heads(key, batch_size)
        value = self.split_heads(value, batch_size)

        scaled_attention_logits = tf.matmul(query, key, transpose_b=True)
        scaled_attention_logits /= tf.math.sqrt(tf.cast(self.depth, tf.float32))

        if mask is not None:
            scaled_attention_logits += (mask * -1e9)

        attention_weights = tf.nn.softmax(scaled_attention_logits, axis=-1)
        output = tf.matmul(attention_weights, value)

        output = tf.transpose(output, perm=[0, 2, 1, 3])
        concat_attention = tf.reshape(output, shape=(batch_size, -1, self.key_dim))
        output = self.final_dense(concat_attention)

        return output, attention_weights

In [5]:
!pip install nbformat



In [6]:
import pandas as pd
from transformers import pipeline

# Load the CSV file into a Pandas DataFrame
df = pd.read_csv('bitcoin_tweets.csv')

# Extract the 'Tweet' column from the DataFrame
tweets = df['Tweet'].tolist()

# Load the pre-trained sentiment analysis model
nlp = pipeline('sentiment-analysis', model='bert-base-uncased', tokenizer='bert-base-uncased')

# Predict the sentiment of each tweet and save the results to a new DataFrame
sentiment_results = nlp(tweets)
sentiment_df = pd.DataFrame(sentiment_results)

# Extract the subjectivity and polarity values from the 'score' column
subjectivity = sentiment_df['score'].apply(lambda x: 'subjective' if x > 0.5 else 'objective')
polarity = sentiment_df['label'].apply(lambda x: x.lower())

# Add the subjectivity and polarity columns to the original DataFrame
df['subjectivity'] = subjectivity
df['polarity'] = polarity

# Save the updated DataFrame to a new CSV file
df.to_csv('bitcoin_sentiments.csv', index=False)

Some weights of the model checkpoint at bert-base-uncased were not used when initializing BertForSequenceClassification: ['cls.seq_relationship.bias', 'cls.predictions.transform.dense.weight', 'cls.predictions.bias', 'cls.predictions.transform.LayerNorm.weight', 'cls.predictions.transform.dense.bias', 'cls.seq_relationship.weight', 'cls.predictions.decoder.weight', 'cls.predictions.transform.LayerNorm.bias']
- This IS expected if you are initializing BertForSequenceClassification from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertForSequenceClassification from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
Some weights of BertForSequenceClassification were not initialized from the model checkpoint at

Downloading:   0%|          | 0.00/466k [00:00<?, ?B/s]

KeyboardInterrupt: 

In [3]:
import pandas as pd
import torch
import transformers
from transformers import BertTokenizer, BertForSequenceClassification, AdamW

# load the dataset
df = pd.read_csv('bitcoin_tweets.csv')
tweets = df['Tweet'].tolist()

# load the tokenizer and model
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
model = BertForSequenceClassification.from_pretrained('bert-base-uncased', num_labels=2)

# encode the inputs
inputs = tokenizer(tweets, padding=True, truncation=True, return_tensors='pt')

# split the dataset into training and validation sets
train_size = int(0.8 * len(df))
val_size = len(df) - train_size
train_inputs, val_inputs = inputs[:train_size], inputs[train_size:]

# define the training parameters
batch_size = 16
num_epochs = 5
optimizer = AdamW(model.parameters(), lr=2e-5)

# train the model
train_dataset = torch.utils.data.TensorDataset(train_inputs['input_ids'], train_inputs['attention_mask'])
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

val_dataset = torch.utils.data.TensorDataset(val_inputs['input_ids'], val_inputs['attention_mask'])
val_loader = torch.utils.data.DataLoader(val_dataset, batch_size=batch_size)

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)

for epoch in range(num_epochs):
    # training loop
    model.train()
    total_loss = 0
    for batch in train_loader:
        batch = tuple(t.to(device) for t in batch)
        inputs = {'input_ids': batch[0], 'attention_mask': batch[1]}
        optimizer.zero_grad()
        outputs = model(**inputs)
        loss = outputs.loss
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    avg_train_loss = total_loss / len(train_loader)

    # validation loop
    model.eval()
    total_loss = 0
    with torch.no_grad():
        for batch in val_loader:
            batch = tuple(t.to(device) for t in batch)
            inputs = {'input_ids': batch[0], 'attention_mask': batch[1]}
            outputs = model(**inputs)
            loss = outputs.loss
            total_loss += loss.item()
        avg_val_loss = total_loss / len(val_loader)

    print(f'Epoch {epoch + 1}: Train Loss = {avg_train_loss:.4f}, Val Loss = {avg_val_loss:.4f}')

# save the trained model
model.save_pretrained('bitcoin_sentiment_model')


Downloading:   0%|          | 0.00/232k [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/28.0 [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/570 [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/440M [00:00<?, ?B/s]

Some weights of the model checkpoint at bert-base-uncased were not used when initializing BertForSequenceClassification: ['cls.seq_relationship.bias', 'cls.predictions.transform.dense.weight', 'cls.predictions.bias', 'cls.predictions.transform.LayerNorm.weight', 'cls.predictions.transform.dense.bias', 'cls.seq_relationship.weight', 'cls.predictions.decoder.weight', 'cls.predictions.transform.LayerNorm.bias']
- This IS expected if you are initializing BertForSequenceClassification from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertForSequenceClassification from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
Some weights of BertForSequenceClassification were not initialized from the model checkpoint at

KeyError: 'Indexing with integers (to access backend Encoding for a given batch index) is not available when using Python based tokenizers'

In [None]:
import torch
from transformers import AutoTokenizer, AutoModelForSequenceClassification

# load the tokenizer and model
tokenizer = AutoTokenizer.from_pretrained('bert-base-uncased')
model = AutoModelForSequenceClassification.from_pretrained('bitcoin_sentiment_model')

# define a function to predict the sentiment of a single tweet
def predict_sentiment(tweet):
    # encode the tweet
    encoded_tweet = tokenizer(tweet, padding=True, truncation=True, return_tensors='pt')

    # feed the encoded tweet into the model and get the output logits
    output = model(**encoded_tweet)
    logits = output.logits

    # calculate the subjectivity and polarity scores from the logits
    subjectivity = torch.softmax(logits[0], dim=0)[0].item()
    polarity = torch.softmax(logits[0], dim=0)[1].item()

    return subjectivity, polarity

# example usage
tweet = "Bitcoin is the future of money!"
subjectivity, polarity = predict_sentiment(tweet)
print(f'Tweet: {tweet}')
print(f'Subjectivity: {subjectivity:.2f}')
print(f'Polarity: {polarity:.2f}')


In [11]:
!pip install torch

Collecting torch
  Downloading torch-2.0.0-cp310-none-macosx_10_9_x86_64.whl (139.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m139.8/139.8 MB[0m [31m10.7 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25hCollecting sympy
  Downloading sympy-1.11.1-py3-none-any.whl (6.5 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m6.5/6.5 MB[0m [31m26.2 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25hCollecting networkx
  Downloading networkx-3.1-py3-none-any.whl (2.1 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.1/2.1 MB[0m [31m21.6 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25hCollecting filelock
  Downloading filelock-3.11.0-py3-none-any.whl (10.0 kB)
Collecting mpmath>=0.19
  Downloading mpmath-1.3.0-py3-none-any.whl (536 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m536.2/536.2 kB[0m [31m13.7 MB/s[0m eta [36m0:00:00[0m00:01[0m
[?25hInstalling collected packages: mpm

In [2]:
import pandas as pd

def read_csv_file(file_path: str) -> pd.DataFrame:
    """Reads the input CSV file and returns a pandas DataFrame with the `Tweet` column."""
    df = pd.read_csv(file_path)
    return df[['Tweet']]

In [3]:
import torch
from torch.utils.data import Dataset

class SentimentDataset(Dataset):
    """PyTorch dataset for loading preprocessed tweet data and labels."""

    def __init__(self, data: pd.DataFrame, labels: torch.Tensor):
        self.data = data
        self.labels = labels

    def __len__(self):
        return len(self.data)

    def __getitem__(self, index):
        tweet = self.data.iloc[index]
        label = self.labels[index]
        return tweet, label

In [4]:
from torch import nn
from torch.nn import TransformerEncoder, TransformerEncoderLayer

class SentimentTransformer(nn.Module):
    """PyTorch module for sentiment analysis using the Transformer architecture."""

    def __init__(self, vocab_size: int, embedding_dim: int, hidden_dim: int, num_layers: int, num_heads: int, dropout: float):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.pos_encoder = nn.Sequential(
            nn.Linear(embedding_dim, hidden_dim),
            nn.Dropout(dropout),
            nn.ReLU(),
            nn.Linear(hidden_dim, embedding_dim),
            nn.Dropout(dropout)
        )
        encoder_layers = TransformerEncoderLayer(embedding_dim, num_heads, hidden_dim, dropout)
        self.transformer_encoder = TransformerEncoder(encoder_layers, num_layers)
        self.classifier = nn.Linear(embedding_dim, 2)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        """Performs forward pass of the model.

        Args:
            x (torch.Tensor): Input tensor of shape (batch_size, seq_len).

        Returns:
            torch.Tensor: Output tensor of shape (batch_size, 2).
        """
        x = self.embedding(x)
        x = self.pos_encoder(x)
        x = x.permute(1, 0, 2)
        output = self.transformer_encoder(x)
        output = output.mean(dim=0)
        output = self.classifier(output)
        return output

In [1]:
import torch
import torch.nn as nn
from torch.utils.data import DataLoader
from tqdm import tqdm

def train(model: nn.Module, train_loader: DataLoader, valid_loader: DataLoader,
          epochs: int, lr: float, device: torch.device) -> nn.Module:
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)
    criterion = nn.CrossEntropyLoss()

    best_valid_loss = float('inf')
    best_model = None

    for epoch in range(1, epochs+1):
        model.train()
        train_loss = 0

        for batch in tqdm(train_loader, desc=f"Training Epoch {epoch}", leave=False):
            input_ids, labels = batch
            input_ids = input_ids.to(device)
            labels = labels.to(device)

            optimizer.zero_grad()
            output = model(input_ids)
            loss = criterion(output, labels)
            loss.backward()
            optimizer.step()

            train_loss += loss.item()

        train_loss /= len(train_loader)

        valid_loss, valid_acc = evaluate(model, valid_loader, criterion, device)

        print(f"Epoch: {epoch}, Training Loss: {train_loss:.4f}, Validation Loss: {valid_loss:.4f}, Validation Accuracy: {valid_acc:.4f}")

        if valid_loss < best_valid_loss:
            best_valid_loss = valid_loss
            best_model = model.state_dict()

    model.load_state_dict(best_model)

    return model

def evaluate(model: nn.Module, data_loader: DataLoader, criterion: nn.Module, device: torch.device) -> tuple:
    model.eval()
    loss = 0
    total_correct = 0
    total_samples = 0

    with torch.no_grad():
        for batch in data_loader:
            input_ids, labels = batch
            input_ids = input_ids.to(device)
            labels = labels.to(device)

            output = model(input_ids)
            loss += criterion(output, labels).item()

            pred = output.argmax(dim=1)
            total_correct += (pred == labels).sum().item()
            total_samples += len(labels)

    loss /= len(data_loader)
    acc = total_correct / total_samples

    return loss, acc


In [None]:
import torch
from sklearn.model_selection import train_test_split
from transformers import BertTokenizer
from typing import Tuple

# Step 1: Read CSV file and preprocess tweets
tweets_df = read_csv_file('tweets.csv')
preprocessed_tweets = preprocess_tweets(tweets_df['Tweet'])

# Step 2: Tokenize tweets and labels
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
tokenized_tweets, labels = tokenize_tweets_labels(preprocessed_tweets, tweets_df['Label'], tokenizer)

# Step 3: Create PyTorch dataset and dataloaders
dataset = SentimentDataset(tokenized_tweets, labels)
train_data, valid_data = train_valid_split(dataset)
train_loader, valid_loader = create_data_loaders(train_data, valid_data, batch_size=16)

# Step 4: Define the model
model = SentimentTransformer(
    vocab_size=len(tokenizer),
    embedding_dim=128,
    hidden_dim=256,
    num_layers=2,
    num_heads=8,
    dropout=0.2
)

# Step 5: Train the model
best_model = train(
    model=model,
    train_data=train_loader.dataset,
    valid_data=valid_loader.dataset,
    epochs=10,
    batch_size=16,
    lr=1e-4,
    output_dir='./models'
)

# Step 6: Evaluate the model
test_data = SentimentDataset(tokenized_tweets, labels)
test_loader = DataLoader(test_data, batch_size=16, shuffle=False)
test_loss, test_acc = evaluate(best_model, test_loader, criterion=nn.CrossEntropyLoss(), device=torch.device('cuda'))

print(f"Test Loss: {test_loss:.4f}, Test Accuracy: {test_acc:.4f}")


In [None]:
import torch.optim as optim

def train(model, train_loader, valid_loader, epochs, lr):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = model.to(device)
    optimizer = optim.Adam(model.parameters(), lr=lr)
    criterion = nn.CrossEntropyLoss()

    for epoch in range(epochs):
        model.train()
        total_loss = 0
        for batch_idx, (data, target) in enumerate(train_loader):
            data, target = data.to(device), target.to(device)
            optimizer.zero_grad()
            output = model(data)
            loss = criterion(output, target)
            loss.backward()
            optimizer.step()
            total_loss += loss.item()

        train_loss = total_loss / len(train_loader)
        valid_loss, valid_acc = evaluate(model, valid_loader, criterion, device)

        print('Epoch [{}/{}], Train Loss: {:.4f}, Valid Loss: {:.4f}, Valid Acc: {:.2f}%'
              .format(epoch+1, epochs, train_loss, valid_loss, valid_acc*100))

def evaluate(model, dataloader, criterion, device):
    model.eval()
    total_loss = 0
    correct = 0
    total = 0

    with torch.no_grad():
        for data, target in dataloader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            loss = criterion(output, target)
            total_loss += loss.item()
            _, predicted = torch.max(output.data, 1)
            total += target.size(0)
            correct += (predicted == target).sum().item()

    valid_loss = total_loss / len(dataloader)
    valid_acc = correct / total

    return valid_loss, valid_acc

In [5]:
from torch import nn, optim
from torch.utils.data import DataLoader
from tempfile import TemporaryDirectory
import os
import shutil
from tqdm import tqdm

def train(model: SentimentTransformer, train_data: torch.utils.data.Dataset,
          valid_data: torch.utils.data.Dataset, epochs: int,
          batch_size: int, lr: float, output_dir: str) -> SentimentTransformer:
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    train_loader = DataLoader(train_data, batch_size=batch_size, shuffle=True)
    valid_loader = DataLoader(valid_data, batch_size=batch_size, shuffle=False)

    optimizer = torch.optim.Adam(model.parameters(), lr=lr)
    criterion = nn.CrossEntropyLoss()

    best_valid_loss = float('inf')
    best_model = None

    for epoch in range(1, epochs+1):
        model.train()
        train_loss = 0

        for batch in train_loader:
            src = batch['input_ids'].to(device)
            tgt = batch['target'].to(device)
            src_mask = generate_square_subsequent_mask(src.shape[0]).to(device)

            optimizer.zero_grad()
            output = model(src, src_mask)

            loss = criterion(output.view(-1, output.shape[-1]), tgt.view(-1))
            loss.backward()
            optimizer.step()

            train_loss += loss.item()

        train_loss /= len(train_data)

        valid_loss = evaluate(model, valid_loader, criterion, device)

        print(f"Epoch: {epoch}, Training Loss: {train_loss:.4f}, Validation Loss: {valid_loss:.4f}")

        if valid_loss < best_valid_loss:
            best_valid_loss = valid_loss
            best_model = model.state_dict()

            # Save the best model
            with TemporaryDirectory() as tmpdir:
                tmp_model_file = os.path.join(tmpdir, 'model.pt')
                torch.save(model.state_dict(), tmp_model_file)
                shutil.copyfile(tmp_model_file, os.path.join(output_dir, 'model.pt'))

    # Load the best model and return it
    best_model = SentimentTransformer(len(TEXT.vocab), output_dim, TEXT.vocab.stoi[TEXT.pad_token]).to(device)
    best_model.load_state_dict(best_model.state_dict())

    return best_model

def generate_square_subsequent_mask(sz: int) -> Tensor:
    """Generates an upper-triangular matrix of ``-inf``, with zeros on ``diag``."""
    return torch.triu(torch.ones(sz, sz) * float('-inf'), diagonal=1)


In [4]:



class TransformerSentimentAnalyzer(nn.Module):

    def __init__(self, input_dim: int, d_model: int, nhead: int, d_hid: int,
                 nlayers: int, dropout: float = 0.5):
        super().__init__()
        self.model_type = 'Transformer'
        self.pos_encoder = PositionalEncoding(d_model, dropout)
        encoder_layers = TransformerEncoderLayer(d_model, nhead, d_hid, dropout)
        self.transformer_encoder = TransformerEncoder(encoder_layers, nlayers)
        self.encoder = nn.Linear(input_dim, d_model)
        self.d_model = d_model
        self.decoder = nn.Linear(d_model, 2)

        self.init_weights()

    def init_weights(self) -> None:
        initrange = 0.1
        self.encoder.weight.data.uniform_(-initrange, initrange)
        self.decoder.bias.data.zero_()
        self.decoder.weight.data.uniform_(-initrange, initrange)

    def forward(self, src: Tensor, src_mask: Tensor) -> Tuple[Tensor, Tensor]:
        """
        Arguments:
            src: Tensor, shape ``[seq_len, batch_size, input_dim]``
            src_mask: Tensor, shape ``[seq_len, seq_len]``

        Returns:
            A tuple containing two Tensors: the subjectivity and polarity scores
            of shape ``[batch_size, 1]``.
        """
        src = self.encoder(src) * math.sqrt(self.d_model)
        src = self.pos_encoder(src)
        output = self.transformer_encoder(src, src_mask)
        output = self.decoder(output[-1])  # use only the last output token
        return output[:, 0], output[:, 1]  # return subjectivity and polarity


class PositionalEncoding(nn.Module):

    def __init__(self, d_model: int, dropout: float = 0.1, max_len: int = 5000):
        super().__init__()
        self.dropout = nn.Dropout(p=dropout)

        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0).transpose(0, 1)
        self.register_buffer('pe', pe)

    def forward(self, x: Tensor) -> Tensor:
        x = x + self.pe[:x.size(0), :]
        return self.dropout(x)


    def get_sentiment(tweet: str, model: nn.Module) -> Tuple[float, float]:
        # Convert the tweet to a PyTorch tensor
        tensor = torch.tensor(tweet).unsqueeze(1)

        # Generate the mask (no masking necessary for this task)
        mask = generate_square_subsequent_mask(tensor.size(0))

        # Run the tweet through the model and return the subjectivity and polarity scores
        with torch.no_grad():
            subj, pol = model(tensor, mask)
        return subj.item(), pol.item
class BitcoinTweetsDataset(dataset.Dataset):

    def __init__(self, path: str):
        self.df = pd.read_csv(path)

    def __len__(self):
        return len(self.df)

    def __getitem__(self, index: int) -> Tuple[str, int, int]:
        tweet = self.df.iloc[index]['Tweet']
        subj, pol = get_subjectivity_polarity(tweet)
        return tweet, subj, pol

class PositionalEncoding(nn.Module):

    def __init__(self, d_model: int, dropout: float = 0.1, max_len: int = 5000):
        super().__init__()
        self.dropout = nn.Dropout(p=dropout)

        # Compute the positional encodings once in log space.
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0).transpose(0, 1)
        self.register_buffer('pe', pe)

    def forward(self, x: Tensor) -> Tensor:
        x = x + self.pe[:x.size(0), :]
        return self.dropout(x)

    def get_subjectivity_polarity(tweet: str) -> Tuple[int, int]:
    # Code to compute subjectivity and polarity of the tweet
    # ...
        return subj, pol


文件路径： /Users/emilyziyixiao/DataspellProjects/AdvDsFinance/project/划水part.ipynb


In [7]:

!python -m nbformat / Users / emilyziyixiao / DataspellProjects / AdvDsFinance / project / 划水part.ipynb / Users / emilyziyixiao / DataspellProjects / AdvDsFinance / project / 划水part.ipynb

/Users/emilyziyixiao/opt/anaconda3/envs/310_new/bin/python: No module named nbformat.__main__; 'nbformat' is a package and cannot be directly executed
