## Working With Nested Data

This notebook allow to show how work with nested data (json files) on Databricks.

Additionaly, you can see this code at [this Databricks link](https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/5025747602310753/3735810169305329/2315352051373759/latest.html).

The purpose of this notebook is to:

Download and read a JSON file from [URL]("https://health.data.ny.gov/api/views/jxy9-yhdk/rows.json?accessType=DOWNLOAD").

The idea is to organize the information from the data into a PySpark dataframe and finally determine what is the most popular first letters baby names to start with in each year.

#### For this, the plan is:

- Download the data.
- Identify the data structure.
- Organize the data into a PySpark dataframe.
- Perform the requested query.

#### Through this, we will learn how to:

- Retrieve information from nested data.
- Utilize functions like explode, col, and row_number.
- Use window function to apply row_number.

In [None]:
# Constants
url_source = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.json?accessType=DOWNLOAD"
local_file = '/tmp/local_archive_baby_names.json'

In [None]:
# import libraries
import urllib.request
import json
import pyspark.sql.functions as F
from pyspark.sql.window import Window

In [None]:
# Download the data from url:
urllib.request.urlretrieve(url_source, local_file)

('/tmp/local_archive_baby_names.json',
 <http.client.HTTPMessage at 0x7f6008fb8160>)

In [None]:
# Let see the file content

with open(local_file, 'r') as f:
    data = json.load(f)

print(json.dumps(data, indent=2))

