In [None]:
!pip install pandas numpy scikit-learn transformers datasets pyspark wandb


Collecting datasets
  Downloading datasets-3.0.2-py3-none-any.whl.metadata (20 kB)
Collecting dill<0.3.9,>=0.3.0 (from datasets)
  Downloading dill-0.3.8-py3-none-any.whl.metadata (10 kB)
Collecting xxhash (from datasets)
  Downloading xxhash-3.5.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (12 kB)
Collecting multiprocess<0.70.17 (from datasets)
  Downloading multiprocess-0.70.16-py310-none-any.whl.metadata (7.2 kB)
Downloading datasets-3.0.2-py3-none-any.whl (472 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m472.7/472.7 kB[0m [31m8.5 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading dill-0.3.8-py3-none-any.whl (116 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m116.3/116.3 kB[0m [31m7.9 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading multiprocess-0.70.16-py310-none-any.whl (134 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m134.8/134.8 kB[0m [31m8.7 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading xx

In [None]:
import pandas as pd
import numpy as np
import logging
from datetime import datetime
from google.colab import drive
from pyspark.sql import SparkSession
from transformers import pipeline, AutoTokenizer, AutoModelForMaskedLM, Trainer, TrainingArguments
from sklearn.metrics import mean_squared_error
from datasets import Dataset
import warnings

# Suppress non-essential warnings
warnings.filterwarnings("ignore", category=UserWarning)
warnings.filterwarnings("ignore", category=FutureWarning)

# Mount Google Drive for persistent storage
drive.mount('/content/drive', force_remount=True)

# Set up logging
logging.basicConfig(filename='/content/drive/MyDrive/training_log.log', level=logging.INFO)

def log_message(message):
    logging.info(f"{datetime.now()}: {message}")
    print(message)  # Also print to console for real-time monitoring

# Initialize PySpark session
spark = SparkSession.builder.appName("AirQualityProcessing").getOrCreate()

log_message("Loading dataset...")

# Load the dataset with pandas
full_dataset_path = '/content/drive/MyDrive/AirQualityUCI.csv'  # Replace with your actual path
full_dataset = pd.read_csv(full_dataset_path, sep=';', decimal=',')

# Convert pandas DataFrame to Spark DataFrame for processing
spark_df = spark.createDataFrame(full_dataset)

# Drop rows with empty columns to clean data
spark_df = spark_df.dropna(how="all", subset=["RH", "Date", "Time", "CO(GT)"])

# Convert back to Pandas DataFrame for Hugging Face compatibility
full_dataset = spark_df.toPandas()
log_message("Dataset loaded and cleaned.")

# Extract 'RH' column for ground truth values
full_rh_values = full_dataset['RH'].copy()
dataset = full_dataset.copy()
dataset['RH'] = dataset['RH'].astype(str)

# Randomly mask 20% of 'RH' values
log_message("Masking 20% of RH values...")
missing_indices = np.random.choice(dataset.index, size=int(0.2 * len(dataset)), replace=False)
dataset.loc[missing_indices, 'RH'] = '[MASK]'

# Prepare formatted text for model input with adjusted prompt
dataset_text = dataset.apply(lambda row: f"The RH value for Date {row['Date']} at Time {row['Time']} with CO(GT) {row['CO(GT)']} is approximately [MASK].", axis=1)

# Load tokenizer and model
log_message("Loading model and tokenizer...")
tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased", clean_up_tokenization_spaces=True)
model = AutoModelForMaskedLM.from_pretrained("bert-base-uncased")

# Define a function to tokenize input and set up labels for masked language modeling
def prepare_input_for_model(text):
    encoding = tokenizer(
        text,
        padding="max_length",
        max_length=128,
        truncation=True,
        return_tensors="pt"
    )
    encoding["labels"] = encoding["input_ids"].clone()
    return encoding

# Prepare dataset for training
log_message("Preparing dataset for training...")
encoded_data = [prepare_input_for_model(text) for text in dataset_text]
encoded_dataset = Dataset.from_dict({
    "input_ids": [d['input_ids'][0] for d in encoded_data],
    "attention_mask": [d['attention_mask'][0] for d in encoded_data],
    "labels": [d['labels'][0] for d in encoded_data]
})

# Define training arguments with checkpoint saving
training_args = TrainingArguments(
    output_dir="/content/drive/MyDrive/checkpoints",
    save_steps=500,
    save_total_limit=3,
    logging_dir="/content/drive/MyDrive/logs",
    logging_steps=100,
    per_device_train_batch_size=8,  # Set this lower if running out of GPU memory
    num_train_epochs=3,  # Increased epochs for better learning
    report_to="none"
)

# Initialize Trainer for model training
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=encoded_dataset
)

# Log the start of training
log_message("Training started.")

# Try resuming from last checkpoint if interrupted
try:
    trainer.train(resume_from_checkpoint=False)  # Set to False to avoid checkpoint issues
except Exception as e:
    log_message(f"Training interrupted. Error: {e}")

log_message("Training complete.")

# Set up pipeline to fill in missing RH values with GPU if available
fill_pipeline = pipeline("fill-mask", model=model, tokenizer=tokenizer, device=0)
log_message("Pipeline ready for inference.")

# Set batch size for inference based on available GPU RAM
batch_size = 16  # Adjust batch size to prevent GPU overload

# Calculate fallback prediction (mean RH) for use when `[MASK]` is predicted
fallback_rh_value = full_rh_values.mean()

# Create batched input texts for missing RH values with adjusted prompt
batched_inputs = [
    f"The RH value for Date {row['Date']} at Time {row['Time']} with CO(GT) {row['CO(GT)']} is approximately [MASK]."
    for idx, row in dataset.iloc[missing_indices].iterrows()
]

# Initialize lists to store predictions and actual values for MSE calculation
predicted_rh_values = []
true_rh_values = []

# Process inputs in batches to maximize GPU efficiency
log_message("Starting batch inference with adjusted prompt...")
for i in range(0, len(batched_inputs), batch_size):
    batch = batched_inputs[i:i + batch_size]
    predictions = fill_pipeline(batch)

    for j, prediction in enumerate(predictions):
        try:
            # Extract and log the predicted token
            predicted_token = prediction[0]['token_str'].replace(",", "").strip()
            log_message(f"Predicted token for batch {i // batch_size + 1}, item {j + 1}: {predicted_token}")

            # Convert the predicted token to a float, use fallback if it's still `[MASK]`
            if predicted_token == '[MASK]':
                predicted_value = fallback_rh_value
                log_message(f"Fallback used for batch {i // batch_size + 1}, item {j + 1}")
            else:
                predicted_value = float(predicted_token)

            predicted_rh_values.append(predicted_value)
            true_rh_values.append(float(full_rh_values.iloc[missing_indices[i + j]]))
        except (ValueError, IndexError) as e:
            # Log if there's an issue converting the prediction
            log_message(f"Error in prediction conversion for batch {i // batch_size + 1}, item {j + 1}: {e}")
            predicted_rh_values.append(fallback_rh_value)
            continue

    log_message(f"Batch {i // batch_size + 1} of {len(batched_inputs) // batch_size + 1} processed.")

# Filter out NaN values in true_rh_values and predicted_rh_values before calculating MSE
true_rh_values_filtered = [val for val in true_rh_values if not np.isnan(val)]
predicted_rh_values_filtered = [predicted_rh_values[i] for i in range(len(predicted_rh_values)) if not np.isnan(true_rh_values[i])]

# Check if we have valid predictions to calculate MSE
if true_rh_values_filtered and predicted_rh_values_filtered:
    mse = mean_squared_error(true_rh_values_filtered, predicted_rh_values_filtered)
    log_message(f"Mean Squared Error (MSE) of the predictions: {mse}")
else:
    log_message("No valid predictions were available to calculate MSE.")

# Stop the Spark session
spark.stop()
log_message("Process complete.")


Mounted at /content/drive
Loading dataset...
Dataset loaded and cleaned.
Masking 20% of RH values...
Loading model and tokenizer...


tokenizer_config.json:   0%|          | 0.00/48.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/570 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/466k [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/440M [00:00<?, ?B/s]

Some weights of the model checkpoint at bert-base-uncased were not used when initializing BertForMaskedLM: ['bert.pooler.dense.bias', 'bert.pooler.dense.weight', 'cls.seq_relationship.bias', 'cls.seq_relationship.weight']
- This IS expected if you are initializing BertForMaskedLM from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertForMaskedLM from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).


Preparing dataset for training...
Training started.


Step,Training Loss
100,0.8466
200,0.0004
300,0.0002
400,0.0002
500,0.0001
600,0.0001
700,0.0001
800,0.0001
900,0.0
1000,0.0013


Training complete.
Pipeline ready for inference.
Starting batch inference with adjusted prompt...
Predicted token for batch 1, item 1: [MASK]
Fallback used for batch 1, item 1
Predicted token for batch 1, item 2: [MASK]
Fallback used for batch 1, item 2
Predicted token for batch 1, item 3: [MASK]
Fallback used for batch 1, item 3
Predicted token for batch 1, item 4: [MASK]
Fallback used for batch 1, item 4
Predicted token for batch 1, item 5: [MASK]
Fallback used for batch 1, item 5
Predicted token for batch 1, item 6: [MASK]
Fallback used for batch 1, item 6
Predicted token for batch 1, item 7: [MASK]
Fallback used for batch 1, item 7
Predicted token for batch 1, item 8: [MASK]
Fallback used for batch 1, item 8
Predicted token for batch 1, item 9: [MASK]
Fallback used for batch 1, item 9
Predicted token for batch 1, item 10: [MASK]
Fallback used for batch 1, item 10
Predicted token for batch 1, item 11: [MASK]
Fallback used for batch 1, item 11
Predicted token for batch 1, item 12: [M

You seem to be using the pipelines sequentially on GPU. In order to maximize efficiency please use a dataset


Predicted token for batch 10, item 1: [MASK]
Fallback used for batch 10, item 1
Predicted token for batch 10, item 2: [MASK]
Fallback used for batch 10, item 2
Predicted token for batch 10, item 3: [MASK]
Fallback used for batch 10, item 3
Predicted token for batch 10, item 4: [MASK]
Fallback used for batch 10, item 4
Predicted token for batch 10, item 5: [MASK]
Fallback used for batch 10, item 5
Predicted token for batch 10, item 6: [MASK]
Fallback used for batch 10, item 6
Predicted token for batch 10, item 7: [MASK]
Fallback used for batch 10, item 7
Predicted token for batch 10, item 8: [MASK]
Fallback used for batch 10, item 8
Predicted token for batch 10, item 9: [MASK]
Fallback used for batch 10, item 9
Predicted token for batch 10, item 10: [MASK]
Fallback used for batch 10, item 10
Predicted token for batch 10, item 11: [MASK]
Fallback used for batch 10, item 11
Predicted token for batch 10, item 12: [MASK]
Fallback used for batch 10, item 12
Predicted token for batch 10, item

In [None]:
!nvidia-smi


Wed Oct 30 20:41:37 2024       
+---------------------------------------------------------------------------------------+
| NVIDIA-SMI 535.104.05             Driver Version: 535.104.05   CUDA Version: 12.2     |
|-----------------------------------------+----------------------+----------------------+
| GPU  Name                 Persistence-M | Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp   Perf          Pwr:Usage/Cap |         Memory-Usage | GPU-Util  Compute M. |
|                                         |                      |               MIG M. |
|   0  Tesla T4                       Off | 00000000:00:04.0 Off |                    0 |
| N/A   77C    P0              33W /  70W |   2931MiB / 15360MiB |      0%      Default |
|                                         |                      |                  N/A |
+-----------------------------------------+----------------------+----------------------+
                                                                    