# Load data for Machine Learning and Deep Learning

This Notebook covers information about loading data specifically for ML and DL application

## Petastorm

Petastorm is an opensource data access library that enables directly loading data stored in Apache Parquet Format. This library enables single-node or distributed training and evaluation of deep learning model directly from datasets in Apache parquet format and dataset that are loaded as Apache Spark Data frame

Petastorm is an open source data access library developed at **Uber ATG**. This library enables single machine or distributed training and evaluation of deep learning models directly from datasets in Apache Parquet format. Petastorm supports popular Python-based machine learning (ML) frameworks such as Tensorflow, PyTorch, and PySpark. It can also be used from pure Python code.

**Installation**
```
pip install petastorm
```
There are several extra dependencies that are defined by the petastorm package that are not installed automatically. The extras are: tf, tf_gpu, torch, opencv, docs, test.

For example to trigger installation of GPU version of tensorflow and opencv, use the following pip command:
```
pip install petastorm[opencv,tf_gpu]
```

Petastorm Spark convertor API simplifies data conversion from Spark to Tensorflow or Pytorch.

In [1]:
from pyspark.sql.session import SparkSession
spark = SparkSession.builder.appName('Loading Data').getOrCreate()

In [2]:
from petastorm.spark import SparkDatasetConverter, make_spark_converter

spark.conf.set(SparkDatasetConverter.PARENT_CACHE_DIR_URL_CONF, 'file:///home/jovyan/cache')

  from pyarrow import LocalFileSystem
  from pyarrow import LocalFileSystem


In [3]:
import numpy as np
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType

from petastorm.codecs import ScalarCodec, CompressedImageCodec, NdarrayCodec
from petastorm.etl.dataset_metadata import materialize_dataset
from petastorm.unischema import dict_to_spark_row, Unischema, UnischemaField

In [4]:
# The schema defines how the dataset schema looks like
HelloWorldSchema = Unischema('HelloWorldSchema', [
    UnischemaField('id', np.int32, (), ScalarCodec(IntegerType()), False),
    UnischemaField('image1', np.uint8, (128, 256, 3), CompressedImageCodec('png'), False),
    UnischemaField('array_4d', np.uint8, (None, 128, 30, None), NdarrayCodec(), False),
])

`HelloWorldSchema` is an instance of a `Unischema` object. `Unischema` is capable of rendering types of its fields into different framework specific formats, such as: Spark StructType, Tensorflow tf.DType and numpy numpy.dtype.

To define a dataset field, you need to specify a type, shape, a codec instance and whether the field is nullable for each field of the Unischema.


In [5]:
def row_generator(x):
    """Returns a single entry in the generated dataset. Return a bunch of random values as an example."""
    return {'id': x,
            'image1': np.random.randint(0, 255, dtype=np.uint8, size=(128, 256, 3)),
            'array_4d': np.random.randint(0, 255, dtype=np.uint8, size=(4, 128, 30, 3))}

In [6]:
def generate_petastorm_dataset(output_url='file:///tmp/hello_world_dataset'):
    rowgroup_size_mb = 256

    spark = SparkSession.builder.config('spark.driver.memory', '2g').master('local[2]').getOrCreate()
    sc = spark.sparkContext

    # Wrap dataset materialization portion. Will take care of setting up spark environment variables as
    # well as save petastorm specific metadata
    rows_count = 10
    with materialize_dataset(spark, output_url, HelloWorldSchema, rowgroup_size_mb):

        rows_rdd = sc.parallelize(range(rows_count))\
            .map(row_generator)\
            .map(lambda x: dict_to_spark_row(HelloWorldSchema, x))

        spark.createDataFrame(rows_rdd, HelloWorldSchema.as_spark_schema()) \
            .coalesce(10) \
            .write \
            .mode('overwrite') \
            .parquet(output_url)

- We wrap spark dataset generation code with the `materialize_dataset` context manager. The context manager is responsible for configuring row group size at the beginning and write out petastorm specific metadata at the end.
- The row generating code is expected to return a Python dictionary indexed by a field name. We use `row_generator` function for that.
- `dict_to_spark_row converts` the dictionary into a pyspark.Row object while ensuring schema HelloWorldSchema compliance (shape, type and is-nullable condition are tested).
- Once we have a pyspark.DataFrame we write it out to a parquet storage. The parquet schema is automatically derived from HelloWorldSchema.

