In [13]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.sql.types import *
from pyspark.sql.functions import col
from pyspark.ml.feature import StringIndexer
from tqdm import tqdm
import shutil
import warnings
import time
import os
import torch

warnings.filterwarnings('ignore')

In [2]:
def timing(start):
    print(f'Elapsed time: {time.time() - start:.2f} s')
# start = time.time()

# Start Session

In [3]:
start = time.time()

spark = SparkSession.builder.appName('SparkCPU').config("spark.driver.memory", "15g").getOrCreate()

timing(start)

23/08/14 10:51:30 WARN Utils: Your hostname, bdai-desktop resolves to a loopback address: 127.0.1.1; using 165.132.118.199 instead (on interface enp0s31f6)
23/08/14 10:51:30 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/08/14 10:51:30 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Elapsed time: 3.03 s


# 1. Load Dataset

In [4]:
train_image_path = "/home/bdai/spark_work/spark-warehouse/covid_train_binary"
test_image_path = "/home/bdai/spark_work/spark-warehouse/covid_test_binary"
cache_path = "file:///home/bdai/spark_work/petastorm"

In [5]:
start = time.time()

train_df = spark.read.parquet(train_image_path, engine='fastparquet')
df_test = spark.read.parquet(test_image_path, engine='fastparquet')

df_train, df_val = train_df.randomSplit([0.8, 0.2], seed=12345)

timing(start)

Elapsed time: 2.74 s


# 2. Image preprocessing

In [6]:
import torchvision.transforms as transforms
import io
import numpy as np
import pandas as pd
from PIL import Image

from petastorm.spark import SparkDatasetConverter, make_spark_converter
from petastorm import TransformSpec

image_shape = (3, 224, 224)

## 1) Cache the Spark DataFrame using Petastorm Spark converter

In [7]:
# Set a cache directory on DBFS FUSE for intermediate data.
start = time.time()

spark.conf.set(SparkDatasetConverter.PARENT_CACHE_DIR_URL_CONF, cache_path)

converter_train = make_spark_converter(df_train)
converter_val = make_spark_converter(df_val)
converter_test = make_spark_converter(df_test)

print(f"train: {len(converter_train)}, val: {len(converter_val)}, test : {len(converter_test)}")

timing(start)

