In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('instance').getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/06/23 13:27:50 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/06/23 13:27:50 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


## Overview of writting files from Spark Data Frames
* writting files using direct API such as `csv`, `json`, etc. where df is of type DataFrameWritter
* writting files using `format` and `save` under `df.write`
* specyfying options as arguments as well as using `option` or `options`
* supported file formats `text`, `csv`, `json`, `parquet`, `orc`, etc.
* other common file formats `xml`, `avro`
* for certification the following formats are important `csv`, `json`, `parquet`
* writting into compressed files 

In [3]:
import datetime
from pyspark.sql import Row

courses = [
    {
        'course_id': 1,
        'course_title': 'Mastering Python',
        'course_published_dt': datetime.date(2021, 1, 14),
        'is_active': True,
        'last_updated_ts': datetime.datetime(2021, 2, 18, 16, 57, 25)
    },
    {
        'course_id': 2,
        'course_title': 'Data Engineering Essentials',
        'course_published_dt': datetime.date(2021, 2, 10),
        'is_active': True,
        'last_updated_ts': datetime.datetime(2021, 3, 5, 12, 7, 33)
    },
    {
        'course_id': 3,
        'course_title': 'Mastering Pyspark',
        'course_published_dt': datetime.date(2021, 1, 7),
        'is_active': True,
        'last_updated_ts': datetime.datetime(2021, 4, 6, 10, 5, 42)
    },
    {
        'course_id': 4,
        'course_title': 'AWS Essentials',
        'course_published_dt': datetime.date(2021, 3, 19),
        'is_active': False,
        'last_updated_ts': datetime.datetime(2021, 4, 10, 2, 25, 36)
    },
    {
        'course_id': 5,
        'course_title': 'Docker 101',
        'course_published_dt': datetime.date(2021, 2, 28),
        'is_active': True,
        'last_updated_ts': datetime.datetime(2021, 3, 21, 7, 18, 52)
    }
]

#both work
courses_df = spark.createDataFrame([Row(**course) for course in courses]) # maintained column order
#courses_df = spark.createDataFrame(courses)
courses_df.printSchema()
courses_df.show()

root
 |-- course_id: long (nullable = true)
 |-- course_title: string (nullable = true)
 |-- course_published_dt: date (nullable = true)
 |-- is_active: boolean (nullable = true)
 |-- last_updated_ts: timestamp (nullable = true)

+---------+--------------------+-------------------+---------+-------------------+
|course_id|        course_title|course_published_dt|is_active|    last_updated_ts|
+---------+--------------------+-------------------+---------+-------------------+
|        1|    Mastering Python|         2021-01-14|     true|2021-02-18 16:57:25|
|        2|Data Engineering ...|         2021-02-10|     true|2021-03-05 12:07:33|
|        3|   Mastering Pyspark|         2021-01-07|     true|2021-04-06 10:05:42|
|        4|      AWS Essentials|         2021-03-19|    false|2021-04-10 02:25:36|
|        5|          Docker 101|         2021-02-28|     true|2021-03-21 07:18:52|
+---------+--------------------+-------------------+---------+-------------------+



In [4]:
type(courses_df.write)

pyspark.sql.readwriter.DataFrameWriter

In [9]:
output_dir = '/Users/adhoc/git/retail_db/json/courses'
#courses_df.write.json(output_dir)
courses_df.write.json(output_dir, mode='overwrite')
courses_df.write.format('json').save(output_dir, mode='overwrite')

## Steps to follow while writting Spark Data Frames into files
* make sure to analyze the schema of the Data Frame
* make sure you got write permissions on the target location
* understand whether you want to `overwrite`, `append`, `ignore` or `throw exception` in case the target location exists
* decide whether you want to compress the data or not
* make sure you understand the data will be compressed or not by default
* use appropriate API along with right arguments based up on the requirements

## Writting CSV files into Spark Data Frames
There are multiply approches:
* Approach 1: `df.write.csv(path_to_folder)`
* Approach 2: `df.write.format('csv').save(path_to_folder)`
* The column names from the schema can be added as header to each of the files by setting `header=True`
* You can save files using customized delimiter by setting `sep` option
* You can compress the data while writting data frame into csv files 

## Specyfing header while writting to CSV files from Spark Dada Frame 

In [18]:
output_path = '/Users/adhoc/git/retail_db/csv'
courses_df.coalesce(1).write.format('csv').save(output_path, mode='overwrite', header=True)

In [19]:
spark.read.text(output_path).show(truncate=False)

+---------------------------------------------------------------------------+
|value                                                                      |
+---------------------------------------------------------------------------+
|course_id,course_title,course_published_dt,is_active,last_updated_ts       |
|1,Mastering Python,2021-01-14,true,2021-02-18T16:57:25.000+01:00           |
|2,Data Engineering Essentials,2021-02-10,true,2021-03-05T12:07:33.000+01:00|
|3,Mastering Pyspark,2021-01-07,true,2021-04-06T10:05:42.000+02:00          |
|4,AWS Essentials,2021-03-19,false,2021-04-10T02:25:36.000+02:00            |
|5,Docker 101,2021-02-28,true,2021-03-21T07:18:52.000+01:00                 |
+---------------------------------------------------------------------------+



