# Tutorial in chapter 09 - petastorm-pyspark-pytorch

### 1. load parquet data into pytorch loader

file path: `notebooks/images_data/silver/augmented`

In [50]:
# spark
from pyspark.sql.functions import lit
from pyspark.sql.types import BinaryType,StringType
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
import pyspark.sql.functions 
from pyspark.sql.types import *

#petastorm

from petastorm.spark import SparkDatasetConverter, make_spark_converter
from petastorm import TransformSpec 
    
    
import io
import numpy as np
from PIL import Image
from functools import partial 


# train images with pytorch
#from torchvision import transforms
from hyperopt import fmin, tpe, hp, SparkTrials, STATUS_OK
import torch
import torch.optim as optim
from torch.autograd import Variable
import torch.nn as nn
import torch.nn.functional as F


# import mlflow
import mlflow
import mlflow.pytorch





In [14]:
# start Spark session:

spark = SparkSession \
    .builder \
    .appName("Distributed Pytorch training") \
    .config("spark.memory.offHeap.enabled",True) \
    .config("spark.memory.offHeap.size","30g")\
    .getOrCreate()

In [5]:
from petastorm.spark import SparkDatasetConverter, make_spark_converter

In [6]:
spark.conf.set(SparkDatasetConverter.PARENT_CACHE_DIR_URL_CONF, 'petastorm_cache')

In [6]:
data_path = "images_data/silver/augmented"
mlflow_model_dir_path = "/"

# Enable MLFlow tracking

In [10]:
import pytorch_lightning as pl
import torch
from torch.nn import functional as F
from torch.utils.data import DataLoader
from torchvision import transforms
from torchvision.datasets import MNIST

try:
    from torchmetrics.functional import accuracy
except ImportError:
    from pytorch_lightning.metrics.functional import accuracy

In [11]:
#Enable MLFlow tracking
mlflow.set_experiment(mlflow_model_dir_path)
# requires pytorch_lightning
mlflow.pytorch.autolog()



## params

In [12]:
IMG_SHAPE = (224, 224, 3)
BATCH_SIZE = 5
#The number of **epochs** is a hyperparameter that defines the number times that the learning algorithm will work through the entire training dataset. One epoch means that each sample in the training dataset has had an opportunity to update the internal model parameters.
SAMPLE_SIZE = 50
NUM_EPOCHS = 1
NUM_EXECUTERS = 1


## 2. Load preprocessed data

In [15]:
# Read the training data stored in parquet, limiting the dataset for the example
df_parquet = spark.read.parquet(data_path)
df = df_parquet.select(col("content"), col("label_index").cast(LongType())).limit(SAMPLE_SIZE)
  
num_classes = df.select("label_index").distinct().count()


In [16]:
num_classes =4

## 3. Split to train and test

In [17]:
df_train , df_val = df.randomSplit([0.6,0.4], seed=12345)  

## 4. Cache the Spark DataFrame using Petastorm Spark Converter

In [18]:
tmp_path = "file:/home/jovyan/petastorm_cache/"

# Set a cache directory on DBFS FUSE for intermediate data
spark.conf.set(SparkDatasetConverter.PARENT_CACHE_DIR_URL_CONF,tmp_path)

# TIP: Use a low value for parquet_row_group_bytes. The detafault of 32 MiB can be too high for larger datasets. Using 1MB instead.
#train
converter_train = make_spark_converter(df_train, parquet_row_group_size_bytes=32000000)
#test
converter_val = make_spark_converter(df_val, parquet_row_group_size_bytes=32000000)


  self._filesystem = pyarrow.localfs


### Petastorm prepreocess
used during materlizing spark dataframe with petastorm and bridging to TensorFlow

In [41]:
import torchvision, torch
from torchvision import datasets, models, transforms


def preprocess(grayscale_image):
  """
  Preprocess an image file bytes for MobileNetV2 (ImageNet).
  """
  image = Image.open(io.BytesIO(grayscale_image)).resize([224, 224])
  image_array = np.array(image) 

  #image_array = keras.preprocessing.image.img_to_array(image)
  return image_array

