In [0]:
import json
import os
import sys
directory = '03/demos/coffee_sales/'

for filename in os.listdir(directory):
    if filename.endswith('.json'):
        with open(os.path.join(directory, filename)) as f:
             data = json.load(f)
             print('File:', filename)
             print('Type:', type(data))

             if isinstance(data, list):
                 print('List Items:')
                 for item in data:
                     print(item)
             elif isinstance(data, dict):
                 print('Dictionary Items:')
                 for key, value in data.items():
                     print(key, ':', value)
             else:
                 print('Data:')
                 print(data)

             print('-------------------------')

with open('03/demos/coffee_sales/coffee_sales.json', 'r') as f:
    file_data = json.load(f)

json_string = json.dumps(file_data)
json_data = json.loads(json_string)

locations_data = json_data['locations']
sales_data = json_data['sales']

locations_names = [location['name'] for location in locations_data]
sales_dates = [sale['date'] for sale in sales_data]
sales_sales = [sale['sales'] for sale in sales_data]

coffee_data = {'locations_names': locations_names, 'sales_dates': sales_dates, 'sales_sales': sales_sales}

print('\nfile_data:',type(file_data),'\n')
display(file_data)
print('\njson_string:',type(json_string),'\n')
display(json_string)
print('\njson_data:',type(json_data),'\n')
display(json_data)
print('\nfile_data same as json_data? ',file_data==json_data,'\n')
print('\ncoffee_data:',type(coffee_data),'\n')
display(coffee_data)


In [0]:
directory = '/Volumes/workspace/default/managed_volume/ManishKumar/'

from pyspark.sql.functions import *
from pyspark.sql.types import *

import pandas as pd
import json

#Custom way to convert multiline json to singleline json. Below code snippet will:

# 1. read entire json file (multiline) as a single column 'value', with each line on a new row.
# 2. remove characters for new line, tabs and white spaces (indentation)
# 3a. collect all rows (RowObjects) into a list using collect_list().
# 3b. concat all rows into a single string using concat_ws().
# 4. capture string contents of 3b in a Python string variable.

payload = spark.read.format('text')\
        .options(multiline=True,mode='permissive')\
        .load(directory+'json_data.json')\
        .withColumn('value',regexp_replace(regexp_replace(regexp_replace('value', '\n', ''), '\t', ''),'  ',''))\
        .select(concat_ws('',collect_list('value')).alias('json_contents'))
display(payload)

# 4a. Using collect(), create a a python list of Row objects "list_of_RowObjects" from a spark dataframe "payload". Both will contain same data, but representation is different.
# 4b. Using row iterator [n], extract the first row object from the list and assign it to a row object variable "row".
# 4c. Using row['column'] to extract the actual string contents of the "column".
list_of_RowObjects = payload.collect()
row = list_of_RowObjects[0]
json_contents_string = row['json_contents']

spark.sql(f"""
        create or replace temporary view vw_json as
        select  json_data.*
        from    (
                select  from_json(
                        '{json_contents_string}'
                        ,schema_of_json('{json_contents_string}')
                        ) as json_data
                )
        """)

spark.sql("""select * from vw_json""").display()
# #this gives json string. Copy only 1st record from results and paste in next snippet.

#sample json string for schema inference
json_string = '{"user_id":"0001","first_name":"Akshay","listings": [{"listing_id":"847254","place": {"Area":"Naupada","City":"Thane"},"description":"apartment","services":[{"service_id":"BG111","service_type":"CookingGas","service_provider":"BharatGas"},{"service_id":"MV111","service_type":"Electricity","service_provider":"Mahavitaran"}]},{"listing_id":"435543","place": {"Area":"ShivajiNagar","City":"Pune"},"description":"vila","services":[{"service_id":"HG111","service_type":"CookingGas","service_provider":"HidustanGas"},{"service_id":"RL111","service_type":"Electricity","service_provider":"Reliance"}]}]}'

spark.sql(f"""
        create or replace temporary view vw_json as
        select  from_json('{json_string}'
                ,schema_of_json('{json_string}')) as json_data
        """)

spark.sql("""select * from vw_json""").display()

spark.sql("""
    select  * except (services)
            ,services.service_id as service_id
            ,services.service_provider as service_provider
            ,services.service_type as service_type
    from    (
            select  * except (listings)
                    ,listings.listing_id as listing_id
                    ,listings.description as description
                    ,listings.place.Area as Area
                    ,listings.place.City as City
                    ,explode(listings.services) as services
            from    (
                        select  * except (json_data) 
                                ,json_data.user_id as user_id
                                ,json_data.first_name as first_name
                                ,explode(json_data.listings) as listings
                        from  vw_json
                    )t1
            )t2
    order by user_id, listing_id, service_id
    """).display()