In [23]:
spark.read.csv(output_path, header=True).show(truncate=False)
spark.read.csv(output_path, header=True).dtypes

+---------+---------------------------+-------------------+---------+-----------------------------+
|course_id|course_title               |course_published_dt|is_active|last_updated_ts              |
+---------+---------------------------+-------------------+---------+-----------------------------+
|1        |Mastering Python           |2021-01-14         |true     |2021-02-18T16:57:25.000+01:00|
|2        |Data Engineering Essentials|2021-02-10         |true     |2021-03-05T12:07:33.000+01:00|
|3        |Mastering Pyspark          |2021-01-07         |true     |2021-04-06T10:05:42.000+02:00|
|4        |AWS Essentials             |2021-03-19         |false    |2021-04-10T02:25:36.000+02:00|
|5        |Docker 101                 |2021-02-28         |true     |2021-03-21T07:18:52.000+01:00|
+---------+---------------------------+-------------------+---------+-----------------------------+



[('course_id', 'string'),
 ('course_title', 'string'),
 ('course_published_dt', 'string'),
 ('is_active', 'string'),
 ('last_updated_ts', 'string')]

In [25]:
spark.read.csv(output_path, header=True, inferSchema=True).show(truncate=False)
spark.read.csv(output_path, header=True, inferSchema=True).dtypes

+---------+---------------------------+-------------------+---------+-------------------+
|course_id|course_title               |course_published_dt|is_active|last_updated_ts    |
+---------+---------------------------+-------------------+---------+-------------------+
|1        |Mastering Python           |2021-01-14         |true     |2021-02-18 16:57:25|
|2        |Data Engineering Essentials|2021-02-10         |true     |2021-03-05 12:07:33|
|3        |Mastering Pyspark          |2021-01-07         |true     |2021-04-06 10:05:42|
|4        |AWS Essentials             |2021-03-19         |false    |2021-04-10 02:25:36|
|5        |Docker 101                 |2021-02-28         |true     |2021-03-21 07:18:52|
+---------+---------------------------+-------------------+---------+-------------------+



[('course_id', 'int'),
 ('course_title', 'string'),
 ('course_published_dt', 'date'),
 ('is_active', 'boolean'),
 ('last_updated_ts', 'timestamp')]

In [26]:
spark.read.csv(output_path, header=True, inferSchema=True).columns

['course_id',
 'course_title',
 'course_published_dt',
 'is_active',
 'last_updated_ts']

## Using compression while writting files from Spark Data Frame

In [28]:
# compress using gzip
courses_df. \
    coalesce(1).\
    write. \
    format('csv'). \
    save(output_path, header=True, mode='overwrite', compression='gzip',)

In [31]:
spark.read.csv(output_path, header=True, inferSchema=True).show()
spark.read.csv(output_path, header=True, inferSchema=True).dtypes

+---------+--------------------+-------------------+---------+-------------------+
|course_id|        course_title|course_published_dt|is_active|    last_updated_ts|
+---------+--------------------+-------------------+---------+-------------------+
|        1|    Mastering Python|         2021-01-14|     true|2021-02-18 16:57:25|
|        2|Data Engineering ...|         2021-02-10|     true|2021-03-05 12:07:33|
|        3|   Mastering Pyspark|         2021-01-07|     true|2021-04-06 10:05:42|
|        4|      AWS Essentials|         2021-03-19|    false|2021-04-10 02:25:36|
|        5|          Docker 101|         2021-02-28|     true|2021-03-21 07:18:52|
+---------+--------------------+-------------------+---------+-------------------+



[('course_id', 'int'),
 ('course_title', 'string'),
 ('course_published_dt', 'date'),
 ('is_active', 'boolean'),
 ('last_updated_ts', 'timestamp')]

In [32]:
#checking snappy for CSV
courses_df. \
    coalesce(1).\
    write. \
    format('csv'). \
    save(output_path, header=True, mode='overwrite', compression='snappy',)

                                                                                

In [33]:
spark.read.csv(output_path, header=True, inferSchema=True).show()
spark.read.csv(output_path, header=True, inferSchema=True).dtypes

+---------+--------------------+-------------------+---------+-------------------+
|course_id|        course_title|course_published_dt|is_active|    last_updated_ts|
+---------+--------------------+-------------------+---------+-------------------+
|        1|    Mastering Python|         2021-01-14|     true|2021-02-18 16:57:25|
|        2|Data Engineering ...|         2021-02-10|     true|2021-03-05 12:07:33|
|        3|   Mastering Pyspark|         2021-01-07|     true|2021-04-06 10:05:42|
|        4|      AWS Essentials|         2021-03-19|    false|2021-04-10 02:25:36|
|        5|          Docker 101|         2021-02-28|     true|2021-03-21 07:18:52|
+---------+--------------------+-------------------+---------+-------------------+



[('course_id', 'int'),
 ('course_title', 'string'),
 ('course_published_dt', 'date'),
 ('is_active', 'boolean'),
 ('last_updated_ts', 'timestamp')]

## Specifying delimiter while writting Spark Data Frame into files

