# Loading large datasets

**Learning Objectives**
  - Understand difference between loading data entirely in-memory and loading in batches from disk
  - Practice loading a `.csv` file from disk in batches using the `tf.data` module
 
## Introduction

In the previous notebook, we read the the whole taxifare .csv files into memory, specifically a Pandas dataframe, before invoking `tf.data.from_tensor_slices` from the tf.data API. We could get away with this because it was a small sample of the dataset, but on the full taxifare dataset this wouldn't be feasible.

In this notebook we demonstrate how to read .csv files directly from disk, one batch at a time, using `tf.data.TextLineDataset`

Run the following cell and restart the kernel if needed

In [1]:
# Ensure that we have Tensorflow 1.13.1 installed.
!pip3 freeze | grep tensorflow==1.13.1 || pip3 install tensorflow==1.13.1

Collecting tensorflow==1.13.1
[?25l  Downloading https://files.pythonhosted.org/packages/ca/f2/0931c194bb98398017d52c94ee30e5e1a4082ab6af76e204856ff1fdb33e/tensorflow-1.13.1-cp35-cp35m-manylinux1_x86_64.whl (92.5MB)
[K    100% |████████████████████████████████| 92.5MB 199kB/s eta 0:00:01  1% |▌                               | 1.5MB 42.3MB/s eta 0:00:03    4% |█▌                              | 4.4MB 29.2MB/s eta 0:00:04    6% |██▏                             | 6.1MB 37.8MB/s eta 0:00:03    15% |█████                           | 14.6MB 40.8MB/s eta 0:00:02    28% |█████████▏                      | 26.7MB 43.1MB/s eta 0:00:02    40% |█████████████                   | 37.6MB 10.1MB/s eta 0:00:06    41% |█████████████▍                  | 38.7MB 39.8MB/s eta 0:00:02    43% |██████████████                  | 40.7MB 12.2MB/s eta 0:00:05    53% |█████████████████               | 49.1MB 37.8MB/s eta 0:00:02    60% |███████████████████▎            | 55.8MB 37.9MB/s eta 0:00:01    64% |█████████

In [2]:
import tensorflow as tf
import shutil
print(tf.__version__)

  from ._conv import register_converters as _register_converters


1.13.1


In [4]:
tf.enable_eager_execution()

## Input function reading from CSV

We define `read_dataset()` which given a csv file path returns a `tf.data.Dataset` in which each row represents a (features,label) in the Estimator API required format 
- features: A python dictionary. Each key is a feature column name and its value is the tensor containing the data for that feature
- label: A Tensor containing the labels

We then invoke `read_dataset()` function from within the `train_input_fn()` and `eval_input_fn()`. The remaining code is as before.

#### **Exercise 1**

In the next cell, implement a `parse_row` function that takes as input a csv row (as a string) 
and returns a tuple (features, labels) as described above.

First, use the [tf.decode_csv function](https://www.tensorflow.org/api_docs/python/tf/io/decode_csv) to read in the features from a csv file. Next, once `fields` has been read from the `.csv` file, create a dictionary of features and values. Lastly, define the label and remove it from the `features` dict you created. This can be done in one step with pythons pop operation.

The column names and the default values you'll need for these operations are given by global variables `CSV_COLUMN_NAMES`
and `CSV_DEFAULTS`. The labels are stored in the first column.

In [7]:
CSV_COLUMN_NAMES = ["fare_amount","dayofweek","hourofday","pickuplon","pickuplat","dropofflon","dropofflat"]
CSV_DEFAULTS = [[0.0],[1],[0],[-74.0], [40.0], [-74.0], [40.7]]

def parse_row(row):
    fields = tf.decode_csv(row, CSV_DEFAULTS)# TODO: Your code goes here
    features = dict(zip(CSV_COLUMN_NAMES, fields))# TODO: Your code goes here
    labels = features.pop("fare_amount") # TODO: Your code goes here
    return features, labels

Run the following test to make sure your implementation is correct

In [8]:
a_row = "0.0,1,0,-74.0,40.0,-74.0,40.7"
features, labels = parse_row(a_row)

assert labels.numpy() == 0.0
assert features["pickuplon"].numpy() == -74.0
print("You rock!")

You rock!


#### **Exercise 2**

Use the function `parse_row` you implemented in the previous exercise to 
implement a `read_dataset` function that
- takes as input the path to a csv file
- returns a `tf.data.Dataset` object containing the features, labels

Assume that the .csv file has a header, and that your `read_dataset` will skip it. Have a look at the [tf.data.TextLineDataset documentation](https://www.tensorflow.org/api_docs/python/tf/data/TextLineDataset) to see what variables to pass when initializing the dataset pipeline. Then use the `parse_row` operation we created above to read the values from the .csv file

In [9]:
def read_dataset(csv_path):  
    dataset = tf.data.TextLineDataset(csv_path).skip(1) # TODO: Your code goes here
    dataset = dataset.map(parse_row) # TODO: Your code goes here
    return dataset

### Tests

Let's create a test dataset to test our function.

In [10]:
%%writefile test.csv
fare_amount,dayofweek,hourofday,pickuplon,pickuplat,dropofflon,dropofflat
28,1,0,-73.0,41.0,-74.0,20.7
12.3,1,0,-72.0,44.0,-75.0,40.6
10,1,0,-71.0,41.0,-71.0,42.9

Writing test.csv


You should be able to iterate over what's returned by `read_dataset`. We'll print the `dropofflat` and `fare_amount` for each entry in `./test.csv`

In [11]:
for feature, label in read_dataset("./test.csv"):
    print("dropofflat:", feature["dropofflat"].numpy())
    print("fare_amount:", label.numpy())

Instructions for updating:
Colocations handled automatically by placer.
dropofflat: 20.7
fare_amount: 28.0
dropofflat: 40.6
fare_amount: 12.3
dropofflat: 42.9
fare_amount: 10.0


Run the following test cell to make sure you function works properly:

In [12]:
dataset= read_dataset("./test.csv")
dataset_iterator = dataset.make_one_shot_iterator()
features, labels = dataset_iterator.get_next()

assert features['dayofweek'].numpy() == 1
assert labels.numpy() == 28
print("You rock!")

You rock!


#### **Exercise 3**

In the code cell below, implement a `train_input_fn` function that
- takes as input a path to a csv file along with a batch_size
- returns a dataset object that shuffle the rows and returns them in batches of `batch_size`

**Hint:** Reuse the `read_dataset` function you implemented above. 

Once you've initialized the `dataset`, be sure to add a step to `shuffle`, `repeat` and `batch` to your pipeline.

In [23]:
def train_input_fn(csv_path, batch_size = 128):
    dataset = read_dataset(csv_path)# TODO: Your code goes here
    dataset = dataset.shuffle(buffer_size = 1000).batch (batch_size = batch_size).repeat(count = None)# TODO: Your code goes here
    return dataset

#### **Exercise 4**

Next, implement as `eval_input_fn` similar to the `train_input_fn` you implemented above. Remember, the only difference is that this function does not need to shuffle the rows.

In [16]:
def eval_input_fn(csv_path, batch_size = 128):
    dataset = read_dataset(csv_path) # TODO: Your code goes here
    dataset = dataset.batch (batch_size = batch_size).repeat(count = 1)# TODO: Your code goes here
    return dataset

## Create feature columns

The features of our models are the following:

In [17]:
FEATURE_NAMES = CSV_COLUMN_NAMES[1:] # all but first column
print(FEATURE_NAMES)

['dayofweek', 'hourofday', 'pickuplon', 'pickuplat', 'dropofflon', 'dropofflat']


#### **Exercise 5**

In the cell below, create a variable called `feature_cols` which contains a list of the appropriate `tf.feature_column` to be passed to a `tf.estimator`.

In [18]:
feature_cols = [ tf.feature_column.numeric_column(i) for i in FEATURE_NAMES] # TODO: Your code goes here
print(feature_cols)

[NumericColumn(key='dayofweek', shape=(1,), default_value=None, dtype=tf.float32, normalizer_fn=None), NumericColumn(key='hourofday', shape=(1,), default_value=None, dtype=tf.float32, normalizer_fn=None), NumericColumn(key='pickuplon', shape=(1,), default_value=None, dtype=tf.float32, normalizer_fn=None), NumericColumn(key='pickuplat', shape=(1,), default_value=None, dtype=tf.float32, normalizer_fn=None), NumericColumn(key='dropofflon', shape=(1,), default_value=None, dtype=tf.float32, normalizer_fn=None), NumericColumn(key='dropofflat', shape=(1,), default_value=None, dtype=tf.float32, normalizer_fn=None)]


### Choose Estimator 

#### **Exercise 6**

In the cell below, create an instance of a `tf.estimator.DNNRegressor` such that
- it has two layers of 10 units each
- it uses the features defined in the previous exercise
- it saves the trained model into the directory `./taxi_trained`
- it has a random seed set to 1 for replicability and debugging

Have a look at [the documentation for Tensorflow's DNNRegressor](https://www.tensorflow.org/api_docs/python/tf/estimator/DNNRegressor) to remind you of the implementation.

**Hint:** Remember, the random seed is set by passing a `tf.estimator.RunConfig` object
  to the `config` parameter of the `tf.estimator`.

In [19]:
OUTDIR = "taxi_trained"

model = tf.estimator.DNNRegressor(hidden_units = [10, 10], 
                                  feature_columns = feature_cols, 
                                  model_dir = OUTDIR,
                                  config = tf.estimator.RunConfig(tf_random_seed = 1))  # TODO: Your code goes here

INFO:tensorflow:Using config: {'_cluster_spec': <tensorflow.python.training.server_lib.ClusterSpec object at 0x7f12346ab160>, '_log_step_count_steps': 100, '_eval_distribute': None, '_task_type': 'worker', '_global_id_in_cluster': 0, '_tf_random_seed': 1, '_save_summary_steps': 100, '_keep_checkpoint_every_n_hours': 10000, '_keep_checkpoint_max': 5, '_save_checkpoints_secs': 600, '_experimental_distribute': None, '_model_dir': 'taxi_trained', '_service': None, '_task_id': 0, '_num_worker_replicas': 1, '_num_ps_replicas': 0, '_evaluation_master': '', '_master': '', '_train_distribute': None, '_is_chief': True, '_save_checkpoints_steps': None, '_device_fn': None, '_protocol': None, '_session_config': allow_soft_placement: true
graph_options {
  rewrite_options {
    meta_optimizer_iterations: ONE
  }
}
}


## Train

Next, we'll train the model.

#### **Exercise 7**

Complete the code in the cell below to train the `DNNRegressor` model you instantiated above on our data. Have a look at [the documentation for the `train` method of the `DNNRegressor`](https://www.tensorflow.org/api_docs/python/tf/estimator/DNNRegressor#train) to see what variables you should pass. You'll use the `train_input_function` you created above and the `./taxi-train.csv` dataset. 

If you train your model for 500 steps. How many epochs of the dataset does this represent? 

In [24]:
%%time
tf.logging.set_verbosity(tf.logging.INFO) # so loss is printed during training
shutil.rmtree(path = OUTDIR, ignore_errors = True) # start fresh each time

model.train(
    input_fn = lambda: train_input_fn(csv_path = "./taxi-train.csv"), # TODO: Your code goes here,
    steps = 500 # TODO: Your code goes here
)

INFO:tensorflow:Calling model_fn.
Instructions for updating:
Use tf.cast instead.
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Create CheckpointSaverHook.
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Saving checkpoints for 0 into taxi_trained/model.ckpt.


NotFoundError: ./taxi-train.csv; No such file or directory
	 [[node IteratorGetNext (defined at /usr/local/envs/py3env/lib/python3.5/site-packages/tensorflow_estimator/python/estimator/util.py:110) ]]

Caused by op 'IteratorGetNext', defined at:
  File "/usr/local/envs/py3env/lib/python3.5/runpy.py", line 193, in _run_module_as_main
    "__main__", mod_spec)
  File "/usr/local/envs/py3env/lib/python3.5/runpy.py", line 85, in _run_code
    exec(code, run_globals)
  File "/usr/local/envs/py3env/lib/python3.5/site-packages/ipykernel/__main__.py", line 3, in <module>
    app.launch_new_instance()
  File "/usr/local/envs/py3env/lib/python3.5/site-packages/traitlets/config/application.py", line 658, in launch_instance
    app.start()
  File "/usr/local/envs/py3env/lib/python3.5/site-packages/ipykernel/kernelapp.py", line 486, in start
    self.io_loop.start()
  File "/usr/local/envs/py3env/lib/python3.5/site-packages/tornado/ioloop.py", line 888, in start
    handler_func(fd_obj, events)
  File "/usr/local/envs/py3env/lib/python3.5/site-packages/tornado/stack_context.py", line 277, in null_wrapper
    return fn(*args, **kwargs)
  File "/usr/local/envs/py3env/lib/python3.5/site-packages/zmq/eventloop/zmqstream.py", line 450, in _handle_events
    self._handle_recv()
  File "/usr/local/envs/py3env/lib/python3.5/site-packages/zmq/eventloop/zmqstream.py", line 480, in _handle_recv
    self._run_callback(callback, msg)
  File "/usr/local/envs/py3env/lib/python3.5/site-packages/zmq/eventloop/zmqstream.py", line 432, in _run_callback
    callback(*args, **kwargs)
  File "/usr/local/envs/py3env/lib/python3.5/site-packages/tornado/stack_context.py", line 277, in null_wrapper
    return fn(*args, **kwargs)
  File "/usr/local/envs/py3env/lib/python3.5/site-packages/ipykernel/kernelbase.py", line 283, in dispatcher
    return self.dispatch_shell(stream, msg)
  File "/usr/local/envs/py3env/lib/python3.5/site-packages/ipykernel/kernelbase.py", line 233, in dispatch_shell
    handler(stream, idents, msg)
  File "/usr/local/envs/py3env/lib/python3.5/site-packages/ipykernel/kernelbase.py", line 399, in execute_request
    user_expressions, allow_stdin)
  File "/usr/local/envs/py3env/lib/python3.5/site-packages/ipykernel/ipkernel.py", line 208, in do_execute
    res = shell.run_cell(code, store_history=store_history, silent=silent)
  File "/usr/local/envs/py3env/lib/python3.5/site-packages/ipykernel/zmqshell.py", line 537, in run_cell
    return super(ZMQInteractiveShell, self).run_cell(*args, **kwargs)
  File "/usr/local/envs/py3env/lib/python3.5/site-packages/IPython/core/interactiveshell.py", line 2662, in run_cell
    raw_cell, store_history, silent, shell_futures)
  File "/usr/local/envs/py3env/lib/python3.5/site-packages/IPython/core/interactiveshell.py", line 2785, in _run_cell
    interactivity=interactivity, compiler=compiler, result=result)
  File "/usr/local/envs/py3env/lib/python3.5/site-packages/IPython/core/interactiveshell.py", line 2907, in run_ast_nodes
    if self.run_code(code, result):
  File "/usr/local/envs/py3env/lib/python3.5/site-packages/IPython/core/interactiveshell.py", line 2961, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "<ipython-input-24-05233dcf9789>", line 1, in <module>
    get_ipython().run_cell_magic('time', '', 'tf.logging.set_verbosity(tf.logging.INFO) # so loss is printed during training\nshutil.rmtree(path = OUTDIR, ignore_errors = True) # start fresh each time\n\nmodel.train(\n    input_fn = lambda: train_input_fn(csv_path = "./taxi-train.csv"), # TODO: Your code goes here,\n    steps = 500 # TODO: Your code goes here\n)')
  File "/usr/local/envs/py3env/lib/python3.5/site-packages/datalab/kernel/__init__.py", line 104, in _run_cell_magic
    return _orig_run_cell_magic(self, magic_name, line, cell)
  File "/usr/local/envs/py3env/lib/python3.5/site-packages/google/datalab/kernel/__init__.py", line 92, in _run_cell_magic
    return _orig_run_cell_magic(self, magic_name, line, cell)
  File "/usr/local/envs/py3env/lib/python3.5/site-packages/IPython/core/interactiveshell.py", line 2167, in run_cell_magic
    result = fn(magic_arg_s, cell)
  File "<decorator-gen-62>", line 2, in time
  File "/usr/local/envs/py3env/lib/python3.5/site-packages/IPython/core/magic.py", line 187, in <lambda>
    call = lambda f, *a, **k: f(*a, **k)
  File "/usr/local/envs/py3env/lib/python3.5/site-packages/IPython/core/magics/execution.py", line 1237, in time
    exec(code, glob, local_ns)
  File "<timed exec>", line 6, in <module>
  File "/usr/local/envs/py3env/lib/python3.5/site-packages/tensorflow_estimator/python/estimator/estimator.py", line 358, in train
    loss = self._train_model(input_fn, hooks, saving_listeners)
  File "/usr/local/envs/py3env/lib/python3.5/site-packages/tensorflow_estimator/python/estimator/estimator.py", line 1124, in _train_model
    return self._train_model_default(input_fn, hooks, saving_listeners)
  File "/usr/local/envs/py3env/lib/python3.5/site-packages/tensorflow_estimator/python/estimator/estimator.py", line 1151, in _train_model_default
    input_fn, model_fn_lib.ModeKeys.TRAIN))
  File "/usr/local/envs/py3env/lib/python3.5/site-packages/tensorflow_estimator/python/estimator/estimator.py", line 992, in _get_features_and_labels_from_input_fn
    self._call_input_fn(input_fn, mode))
  File "/usr/local/envs/py3env/lib/python3.5/site-packages/tensorflow_estimator/python/estimator/util.py", line 110, in parse_input_fn_result
    result = iterator.get_next()
  File "/usr/local/envs/py3env/lib/python3.5/site-packages/tensorflow/python/data/ops/iterator_ops.py", line 414, in get_next
    output_shapes=self._structure._flat_shapes, name=name)
  File "/usr/local/envs/py3env/lib/python3.5/site-packages/tensorflow/python/ops/gen_dataset_ops.py", line 1685, in iterator_get_next
    output_shapes=output_shapes, name=name)
  File "/usr/local/envs/py3env/lib/python3.5/site-packages/tensorflow/python/framework/op_def_library.py", line 788, in _apply_op_helper
    op_def=op_def)
  File "/usr/local/envs/py3env/lib/python3.5/site-packages/tensorflow/python/util/deprecation.py", line 507, in new_func
    return func(*args, **kwargs)
  File "/usr/local/envs/py3env/lib/python3.5/site-packages/tensorflow/python/framework/ops.py", line 3300, in create_op
    op_def=op_def)
  File "/usr/local/envs/py3env/lib/python3.5/site-packages/tensorflow/python/framework/ops.py", line 1801, in __init__
    self._traceback = tf_stack.extract_stack()

