## Spark Read APIs

* `spark` has a bunch of APIs to read data from files of different formats.

* All APIs are exposed under `spark.read`

    * `text` - To read single column data from text files as well as reading each of the whole text file as one record.
    
    * `csv`- To read text files with delimiters. Default is a `comma`, but we can use other delimiters as well.
    
    * `json` - To read data from JSON files
    
    * `orc` - To read data from ORC files
    
    * `parquet` - To read data from Parquet files

* We can also read data from other file formats by plugging in and by using `spark.read.format` but we will also need to use `load` API to pass the `path` of the file.

* We can also pass `option / options` based on the file formats.

    * `inferSchema` - To infer the Data Types of the columns based on the data.
    
    * `header` - To use header to get the column names in case of text files.
    
    * `schema` - To explicitly specify the schema.

## Preview Data & Schema

* `show` method can be used to preview the data. It will typically show first `20` records where output is truncated.

* We can pass number of records and set `truncate` to `false` while previewing the data.

* `printSchema` can be used to get the schema details.

* `describe` can be used to get statistics out of our data.

### Read CSV Files

In [1]:
val orders = spark.read.csv("data/retail_db/orders/part-00000.csv")

Intitializing Scala interpreter ...

Spark Web UI available at http://192.168.1.138:4043
SparkContext available as 'sc' (version = 3.3.0, master = local[*], app id = local-1670441216901)
SparkSession available as 'spark'


orders: org.apache.spark.sql.DataFrame = [_c0: string, _c1: string ... 2 more fields]


In [2]:
orders.show(truncate=false)

+---+---------------------+-----+---------------+
|_c0|_c1                  |_c2  |_c3            |
+---+---------------------+-----+---------------+
|1  |2013-07-25 00:00:00.0|11599|CLOSED         |
|2  |2013-07-25 00:00:00.0|256  |PENDING_PAYMENT|
|3  |2013-07-25 00:00:00.0|12111|COMPLETE       |
|4  |2013-07-25 00:00:00.0|8827 |CLOSED         |
|5  |2013-07-25 00:00:00.0|11318|COMPLETE       |
|6  |2013-07-25 00:00:00.0|7130 |COMPLETE       |
|7  |2013-07-25 00:00:00.0|4530 |COMPLETE       |
|8  |2013-07-25 00:00:00.0|2911 |PROCESSING     |
|9  |2013-07-25 00:00:00.0|5657 |PENDING_PAYMENT|
|10 |2013-07-25 00:00:00.0|5648 |PENDING_PAYMENT|
|11 |2013-07-25 00:00:00.0|918  |PAYMENT_REVIEW |
|12 |2013-07-25 00:00:00.0|1837 |CLOSED         |
|13 |2013-07-25 00:00:00.0|9149 |PENDING_PAYMENT|
|14 |2013-07-25 00:00:00.0|9842 |PROCESSING     |
|15 |2013-07-25 00:00:00.0|2568 |COMPLETE       |
|16 |2013-07-25 00:00:00.0|7276 |PENDING_PAYMENT|
|17 |2013-07-25 00:00:00.0|2667 |COMPLETE       |


In [3]:
orders.show(5, truncate=false)

+---+---------------------+-----+---------------+
|_c0|_c1                  |_c2  |_c3            |
+---+---------------------+-----+---------------+
|1  |2013-07-25 00:00:00.0|11599|CLOSED         |
|2  |2013-07-25 00:00:00.0|256  |PENDING_PAYMENT|
|3  |2013-07-25 00:00:00.0|12111|COMPLETE       |
|4  |2013-07-25 00:00:00.0|8827 |CLOSED         |
|5  |2013-07-25 00:00:00.0|11318|COMPLETE       |
+---+---------------------+-----+---------------+
only showing top 5 rows



In [4]:
orders.printSchema

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)



In [5]:
orders.describe().show(truncate=false)

+-------+------------------+---------------------+-----------------+---------------+
|summary|_c0               |_c1                  |_c2              |_c3            |
+-------+------------------+---------------------+-----------------+---------------+
|count  |68883             |68883                |68883            |68883          |
|mean   |34442.0           |null                 |6216.571098819738|null           |
|stddev |19884.953633337947|null                 |3586.205241263963|null           |
|min    |1                 |2013-07-25 00:00:00.0|1                |CANCELED       |
|max    |9999              |2014-07-24 00:00:00.0|9999             |SUSPECTED_FRAUD|
+-------+------------------+---------------------+-----------------+---------------+



#### **inferSchema**

It comes under option while reading CSV file which will give appropriate DataTypes corresponding to each columns.

In [6]:
val orders = spark.read.option("inferSchema", "true")
                       .csv("data/retail_db/orders/part-00000.csv")

orders: org.apache.spark.sql.DataFrame = [_c0: int, _c1: timestamp ... 2 more fields]


In [7]:
orders.show(5, truncate=false)