In [38]:
input_path = '/Users/adhoc/git/retail_db/orders'
orders = spark.read.csv(input_path)
orders.show()
orders.dtypes

+---+--------------------+-----+---------------+
|_c0|                 _c1|  _c2|            _c3|
+---+--------------------+-----+---------------+
|  1|2013-07-25 00:00:...|11599|         CLOSED|
|  2|2013-07-25 00:00:...|  256|PENDING_PAYMENT|
|  3|2013-07-25 00:00:...|12111|       COMPLETE|
|  4|2013-07-25 00:00:...| 8827|         CLOSED|
|  5|2013-07-25 00:00:...|11318|       COMPLETE|
|  6|2013-07-25 00:00:...| 7130|       COMPLETE|
|  7|2013-07-25 00:00:...| 4530|       COMPLETE|
|  8|2013-07-25 00:00:...| 2911|     PROCESSING|
|  9|2013-07-25 00:00:...| 5657|PENDING_PAYMENT|
| 10|2013-07-25 00:00:...| 5648|PENDING_PAYMENT|
| 11|2013-07-25 00:00:...|  918| PAYMENT_REVIEW|
| 12|2013-07-25 00:00:...| 1837|         CLOSED|
| 13|2013-07-25 00:00:...| 9149|PENDING_PAYMENT|
| 14|2013-07-25 00:00:...| 9842|     PROCESSING|
| 15|2013-07-25 00:00:...| 2568|       COMPLETE|
| 16|2013-07-25 00:00:...| 7276|PENDING_PAYMENT|
| 17|2013-07-25 00:00:...| 2667|       COMPLETE|
| 18|2013-07-25 00:0

[('_c0', 'string'), ('_c1', 'string'), ('_c2', 'string'), ('_c3', 'string')]

In [41]:
output_path = '/Users/adhoc/git/retail_db/csv/pipe/orders'
orders.coalesce(1).write.mode('overwrite').csv(output_path, sep='|')

In [46]:
spark.read.csv(output_path).show(truncate=False)
spark.read.csv(output_path, sep='|').show(truncate=False)
spark.read.csv(output_path, sep='|').dtypes

+---------------------------------------------+
|_c0                                          |
+---------------------------------------------+
|1|2013-07-25 00:00:00.0|11599|CLOSED         |
|2|2013-07-25 00:00:00.0|256|PENDING_PAYMENT  |
|3|2013-07-25 00:00:00.0|12111|COMPLETE       |
|4|2013-07-25 00:00:00.0|8827|CLOSED          |
|5|2013-07-25 00:00:00.0|11318|COMPLETE       |
|6|2013-07-25 00:00:00.0|7130|COMPLETE        |
|7|2013-07-25 00:00:00.0|4530|COMPLETE        |
|8|2013-07-25 00:00:00.0|2911|PROCESSING      |
|9|2013-07-25 00:00:00.0|5657|PENDING_PAYMENT |
|10|2013-07-25 00:00:00.0|5648|PENDING_PAYMENT|
|11|2013-07-25 00:00:00.0|918|PAYMENT_REVIEW  |
|12|2013-07-25 00:00:00.0|1837|CLOSED         |
|13|2013-07-25 00:00:00.0|9149|PENDING_PAYMENT|
|14|2013-07-25 00:00:00.0|9842|PROCESSING     |
|15|2013-07-25 00:00:00.0|2568|COMPLETE       |
|16|2013-07-25 00:00:00.0|7276|PENDING_PAYMENT|
|17|2013-07-25 00:00:00.0|2667|COMPLETE       |
|18|2013-07-25 00:00:00.0|1205|CLOSED   

[('_c0', 'string'), ('_c1', 'string'), ('_c2', 'string'), ('_c3', 'string')]

In [52]:
input_path = '/Users/adhoc/git/retail_db/csv/pipe/orders'
schema = """
    order_id INT,
    order_date TIMESTAMP,
    order_customer_id INT,
    order_status STRING
"""
spark.read.csv(input_path, schema=schema).show(truncate=False)
spark.read.csv(input_path, sep='|', schema=schema).show()
spark.read.csv(input_path, sep='|', schema=schema).dtypes

+--------+----------+-----------------+------------+
|order_id|order_date|order_customer_id|order_status|
+--------+----------+-----------------+------------+
|NULL    |NULL      |NULL             |NULL        |
|NULL    |NULL      |NULL             |NULL        |
|NULL    |NULL      |NULL             |NULL        |
|NULL    |NULL      |NULL             |NULL        |
|NULL    |NULL      |NULL             |NULL        |
|NULL    |NULL      |NULL             |NULL        |
|NULL    |NULL      |NULL             |NULL        |
|NULL    |NULL      |NULL             |NULL        |
|NULL    |NULL      |NULL             |NULL        |
|NULL    |NULL      |NULL             |NULL        |
|NULL    |NULL      |NULL             |NULL        |
|NULL    |NULL      |NULL             |NULL        |
|NULL    |NULL      |NULL             |NULL        |
|NULL    |NULL      |NULL             |NULL        |
|NULL    |NULL      |NULL             |NULL        |
|NULL    |NULL      |NULL             |NULL   