{
  "meta": {
    "view": {
      "id": "jxy9-yhdk",
      "name": "Baby Names: Beginning 2007",
      "assetType": "dataset",
      "attribution": "New York State Department of Health",
      "attributionLink": "http://www.health.ny.gov/statistics/vital_statistics/",
      "averageRating": 0,
      "category": "Health",
      "createdAt": 1356724562,
      "description": "New York State Baby Names are aggregated and displayed by the year, county, or borough where the mother resided as stated on a New York State or New York City (NYC) birth certificate. The frequency of the baby name is listed if there are 5 or more of the same baby name in a county outside of NYC or 10 or more of the same baby name in a NYC borough.",
      "displayType": "table",
      "downloadCount": 119425,
      "hideFromCatalog": false,
      "hideFromDataJson": false,
      "indexUpdatedAt": 1527713966,
      "newBackend": true,
      "numberOfComments": 0,
      "oid": 28927042,
      "provenance": "official",

In [None]:
# Read with pyspark
df = spark.read.json('file:'+local_file, multiLine=True)
df.printSchema()
# In the schema below, it can be seen that the data is within the 'data' array and the columns names are in 'meta.view.columns.name'

root
 |-- data: array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: string (containsNull = true)
 |-- meta: struct (nullable = true)
 |    |-- view: struct (nullable = true)
 |    |    |-- approvals: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- reviewedAt: long (nullable = true)
 |    |    |    |    |-- reviewedAutomatically: boolean (nullable = true)
 |    |    |    |    |-- state: string (nullable = true)
 |    |    |    |    |-- submissionDetails: struct (nullable = true)
 |    |    |    |    |    |-- permissionType: string (nullable = true)
 |    |    |    |    |-- submissionId: long (nullable = true)
 |    |    |    |    |-- submissionObject: string (nullable = true)
 |    |    |    |    |-- submissionOutcome: string (nullable = true)
 |    |    |    |    |-- submissionOutcomeApplication: struct (nullable = true)
 |    |    |    |    |    |-- failureCount: long (nullable = true

In [None]:
# We can view the array data separated using the 'explode' function:
data_df = df.select(F.explode('data'))
data_df.show(truncate=False)

+-------------------------------------------------------------------------------------------------------------------------------------+
|col                                                                                                                                  |
+-------------------------------------------------------------------------------------------------------------------------------------+
|[row-emfw_sfk5_5wtx, 00000000-0000-0000-3154-4394D27F2559, 0, 1682529128, null, 1682529128, null, { }, 2007, ZOEY, KINGS, F, 11]     |
|[row-nnzi~h48n.2mpf, 00000000-0000-0000-3DF6-C10C58270E48, 0, 1682529128, null, 1682529128, null, { }, 2007, ZOEY, SUFFOLK, F, 6]    |
|[row-8jmi-hg3w_uppp, 00000000-0000-0000-7C56-D892C3D84DE1, 0, 1682529128, null, 1682529128, null, { }, 2007, ZOEY, MONROE, F, 6]     |
|[row-bp7y-cty8-m6gp, 00000000-0000-0000-41FE-82EEAC28D5D1, 0, 1682529128, null, 1682529128, null, { }, 2007, ZOEY, ERIE, F, 9]       |
|[row-k26x~vr43~psrg, 00000000-0000-0000-1835-2E

In [None]:
# To access the data, you need to access it by index in the following way:
df.select(F.explode('data')).select(F.col('col')[0], F.col('col')[1]).show(truncate=False)

+------------------+------------------------------------+
|col[0]            |col[1]                              |
+------------------+------------------------------------+
|row-emfw_sfk5_5wtx|00000000-0000-0000-3154-4394D27F2559|
|row-nnzi~h48n.2mpf|00000000-0000-0000-3DF6-C10C58270E48|
|row-8jmi-hg3w_uppp|00000000-0000-0000-7C56-D892C3D84DE1|
|row-bp7y-cty8-m6gp|00000000-0000-0000-41FE-82EEAC28D5D1|
|row-k26x~vr43~psrg|00000000-0000-0000-1835-2E9BD12C2A3E|
|row-8cvx_w265_mg48|00000000-0000-0000-A4C6-EE82A36B5BAD|
|row-wknm.93pt~87w7|00000000-0000-0000-9D37-4BB5279B1F5D|
|row-c75h-74q9.zkaw|00000000-0000-0000-4C13-DF627DDA43E7|
|row-n5tg_veg7-dxt9|00000000-0000-0000-6020-F05111ADBBCB|
|row-pe29.gtsr_ta54|00000000-0000-0000-7E73-823615A4553C|
|row-ve9k~rqcx_xxar|00000000-0000-0000-0755-B7F094946349|
|row-m6kc.4n27~if9t|00000000-0000-0000-DF06-A9FE6416F3F3|
|row-h8a2~jf8z_ycfb|00000000-0000-0000-686F-63DA4AB06D32|
|row-2i6u.ru75~ueb5|00000000-0000-0000-DAF8-AF850AB30355|
|row-yt9x-r4ws

In [None]:
# Let's retrieve the column names
df.select(F.explode('meta.view.columns.name')).show(truncate=False)

+------------+
|col         |
+------------+
|sid         |
|id          |
|position    |
|created_at  |
|created_meta|
|updated_at  |
|updated_meta|
|meta        |
|Year        |
|First Name  |
|County      |
|Sex         |
|Count       |
+------------+



In [None]:
columns_name = [str(name.col).replace(' ', '_').lower() for name in df.select(F.explode('meta.view.columns.name')).collect()]
columns_name

['sid',
 'id',
 'position',
 'created_at',
 'created_meta',
 'updated_at',
 'updated_meta',
 'meta',
 'year',
 'first_name',
 'county',
 'sex',
 'count']

In [None]:
# Ahora ponemos los nombres de los campos que están en el campo data
final_df = data_df.select([F.col('col')[i].alias(col_name) for i, col_name in enumerate(columns_name)])
final_df.show()

+------------------+--------------------+--------+----------+------------+----------+------------+----+----+----------+-----------+---+-----+
|               sid|                  id|position|created_at|created_meta|updated_at|updated_meta|meta|year|first_name|     county|sex|count|
+------------------+--------------------+--------+----------+------------+----------+------------+----+----+----------+-----------+---+-----+
|row-emfw_sfk5_5wtx|00000000-0000-000...|       0|1682529128|        null|1682529128|        null| { }|2007|      ZOEY|      KINGS|  F|   11|
|row-nnzi~h48n.2mpf|00000000-0000-000...|       0|1682529128|        null|1682529128|        null| { }|2007|      ZOEY|    SUFFOLK|  F|    6|
|row-8jmi-hg3w_uppp|00000000-0000-000...|       0|1682529128|        null|1682529128|        null| { }|2007|      ZOEY|     MONROE|  F|    6|
|row-bp7y-cty8-m6gp|00000000-0000-000...|       0|1682529128|        null|1682529128|        null| { }|2007|      ZOEY|       ERIE|  F|    9|
|row-k

In [None]:


# For that, we will make use of the row_number() function.

# First, we set up the window function that will be used when using row_number():

order_window = Window.partitionBy('year').orderBy(F.desc('count'))

In [None]:
# We calculate the row_number() column as row_:
final_df.select('year', F.substring('first_name', 1, 1).alias('letter'), 'count').groupBy('year', 'letter').count().withColumn('row_', F.row_number().over(order_window)).show()

+----+------+-----+----+
|year|letter|count|row_|
+----+------+-----+----+
|2007|     A|  913|   1|
|2007|     J|  814|   2|
|2007|     M|  565|   3|
|2007|     C|  453|   4|
|2007|     S|  414|   5|
|2007|     E|  400|   6|
|2007|     L|  346|   7|
|2007|     D|  272|   8|
|2007|     K|  272|   9|
|2007|     N|  252|  10|
|2007|     B|  236|  11|
|2007|     G|  234|  12|
|2007|     R|  203|  13|
|2007|     T|  179|  14|
|2007|     H|  166|  15|
|2007|     I|  140|  16|
|2007|     P|   99|  17|
|2007|     O|   96|  18|
|2007|     V|   93|  19|
|2007|     F|   63|  20|
+----+------+-----+----+
only showing top 20 rows



In [None]:
# Finally, we filter:

favorite_first_letter_df = final_df.select('year', F.substring('first_name', 1, 1).alias('letter'), 'count').groupBy('year', 'letter').count().withColumn('row_', F.row_number().over(order_window)).filter('row_ == 1').select('year', 'letter', 'count')
favorite_first_letter_df.show()

+----+------+-----+
|year|letter|count|
+----+------+-----+
|2007|     A|  913|
|2008|     A|  936|
|2009|     A|  900|
|2010|     A|  884|
|2011|     A|  915|
|2012|     A|  880|
|2013|     A|  895|
|2014|     A| 1245|
|2015|     A|  918|
|2016|     A|  935|
|2017|     A|  883|
|2018|     A|  875|
|2019|     A|  868|
|2020|     A|  796|
+----+------+-----+