+---+-------------------+-----+---------------+
|_c0|_c1                |_c2  |_c3            |
+---+-------------------+-----+---------------+
|1  |2013-07-25 00:00:00|11599|CLOSED         |
|2  |2013-07-25 00:00:00|256  |PENDING_PAYMENT|
|3  |2013-07-25 00:00:00|12111|COMPLETE       |
|4  |2013-07-25 00:00:00|8827 |CLOSED         |
|5  |2013-07-25 00:00:00|11318|COMPLETE       |
+---+-------------------+-----+---------------+
only showing top 5 rows



In [8]:
orders.printSchema

root
 |-- _c0: integer (nullable = true)
 |-- _c1: timestamp (nullable = true)
 |-- _c2: integer (nullable = true)
 |-- _c3: string (nullable = true)



#### **Custom Schema**

We can customize the Header column as per our requirements using the method `schema`. We provide schema as a string in format `column_name data_type`

In [9]:
val orders = spark.read.schema("""order_id INT, 
                                  order_date TIMESTAMP,
                                  order_customer_id INT,
                                  order_status STRING""")
                       .csv("data/retail_db/orders/part-00000.csv")

orders: org.apache.spark.sql.DataFrame = [order_id: int, order_date: timestamp ... 2 more fields]


In [10]:
orders.show(5, truncate=false)

+--------+-------------------+-----------------+---------------+
|order_id|order_date         |order_customer_id|order_status   |
+--------+-------------------+-----------------+---------------+
|1       |2013-07-25 00:00:00|11599            |CLOSED         |
|2       |2013-07-25 00:00:00|256              |PENDING_PAYMENT|
|3       |2013-07-25 00:00:00|12111            |COMPLETE       |
|4       |2013-07-25 00:00:00|8827             |CLOSED         |
|5       |2013-07-25 00:00:00|11318            |COMPLETE       |
+--------+-------------------+-----------------+---------------+
only showing top 5 rows



In [11]:
orders.printSchema

root
 |-- order_id: integer (nullable = true)
 |-- order_date: timestamp (nullable = true)
 |-- order_customer_id: integer (nullable = true)
 |-- order_status: string (nullable = true)



#### **Pass Delimeter**

We can pass delimeter as an option while reading CSV file. We can use `sep` for this.

In [12]:
val orders = spark.read.schema("""order_id INT,
                                  order_date TIMESTAMP,
                                  order_customer_id INT,
                                  order_status STRING""")
                       .option("sep", ",")
                       .csv("data/retail_db/orders/part-00000.csv")

orders: org.apache.spark.sql.DataFrame = [order_id: int, order_date: timestamp ... 2 more fields]


In [13]:
orders.show(5, truncate=false)

+--------+-------------------+-----------------+---------------+
|order_id|order_date         |order_customer_id|order_status   |
+--------+-------------------+-----------------+---------------+
|1       |2013-07-25 00:00:00|11599            |CLOSED         |
|2       |2013-07-25 00:00:00|256              |PENDING_PAYMENT|
|3       |2013-07-25 00:00:00|12111            |COMPLETE       |
|4       |2013-07-25 00:00:00|8827             |CLOSED         |
|5       |2013-07-25 00:00:00|11318            |COMPLETE       |
+--------+-------------------+-----------------+---------------+
only showing top 5 rows



In [14]:
orders.printSchema

root
 |-- order_id: integer (nullable = true)
 |-- order_date: timestamp (nullable = true)
 |-- order_customer_id: integer (nullable = true)
 |-- order_status: string (nullable = true)



#### **Load Data with Format**

We can use `format` method to `load` any format of data.

In [15]:
val orders = spark.read.schema("""order_id INT,
                                  order_date TIMESTAMP,
                                  order_customer_id INT,
                                  order_status STRING""")
                       .option("sep", ",")
                       .format("csv")
                       .load("data/retail_db/orders/part-00000.csv")

orders: org.apache.spark.sql.DataFrame = [order_id: int, order_date: timestamp ... 2 more fields]


In [16]:
orders.show(5, truncate=false)

+--------+-------------------+-----------------+---------------+
|order_id|order_date         |order_customer_id|order_status   |
+--------+-------------------+-----------------+---------------+
|1       |2013-07-25 00:00:00|11599            |CLOSED         |
|2       |2013-07-25 00:00:00|256              |PENDING_PAYMENT|
|3       |2013-07-25 00:00:00|12111            |COMPLETE       |
|4       |2013-07-25 00:00:00|8827             |CLOSED         |
|5       |2013-07-25 00:00:00|11318            |COMPLETE       |
+--------+-------------------+-----------------+---------------+
only showing top 5 rows



In [17]:
orders.printSchema

root
 |-- order_id: integer (nullable = true)
 |-- order_date: timestamp (nullable = true)
 |-- order_customer_id: integer (nullable = true)
 |-- order_status: string (nullable = true)



### Read JSON Objects

* While reading JSON data from text files, we can directly infer schema from the data as Each JSON object contain both column name and value.

```json
{
    "order_id" : 1,
    "order_date" : "2013-07-25 00:00:00.0",
    "order_customer_id" : 12345,
    "order_status" : "COMPLETE"
}
```

In [18]:
val orders = spark.read.json("data/retail_db_json/orders/part-r-00000-990f5773-9005-49ba-b670-631286032674")