[('order_id', 'int'),
 ('order_date', 'timestamp'),
 ('order_customer_id', 'int'),
 ('order_status', 'string')]

## Using options while writting Spark Data Frames into files

In [57]:
input_path = '/Users/adhoc/git/retail_db/orders'
output_path = '/Users/adhoc/git/retail_db/csv/pipe/orders'

schema = """
    order_id INT,
    order_date TIMESTAMP,
    order_customer_id INT,
    order_status STRING
"""
orders = spark.read.csv(input_path, schema=schema)
orders.show()
orders.dtypes

+--------+-------------------+-----------------+---------------+
|order_id|         order_date|order_customer_id|   order_status|
+--------+-------------------+-----------------+---------------+
|       1|2013-07-25 00:00:00|            11599|         CLOSED|
|       2|2013-07-25 00:00:00|              256|PENDING_PAYMENT|
|       3|2013-07-25 00:00:00|            12111|       COMPLETE|
|       4|2013-07-25 00:00:00|             8827|         CLOSED|
|       5|2013-07-25 00:00:00|            11318|       COMPLETE|
|       6|2013-07-25 00:00:00|             7130|       COMPLETE|
|       7|2013-07-25 00:00:00|             4530|       COMPLETE|
|       8|2013-07-25 00:00:00|             2911|     PROCESSING|
|       9|2013-07-25 00:00:00|             5657|PENDING_PAYMENT|
|      10|2013-07-25 00:00:00|             5648|PENDING_PAYMENT|
|      11|2013-07-25 00:00:00|              918| PAYMENT_REVIEW|
|      12|2013-07-25 00:00:00|             1837|         CLOSED|
|      13|2013-07-25 00:0

[('order_id', 'int'),
 ('order_date', 'timestamp'),
 ('order_customer_id', 'int'),
 ('order_status', 'string')]

In [67]:
# there is no option for mode so it needs to be specified as mode function or csv argument
# if mode is specified as option it will be ignored
orders. \
    coalesce(1). \
    write. \
    mode('overwrite'). \
    option('sep', '|'). \
    option('compression', 'gzip'). \
    option('header', True). \
    csv(output_path)
orders. \
    coalesce(1). \
    write. \
    mode('overwrite'). \
    option('header', True). \
    csv(output_path, sep='|', compression='gzip', header=True)

In [68]:
spark.read.csv(output_path, sep='|', header=True, inferSchema=True).show()
spark.read.csv(output_path, sep='|', header=True, inferSchema=True).dtypes

+--------+-------------------+-----------------+---------------+
|order_id|         order_date|order_customer_id|   order_status|
+--------+-------------------+-----------------+---------------+
|       1|2013-07-25 00:00:00|            11599|         CLOSED|
|       2|2013-07-25 00:00:00|              256|PENDING_PAYMENT|
|       3|2013-07-25 00:00:00|            12111|       COMPLETE|
|       4|2013-07-25 00:00:00|             8827|         CLOSED|
|       5|2013-07-25 00:00:00|            11318|       COMPLETE|
|       6|2013-07-25 00:00:00|             7130|       COMPLETE|
|       7|2013-07-25 00:00:00|             4530|       COMPLETE|
|       8|2013-07-25 00:00:00|             2911|     PROCESSING|
|       9|2013-07-25 00:00:00|             5657|PENDING_PAYMENT|
|      10|2013-07-25 00:00:00|             5648|PENDING_PAYMENT|
|      11|2013-07-25 00:00:00|              918| PAYMENT_REVIEW|
|      12|2013-07-25 00:00:00|             1837|         CLOSED|
|      13|2013-07-25 00:0

[('order_id', 'int'),
 ('order_date', 'timestamp'),
 ('order_customer_id', 'int'),
 ('order_status', 'string')]

In [69]:
orders. \
    coalesce(1). \
    write. \
    mode('overwrite'). \
    options(sep='|', compression='gzip', header=True). \
    csv(output_path)

In [70]:
spark.read.csv(output_path, sep='|', header=True, inferSchema=True).show()
spark.read.csv(output_path, sep='|', header=True, inferSchema=True).dtypes

+--------+-------------------+-----------------+---------------+
|order_id|         order_date|order_customer_id|   order_status|
+--------+-------------------+-----------------+---------------+
|       1|2013-07-25 00:00:00|            11599|         CLOSED|
|       2|2013-07-25 00:00:00|              256|PENDING_PAYMENT|
|       3|2013-07-25 00:00:00|            12111|       COMPLETE|
|       4|2013-07-25 00:00:00|             8827|         CLOSED|
|       5|2013-07-25 00:00:00|            11318|       COMPLETE|
|       6|2013-07-25 00:00:00|             7130|       COMPLETE|
|       7|2013-07-25 00:00:00|             4530|       COMPLETE|
|       8|2013-07-25 00:00:00|             2911|     PROCESSING|
|       9|2013-07-25 00:00:00|             5657|PENDING_PAYMENT|
|      10|2013-07-25 00:00:00|             5648|PENDING_PAYMENT|
|      11|2013-07-25 00:00:00|              918| PAYMENT_REVIEW|
|      12|2013-07-25 00:00:00|             1837|         CLOSED|
|      13|2013-07-25 00:0