NotFoundError (see above for traceback): ./taxi-train.csv; No such file or directory
	 [[node IteratorGetNext (defined at /usr/local/envs/py3env/lib/python3.5/site-packages/tensorflow_estimator/python/estimator/util.py:110) ]]


## Evaluate

Lastly, we'll evaluate our model.

#### **Exercise 8**

In the cell below, evaluate the model using its `.evaluate` method and the `eval_input_fn` function you implemented above on the `/.taxi-valid.csv` dataset. Capture the result of running evaluation on the evaluation set in a variable called `metrics`. Then, extract the `average_loss` for the dictionary returned by `model.evaluate` and contained in `metrics`. This is the RMSE.

In [None]:
metrics = # TODO: Your code goes here
print("RMSE on dataset = {}".format(# TODO: Your code goes here))

## Challenge exercise

Create a neural network that is capable of finding the volume of a cylinder given the radius of its base (r) and its height (h). Assume that the radius and height of the cylinder are both in the range 0.5 to 2.0. Unlike in the challenge exercise for b_estimator.ipynb, assume that your measurements of r, h and V are all rounded off to the nearest 0.1. Simulate the necessary training dataset. This time, you will need a lot more data to get a good predictor.

Hint (highlight to see):
<p style='color:white'>
Create random values for r and h and compute V. Then, round off r, h and V (i.e., the volume is computed from the true value of r and h; it's only your measurement that is rounded off). Your dataset will consist of the round values of r, h and V. Do this for both the training and evaluation datasets.
</p>

Now modify the "noise" so that instead of just rounding off the value, there is up to a 10% error (uniformly distributed) in the measurement followed by rounding off.

Copyright 2019 Google Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License