Converting floating-point columns to float32
23/08/14 10:51:57 WARN InternalParquetRecordWriter: Too much memory used: Store {
 [class] optional binary class (STRING) {
  r:0 bytes
  d:0 bytes
   data: FallbackValuesWriter{
   data: initial: DictionaryValuesWriter{
   data: initial: dict:24
   data: initial: values:120
   data: initial:}

   data: fallback: PLAIN CapacityByteArrayOutputStream 0 slabs, 0 bytes
   data:}

   pages: ColumnChunkPageWriter ConcatenatingByteArrayCollector 0 slabs, 0 bytes
   total: 360/144
 }
 [content] optional binary content {
  r:0 bytes
  d:0 bytes
   data: FallbackValuesWriter{
   data: initial: DictionaryValuesWriter{
   data: initial: dict:0
   data: initial: values:0
   data: initial:}

   data: fallback: PLAIN CapacityByteArrayOutputStream 60 slabs, 114,587,997 bytes
   data:}

   pages: ColumnChunkPageWriter ConcatenatingByteArrayCollector 0 slabs, 0 bytes
   total: 114,587,997/114,587,997
 }
 [file_name] optional binary file_name (STRING) {
  r:0 

train: 23931, val: 6055, test : 400
Elapsed time: 96.79 s


## 2) Preprocess images
Before feeding the dataset into the model, we need to decode the raw image bytes and apply standard ImageNet transforms. We recommend not doing this transformation on the Spark DataFrame since that will substantially increase the size of the intermediate files and might harm the performance. Instead, we recommend doing this transformation in a TransformSpec function in petastorm.

In [57]:
time_list = []

In [58]:
def preprocess(content):
    image = Image.open(io.BytesIO(content)).resize([image_shape[1],image_shape[2]])
    transformers = [transforms.Lambda(lambda image: image.convert('RGB'))]
    transformers.extend([transforms.ToTensor()])
    trans = transforms.Compose(transformers)
    image_arr = trans(image)
    return image_arr.numpy()
    

def transform_row(pd_batch):
  """
  The input and output of this function must be pandas dataframes.
  """
  start = time.time()
  pd_batch['features'] = pd_batch['content'].map(lambda x: preprocess(x))
  pd_batch['label'] = pd_batch['label'].map(lambda x: int(x))
  pd_batch = pd_batch.drop(labels=['content'], axis=1)
  end = time.time()
  transform_time = end - start
  time_list.append(transform_time)
  return pd_batch[['features', 'label']]

def get_transform_spec():
  # Note that the output shape of the `TransformSpec` is not automatically known by petastorm, 
  # so we need to specify the shape for new columns in `edit_fields` and specify the order of 
  # the output columns in `selected_fields`.
  return TransformSpec(transform_row, 
                       edit_fields=[('features', np.float32, image_shape, False)], 
                       selected_fields=['features', 'label'])

## 3) Examining execution time for dataloading and transorming a batch    

In [56]:
start = time.time()
steps_per_epoch = len(converter_train) // 64
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
with converter_train.make_torch_dataloader(transform_spec=get_transform_spec(), batch_size=64) as train_dataloader:
    train_dataloader_iter = iter(train_dataloader)
    for step in tqdm(range(steps_per_epoch)):
        pd_batch = next(train_dataloader_iter)
        inputs, labels = pd_batch['features'].to(device), pd_batch['label'].to(device)
timing(start)

  0%|                                                   | 0/373 [00:00<?, ?it/s]Worker 2 terminated: unexpected exception:
Traceback (most recent call last):
  File "/home/bdai/anaconda3/envs/spark_env/lib/python3.9/site-packages/petastorm/workers_pool/thread_pool.py", line 62, in run
    self._worker_impl.process(*args, **kargs)
  File "/home/bdai/anaconda3/envs/spark_env/lib/python3.9/site-packages/petastorm/arrow_reader_worker.py", line 163, in process
    all_cols = self._local_cache.get(cache_key,
  File "/home/bdai/anaconda3/envs/spark_env/lib/python3.9/site-packages/petastorm/cache.py", line 39, in get
    return fill_cache_func()
  File "/home/bdai/anaconda3/envs/spark_env/lib/python3.9/site-packages/petastorm/arrow_reader_worker.py", line 164, in <lambda>
    lambda: self._load_rows(parquet_file, piece, shuffle_row_drop_partition))
  File "/home/bdai/anaconda3/envs/spark_env/lib/python3.9/site-packages/petastorm/arrow_reader_worker.py", line 196, in _load_rows
    transformed_

NameError: name 'time_list' is not defined

In [61]:
total_time = 0
total_trans_time = 0
time_start = time.time()
start = time.time()
steps_per_epoch = len(converter_train) // 64
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
with converter_train.make_torch_dataloader(transform_spec=get_transform_spec(), batch_size=64) as train_dataloader:
    train_dataloader_iter = iter(train_dataloader)
    for step in tqdm(range(steps_per_epoch)):
        pd_batch = next(train_dataloader_iter)
        inputs, labels = pd_batch['features'], pd_batch['label']
        time_end = time.time()
        cur_time = time_end - time_start
        total_time += cur_time
        total_trans_time += trans_time.sum().item()
        time_start = time.time()

100%|█████████████████████████████████████████| 373/373 [01:29<00:00,  4.19it/s]


In [68]:
268

373

# 3. Train Model

In [None]:
import numpy as np
import torch


import torchvision

## 1) Get the model ResNet from torchvision

In [None]:
def get_model(lr=0.001):
  # Load a ResNet50 model from torchvision
  model = torchvision.models.resnet50(pretrained=True)
  # Freeze parameters in the feature extraction layers
  for param in model.parameters():
    param.requires_grad = False
    
  # Add a new classifier layer for transfer learning
  num_ftrs = model.fc.in_features
  # Parameters of newly constructed modules have requires_grad=True by default
  model.fc = torch.nn.Sequential(torch.nn.Linear(num_ftrs, 1), torch.nn.Sigmoid())
  
  return model

## 2) Define the train and evaluate function for the model

In [109]:
def train_one_epoch(model, criterion, optimizer, scheduler, 
                    train_dataloader_iter, steps_per_epoch, epoch, 
                    device):
    # statistics
    running_loss = 0.0
    running_corrects = 0
    total_loading = 0
    total_training = 0 
    total_stats = 0
    
    start = time.time()
    model.train()  # Set model to training mode
    # Iterate over the data for one epoch.
    load_start = time.time()
    for step in tqdm(range(steps_per_epoch)):
        pd_batch = next(train_dataloader_iter)
        inputs, labels = pd_batch['features'].to(device), pd_batch['label'].to(device).reshape(-1,1).float()
        load_end = time.time()
        current_load_time = load_end-load_start
        total_loading += current_load_time
        # Track history in training
        train_start = time.time()
        with torch.set_grad_enabled(True):
            # zero the parameter gradients
            optimizer.zero_grad()
            
            # forward
            outputs = model(inputs)
            preds = outputs > 0.5
            loss = criterion(outputs, labels)
            
            # backward + optimize
            loss.backward()
            optimizer.step()
        train_end = time.time()
        current_train_time = train_end-train_start
        total_training += current_train_time
        # stats_start = time.time()
        # # statistics
        # running_loss += loss.item()
        # running_corrects += torch.sum(preds == labels.data)
        
        # stats_end = time.time()
        # current_stats_time = stats_end - stats_start
        # total_stats += current_stats_time
        load_start = time.time()

    scheduler.step()
    end = time.time()
    total_time = end-start
    
    # epoch_loss = running_loss / (steps_per_epoch * BATCH_SIZE)
    # epoch_acc = running_corrects.double() / (steps_per_epoch * BATCH_SIZE)
    print("Total time per epoch : {:.2f} / Dataloading time : {:.2f}  / Training time : {:.2f}"\
          .format(total_time, total_loading, total_training))
    # print("Total time per epoch : {:.2f} / Dataloading time : {:.2f}  / Training time : {:.2f} / Writing stats : {.2f}"\
    #       .format(total_time, total_loading, total_training, total_stats))
    # print('Train Loss: {:.4f} Acc: {:.4f}'.format(epoch_loss, epoch_acc))
    return epoch_loss, epoch_acc

def evaluate(model, criterion, val_dataloader_iter, validation_steps, device, 
             metric_agg_fn=None):
  model.eval()  # Set model to evaluate mode

  # statistics
  running_loss = 0.0
  running_corrects = 0

  # Iterate over all the validation data.
  for step in range(validation_steps):
    pd_batch = next(val_dataloader_iter)
    inputs, labels = pd_batch['features'].to(device), pd_batch['label'].to(device).reshape(-1,1).float()

    # Do not track history in evaluation to save memory
    with torch.set_grad_enabled(False):
      # forward
      outputs = model(inputs)
      preds = outputs > 0.5
      loss = criterion(outputs, labels)

    # statistics
    running_loss += loss.item()
    running_corrects += torch.sum(preds == labels.data)
  
  # The losses are averaged across observations for each minibatch.
  epoch_loss = running_loss / validation_steps
  epoch_acc = running_corrects.double() / (validation_steps * BATCH_SIZE)
  
  # metric_agg_fn is used in the distributed training to aggregate the metrics on all workers
  if metric_agg_fn is not None:
    epoch_loss = metric_agg_fn(epoch_loss, 'avg_loss')
    epoch_acc = metric_agg_fn(epoch_acc, 'avg_acc')

  print('Validation Loss: {:.4f} Acc: {:.4f}'.format(epoch_loss, epoch_acc))
  return epoch_loss, epoch_acc

## 3) Train and evaluate the model on the local machine
Use converter.make_torch_dataloader(...) to create the dataloader.

In [110]:
from tqdm import tqdm
# hyperparameters
NUM_EPOCHS = 2
BATCH_SIZE = 64

In [111]:
def train_and_evaluate(lr=0.001):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    
    model = get_model(lr=lr)
    model = model.to(device)
    
    criterion = torch.nn.BCELoss()
    
    # Only parameters of final layer are being optimized.
    optimizer = torch.optim.SGD(model.fc.parameters(), lr=lr, momentum=0.9)
    
    # Decay LR by a factor of 0.1 every 7 epochs
    exp_lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=7, gamma=0.1)
    
    with converter_train.make_torch_dataloader(transform_spec=get_transform_spec(), 
                                             batch_size=BATCH_SIZE) as train_dataloader, \
       converter_val.make_torch_dataloader(transform_spec=get_transform_spec(), 
                                           batch_size=BATCH_SIZE) as val_dataloader:
    
        train_dataloader_iter = iter(train_dataloader)
        steps_per_epoch = len(converter_train) // BATCH_SIZE
        
        val_dataloader_iter = iter(val_dataloader)
        validation_steps = max(1, len(converter_val) // BATCH_SIZE)
        
        for epoch in range(NUM_EPOCHS):
          print('Epoch {}/{}'.format(epoch + 1, NUM_EPOCHS))
          print('-' * 10)
        
          train_loss, train_acc = train_one_epoch(model, criterion, optimizer, exp_lr_scheduler, 
                                                  train_dataloader_iter, steps_per_epoch, epoch, 
                                                  device)
          val_loss, val_acc = evaluate(model, criterion, val_dataloader_iter, validation_steps, device)
    return val_loss

In [None]:
start = time.time()

loss = train_and_evaluate(1e-3)

timing(start)

Epoch 1/2
----------


100%|█████████████████████████████████████████| 373/373 [02:06<00:00,  2.96it/s]


Total time per epoch : 126.05 / Dataloading time : 122.43  / Training time : 3.62
Validation Loss: 3.1816 Acc: 0.5401
Epoch 2/2
----------


 96%|███████████████████████████████████████▎ | 358/373 [01:58<00:04,  3.06it/s]

In [12]:
spark.stop()

In [73]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
lr = 1e-3
model = get_model(lr=lr)
model = model.to(device)

criterion = torch.nn.BCELoss()

# Only parameters of final layer are being optimized.
optimizer = torch.optim.SGD(model.fc.parameters(), lr=lr, momentum=0.9)

# Decay LR by a factor of 0.1 every 7 epochs
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=7, gamma=0.1)

model.train()  # Set model to training mode

# statistics
running_loss = torch.zeros(1).to(device)
running_corrects = 0
total_fir = 0
total_sec = 0
with converter_train.make_torch_dataloader(transform_spec=get_transform_spec(), 
                                         batch_size=BATCH_SIZE) as train_dataloader, \
    converter_val.make_torch_dataloader(transform_spec=get_transform_spec(), 
                                       batch_size=BATCH_SIZE) as val_dataloader:

    train_dataloader_iter = iter(train_dataloader)
    steps_per_epoch = len(converter_train) // BATCH_SIZE
    
    val_dataloader_iter = iter(val_dataloader)
    validation_steps = max(1, len(converter_val) // BATCH_SIZE)

    # Iterate over the data for one epoch.
    for step in range(steps_per_epoch):
        pd_batch = next(train_dataloader_iter)
        inputs, labels = pd_batch['features'].to(device), pd_batch['label'].to(device).reshape(-1,1).float()
        
    
        # Track history in training
        with torch.set_grad_enabled(True):
            # zero the parameter gradients
            optimizer.zero_grad()
            
            # forward
            outputs = model(inputs)
            preds = outputs > 0.5
            loss = criterion(outputs, labels)
            
            # backward + optimize
            loss.backward()
            optimizer.step()
        
        # statistics
        fir_start = time.time()
        running_loss += loss
        fir_end = time.time()
        fir = fir_end - fir_start
        sec_start = time.time()
        running_corrects += torch.sum(preds == labels.data)
        sec_end = time.time()
        sec = sec_end - sec_start
        total_fir += fir
        total_sec += sec
        print(total_fir, total_sec)
    scheduler.step()
    
    epoch_loss = running_loss.item() / (steps_per_epoch * BATCH_SIZE)
    epoch_acc = running_corrects.double() / (steps_per_epoch * BATCH_SIZE)
    
    print('Train Loss: {:.4f} Acc: {:.4f}'.format(epoch_loss, epoch_acc))

0.00047206878662109375 6.198883056640625e-05
0.0005159378051757812 0.00021338462829589844
0.0005242824554443359 0.00026106834411621094
0.0005552768707275391 0.00032067298889160156
0.0005648136138916016 0.0004048347473144531
0.0005803108215332031 0.0004627704620361328
0.0005886554718017578 0.0005068778991699219
0.0005970001220703125 0.0005524158477783203
0.0006055831909179688 0.0005993843078613281
0.0006163120269775391 0.0006570816040039062
0.0006499290466308594 0.0008254051208496094
0.0006580352783203125 0.0008697509765625
0.0006663799285888672 0.0009145736694335938
0.0006756782531738281 0.0009696483612060547
0.0006933212280273438 0.0010309219360351562
0.0007028579711914062 0.0011048316955566406
0.0007107257843017578 0.0011475086212158203
0.0007188320159912109 0.0011904239654541016
0.0007381439208984375 0.0012364387512207031
0.0007460117340087891 0.001279592514038086
0.0007541179656982422 0.0013232231140136719
0.0007624626159667969 0.0013668537139892578
0.0007710456848144531 0.00141096

In [72]:
temp = torch.zeros(1).to(device)
temp + loss

tensor([0.4415], device='cuda:0', grad_fn=<AddBackward0>)