Questions:
1. What is json data and how to read it in spark?
2. What if I have 3 keys in all line and 4 keys in one?
3. What is multiline and line delimited json?
4. Which one works faster - multiline or line-delimited?
5. How to convert nested json into spark DF?
6. What will happen if I have corrupted json file or invalid json file?

JSON -> JavaScript Object Notation\
key-value pair\

Structured Data (Tabular):
----------------------
| id | Name    | age |
----------------------
| 1  | Deep    | 26  |
----------------------
| 2  | Dibakar | 26  |
----------------------

Semistructured Data (JSON):

[
  {"id": 1, "Name": "Deep", "age": 26},
  {"id": 2, "Name": "Dibakar", "age": 26}
]

Difference between structured and Semistructured data is, if we had to add a new field/attribute for Deep, then in-case of structered data, we would have to add a new column and assign some value to the records (Null for the records that do not have that field/attribute). But in-case of semistructed data, we can just add that extra attrbute to the record that has that attribute and not add it to the records that do not have a dedicated value for that attribute.

Example:

Structured Data (Tabular):
-----------------------------------------
| id | Name    | age | Salary | Nominee |
-----------------------------------------
| 1  | Deep    | 26  | 20000  | Null    |
-----------------------------------------
| 2  | Dibakar | 26  | Null   | abc     |
-----------------------------------------

Semistructured Data (JSON):

[\
  {"id": 1, "Name": "Deep", "age": 26, "Salary": 20000},\
  {"id": 2, "Name": "Dibakar", "age": 26, "Nominee": "abc"}\
]

From the above examples, we can see that JSON gives us more flexibility.


<pre>
File uploaded to /FileStore/tables/line_delimited_json.json
File uploaded to /FileStore/tables/single_file_json_with_extra_fields.json
File uploaded to /FileStore/tables/corrupted_json.json
File uploaded to /FileStore/tables/Multi_line_incorrect.json
File uploaded to /FileStore/tables/Multi_line_correct.json
File uploaded to /FileStore/tables/file5.json
</pre>

In [0]:
spark.read.format("json")\
        .option("inferschema", "true")\
        .option("mode", "PERMISSIVE")\
        .load("/FileStore/tables/line_delimited_json.json")\
        .show()

+---+--------+------+
|age|    name|salary|
+---+--------+------+
| 20|  Manish| 20000|
| 25|  Nikita| 21000|
| 16|  Pritam| 22000|
| 35|Prantosh| 25000|
| 67|  Vikash| 40000|
+---+--------+------+



- By default, Spark reads line_delimited json files. For line_delimited files, it considers each line as one record and easily reads it.
- But for multi_line json we need to pass multi_line as true. As there is no stable way for spark to figure out where one record starts and ends, spark reads the whole file as an object and goes in the files to manually determine each record.
- Because of this, reading multi_line json file degrades the performance than reading line_delimited json file.
- To pass a multi_line json file correctly, we need to pass it inside a list (a list of dictionary). If we do not pass it in a list then spark will only read the very fast record and stop reading the rest.

Examples:
- Line delimited:
<pre>
{"id": 1, "Name": "Deep", "age": 26, "Salary": 20000},
{"id": 2, "Name": "Dibakar", "age": 26, "Nominee": "abc"}
</pre>

- Multiline:
<pre>
{
  "id": 1, 
  "Name": "Deep", 
  "age": 26, 
  "Salary": 20000
},
{
  "id": 2, 
  "Name": "Dibakar", 
  "age": 26, 
  "Nominee": "abc"
}
</pre>

In [0]:
spark.read.format("json")\
        .option("inferschema", "true")\
        .option("mode", "PERMISSIVE")\
        .load("/FileStore/tables/single_file_json_with_extra_fields.json")\
        .show()
# As the last record is the only one with the extra field, gender, so null is populated for other records that do not have the field in them.

+---+------+--------+------+
|age|gender|    name|salary|
+---+------+--------+------+
| 20|  null|  Manish| 20000|
| 25|  null|  Nikita| 21000|
| 16|  null|  Pritam| 22000|
| 35|  null|Prantosh| 25000|
| 67|     M|  Vikash| 40000|
+---+------+--------+------+



