# Writing Data
Copyright (c) Microsoft Corporation. All rights reserved.<br>
Licensed under the MIT License.

It is possible to write out the data at any point in a Dataflow. These writes are added as steps to the resulting Dataflow and will be executed every time the Dataflow is executed. Since there are no limitations to how many write steps there are in a pipeline, this makes it easy to write out intermediate results for troubleshooting or to be picked up by other pipelines.

It is important to note that the execution of each write results in a full pull of the data in the Dataflow. A Dataflow with three write steps will, for example, read and process every record in the dataset three times.

## Writing to Files

Data can be written to files in any of our supported locations (Local File System, Azure Blob Storage, and Azure Data Lake Storage). In order to parallelize the write, the data is written to multiple partition files. A sentinel file named SUCCESS is also output once the write has completed. This makes it possible to identify when an intermediate write has completed without having to wait for the whole pipeline to complete.

> When running a Dataflow in Spark, attempting to execute a write to an existing folder will fail. It is important to ensure the folder is empty or use a different target location per execution.

The following file formats are currently supported:
- Delimited Files (CSV, TSV, etc.)
- Parquet Files

In [1]:
import azureml.dataprep as dprep

# We'll start by loading data into a Dataflow. We will re-use this with different formats.
t = dprep.smart_read_file('./data/fixed_width_file.txt')
t = t.to_number('Column3')
t.head(10)

Unnamed: 0,Column1,Column2,Column3,Column4,Column5,Column6,Column7,Column8,Column9
0,10000.0,99999.0,"azureml.dataprep.native.DataPrepError(""'Micros...",NO,NO,ENRS,"azureml.dataprep.native.DataPrepError(""'Micros...","azureml.dataprep.native.DataPrepError(""'Micros...","azureml.dataprep.native.DataPrepError(""'Micros..."
1,10003.0,99999.0,"azureml.dataprep.native.DataPrepError(""'Micros...",NO,NO,ENSO,"azureml.dataprep.native.DataPrepError(""'Micros...","azureml.dataprep.native.DataPrepError(""'Micros...","azureml.dataprep.native.DataPrepError(""'Micros..."
2,10010.0,99999.0,"azureml.dataprep.native.DataPrepError(""'Micros...",NO,JN,ENJA,70933,-8667,90
3,10013.0,99999.0,"azureml.dataprep.native.DataPrepError(""'Micros...",NO,NO,,"azureml.dataprep.native.DataPrepError(""'Micros...","azureml.dataprep.native.DataPrepError(""'Micros...","azureml.dataprep.native.DataPrepError(""'Micros..."
4,10014.0,99999.0,"azureml.dataprep.native.DataPrepError(""'Micros...",NO,NO,ENSO,59783,5350,500
5,10015.0,99999.0,"azureml.dataprep.native.DataPrepError(""'Micros...",NO,NO,ENBL,61383,5867,3270
6,10016.0,99999.0,"azureml.dataprep.native.DataPrepError(""'Micros...",NO,NO,,64850,11233,140
7,10017.0,99999.0,"azureml.dataprep.native.DataPrepError(""'Micros...",NO,NO,ENFR,59933,2417,480
8,10020.0,99999.0,"azureml.dataprep.native.DataPrepError(""'Micros...",NO,SV,,80050,16250,80
9,10030.0,99999.0,"azureml.dataprep.native.DataPrepError(""'Micros...",NO,SV,,77000,15500,120


### Delimited Files

In [2]:
# The line below creates a new Dataflow with a write step, but the actual write has not been
# executed yet. When the Dataflow is run, the write will take place.
write_t = t.write_to_csv(directory_path=dprep.LocalFileOutput('./test_out/'))

# We now execute the Dataflow, which executes the write operation.
write_t.run_local()

written_files = dprep.read_csv('./test_out/part-*')
written_files.head(10)

Unnamed: 0,Column1,Column2,Column3,Column4,Column5,Column6,Column7,Column8,Column9
0,10000,99999,ERROR,NO,NO,ENRS,ERROR,ERROR,ERROR
1,10003,99999,ERROR,NO,NO,ENSO,ERROR,ERROR,ERROR
2,10010,99999,ERROR,NO,JN,ENJA,70933,-8667,90
3,10013,99999,ERROR,NO,NO,,ERROR,ERROR,ERROR
4,10014,99999,ERROR,NO,NO,ENSO,59783,5350,500
5,10015,99999,ERROR,NO,NO,ENBL,61383,5867,3270
6,10016,99999,ERROR,NO,NO,,64850,11233,140
7,10017,99999,ERROR,NO,NO,ENFR,59933,2417,480
8,10020,99999,ERROR,NO,SV,,80050,16250,80
9,10030,99999,ERROR,NO,SV,,77000,15500,120


The data we wrote out contains several errors in the numeric columns due to numbers that we were unable to parse. When written out to CSV, these are replaced with the string "ERROR" by default. We can parameterize this as part of our write call. In the same vein, it is also possible to set what string to use to represent null values.

In [3]:
write_t = t.write_to_csv(directory_path=dprep.LocalFileOutput('./test_out/'), 
                         error='BadData',
                         na='NA')
write_t.run_local()
written_files = dprep.read_csv('./test_out/part-*')
written_files.head(10)

Unnamed: 0,Column1,Column2,Column3,Column4,Column5,Column6,Column7,Column8,Column9
0,10000,99999,BadData,NO,NO,ENRS,BadData,BadData,BadData
1,10003,99999,BadData,NO,NO,ENSO,BadData,BadData,BadData
2,10010,99999,BadData,NO,JN,ENJA,70933,-8667,90
3,10013,99999,BadData,NO,NO,,BadData,BadData,BadData
4,10014,99999,BadData,NO,NO,ENSO,59783,5350,500
5,10015,99999,BadData,NO,NO,ENBL,61383,5867,3270
6,10016,99999,BadData,NO,NO,,64850,11233,140
7,10017,99999,BadData,NO,NO,ENFR,59933,2417,480
8,10020,99999,BadData,NO,SV,,80050,16250,80
9,10030,99999,BadData,NO,SV,,77000,15500,120


### Parquet Files

In [4]:
# Similarly to write_to_csv above write_to_parquet returns a new Dataflow with a write Parquet step
# which hasn't been executed yet.
write_parquet_t = t.write_to_parquet(directory_path=dprep.LocalFileOutput('./test_parquet_out/'),
                                     error='MiscreantData')

# We now execute the Dataflow, which executes the write operation.
write_parquet_t.run_local()

written_parquet_files = dprep.read_parquet_file('./test_parquet_out/part-*')
# DataPrep relies on pyarrow for reading Parquet files and pyarrow doesn't currently support Python 3.7
import sys
if sys.version_info < (3,7):
    written_parquet_files.head(10)