## Writing data from files into spark dataframe

* Writing files using direct APIs such as `csv, json`, etc under `df.write` where df is of type Spark's DataFrameWriter.
* Writing files using `format` and `save` under `df.write`.
* Specifying options as arguments as well as using functions such as `option` and `options`.
* Supported file formats:
    * `csv`
    * `text`
    * `json`
    * `parquet`
    * `orc`
* Other common file formats:
    * `xml`
    * `avro`
* Important file formats for certification - `csv`, `text`, `json`, `parquet`
* Writing into compressed files

In [1]:
from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import *
import datetime

In [2]:
userName = 'CodeInDNA'
spark = SparkSession. \
        builder. \
        appName(f'{userName} - JoinSparkDF'). \
        getOrCreate()

In [3]:
courses = [
    {
        'course_id': 1,
        'course_title': 'Mastering Python',
        'course_published_dt': datetime.date(2021, 1, 14),
        'is_active': True,
        'last_updated_ts': datetime.datetime(2021, 2, 18, 16, 57, 25)
    },
    {
        'course_id': 2,
        'course_title': 'Data Engineering Essentials',
        'course_published_dt': datetime.date(2021, 2, 10),
        'is_active': True,
        'last_updated_ts': datetime.datetime(2021, 3, 5, 12, 7, 33)
    },
    {
        'course_id': 3,
        'course_title': 'Mastering PySpark',
        'course_published_dt': datetime.date(2021, 1, 7),
        'is_active': True,
        'last_updated_ts': datetime.datetime(2021, 4, 6, 10, 5, 42)
    },
    {
        'course_id': 4,
        'course_title': 'AWS Essentials',
        'course_published_dt': datetime.date(2021, 3, 19),
        'is_active': False,
        'last_updated_ts': datetime.datetime(2021, 4, 10, 2, 25, 36)
    },
    {
        'course_id': 5,
        'course_title': 'Docker 101',
        'course_published_dt': datetime.date(2021, 2, 28),
        'is_active': True,
        'last_updated_ts': datetime.datetime(2021, 3, 21, 7, 18, 52)
    },
]

In [4]:
courses_df = spark.createDataFrame([Row(**course) for course in courses])

courses_df.show()

+---------+--------------------+-------------------+---------+-------------------+
|course_id|        course_title|course_published_dt|is_active|    last_updated_ts|
+---------+--------------------+-------------------+---------+-------------------+
|        1|    Mastering Python|         2021-01-14|     true|2021-02-18 16:57:25|
|        2|Data Engineering ...|         2021-02-10|     true|2021-03-05 12:07:33|
|        3|   Mastering PySpark|         2021-01-07|     true|2021-04-06 10:05:42|
|        4|      AWS Essentials|         2021-03-19|    false|2021-04-10 02:25:36|
|        5|          Docker 101|         2021-02-28|     true|2021-03-21 07:18:52|
+---------+--------------------+-------------------+---------+-------------------+



In [5]:
type(courses_df.write)

pyspark.sql.readwriter.DataFrameWriter

In [6]:
courses_df.write.json('../data/courses', mode='overwrite')

In [7]:
courses_df.write.format('json').save('../data/courses', mode='overwrite')

#### Steps to follow the spark dataframes into files

* Make sure to analyse the schema of the dataframe.
* Make sure you have write permissions on the target location.
* Understand whether you want to overwrite or append or ignore or throw exception in case target folder already exists.
* Decide whether you would like to compressed or not by default.
* Make sure you understand whether the files will be compressed or not by default.
* Use appropriate APIs along with right arguments based up on the requirements.

In [8]:
courses_df.dtypes

[('course_id', 'bigint'),
 ('course_title', 'string'),
 ('course_published_dt', 'date'),
 ('is_active', 'boolean'),
 ('last_updated_ts', 'timestamp')]

In [9]:
courses_df.write.csv('../data/courses_csv', mode='overwrite')

In [10]:
courses_df.write.format('csv').save('../data/courses_csv', mode='overwrite')

In [11]:
spark.read.text('../data/courses_csv').show(truncate=False)

+---------------------------------------------------------------------------+
|value                                                                      |
+---------------------------------------------------------------------------+
|4,AWS Essentials,2021-03-19,false,2021-04-10T02:25:36.000+05:30            |
|5,Docker 101,2021-02-28,true,2021-03-21T07:18:52.000+05:30                 |
|2,Data Engineering Essentials,2021-02-10,true,2021-03-05T12:07:33.000+05:30|
|3,Mastering PySpark,2021-01-07,true,2021-04-06T10:05:42.000+05:30          |
|1,Mastering Python,2021-01-14,true,2021-02-18T16:57:25.000+05:30           |
+---------------------------------------------------------------------------+