In [0]:
spark.read.format("json")\
        .option("inferschema", "true")\
        .option("mode", "PERMISSIVE")\
        .option("multiline", "true")\
        .load("/FileStore/tables/Multi_line_correct.json")\
        .show()

+---+--------+------+
|age|    name|salary|
+---+--------+------+
| 20|  Manish| 20000|
| 25|  Nikita| 21000|
| 16|  Pritam| 22000|
| 35|Prantosh| 25000|
| 67|  Vikash| 40000|
+---+--------+------+



In [0]:
spark.read.format("json")\
        .option("inferschema", "true")\
        .option("mode", "PERMISSIVE")\
        .option("multiline", "true")\
        .load("/FileStore/tables/Multi_line_incorrect.json")\
        .show()
# As the records were not in a list, so spark only read the very first record.

+---+------+------+
|age|  name|salary|
+---+------+------+
| 20|Manish| 20000|
+---+------+------+



In [0]:
spark.read.format("json")\
        .option("inferschema", "true")\
        .option("mode", "PERMISSIVE")\
        .load("/FileStore/tables/corrupted_json.json")\
        .show(truncate=False)
# When reading a corrupted json, the corrupted record is stored in the _corrupted_record but parse the valid records in the DF.
# In case of invalid JSON, it will give us a complete error.
# In case of corrupted JSON, it will not give us an error. Rather just store the corrupted record in a _corrupted_record field.