orders: org.apache.spark.sql.DataFrame = [order_customer_id: bigint, order_date: string ... 2 more fields]


In [19]:
orders.show(5, truncate=false)

+-----------------+---------------------+--------+---------------+
|order_customer_id|order_date           |order_id|order_status   |
+-----------------+---------------------+--------+---------------+
|11599            |2013-07-25 00:00:00.0|1       |CLOSED         |
|256              |2013-07-25 00:00:00.0|2       |PENDING_PAYMENT|
|12111            |2013-07-25 00:00:00.0|3       |COMPLETE       |
|8827             |2013-07-25 00:00:00.0|4       |CLOSED         |
|11318            |2013-07-25 00:00:00.0|5       |COMPLETE       |
+-----------------+---------------------+--------+---------------+
only showing top 5 rows



In [20]:
orders.printSchema

root
 |-- order_customer_id: long (nullable = true)
 |-- order_date: string (nullable = true)
 |-- order_id: long (nullable = true)
 |-- order_status: string (nullable = true)



#### **Custom Schema**

In [21]:
val orders = spark.read.schema("""order_id INT,
                                  order_date TIMESTAMP,
                                  order_customer_id INT,
                                  order_status STRING""")
                       .json("data/retail_db_json/orders/part-r-00000-990f5773-9005-49ba-b670-631286032674")

orders: org.apache.spark.sql.DataFrame = [order_id: int, order_date: timestamp ... 2 more fields]


In [22]:
orders.show(5, truncate=false)

+--------+-------------------+-----------------+---------------+
|order_id|order_date         |order_customer_id|order_status   |
+--------+-------------------+-----------------+---------------+
|1       |2013-07-25 00:00:00|11599            |CLOSED         |
|2       |2013-07-25 00:00:00|256              |PENDING_PAYMENT|
|3       |2013-07-25 00:00:00|12111            |COMPLETE       |
|4       |2013-07-25 00:00:00|8827             |CLOSED         |
|5       |2013-07-25 00:00:00|11318            |COMPLETE       |
+--------+-------------------+-----------------+---------------+
only showing top 5 rows



In [23]:
orders.printSchema

root
 |-- order_id: integer (nullable = true)
 |-- order_date: timestamp (nullable = true)
 |-- order_customer_id: integer (nullable = true)
 |-- order_status: string (nullable = true)



#### **inferSchema**

We should not specify `inferSchema` for best practises since Json can directly infer schema.

We should mention options very carefully since it doesn't give any error for our mistakes.

In [24]:
val orders = spark.read.option("inferSchema", "false")
                       .schema("""order_id INT,
                                  order_date TIMESTAMP,
                                  order_customer_id INT,
                                  order_status STRING""")
                       .json("data/retail_db_json/orders/part-r-00000-990f5773-9005-49ba-b670-631286032674")

orders: org.apache.spark.sql.DataFrame = [order_id: int, order_date: timestamp ... 2 more fields]


In [25]:
orders.show(5, truncate=false)

+--------+-------------------+-----------------+---------------+
|order_id|order_date         |order_customer_id|order_status   |
+--------+-------------------+-----------------+---------------+
|1       |2013-07-25 00:00:00|11599            |CLOSED         |
|2       |2013-07-25 00:00:00|256              |PENDING_PAYMENT|
|3       |2013-07-25 00:00:00|12111            |COMPLETE       |
|4       |2013-07-25 00:00:00|8827             |CLOSED         |
|5       |2013-07-25 00:00:00|11318            |COMPLETE       |
+--------+-------------------+-----------------+---------------+
only showing top 5 rows



In [26]:
orders.printSchema

root
 |-- order_id: integer (nullable = true)
 |-- order_date: timestamp (nullable = true)
 |-- order_customer_id: integer (nullable = true)
 |-- order_status: string (nullable = true)



In [27]:
val orders = spark.read.option("inferSchema", "false")
                       .schema("""order_id INT,
                                  order_date TIMESTAMP,
                                  order_customer_id INT,
                                  order_status STRING""")
                       .format("json")
                       .load("data/retail_db_json/orders/part-r-00000-990f5773-9005-49ba-b670-631286032674")

orders: org.apache.spark.sql.DataFrame = [order_id: int, order_date: timestamp ... 2 more fields]


In [28]:
orders.show(5, truncate=false)

+--------+-------------------+-----------------+---------------+
|order_id|order_date         |order_customer_id|order_status   |
+--------+-------------------+-----------------+---------------+
|1       |2013-07-25 00:00:00|11599            |CLOSED         |
|2       |2013-07-25 00:00:00|256              |PENDING_PAYMENT|
|3       |2013-07-25 00:00:00|12111            |COMPLETE       |
|4       |2013-07-25 00:00:00|8827             |CLOSED         |
|5       |2013-07-25 00:00:00|11318            |COMPLETE       |
+--------+-------------------+-----------------+---------------+
only showing top 5 rows



In [29]:
orders.printSchema

root
 |-- order_id: integer (nullable = true)
 |-- order_date: timestamp (nullable = true)
 |-- order_customer_id: integer (nullable = true)
 |-- order_status: string (nullable = true)