[('order_id', 'int'),
 ('order_date', 'timestamp'),
 ('order_customer_id', 'int'),
 ('order_status', 'string')]

In [77]:
options = {
    'sep': '|',
    'inferSchema': True,
    'header': True,
    'compression': 'snappy'
}

orders.coalesce(1).write.mode('overwrite').options(**options).csv(output_path)

In [78]:
spark.read.csv(output_path, sep='|', header=True, inferSchema=True).show()
spark.read.csv(output_path, sep='|', header=True, inferSchema=True).dtypes

+--------+-------------------+-----------------+---------------+
|order_id|         order_date|order_customer_id|   order_status|
+--------+-------------------+-----------------+---------------+
|       1|2013-07-25 00:00:00|            11599|         CLOSED|
|       2|2013-07-25 00:00:00|              256|PENDING_PAYMENT|
|       3|2013-07-25 00:00:00|            12111|       COMPLETE|
|       4|2013-07-25 00:00:00|             8827|         CLOSED|
|       5|2013-07-25 00:00:00|            11318|       COMPLETE|
|       6|2013-07-25 00:00:00|             7130|       COMPLETE|
|       7|2013-07-25 00:00:00|             4530|       COMPLETE|
|       8|2013-07-25 00:00:00|             2911|     PROCESSING|
|       9|2013-07-25 00:00:00|             5657|PENDING_PAYMENT|
|      10|2013-07-25 00:00:00|             5648|PENDING_PAYMENT|
|      11|2013-07-25 00:00:00|              918| PAYMENT_REVIEW|
|      12|2013-07-25 00:00:00|             1837|         CLOSED|
|      13|2013-07-25 00:0

[('order_id', 'int'),
 ('order_date', 'timestamp'),
 ('order_customer_id', 'int'),
 ('order_status', 'string')]

## Write JSON files from Spark Data Frame

In [79]:
import datetime
from pyspark.sql import Row

courses = [
    {
        'course_id': 1,
        'course_title': 'Mastering Python',
        'course_published_dt': datetime.date(2021, 1, 14),
        'is_active': True,
        'last_updated_ts': datetime.datetime(2021, 2, 18, 16, 57, 25)
    },
    {
        'course_id': 2,
        'course_title': 'Data Engineering Essentials',
        'course_published_dt': datetime.date(2021, 2, 10),
        'is_active': True,
        'last_updated_ts': datetime.datetime(2021, 3, 5, 12, 7, 33)
    },
    {
        'course_id': 3,
        'course_title': 'Mastering Pyspark',
        'course_published_dt': datetime.date(2021, 1, 7),
        'is_active': True,
        'last_updated_ts': datetime.datetime(2021, 4, 6, 10, 5, 42)
    },
    {
        'course_id': 4,
        'course_title': 'AWS Essentials',
        'course_published_dt': datetime.date(2021, 3, 19),
        'is_active': False,
        'last_updated_ts': datetime.datetime(2021, 4, 10, 2, 25, 36)
    },
    {
        'course_id': 5,
        'course_title': 'Docker 101',
        'course_published_dt': datetime.date(2021, 2, 28),
        'is_active': True,
        'last_updated_ts': datetime.datetime(2021, 3, 21, 7, 18, 52)
    }
]

#both work
courses_df = spark.createDataFrame([Row(**course) for course in courses]) # maintained column order
#courses_df = spark.createDataFrame(courses)
courses_df.printSchema()
courses_df.show()

root
 |-- course_id: long (nullable = true)
 |-- course_title: string (nullable = true)
 |-- course_published_dt: date (nullable = true)
 |-- is_active: boolean (nullable = true)
 |-- last_updated_ts: timestamp (nullable = true)

+---------+--------------------+-------------------+---------+-------------------+
|course_id|        course_title|course_published_dt|is_active|    last_updated_ts|
+---------+--------------------+-------------------+---------+-------------------+
|        1|    Mastering Python|         2021-01-14|     true|2021-02-18 16:57:25|
|        2|Data Engineering ...|         2021-02-10|     true|2021-03-05 12:07:33|
|        3|   Mastering Pyspark|         2021-01-07|     true|2021-04-06 10:05:42|
|        4|      AWS Essentials|         2021-03-19|    false|2021-04-10 02:25:36|
|        5|          Docker 101|         2021-02-28|     true|2021-03-21 07:18:52|
+---------+--------------------+-------------------+---------+-------------------+



In [80]:
json_path = '/Users/adhoc/git/retail_db/json/courses'
courses_df.coalesce(1).write.json(json_path, mode='overwrite')

In [83]:
spark.read.json(json_path).show()
spark.read.json(json_path).dtypes, spark.read.json(json_path).inputFiles()