In [10]:
generate_petastorm_dataset()

  self._filesystem_factory = lambda: pyarrow.localfs
  if dataset.fs.exists(common_metadata_file_path):
  elif dataset.fs.exists(metadata_file_path):
  arrow_metadata = dataset.pieces[0].get_metadata()
  with dataset.fs.open(common_metadata_file_path, 'wb') as metadata_file:
  if isinstance(dataset.fs, LocalFileSystem) and dataset.fs.exists(common_metadata_file_crc_path):
  paths = [piece.path for piece in dataset.pieces]
  with dataset.fs.open(common_metadata_file_path) as f:
  metadata = dataset.metadata
  common_metadata = dataset.common_metadata
  sorted_pieces = sorted(dataset.pieces, key=attrgetter('path'))
  rowgroups.append(pq.ParquetDatasetPiece(piece.path, open_file_func=dataset.fs.open, row_group=row_group,
  rowgroups.append(pq.ParquetDatasetPiece(piece.path, open_file_func=dataset.fs.open, row_group=row_group,


## Plain Python API
The petastorm.reader.Reader class is the main entry point for user code that accesses the data from an ML framework such as Tensorflow or Pytorch. The reader has multiple features such as:

- Selective column readout
- Multiple parallelism strategies: thread, process, single-threaded (for debug)
- N-grams readout support
- Row filtering (row predicates)
- Shuffling
- Partitioning for multi-GPU training
- Local caching

In [12]:
from petastorm import make_reader

with make_reader('file:///tmp/hello_world_dataset') as reader:
    for row in reader:
        print(row)
        break

HelloWorldSchema_view(id=2, image1=array([[[ 99,  56, 115],
        [141,  88,  32],
        [253,   6,  80],
        ...,
        [133, 248,  32],
        [130, 216, 171],
        [162,  70, 249]],

       [[200,  10, 104],
        [ 72,  10, 196],
        [ 11,  10, 185],
        ...,
        [ 96, 131, 158],
        [150,  31, 178],
        [106,  59,  55]],

       [[ 23, 110, 254],
        [144, 254, 203],
        [104, 108,  64],
        ...,
        [150, 168, 200],
        [102, 102, 188],
        [184, 169, 251]],

       ...,

       [[157, 226, 161],
        [ 19, 204, 107],
        [171,  81, 208],
        ...,
        [199, 114, 198],
        [223,  82, 151],
        [209, 151, 127]],

       [[206,  90, 254],
        [180, 253,  91],
        [ 13,  69, 128],
        ...,
        [ 96, 212, 237],
        [ 44, 224, 224],
        [238,  59, 235]],

       [[241,  18, 154],
        [ 17, 223, 235],
        [221, 111, 168],
        ...,
        [ 32,  67,  27],
        [167, 

  self._filesystem = pyarrow.localfs
  dataset = pq.ParquetDataset(path_or_paths, filesystem=fs, validate_schema=False, metadata_nthreads=10)
  if not dataset.common_metadata:
  dataset_metadata_dict = dataset.common_metadata.metadata
  self.dataset = pq.ParquetDataset(dataset_path, filesystem=pyarrow_filesystem,
  metadata = dataset.metadata
  common_metadata = dataset.common_metadata
  sorted_pieces = sorted(dataset.pieces, key=attrgetter('path'))
  rowgroups.append(pq.ParquetDatasetPiece(piece.path, open_file_func=dataset.fs.open, row_group=row_group,
  rowgroups.append(pq.ParquetDatasetPiece(piece.path, open_file_func=dataset.fs.open, row_group=row_group,
  parquet_file = ParquetFile(self._dataset.fs.open(piece.path))
  partitions = self._dataset.partitions
  data_frame = piece.read(columns=column_names, partitions=self._dataset.partitions).to_pandas(
  partitions = self._dataset.partitions
  data_frame = piece.read(columns=column_names, partitions=self._dataset.partitions).to_pand

## Tensorflow API
To hookup the reader into a tensorflow graph, you can use the tf_tensors function:

In [14]:
from petastorm.tf_utils import tf_tensors
import tensorflow as tf
with make_reader('file:///tmp/hello_world_dataset') as reader:
    row_tensors = tf_tensors(reader)
    with tf.Session() as session:
        for _ in range(3):
            print(session.run(row_tensors))

  self._filesystem = pyarrow.localfs
  dataset = pq.ParquetDataset(path_or_paths, filesystem=fs, validate_schema=False, metadata_nthreads=10)
  if not dataset.common_metadata:
  dataset_metadata_dict = dataset.common_metadata.metadata
  self.dataset = pq.ParquetDataset(dataset_path, filesystem=pyarrow_filesystem,
  metadata = dataset.metadata
  common_metadata = dataset.common_metadata
  sorted_pieces = sorted(dataset.pieces, key=attrgetter('path'))
  rowgroups.append(pq.ParquetDatasetPiece(piece.path, open_file_func=dataset.fs.open, row_group=row_group,
  rowgroups.append(pq.ParquetDatasetPiece(piece.path, open_file_func=dataset.fs.open, row_group=row_group,
  parquet_file = ParquetFile(self._dataset.fs.open(piece.path))
  partitions = self._dataset.partitions
  data_frame = piece.read(columns=column_names, partitions=self._dataset.partitions).to_pandas(
  partitions = self._dataset.partitions
  data_frame = piece.read(columns=column_names, partitions=self._dataset.partitions).to_pand

AttributeError: module 'tensorflow' has no attribute 'Session'

----------------------------------------
Exception occurred during processing of request from ('127.0.0.1', 49490)
Traceback (most recent call last):
  File "/opt/conda/lib/python3.10/socketserver.py", line 316, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/opt/conda/lib/python3.10/socketserver.py", line 347, in process_request
    self.finish_request(request, client_address)
  File "/opt/conda/lib/python3.10/socketserver.py", line 360, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/opt/conda/lib/python3.10/socketserver.py", line 747, in __init__
    self.handle()
  File "/usr/local/spark/python/pyspark/accumulators.py", line 281, in handle
    poll(accum_updates)
  File "/usr/local/spark/python/pyspark/accumulators.py", line 253, in poll
    if func():
  File "/usr/local/spark/python/pyspark/accumulators.py", line 257, in accum_updates
    num_updates = read_int(self.rfile)
  File "/usr/local/spark/python/