In [None]:
# from google.colab import drive

# drive.mount('/content/drive')

In [None]:
!pip install pyspark

In [None]:
import pyspark

In [None]:
pyspark.__version__

In [None]:
from pyspark.sql import SparkSession

import re

from pyspark.sql.functions import col

from transformers import DistilBertTokenizer, DistilBertForSequenceClassification, Trainer, TrainingArguments

from torch.utils.data import Dataset, DataLoader

import torch

import pandas as pd

import numpy as np

In [None]:
# Create SparkSession

spark = SparkSession.builder.appName("Deep_Learning_Sentiment_Analysis").getOrCreate()

In [None]:
# Split CSV data (handle improperly formatted rows)

def split_csv(line):

    columns = line.split(",")

    if len(columns) > 4:

        # Combine columns starting from position 4 onwards

        columns[3] = ",".join(columns[3:])

    return columns



# Cleaning text

def clean_text(text):
    
    text = str(text)

    text = re.sub(r'\s+', ' ', text).strip().lower()  # Remove extra spaces

    text = re.sub(r'http[s]?://\S+', '[URL]', text)  # Replace URLs with token

    text = re.sub(r'[^a-zA-Z0-9\s.,!?:]', '', text)

    return text

In [None]:
# Reading data from CSV file

input_path = "/kaggle/input/tweets-2/tweets.csv"

rdd = spark.sparkContext.textFile(input_path).map(split_csv).map(lambda columns: (float(columns[1]),clean_text(columns[3])))

In [None]:
print(rdd.take(10))

In [None]:
# Converting RDD to DataFrame

input_dataframe = rdd.toDF(["label", "tweet"])

In [None]:
# Showing top 10 rows

input_dataframe.show(10, truncate=False)

In [None]:
data = input_dataframe.toPandas()

In [None]:
print(data.head(10))

In [None]:
from sklearn.model_selection import train_test_split

train_texts, test_texts, train_labels, test_labels = train_test_split(data['tweet'], data['label'], test_size=0.25, random_state=42)

In [None]:
print(train_texts.head(10))

In [None]:
tokenizer = DistilBertTokenizer.from_pretrained("distilbert-base-uncased-finetuned-sst-2-english")

In [None]:
class TweetDataset(Dataset):

  def __init__(self, texts, labels, tokenizer, max_length):

    self.texts = texts

    self.labels = torch.tensor((labels > 0).astype(int).values, dtype=torch.long)

    self.tokenizer = tokenizer

    self.max_length = max_length



  def __len__(self):

    return len(self.texts)



  def __getitem__(self, idx):

    text = clean_text(self.texts.iloc[idx])

    label = self.labels[idx]

    encoding = self.tokenizer(

        text,

        truncation=True,

        padding='max_length',

        max_length=self.max_length,

        return_tensors='pt')

    return {

        'input_ids': encoding['input_ids'].squeeze(),

        'attention_mask': encoding['attention_mask'].squeeze(),

        'labels': label

    }

In [None]:
max_length = 128

train_dataset = TweetDataset(train_texts, train_labels, tokenizer, max_length)

test_dataset = TweetDataset(test_texts, test_labels, tokenizer, max_length)

In [None]:
model = DistilBertForSequenceClassification.from_pretrained("distilbert-base-uncased-finetuned-sst-2-english")

In [None]:
training_args = TrainingArguments(

    output_dir="./results",
    
    learning_rate=2e-5,
    
    per_device_train_batch_size=64,  # Increased batch size

    per_device_eval_batch_size=64,
    
    num_train_epochs=3,
    
    weight_decay=0.01,
    
    evaluation_strategy="steps",
    
    eval_steps=500,
    
    save_steps=500,
    
    load_best_model_at_end=True,
    
    fp16=True,  # Mixed precision training
    
    dataloader_num_workers=4,
    
    logging_steps=100

)

In [None]:
# class CustomTrainer(Trainer):
#     def __init__(self, *args, **kwargs):
#         super().__init__(*args, **kwargs)
    
#     def compute_loss(self, model, inputs, return_outputs=False):
#         labels = inputs.get("labels")
#         outputs = model(**{k: v for k, v in inputs.items() if k != "labels"})
#         logits = outputs.logits
#         loss_fct = torch.nn.CrossEntropyLoss()
#         loss = loss_fct(logits.view(-1, self.model.config.num_labels), labels.view(-1))
#         return (loss, outputs) if return_outputs else loss

In [None]:
trainer = Trainer(

    model=model,

    args=training_args,

    train_dataset=train_dataset,

    eval_dataset=test_dataset,

    tokenizer=tokenizer

)

In [None]:
trainer.train()

In [None]:
metrics = trainer.evaluate()
print(metrics)

In [None]:
#Inference predictions
def get_predictions(dataset, model, tokenizer, max_len=128):
    model.eval()
    predictions = []
    
    for text in dataset.texts:
        text = clean_text(text)
        encoding = tokenizer(
            text,
            max_length=max_len,
            padding="max_length",
            truncation=True,
            return_tensors="pt"
        )
        
        with torch.no_grad():
            outputs = model(**{k: v.to(model.device) for k, v in encoding.items()})

        pred = 1.0 if torch.softmax(outputs.logits, dim=1)[0][1] > 0.5 else 0.0
        predictions.append(pred)
    
    return predictions

In [None]:
# Get predictions and true labels for the evaluation set
predictions = get_predictions(test_dataset, model, tokenizer)

In [None]:
def display_predictions(predictions, labels, num=10):
    
    print(f"{'Prediction':<12} {'Label':<12}")

    print("=" * 25)
    
    for pred, label in list(zip(predictions, labels))[:num]:
        
        print(f"{pred:<12} {label:<12}")

display_predictions(predictions, test_dataset.labels)

In [None]:
# Calculate accuracy manually

def calculate_accuracy(predictions, labels):

    correct = sum(1 for p, l in zip(predictions, labels) if p == l)
    
    accuracy = correct / len(labels)
    
    return accuracy

In [None]:
accuracy = calculate_accuracy(predictions, test_dataset.labels)

In [None]:
print(f"Accuracy: {accuracy:.2f}")

In [None]:
# !pip install transformers huggingface_hub

In [None]:
# !huggingface-cli

In [None]:
from huggingface_hub import login

huggingface_token = "my_huggingface_token" 

login(token=huggingface_token)

In [None]:
model.push_to_hub("Smrfhdl/distilbert_smrfhdl")  
tokenizer.push_to_hub("Smrfhdl/distilbert_smrfhdl")