+---------+-------------------+--------------------+---------+--------------------+
|course_id|course_published_dt|        course_title|is_active|     last_updated_ts|
+---------+-------------------+--------------------+---------+--------------------+
|        1|         2021-01-14|    Mastering Python|     true|2021-02-18T16:57:...|
|        2|         2021-02-10|Data Engineering ...|     true|2021-03-05T12:07:...|
|        3|         2021-01-07|   Mastering Pyspark|     true|2021-04-06T10:05:...|
|        4|         2021-03-19|      AWS Essentials|    false|2021-04-10T02:25:...|
|        5|         2021-02-28|          Docker 101|     true|2021-03-21T07:18:...|
+---------+-------------------+--------------------+---------+--------------------+



([('course_id', 'bigint'),
  ('course_published_dt', 'string'),
  ('course_title', 'string'),
  ('is_active', 'boolean'),
  ('last_updated_ts', 'string')],
 ['file:///Users/adhoc/git/retail_db/json/courses/part-00000-41eec3cf-3911-4850-964d-4a5aba5f810a-c000.json'])

In [85]:
courses_df.coalesce(1).write.format('json').save(json_path, mode='overwrite')

In [86]:
spark.read.json(json_path).show()
spark.read.json(json_path).dtypes, spark.read.json(json_path).inputFiles()

+---------+-------------------+--------------------+---------+--------------------+
|course_id|course_published_dt|        course_title|is_active|     last_updated_ts|
+---------+-------------------+--------------------+---------+--------------------+
|        1|         2021-01-14|    Mastering Python|     true|2021-02-18T16:57:...|
|        2|         2021-02-10|Data Engineering ...|     true|2021-03-05T12:07:...|
|        3|         2021-01-07|   Mastering Pyspark|     true|2021-04-06T10:05:...|
|        4|         2021-03-19|      AWS Essentials|    false|2021-04-10T02:25:...|
|        5|         2021-02-28|          Docker 101|     true|2021-03-21T07:18:...|
+---------+-------------------+--------------------+---------+--------------------+



([('course_id', 'bigint'),
  ('course_published_dt', 'string'),
  ('course_title', 'string'),
  ('is_active', 'boolean'),
  ('last_updated_ts', 'string')],
 ['file:///Users/adhoc/git/retail_db/json/courses/part-00000-1d122581-e16a-4d94-a6e5-0c87f5d8bf5e-c000.json'])

## Compression while writting JSON files from Spark Data frames

In [90]:
courses_df.coalesce(1).write.json(json_path, mode='overwrite', compression='gzip')

In [91]:
spark.read.json(json_path).show()
spark.read.json(json_path).dtypes, spark.read.json(json_path).inputFiles()

+---------+-------------------+--------------------+---------+--------------------+
|course_id|course_published_dt|        course_title|is_active|     last_updated_ts|
+---------+-------------------+--------------------+---------+--------------------+
|        1|         2021-01-14|    Mastering Python|     true|2021-02-18T16:57:...|
|        2|         2021-02-10|Data Engineering ...|     true|2021-03-05T12:07:...|
|        3|         2021-01-07|   Mastering Pyspark|     true|2021-04-06T10:05:...|
|        4|         2021-03-19|      AWS Essentials|    false|2021-04-10T02:25:...|
|        5|         2021-02-28|          Docker 101|     true|2021-03-21T07:18:...|
+---------+-------------------+--------------------+---------+--------------------+



([('course_id', 'bigint'),
  ('course_published_dt', 'string'),
  ('course_title', 'string'),
  ('is_active', 'boolean'),
  ('last_updated_ts', 'string')],
 ['file:///Users/adhoc/git/retail_db/json/courses/part-00000-8bc8af52-0262-4ebb-8c25-054edff0ffe1-c000.json.gz'])

In [94]:
courses_df.coalesce(1).write.format('json').save(json_path, mode='overwrite', compression='snappy')

In [95]:
spark.read.json(json_path).show()
spark.read.json(json_path).dtypes, spark.read.json(json_path).inputFiles()

+---------+-------------------+--------------------+---------+--------------------+
|course_id|course_published_dt|        course_title|is_active|     last_updated_ts|
+---------+-------------------+--------------------+---------+--------------------+
|        1|         2021-01-14|    Mastering Python|     true|2021-02-18T16:57:...|
|        2|         2021-02-10|Data Engineering ...|     true|2021-03-05T12:07:...|
|        3|         2021-01-07|   Mastering Pyspark|     true|2021-04-06T10:05:...|
|        4|         2021-03-19|      AWS Essentials|    false|2021-04-10T02:25:...|
|        5|         2021-02-28|          Docker 101|     true|2021-03-21T07:18:...|
+---------+-------------------+--------------------+---------+--------------------+



([('course_id', 'bigint'),
  ('course_published_dt', 'string'),
  ('course_title', 'string'),
  ('is_active', 'boolean'),
  ('last_updated_ts', 'string')],
 ['file:///Users/adhoc/git/retail_db/json/courses/part-00000-936399bb-c7e8-460b-94fb-e996670e5138-c000.json.snappy'])

## Writting Spark Data Frames into parquet files

In [100]:
# lzo, lz4, snappy (default), gzip, zstd, brotli, uncompressed 

parquet_path = '/Users/adhoc/git/retail_db/parquet/courses'
courses_df.coalesce(1).write.parquet(parquet_path, mode='overwrite')