In [12]:
# Coalesce: Create 1 file instead of many small files
courses_df.coalesce(1).write.csv('../data/courses_csv', mode='overwrite', header=True)

In [13]:
spark.read.text('../data/courses_csv/').show(truncate=False)

+---------------------------------------------------------------------------+
|value                                                                      |
+---------------------------------------------------------------------------+
|course_id,course_title,course_published_dt,is_active,last_updated_ts       |
|1,Mastering Python,2021-01-14,true,2021-02-18T16:57:25.000+05:30           |
|2,Data Engineering Essentials,2021-02-10,true,2021-03-05T12:07:33.000+05:30|
|3,Mastering PySpark,2021-01-07,true,2021-04-06T10:05:42.000+05:30          |
|4,AWS Essentials,2021-03-19,false,2021-04-10T02:25:36.000+05:30            |
|5,Docker 101,2021-02-28,true,2021-03-21T07:18:52.000+05:30                 |
+---------------------------------------------------------------------------+



In [14]:
spark.read.csv('../data/courses_csv/', header=True).show()

+---------+--------------------+-------------------+---------+--------------------+
|course_id|        course_title|course_published_dt|is_active|     last_updated_ts|
+---------+--------------------+-------------------+---------+--------------------+
|        1|    Mastering Python|         2021-01-14|     true|2021-02-18T16:57:...|
|        2|Data Engineering ...|         2021-02-10|     true|2021-03-05T12:07:...|
|        3|   Mastering PySpark|         2021-01-07|     true|2021-04-06T10:05:...|
|        4|      AWS Essentials|         2021-03-19|    false|2021-04-10T02:25:...|
|        5|          Docker 101|         2021-02-28|     true|2021-03-21T07:18:...|
+---------+--------------------+-------------------+---------+--------------------+



In [15]:
help(courses_df.write.csv)

Help on method csv in module pyspark.sql.readwriter:

csv(path, mode=None, compression=None, sep=None, quote=None, escape=None, header=None, nullValue=None, escapeQuotes=None, quoteAll=None, dateFormat=None, timestampFormat=None, ignoreLeadingWhiteSpace=None, ignoreTrailingWhiteSpace=None, charToEscapeQuoteEscaping=None, encoding=None, emptyValue=None, lineSep=None) method of pyspark.sql.readwriter.DataFrameWriter instance
    Saves the content of the :class:`DataFrame` in CSV format at the specified path.
    
    :param path: the path in any Hadoop supported file system
    :param mode: specifies the behavior of the save operation when data already exists.
    
        * ``append``: Append contents of this :class:`DataFrame` to existing data.
        * ``overwrite``: Overwrite existing data.
        * ``ignore``: Silently ignore this operation if data already exists.
        * ``error`` or ``errorifexists`` (default case): Throw an exception if data already \
            exists.
    

In [16]:
# Compressions: bzip2, gzip, snappy, lz4, deflate
courses_df.coalesce(1).write.csv('../data/courses_csv', header=True, compression='gzip', mode='overwrite')

In [17]:
spark.read.csv('../data/courses_csv/', header=True).show(truncate=False)

+---------+---------------------------+-------------------+---------+-----------------------------+
|course_id|course_title               |course_published_dt|is_active|last_updated_ts              |
+---------+---------------------------+-------------------+---------+-----------------------------+
|1        |Mastering Python           |2021-01-14         |true     |2021-02-18T16:57:25.000+05:30|
|2        |Data Engineering Essentials|2021-02-10         |true     |2021-03-05T12:07:33.000+05:30|
|3        |Mastering PySpark          |2021-01-07         |true     |2021-04-06T10:05:42.000+05:30|
|4        |AWS Essentials             |2021-03-19         |false    |2021-04-10T02:25:36.000+05:30|
|5        |Docker 101                 |2021-02-28         |true     |2021-03-21T07:18:52.000+05:30|
+---------+---------------------------+-------------------+---------+-----------------------------+



In [18]:
# Read the data and replace the separator to '|'
input_dir = '../data/courses_csv/'

fs = spark._jvm.org.apache.hadoop.fs.FileSystem.get(spark._jsc.hadoopConfiguration())
list_status = fs.listStatus(spark._jvm.org.apache.hadoop.fs.Path(input_dir))