def transform_row(pd_batch):
  """
  The input and output of this function are pandas dataframes.
  """
  pd_batch['features'] = pd_batch['content'].map(lambda x: preprocess(x))
  pd_batch = pd_batch.drop(labels=['content'], axis=1)
  return pd_batch

# The output shape of the `TransformSpec` is not automatically known by petastorm, 
# so you need to specify the shape for new columns in `edit_fields` and specify the order of 
# the output columns in `selected_fields`.
transform_spec_fn = TransformSpec(
  func=transform_row, 
  edit_fields=[('features', np.uint8 , IMG_SHAPE, False)], 
  selected_fields=['features', 'label_index']
)

## 5. Get the model MobileNetV2
#### Get the model MobileNetV2 from torch hub
and only retraint it's final layer to fit our needs.

In [19]:
    model = torch.hub.load('pytorch/vision:v0.10.0', 'mobilenet_v2', pretrained=True)

Downloading: "https://github.com/pytorch/vision/archive/v0.10.0.zip" to /home/jovyan/.cache/torch/hub/v0.10.0.zip
Downloading: "https://download.pytorch.org/models/mobilenet_v2-b0353104.pth" to /home/jovyan/.cache/torch/hub/checkpoints/mobilenet_v2-b0353104.pth


  0%|          | 0.00/13.6M [00:00<?, ?B/s]

In [51]:
class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(1, 32, 3, 1)
        self.conv2 = nn.Conv2d(32, 64, 3, 1)
        self.dropout1 = nn.Dropout2d(0.25)
        self.dropout2 = nn.Dropout2d(0.5)
        self.fc1 = nn.Linear(9216, 128)
        self.fc2 = nn.Linear(128, 10)

    def forward(self, x):  # pylint: disable=arguments-differ
        x = x.view((-1, 1, 28, 28))
        x = self.conv1(x)
        x = F.relu(x)
        x = self.conv2(x)
        x = F.relu(x)
        x = F.max_pool2d(x, 2)
        x = self.dropout1(x)
        x = torch.flatten(x, 1)
        x = self.fc1(x)
        x = F.relu(x)
        x = self.dropout2(x)
        x = self.fc2(x)
        output = F.log_softmax(x, dim=1)
        return output


## 6. set the train function

In [52]:
def train(data_loader, steps=100, lr=0.0005, momentum=0.5):
    model = Net()
    model.train()
    optimizer = optim.SGD(model.parameters(), lr=lr, momentum=momentum)
    loss_hist = []
    for batch_idx, batch in enumerate(data_loader):
        if batch_idx > steps:
            break
        data, target = Variable(batch['features']), Variable(batch['label_index'])
        optimizer.zero_grad()
        output = model(data)
        loss = F.nll_loss(output, target)
        loss.backward()
        optimizer.step()
        if batch_idx % 10 == 0:
            logging.info('[%d/%d]\tLoss: %.6f', batch_idx, steps, loss.data.item())
            loss_hist.append(loss.data.item())
    return model

In [46]:
def test(model, test_loader):
    model.eval()
    test_loss = 0
    correct = 0
    test_len = 0
    with torch.no_grad():
        for batch in test_loader:
            data, target = batch['features'], batch['label']
            output = model(data)
            test_loss += F.nll_loss(output, target, reduction='sum').item()  # sum up batch loss
            pred = output.argmax(dim=1, keepdim=True)  # get the index of the max log-probability
            correct += pred.eq(target.view_as(pred)).sum().item()
            test_len += data.shape[0]

    test_loss /= test_len
    accuracy = correct / test_len

    logging.info('Test set: Average loss: %.4f, Accuracy: %d/%d (%.0f%%)',
                 test_loss, correct, test_len, 100. * accuracy)
    return accuracy

In [37]:
 def train_and_evaluate(_=None):
    with converter_train.make_torch_dataloader(transform_spec=transform_spec_fn) as loader:
            model = train(loader)
    
    with converter_test.make_torch_dataloader(transform_spec=transform_spec_fn,num_epochs=1) as loader:
            accuracy = test(model, loader)
            return accuracy

In [53]:
 accuracy = train_and_evaluate()

RuntimeError: expected scalar type Byte but found Float