In [101]:
spark.read.parquet(parquet_path).show()
spark.read.parquet(parquet_path).dtypes, spark.read.parquet(parquet_path).inputFiles()

+---------+--------------------+-------------------+---------+-------------------+
|course_id|        course_title|course_published_dt|is_active|    last_updated_ts|
+---------+--------------------+-------------------+---------+-------------------+
|        1|    Mastering Python|         2021-01-14|     true|2021-02-18 16:57:25|
|        2|Data Engineering ...|         2021-02-10|     true|2021-03-05 12:07:33|
|        3|   Mastering Pyspark|         2021-01-07|     true|2021-04-06 10:05:42|
|        4|      AWS Essentials|         2021-03-19|    false|2021-04-10 02:25:36|
|        5|          Docker 101|         2021-02-28|     true|2021-03-21 07:18:52|
+---------+--------------------+-------------------+---------+-------------------+



([('course_id', 'bigint'),
  ('course_title', 'string'),
  ('course_published_dt', 'date'),
  ('is_active', 'boolean'),
  ('last_updated_ts', 'timestamp')],
 ['file:///Users/adhoc/git/retail_db/parquet/courses/part-00000-8e22f48f-0680-437c-a72a-1adfd5274dec-c000.snappy.parquet'])

In [103]:
courses_df.coalesce(1).write.format('parquet').save(parquet_path, mode='overwrite')

In [104]:
spark.read.parquet(parquet_path).show()
spark.read.parquet(parquet_path).dtypes, spark.read.parquet(parquet_path).inputFiles()

+---------+--------------------+-------------------+---------+-------------------+
|course_id|        course_title|course_published_dt|is_active|    last_updated_ts|
+---------+--------------------+-------------------+---------+-------------------+
|        1|    Mastering Python|         2021-01-14|     true|2021-02-18 16:57:25|
|        2|Data Engineering ...|         2021-02-10|     true|2021-03-05 12:07:33|
|        3|   Mastering Pyspark|         2021-01-07|     true|2021-04-06 10:05:42|
|        4|      AWS Essentials|         2021-03-19|    false|2021-04-10 02:25:36|
|        5|          Docker 101|         2021-02-28|     true|2021-03-21 07:18:52|
+---------+--------------------+-------------------+---------+-------------------+



([('course_id', 'bigint'),
  ('course_title', 'string'),
  ('course_published_dt', 'date'),
  ('is_active', 'boolean'),
  ('last_updated_ts', 'timestamp')],
 ['file:///Users/adhoc/git/retail_db/parquet/courses/part-00000-fd60c8e3-4286-4e7e-876e-390a045492e0-c000.snappy.parquet'])

## Compression while writtng Spark Data Frames into parquet files

In [105]:
spark.conf.get('spark.sql.parquet.compression.codec')

'snappy'

In [116]:
courses_df.coalesce(1).write.parquet(parquet_path, mode='overwrite', compression='none')

In [117]:
spark.read.parquet(parquet_path).show()
spark.read.parquet(parquet_path).dtypes, spark.read.parquet(parquet_path).inputFiles()

+---------+--------------------+-------------------+---------+-------------------+
|course_id|        course_title|course_published_dt|is_active|    last_updated_ts|
+---------+--------------------+-------------------+---------+-------------------+
|        1|    Mastering Python|         2021-01-14|     true|2021-02-18 16:57:25|
|        2|Data Engineering ...|         2021-02-10|     true|2021-03-05 12:07:33|
|        3|   Mastering Pyspark|         2021-01-07|     true|2021-04-06 10:05:42|
|        4|      AWS Essentials|         2021-03-19|    false|2021-04-10 02:25:36|
|        5|          Docker 101|         2021-02-28|     true|2021-03-21 07:18:52|
+---------+--------------------+-------------------+---------+-------------------+



([('course_id', 'bigint'),
  ('course_title', 'string'),
  ('course_published_dt', 'date'),
  ('is_active', 'boolean'),
  ('last_updated_ts', 'timestamp')],
 ['file:///Users/adhoc/git/retail_db/parquet/courses/part-00000-168a32ca-03b7-423b-9c97-ab816594b557-c000.parquet'])

In [118]:
courses_df.coalesce(1).write.format('parquet').save(parquet_path, mode='overwrite', compression='gzip')

In [123]:
spark.read.parquet(parquet_path).show()
spark.read.parquet(parquet_path).dtypes, spark.read.parquet(parquet_path).inputFiles()

+---------+--------------------+-------------------+---------+-------------------+
|course_id|        course_title|course_published_dt|is_active|    last_updated_ts|
+---------+--------------------+-------------------+---------+-------------------+
|        1|    Mastering Python|         2021-01-14|     true|2021-02-18 16:57:25|
|        2|Data Engineering ...|         2021-02-10|     true|2021-03-05 12:07:33|
|        3|   Mastering Pyspark|         2021-01-07|     true|2021-04-06 10:05:42|
|        4|      AWS Essentials|         2021-03-19|    false|2021-04-10 02:25:36|
|        5|          Docker 101|         2021-02-28|     true|2021-03-21 07:18:52|
+---------+--------------------+-------------------+---------+-------------------+