for file in list_status:
    if file.getPath().getName() != '_SUCCESS':
        df = spark.read.csv(str(file.getPath()), header=True)
        df.show()
        df.coalesce(1).write.csv('../data/courses_csv_pipe/', header=True, mode='overwrite', sep='|')

+---------+--------------------+-------------------+---------+--------------------+
|course_id|        course_title|course_published_dt|is_active|     last_updated_ts|
+---------+--------------------+-------------------+---------+--------------------+
|        1|    Mastering Python|         2021-01-14|     true|2021-02-18T16:57:...|
|        2|Data Engineering ...|         2021-02-10|     true|2021-03-05T12:07:...|
|        3|   Mastering PySpark|         2021-01-07|     true|2021-04-06T10:05:...|
|        4|      AWS Essentials|         2021-03-19|    false|2021-04-10T02:25:...|
|        5|          Docker 101|         2021-02-28|     true|2021-03-21T07:18:...|
+---------+--------------------+-------------------+---------+--------------------+



In [19]:
# Sep '|'
courses_df.coalesce(1).write.csv('../data/courses_csv/', sep='|', header=True, mode='overwrite')

In [20]:
spark.read.csv('../data/courses_csv/', sep='|', header=True).show(truncate=False)

+---------+---------------------------+-------------------+---------+-----------------------------+
|course_id|course_title               |course_published_dt|is_active|last_updated_ts              |
+---------+---------------------------+-------------------+---------+-----------------------------+
|1        |Mastering Python           |2021-01-14         |true     |2021-02-18T16:57:25.000+05:30|
|2        |Data Engineering Essentials|2021-02-10         |true     |2021-03-05T12:07:33.000+05:30|
|3        |Mastering PySpark          |2021-01-07         |true     |2021-04-06T10:05:42.000+05:30|
|4        |AWS Essentials             |2021-03-19         |false    |2021-04-10T02:25:36.000+05:30|
|5        |Docker 101                 |2021-02-28         |true     |2021-03-21T07:18:52.000+05:30|
+---------+---------------------------+-------------------+---------+-----------------------------+



In [21]:
spark.read.csv('../data/courses_csv/', header=True).show(truncate=False)

+---------------------------------------------------------------------------+
|course_id|course_title|course_published_dt|is_active|last_updated_ts       |
+---------------------------------------------------------------------------+
|1|Mastering Python|2021-01-14|true|2021-02-18T16:57:25.000+05:30           |
|2|Data Engineering Essentials|2021-02-10|true|2021-03-05T12:07:33.000+05:30|
|3|Mastering PySpark|2021-01-07|true|2021-04-06T10:05:42.000+05:30          |
|4|AWS Essentials|2021-03-19|false|2021-04-10T02:25:36.000+05:30            |
|5|Docker 101|2021-02-28|true|2021-03-21T07:18:52.000+05:30                 |
+---------------------------------------------------------------------------+



In [22]:
# It should include seperator ('|'), default (',')
spark.read.csv('../data/courses_csv/', header=True).printSchema()

root
 |-- course_id|course_title|course_published_dt|is_active|last_updated_ts: string (nullable = true)



In [23]:
spark.read.csv('../data/courses_csv/', header=True, sep='|').printSchema()

root
 |-- course_id: string (nullable = true)
 |-- course_title: string (nullable = true)
 |-- course_published_dt: string (nullable = true)
 |-- is_active: string (nullable = true)
 |-- last_updated_ts: string (nullable = true)



#### option and options

In [24]:
schema = """order_id INT, order_date TIMESTAMP, order_customer_id INT, order_status STRING"""

In [25]:
orders = spark.read.csv('../data/orders.csv', schema=schema)

orders.dtypes

[('order_id', 'int'),
 ('order_date', 'timestamp'),
 ('order_customer_id', 'int'),
 ('order_status', 'string')]

In [26]:
help(orders.write.option)

Help on method option in module pyspark.sql.readwriter:

option(key, value) method of pyspark.sql.readwriter.DataFrameWriter instance
    Adds an output option for the underlying data source.
    
    You can set the following option(s) for writing files:
        * ``timeZone``: sets the string that indicates a time zone ID to be used to format
            timestamps in the JSON/CSV datasources or partition values. The following
            formats of `timeZone` are supported:
    
            * Region-based zone ID: It should have the form 'area/city', such as                   'America/Los_Angeles'.
            * Zone offset: It should be in the format '(+|-)HH:mm', for example '-08:00' or                  '+01:00'. Also 'UTC' and 'Z' are supported as aliases of '+00:00'.
    
            Other short names like 'CST' are not recommended to use because they can be
            ambiguous. If it isn't set, the current value of the SQL config
            ``spark.sql.session.timeZone`` is 

