In [0]:

dbutils.fs.put("/Volumes/workspace/default/tmp_customer/sample.json","""{
  "name":"MSFT","location":"Redmond", "satellites": ["Bay Area", "Shanghai"],
  "goods": {
    "trade":true, "customers":["government", "distributer", "retail"],
    "orders":[
        {"orderId":1,"orderTotal":123.34,"shipped":{"orderItems":[{"itemName":"Laptop","itemQty":20},{"itemName":"Charger","itemQty":2}]}},
        {"orderId":2,"orderTotal":323.34,"shipped":{"orderItems":[{"itemName":"Mice","itemQty":2},{"itemName":"Keyboard","itemQty":1}]}}
    ]}}
{"name":"Company1","location":"Seattle", "satellites": ["New York"],
  "goods":{"trade":false, "customers":["store1", "store2"],
  "orders":[
      {"orderId":4,"orderTotal":123.34,"shipped":{"orderItems":[{"itemName":"Laptop","itemQty":20},{"itemName":"Charger","itemQty":3}]}},
      {"orderId":5,"orderTotal":343.24,"shipped":{"orderItems":[{"itemName":"Chair","itemQty":4},{"itemName":"Lamp","itemQty":2}]}}
    ]}}
{"name": "Company2", "location": "Bellevue",
  "goods": {"trade": true, "customers":["Bank"], "orders": [{"orderId": 4, "orderTotal": 123.34}]}}
{"name": "Company3", "location": "Kirkland"}""",True)

In [0]:
from pyspark.sql.types import StructType, StructField, StringType, ArrayType, BooleanType, LongType, DoubleType



df_json = spark.read.format('json').option("multiline", True).option('inferSchema', 'true').load("/Volumes/workspace/default/tmp_customer/sample.json")

display(df_json)

In [0]:
df_json.printSchema()

In [0]:
def child_struct(nested_df):
    # creating python list to store dataframe metadata 
    list_schema = [((), nested_df)]

    # Creating empty python list for final flatten columns 
    flat_columns = []

    while len(list_schema) > 0:
        # Removing latest or recently added item (datframe schema) and returning into df variable

        parents, df = list_schema.pop()
        flat_cols = [ col(".".join(parents+ (c[0],))).alias("_".join(parents + (c[0],))) for c in df.dtypes if c[1][:6] != "struct" ]

        struct_cols = [ c[0] for c in df.dtypes if c[1][:6]== "struct" ]

        flat_columns.extend(flat_cols)

        # Reading nested columns and appending into stack list 
        for i in struct_cols:
            projected_df = df.select(i + ".*")
            list_schema.append((parents + (i,), projected_df))
            
    return nested_df.select(flat_columns)


In [0]:
from pyspark.sql.functions import * 

def master_array(df):
    array_cols = [c[0] for c in df.dtypes if c[1][:5]== "array"]
    while len(array_cols)>0:
        for c in array_cols:
            df = df.withColumns({c: explode_outer(c)})
        df = child_struct(df)
        array_cols = [c[0] for c in df.dtypes if c[1][:5]=="array"]
    return df

In [0]:
df_output = master_array(df_json)
display(df_output)
df_output.printSchema()


In [0]:
json_data = [
    {
        "customer_id": 101,
        "status": "active",
        "name": {"first": "John", "last": "Doe"},
        "address": {"street": "123 Elm St", "city": "Springfield", "zip": "62704"},
        "orders": [
            {
                "order_id": "A001",
                "amount": 250,
                "items": [
                    {"product": "Laptop", "qty": 1},
                    {"product": "Mouse", "qty": 2}
                ]
            },
            {
                "order_id": "A002",
                "amount": 150,
                "items": [
                    {"product": "Keyboard", "qty": 1}
                ]
            }
        ]
    },
    {
        "customer_id": 102,
        "status": "active",
        "name": {"first": "John1", "last": "Doe1"},
        "address": {"street": "123 Elm St", "city": "Springfield", "zip": "62704"},
        "orders": [
            {
                "order_id": "A003",
                "amount": 250,
                "items": [
                    {"product": "Laptop", "qty": 1},
                    {"product": "Mouse", "qty": 2}
                ]
            },
            {
                "order_id": "A004",
                "amount": 150,
                "items": [
                    {"product": "Keyboard", "qty": 1}
                ]
            }
        ]
    }
]

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, col
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, ArrayType


schema = StructType([
    StructField("customer_id", IntegerType(), True),
    StructField("status", StringType(), True),
    StructField("name", StructType([
        StructField("first", StringType(), True),
        StructField("last", StringType(), True)
    ]), True),
    StructField("address", StructType([
        StructField("street", StringType(), True),
        StructField("city", StringType(), True),
        StructField("zip", StringType(), True)
    ]), True),
    StructField("orders", ArrayType(StructType([
        StructField("order_id", StringType(), True),
        StructField("amount", IntegerType(), True),
        StructField("items", ArrayType(StructType([
            StructField("product", StringType(), True),
            StructField("qty", IntegerType(), True)
        ])), True)
    ])), True)
])

# Create DataFrame from JSON with schema


df = spark.createDataFrame(json_data, schema=schema)

In [0]:
df.show()

df.printSchema()

In [0]:
df_result = master_array(df)

display(df_result)
df_result.printSchema()

In [0]:
complex_json = spark.read.option("multiline", "true").json("/Volumes/workspace/default/stream_vol/json/input/complex_json.json")

display(complex_json)

complex_json.printSchema()

In [0]:
result_df = master_array(complex_json)

result_df.show()

result_df.printSchema()

In [0]:

# type 1 

dbutils.fs.put("/Volumes/workspace/default/tmp_customer/sample3.json","""{
	"id": "0001",
	"type": "donut",
	"name": "Cake",
	"ppu": 0.55,
	"batters":
		{
			"batter":
				[
					{ "id": "1001", "type": "Regular" },
					{ "id": "1002", "type": "Chocolate" },
					{ "id": "1003", "type": "Blueberry" },
					{ "id": "1004", "type": "Devil's Food" }
				]
		},
	"topping":
		[
			{ "id": "5001", "type": "None" },
			{ "id": "5002", "type": "Glazed" },
			{ "id": "5005", "type": "Sugar" },
			{ "id": "5007", "type": "Powdered Sugar" },
			{ "id": "5006", "type": "Chocolate with Sprinkles" },
			{ "id": "5003", "type": "Chocolate" },
			{ "id": "5004", "type": "Maple" }
		]
}""", True)

In [0]:
dbutils.fs.ls("/Volumes/workspace/default/tmp_customer/sample2.json")

In [0]:
df_json = spark.read.format('json').option("multiline", True).option('inferSchema', 'true').load("/Volumes/workspace/default/tmp_customer/sample3.json")

display(df_json)

In [0]:
result_df = master_array(df_json)

display(result_df)
# type 2 