([('course_id', 'bigint'),
  ('course_title', 'string'),
  ('course_published_dt', 'date'),
  ('is_active', 'boolean'),
  ('last_updated_ts', 'timestamp')],
 ['file:///Users/adhoc/git/retail_db/parquet/courses/part-00000-2c4e4cc1-4f83-4918-9479-44caad90d3d2-c000.parquet'])

In [121]:
spark.conf.set('spark.sql.parquet.compression.codec', 'None')
spark.conf.get('spark.sql.parquet.compression.codec')

'None'

In [122]:
courses_df.coalesce(1).write.format('parquet').save(parquet_path, mode='overwrite')

In [124]:
spark.read.parquet(parquet_path).show()
spark.read.parquet(parquet_path).dtypes, spark.read.parquet(parquet_path).inputFiles()

+---------+--------------------+-------------------+---------+-------------------+
|course_id|        course_title|course_published_dt|is_active|    last_updated_ts|
+---------+--------------------+-------------------+---------+-------------------+
|        1|    Mastering Python|         2021-01-14|     true|2021-02-18 16:57:25|
|        2|Data Engineering ...|         2021-02-10|     true|2021-03-05 12:07:33|
|        3|   Mastering Pyspark|         2021-01-07|     true|2021-04-06 10:05:42|
|        4|      AWS Essentials|         2021-03-19|    false|2021-04-10 02:25:36|
|        5|          Docker 101|         2021-02-28|     true|2021-03-21 07:18:52|
+---------+--------------------+-------------------+---------+-------------------+



([('course_id', 'bigint'),
  ('course_title', 'string'),
  ('course_published_dt', 'date'),
  ('is_active', 'boolean'),
  ('last_updated_ts', 'timestamp')],
 ['file:///Users/adhoc/git/retail_db/parquet/courses/part-00000-2c4e4cc1-4f83-4918-9479-44caad90d3d2-c000.parquet'])

In [125]:
spark.conf.set('spark.sql.parquet.compression.codec', 'snappy')
spark.conf.get('spark.sql.parquet.compression.codec')

'snappy'

## Different modes for writting parquet files from Spark Data Frames

In [130]:
# parquet modes: append, overwrite, error (default), ignore
courses_df.coalesce(1).write.mode('overwrite').parquet(parquet_path)
courses_df.coalesce(1).write.parquet(parquet_path, mode='overwrite')
courses_df.coalesce(1).write.mode('overwrite').format('parquet').save(parquet_path)
courses_df.coalesce(1).write.format('parquet').save(parquet_path, mode='overwrite')

In [134]:
spark.read.parquet(parquet_path).show()
spark.read.parquet(parquet_path).dtypes, \
spark.read.parquet(parquet_path).inputFiles(), \
spark.read.parquet(parquet_path).count()

+---------+--------------------+-------------------+---------+-------------------+
|course_id|        course_title|course_published_dt|is_active|    last_updated_ts|
+---------+--------------------+-------------------+---------+-------------------+
|        1|    Mastering Python|         2021-01-14|     true|2021-02-18 16:57:25|
|        2|Data Engineering ...|         2021-02-10|     true|2021-03-05 12:07:33|
|        3|   Mastering Pyspark|         2021-01-07|     true|2021-04-06 10:05:42|
|        4|      AWS Essentials|         2021-03-19|    false|2021-04-10 02:25:36|
|        5|          Docker 101|         2021-02-28|     true|2021-03-21 07:18:52|
+---------+--------------------+-------------------+---------+-------------------+



([('course_id', 'bigint'),
  ('course_title', 'string'),
  ('course_published_dt', 'date'),
  ('is_active', 'boolean'),
  ('last_updated_ts', 'timestamp')],
 ['file:///Users/adhoc/git/retail_db/parquet/courses/part-00000-e13cf42d-e65a-4bbd-badf-5a2126a2295e-c000.snappy.parquet'],
 5)

## Coalesce and repartitioning of Spark Data Frames
* `coalesce` and `repartitioning` are function defined on top of `DataFrame`
* `coalesce` is typically used to **reduce number of partitions** to deal with as part of downstream processes
* `repartition` is typically used to reshuffle the **data to higher or lower number** of partitions to deal with as part of downstream processes
----------
* `repartition` incurs **shuffling** and it takes time as the data has to be shuffled to newer number of partitions
* you can repartition the data frame on the specified columns
* `coalesce` does not incur shuffling
* `coalesce` is used quite often before writting the data fewer number of files

In [136]:
courses_df.rdd.getNumPartitions()

10

In [140]:
# you cannot increase number of partitions with coalesce (no shuffling)
# if the number is higher than existing number of partitions it is ignored
courses_df.coalesce(5).rdd.getNumPartitions(), \
courses_df.coalesce(15).rdd.getNumPartitions()

(5, 10)

In [153]:
## repartition incurs shuffling
# number of unique values for the repartitioning columns matters
courses_df.repartition(5).rdd.getNumPartitions(), \
courses_df.repartition(15).rdd.getNumPartitions(), \
courses_df.repartition(15, 'course_published_dt').rdd.getNumPartitions()

(5, 15, 15)