In [27]:
help(orders.write.options)

Help on method options in module pyspark.sql.readwriter:

options(**options) method of pyspark.sql.readwriter.DataFrameWriter instance
    Adds output options for the underlying data source.
    
    You can set the following option(s) for writing files:
        * ``timeZone``: sets the string that indicates a time zone ID to be used to format
            timestamps in the JSON/CSV datasources or partition values. The following
            formats of `timeZone` are supported:
    
            * Region-based zone ID: It should have the form 'area/city', such as                   'America/Los_Angeles'.
            * Zone offset: It should be in the format '(+|-)HH:mm', for example '-08:00' or                  '+01:00'. Also 'UTC' and 'Z' are supported as aliases of '+00:00'.
    
            Other short names like 'CST' are not recommended to use because they can be
            ambiguous. If it isn't set, the current value of the SQL config
            ``spark.sql.session.timeZone`` is u

In [28]:
(orders.
coalesce(1).
write.
option('sep', '|').
option('header', True).
option('compression', 'gzip').
csv('../data/orders_csv_pipe', mode='overwrite'))

In [29]:
spark.read.csv('../data/orders_csv_pipe/', header=True, sep='|').dtypes

[('order_id', 'string'),
 ('order_date', 'string'),
 ('order_customer_id', 'string'),
 ('order_status', 'string')]

In [30]:
(orders.
coalesce(1).
write.
mode('overwrite').
options(sep='|', header=True, compression='gzip').
csv('../data/orders_csv_pipe'))

In [31]:
# options with dictionary type
options = {'sep': '|', 'compression':'gzip', 'header':True}

(orders.
coalesce(1).
write.
mode('overwrite').
options(**options).
csv('../data/orders_csv_pipe'))

#### JSON

In [33]:
(courses_df.
coalesce(1).
write.
json('../data/courses_json', mode='overwrite'))

In [35]:
(courses_df.
coalesce(1).
write.
format('json')
.save('../data/courses_json', mode='overwrite'))

In [38]:
spark.read.json('../data/courses_json/').show()

+---------+-------------------+--------------------+---------+--------------------+
|course_id|course_published_dt|        course_title|is_active|     last_updated_ts|
+---------+-------------------+--------------------+---------+--------------------+
|        1|         2021-01-14|    Mastering Python|     true|2021-02-18T16:57:...|
|        2|         2021-02-10|Data Engineering ...|     true|2021-03-05T12:07:...|
|        3|         2021-01-07|   Mastering PySpark|     true|2021-04-06T10:05:...|
|        4|         2021-03-19|      AWS Essentials|    false|2021-04-10T02:25:...|
|        5|         2021-02-28|          Docker 101|     true|2021-03-21T07:18:...|
+---------+-------------------+--------------------+---------+--------------------+



In [39]:
# With compression
(courses_df.
coalesce(1).
write.
json('../data/courses_json', mode='overwrite', compression='gzip'))

In [40]:
spark.read.json('../data/courses_json/').show(2)

+---------+-------------------+--------------------+---------+--------------------+
|course_id|course_published_dt|        course_title|is_active|     last_updated_ts|
+---------+-------------------+--------------------+---------+--------------------+
|        1|         2021-01-14|    Mastering Python|     true|2021-02-18T16:57:...|
|        2|         2021-02-10|Data Engineering ...|     true|2021-03-05T12:07:...|
+---------+-------------------+--------------------+---------+--------------------+
only showing top 2 rows



### Parquet

In [42]:
help(courses_df.write.parquet)

Help on method parquet in module pyspark.sql.readwriter:

parquet(path, mode=None, partitionBy=None, compression=None) method of pyspark.sql.readwriter.DataFrameWriter instance
    Saves the content of the :class:`DataFrame` in Parquet format at the specified path.
    
    :param path: the path in any Hadoop supported file system
    :param mode: specifies the behavior of the save operation when data already exists.
    
        * ``append``: Append contents of this :class:`DataFrame` to existing data.
        * ``overwrite``: Overwrite existing data.
        * ``ignore``: Silently ignore this operation if data already exists.
        * ``error`` or ``errorifexists`` (default case): Throw an exception if data already                 exists.
    :param partitionBy: names of partitioning columns
    :param compression: compression codec to use when saving to file. This can be one of the
                        known case-insensitive shorten names (none, uncompressed, snappy, gzip,
     

In [44]:
(courses_df.
coalesce(1).
write.
parquet('../data/courses_parquet', mode='overwrite'))

In [45]:
spark.read.parquet('../data/courses_parquet').show(2)

+---------+--------------------+-------------------+---------+-------------------+
|course_id|        course_title|course_published_dt|is_active|    last_updated_ts|
+---------+--------------------+-------------------+---------+-------------------+
|        1|    Mastering Python|         2021-01-14|     true|2021-02-18 16:57:25|
|        2|Data Engineering ...|         2021-02-10|     true|2021-03-05 12:07:33|
+---------+--------------------+-------------------+---------+-------------------+
only showing top 2 rows



In [46]:
spark.read.parquet('../data/courses_parquet').dtypes

[('course_id', 'bigint'),
 ('course_title', 'string'),
 ('course_published_dt', 'date'),
 ('is_active', 'boolean'),
 ('last_updated_ts', 'timestamp')]

In [47]:
(courses_df.
coalesce(1).
write.
format('parquet').
save('../data/courses_parquet', mode='overwrite'))

In [49]:
help(courses_df.write.parquet)

Help on method parquet in module pyspark.sql.readwriter:

parquet(path, mode=None, partitionBy=None, compression=None) method of pyspark.sql.readwriter.DataFrameWriter instance
    Saves the content of the :class:`DataFrame` in Parquet format at the specified path.
    
    :param path: the path in any Hadoop supported file system
    :param mode: specifies the behavior of the save operation when data already exists.
    
        * ``append``: Append contents of this :class:`DataFrame` to existing data.
        * ``overwrite``: Overwrite existing data.
        * ``ignore``: Silently ignore this operation if data already exists.
        * ``error`` or ``errorifexists`` (default case): Throw an exception if data already                 exists.
    :param partitionBy: names of partitioning columns
    :param compression: compression codec to use when saving to file. This can be one of the
                        known case-insensitive shorten names (none, uncompressed, snappy, gzip,
     

In [48]:
spark.conf.get('spark.sql.parquet.compression.codec')

'snappy'

In [50]:
(courses_df.
coalesce(1).
write.
parquet('../data/courses_parquet_no_comp', mode='overwrite', compression='none'))

In [51]:
(courses_df.
coalesce(1).
write.
parquet('../data/courses_parquet_gzip', mode='overwrite', compression='gzip'))

In [52]:
spark.conf.set('spark.sql.parquet.compression.codec', 'none')

In [53]:
spark.conf.get('spark.sql.parquet.compression.codec')

'none'

In [54]:
(courses_df.
coalesce(1).
write.
parquet('../data/courses_parquet_no_comp', mode='overwrite'))

#### Modes

In [55]:
help(courses_df.write.mode)

Help on method mode in module pyspark.sql.readwriter:

mode(saveMode) method of pyspark.sql.readwriter.DataFrameWriter instance
    Specifies the behavior when data or table already exists.
    
    Options include:
    
    * `append`: Append contents of this :class:`DataFrame` to existing data.
    * `overwrite`: Overwrite existing data.
    * `error` or `errorifexists`: Throw an exception if data already exists.
    * `ignore`: Silently ignore this operation if data already exists.
    
    >>> df.write.mode('append').parquet(os.path.join(tempfile.mkdtemp(), 'data'))
    
    .. versionadded:: 1.4



Different ways mode can be specified while writing dataframe into files. `file_format` can ne any valid out of the box format such as `text`, `csv`, `json`, `parquet`, `orc`.
* `courses_df.write.mode(savemode).file_format(path_to_folder)`
* `courses_df.write.file_format(path_to_folder, mode=savemode)`
* `courses_df.write.mode(savemode).format(file_format).save(path_to_folder)`
* `courses_df.write.format(file_format).save(path_to_folder, mode=savemode)`

 * Understand default behaviour
    * Fails if folder exists.
    * Creates folder and then adds files to it.

In [56]:
# Throws an error if folder already exists
courses_df.write.parquet('../data/courses_parquet/')

AnalysisException: path file:/E:/Practice/PySpark/data/courses_parquet already exists.;

In [58]:
spark.conf.set('spark.sql.parquet.compression.codec', 'snappy')

In [59]:
spark.read.parquet('../data/courses_parquet/').count()

5

In [60]:
courses_df.coalesce(1).write.mode('append').parquet('../data/courses_parquet/')

In [61]:
spark.read.parquet('../data/courses_parquet/').count()

10