+----------------------------------------+----+--------+------+
|_corrupt_record                         |age |name    |salary|
+----------------------------------------+----+--------+------+
|null                                    |20  |Manish  |20000 |
|null                                    |25  |Nikita  |21000 |
|null                                    |16  |Pritam  |22000 |
|null                                    |35  |Prantosh|25000 |
|{"name":"Vikash","age":67,"salary":40000|null|null    |null  |
+----------------------------------------+----+--------+------+



In [0]:
# Reading nested JSON
spark.read.format("json")\
        .option("inferschema", "true")\
        .option("mode", "PERMISSIVE")\
        .load("/FileStore/tables/file5.json")\
        .printSchema()

root
 |-- code: long (nullable = true)
 |-- message: string (nullable = true)
 |-- restaurants: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- restaurant: struct (nullable = true)
 |    |    |    |-- R: struct (nullable = true)
 |    |    |    |    |-- res_id: long (nullable = true)
 |    |    |    |-- apikey: string (nullable = true)
 |    |    |    |-- average_cost_for_two: long (nullable = true)
 |    |    |    |-- cuisines: string (nullable = true)
 |    |    |    |-- currency: string (nullable = true)
 |    |    |    |-- deeplink: string (nullable = true)
 |    |    |    |-- establishment_types: array (nullable = true)
 |    |    |    |    |-- element: string (containsNull = true)
 |    |    |    |-- events_url: string (nullable = true)
 |    |    |    |-- featured_image: string (nullable = true)
 |    |    |    |-- has_online_delivery: long (nullable = true)
 |    |    |    |-- has_table_booking: long (nullable = true)
 |    |    |    |-- i

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
# To read a nested JSON, we will need to flatten it using explode and other methods. 
resturant_json_data = spark.read.format("json")\
        .option("inferschema", "true")\
        .option("multiline", "true")\
        .option("mode", "PERMISSIVE")\
        .load("/FileStore/tables/file5.json")

In [0]:
resturant_json_data.show()

+----+-------+--------------------+-------------+-------------+-------------+------+
|code|message|         restaurants|results_found|results_shown|results_start|status|
+----+-------+--------------------+-------------+-------------+-------------+------+
|null|   null|                  []|            0|            0|            1|  null|
|null|   null|[{{{17066603}, b9...|         6835|           20|            1|  null|
|null|   null|                  []|            0|            0|            1|  null|
|null|   null|                  []|            0|            0|            1|  null|
|null|   null|[{{{17093124}, b9...|         8680|           20|            1|  null|
|null|   null|                  []|            0|            0|            1|  null|
|null|   null|                  []|            0|            0|            1|  null|
|null|   null|[{{{17580142}, b9...|          943|           20|            1|  null|
|null|   null|                  []|            0|            0|  

In [0]:
# As the JSON is multi_level, nested JSON, we need to flatten it.
resturant_json_data.printSchema() # Shema will tell us, what are the columns that we need to flatten
# We need to flatten the fields of type array and struct
# For array, we will need to explode it, to flatten (un-nest) it. Exploding an array will convert it into a struct.
# For struct we can access elements using dot (.) operator.

root
 |-- code: long (nullable = true)
 |-- message: string (nullable = true)
 |-- restaurants: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- restaurant: struct (nullable = true)
 |    |    |    |-- R: struct (nullable = true)
 |    |    |    |    |-- res_id: long (nullable = true)
 |    |    |    |-- apikey: string (nullable = true)
 |    |    |    |-- average_cost_for_two: long (nullable = true)
 |    |    |    |-- cuisines: string (nullable = true)
 |    |    |    |-- currency: string (nullable = true)
 |    |    |    |-- deeplink: string (nullable = true)
 |    |    |    |-- establishment_types: array (nullable = true)
 |    |    |    |    |-- element: string (containsNull = true)
 |    |    |    |-- events_url: string (nullable = true)
 |    |    |    |-- featured_image: string (nullable = true)
 |    |    |    |-- has_online_delivery: long (nullable = true)
 |    |    |    |-- has_table_booking: long (nullable = true)
 |    |    |    |-- i

In [0]:
resturant_json_data.select("*", explode("restaurants").alias("new_restaurants")).printSchema()
# restaurant is type array.
# To extract anything inside restaurants, we will first need to flatten (un-nest) it.
# We cannot explode anything that is inside restaurants, till restaurant itself is exploded.
# But we can explode other parallel array fields that are in the save level as restaurants.
# Exploding an array will convert it into a struct type
# struct and array are not column types. Column types are nested inside them.

root
 |-- code: long (nullable = true)
 |-- message: string (nullable = true)
 |-- restaurants: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- restaurant: struct (nullable = true)
 |    |    |    |-- R: struct (nullable = true)
 |    |    |    |    |-- res_id: long (nullable = true)
 |    |    |    |-- apikey: string (nullable = true)
 |    |    |    |-- average_cost_for_two: long (nullable = true)
 |    |    |    |-- cuisines: string (nullable = true)
 |    |    |    |-- currency: string (nullable = true)
 |    |    |    |-- deeplink: string (nullable = true)
 |    |    |    |-- establishment_types: array (nullable = true)
 |    |    |    |    |-- element: string (containsNull = true)
 |    |    |    |-- events_url: string (nullable = true)
 |    |    |    |-- featured_image: string (nullable = true)
 |    |    |    |-- has_online_delivery: long (nullable = true)
 |    |    |    |-- has_table_booking: long (nullable = true)
 |    |    |    |-- i

In [0]:
# Now we have new_restaurants field, that is already exploded.
# So, we can drop the restaurants field as we do not need it anymore.
resturant_json_data.select("*", explode("restaurants").alias("new_restaurants"))\
    .drop("restaurants").printSchema()

root
 |-- code: long (nullable = true)
 |-- message: string (nullable = true)
 |-- results_found: long (nullable = true)
 |-- results_shown: long (nullable = true)
 |-- results_start: string (nullable = true)
 |-- status: string (nullable = true)
 |-- new_restaurants: struct (nullable = true)
 |    |-- restaurant: struct (nullable = true)
 |    |    |-- R: struct (nullable = true)
 |    |    |    |-- res_id: long (nullable = true)
 |    |    |-- apikey: string (nullable = true)
 |    |    |-- average_cost_for_two: long (nullable = true)
 |    |    |-- cuisines: string (nullable = true)
 |    |    |-- currency: string (nullable = true)
 |    |    |-- deeplink: string (nullable = true)
 |    |    |-- establishment_types: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- events_url: string (nullable = true)
 |    |    |-- featured_image: string (nullable = true)
 |    |    |-- has_online_delivery: long (nullable = true)
 |    |    |-- has_tab

In [0]:
# Reading res_id column, nested inside struct
resturant_json_data.select("*", explode("restaurants").alias("new_restaurants"))\
    .drop("restaurants")\
        .select("new_restaurants.restaurant.R.res_id").show()

+--------+
|  res_id|
+--------+
|17066603|
|17059541|
|17064405|
|17057797|
|17057591|
|17064266|
|17060516|
|17060320|
|17059060|
|17059012|
|17060869|
|17061231|
|17058534|
|17057925|
|17064031|
|17061237|
|17061253|
|17061296|
|17061205|
|17057397|
+--------+
only showing top 20 rows



In [0]:
resturant_json_data.select("*", explode("restaurants").alias("new_restaurants"))\
    .drop("restaurants")\
        .select("*", "new_restaurants.restaurant.R.res_id").printSchema()

root
 |-- code: long (nullable = true)
 |-- message: string (nullable = true)
 |-- results_found: long (nullable = true)
 |-- results_shown: long (nullable = true)
 |-- results_start: string (nullable = true)
 |-- status: string (nullable = true)
 |-- new_restaurants: struct (nullable = true)
 |    |-- restaurant: struct (nullable = true)
 |    |    |-- R: struct (nullable = true)
 |    |    |    |-- res_id: long (nullable = true)
 |    |    |-- apikey: string (nullable = true)
 |    |    |-- average_cost_for_two: long (nullable = true)
 |    |    |-- cuisines: string (nullable = true)
 |    |    |-- currency: string (nullable = true)
 |    |    |-- deeplink: string (nullable = true)
 |    |    |-- establishment_types: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- events_url: string (nullable = true)
 |    |    |-- featured_image: string (nullable = true)
 |    |    |-- has_online_delivery: long (nullable = true)
 |    |    |-- has_tab

In [0]:
# Query the below fields within restaurants array:
# res_id (R), element (establishment_types), name
resturant_json_data.select("*", explode("restaurants").alias("new_restaurants"))\
    .drop("restaurants")\
        .select(
            "*",
            "new_restaurants.restaurant.R.res_id",
            explode("new_restaurants.restaurant.establishment_types").alias("establishment_types_new"),
            "new_restaurants.restaurant.name"
        ).drop("new_restaurants").printSchema()
# We can drop multiple fields together. We just need to add the name of the fields in drop() with a comma (,) separation.

root
 |-- code: long (nullable = true)
 |-- message: string (nullable = true)
 |-- results_found: long (nullable = true)
 |-- results_shown: long (nullable = true)
 |-- results_start: string (nullable = true)
 |-- status: string (nullable = true)
 |-- res_id: long (nullable = true)
 |-- establishment_types_new: string (nullable = true)
 |-- name: string (nullable = true)



In [0]:
# If there is any null value in a record, then by default, explode removes (discards) the record
# To hande the null case, use explode_outer instead of explode
resturant_json_data.select("*", explode("restaurants").alias("new_restaurants"))\
    .drop("restaurants")\
        .select(
            "new_restaurants.restaurant.R.res_id",
            explode_outer("new_restaurants.restaurant.establishment_types").alias("establishment_types_new"),
            "new_restaurants.restaurant.name"
        ).drop("new_restaurants").show(truncate=False)

+--------+-----------------------+------------------------------------+
|res_id  |establishment_types_new|name                                |
+--------+-----------------------+------------------------------------+
|17066603|null                   |The Coop                            |
|17059541|null                   |Maggiano's Little Italy             |
|17064405|null                   |Tako Cheena by Pom Pom              |
|17057797|null                   |Bosphorous Turkish Cuisine          |
|17057591|null                   |Bahama Breeze Island Grille         |
|17064266|null                   |Hawkers Asian Street Fare           |
|17060516|null                   |Seasons 52 Fresh Grill              |
|17060320|null                   |Raglan Road Irish Pub and Restaurant|
|17059060|null                   |Hillstone                           |
|17059012|null                   |Hollerbach's Willow Tree Café       |
|17060869|null                   |Texas de Brazil               

---