In [0]:
dbutils.fs.ls("/mnt/dataset")

[FileInfo(path='dbfs:/mnt/dataset/example_2.json', name='example_2.json', size=947, modificationTime=1753689852000),
 FileInfo(path='dbfs:/mnt/dataset/example_2_flattened.parquet/', name='example_2_flattened.parquet/', size=0, modificationTime=1753697811000),
 FileInfo(path='dbfs:/mnt/dataset/example_3.json', name='example_3.json', size=1090, modificationTime=1753691329000),
 FileInfo(path='dbfs:/mnt/dataset/example_3_flattened.parquet/', name='example_3_flattened.parquet/', size=0, modificationTime=1753697812000),
 FileInfo(path='dbfs:/mnt/dataset/yellow_tripdata_2018-01.parquet', name='yellow_tripdata_2018-01.parquet', size=123668517, modificationTime=1753644089000)]

## Function to flatten the JSON file

In [0]:
from pyspark.sql.functions import col

from pyspark.sql.functions import col, explode_outer

def flatten_df(df):
    # Loop until all the nested data types are not flattened
    while True:
        flat_cols = []
        explode_cols = []
        struct_cols = []

        # Loop through all the columns and their data types
        for field in df.schema.fields:
            field_name = field.name
            data_type = field.dataType
            
            # Check is the data type has attribute "typeName" associated with it(to verify the data types)
            if hasattr(data_type, "typeName"):
                dtype = data_type.typeName()
            # If data type is not associated, we considered it as simple String
            else:
                dtype = data_type.simpleString()
            
            # Appending the columns to the list based on the data type
            if dtype == "struct":
                struct_cols.append(field_name)
            elif dtype == "array":
                if hasattr(data_type.elementType, "typeName") and data_type.elementType.typeName() == "struct":
                    explode_cols.append(field_name)
                else:
                    flat_cols.append(field_name)
            else:
                flat_cols.append(field_name)

        # If the struct and explode arrays are empty, we break as nested elements are no longer present
        if not struct_cols and not explode_cols:
            break

        # For each struct column, we flatten the columns and drop the original column
        for struct_col in struct_cols:
            child_fields = df.select(f"{struct_col}.*").columns
            for child in child_fields:
                df = df.withColumn(f"{struct_col}_{child}", col(f"{struct_col}.{child}"))
            df = df.drop(struct_col)

        # For each explode array column, we explode the array and drop the original column
        for array_col in explode_cols:
            df = df.withColumn(array_col, explode_outer(col(array_col)))

    return df


## Example 1

In [0]:
df = spark.read.format("json")\
    .option("inferSchema", True)\
    .option("multiLine", True)\
    .load("/mnt/dataset/example_2.json")

In [0]:
df.printSchema()

root
 |-- quiz: struct (nullable = true)
 |    |-- maths: struct (nullable = true)
 |    |    |-- q1: struct (nullable = true)
 |    |    |    |-- answer: string (nullable = true)
 |    |    |    |-- options: array (nullable = true)
 |    |    |    |    |-- element: string (containsNull = true)
 |    |    |    |-- question: string (nullable = true)
 |    |    |-- q2: struct (nullable = true)
 |    |    |    |-- answer: string (nullable = true)
 |    |    |    |-- options: array (nullable = true)
 |    |    |    |    |-- element: string (containsNull = true)
 |    |    |    |-- question: string (nullable = true)
 |    |-- sport: struct (nullable = true)
 |    |    |-- q1: struct (nullable = true)
 |    |    |    |-- answer: string (nullable = true)
 |    |    |    |-- options: array (nullable = true)
 |    |    |    |    |-- element: string (containsNull = true)
 |    |    |    |-- question: string (nullable = true)



In [0]:
flat_json = flatten_df(df)
flat_json.printSchema()
flat_json.display()

root
 |-- quiz_maths_q1_answer: string (nullable = true)
 |-- quiz_maths_q1_options: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- quiz_maths_q1_question: string (nullable = true)
 |-- quiz_maths_q2_answer: string (nullable = true)
 |-- quiz_maths_q2_options: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- quiz_maths_q2_question: string (nullable = true)
 |-- quiz_sport_q1_answer: string (nullable = true)
 |-- quiz_sport_q1_options: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- quiz_sport_q1_question: string (nullable = true)



quiz_maths_q1_answer,quiz_maths_q1_options,quiz_maths_q1_question,quiz_maths_q2_answer,quiz_maths_q2_options,quiz_maths_q2_question,quiz_sport_q1_answer,quiz_sport_q1_options,quiz_sport_q1_question
12,"List(10, 11, 12, 13)",5 + 7 = ?,4,"List(1, 2, 3, 4)",12 - 8 = ?,Huston Rocket,"List(New York Bulls, Los Angeles Kings, Golden State Warriros, Huston Rocket)",Which one is correct team name in NBA?


In [0]:
flat_json.write.mode("overwrite")\
    .format("parquet")\
    .save("/mnt/dataset/example_2_flattened.parquet")

## Example 2

In [0]:
df = spark.read.format("json")\
    .option("inferSchema", True)\
    .option("multiLine", True)\
    .load("/mnt/dataset/example_3.json")

In [0]:
df.printSchema()

root
 |-- id: string (nullable = true)
 |-- orderStatus: string (nullable = true)
 |-- paymentDetails: struct (nullable = true)
 |    |-- amount: double (nullable = true)
 |    |-- method: string (nullable = true)
 |    |-- transactionId: string (nullable = true)
 |-- products: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- price: double (nullable = true)
 |    |    |-- productId: string (nullable = true)
 |    |    |-- quantity: long (nullable = true)
 |-- tags: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- timestamp: string (nullable = true)
 |-- user: struct (nullable = true)
 |    |-- email: string (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- userId: string (nullable = true)



In [0]:
flat_json = flatten_df(df)
flat_json.printSchema()
flat_json.display()

root
 |-- id: string (nullable = true)
 |-- orderStatus: string (nullable = true)
 |-- tags: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- timestamp: string (nullable = true)
 |-- paymentDetails_amount: double (nullable = true)
 |-- paymentDetails_method: string (nullable = true)
 |-- paymentDetails_transactionId: string (nullable = true)
 |-- user_email: string (nullable = true)
 |-- user_name: string (nullable = true)
 |-- user_userId: string (nullable = true)
 |-- products_name: string (nullable = true)
 |-- products_price: double (nullable = true)
 |-- products_productId: string (nullable = true)
 |-- products_quantity: long (nullable = true)



id,orderStatus,tags,timestamp,paymentDetails_amount,paymentDetails_method,paymentDetails_transactionId,user_email,user_name,user_userId,products_name,products_price,products_productId,products_quantity
1,completed,"List(electronics, online_sale)",2023-01-15T10:00:00Z,1250.0,credit_card,TXN12345,alice.s@example.com,Alice Smith,user_A,Laptop,1200.0,P101,1
1,completed,"List(electronics, online_sale)",2023-01-15T10:00:00Z,1250.0,credit_card,TXN12345,alice.s@example.com,Alice Smith,user_A,Mouse,25.0,P102,2
2,pending,List(peripherals),2023-01-15T11:30:00Z,375.0,paypal,TXN67890,bob.j@example.com,Bob Johnson,user_B,Keyboard,75.0,P201,1
2,pending,List(peripherals),2023-01-15T11:30:00Z,375.0,paypal,TXN67890,bob.j@example.com,Bob Johnson,user_B,Monitor,300.0,P202,1


In [0]:
flat_json.write.mode("overwrite")\
    .format("parquet")\
    .save("/mnt/dataset/example_3_flattened.parquet")

## Loading Saved Parquet Files

In [0]:
spark.read.format("parquet")\
    .option("inferSchema", True)\
    .option("header", True)\
    .load("/mnt/dataset/example_2_flattened.parquet").display()

quiz_maths_q1_answer,quiz_maths_q1_options,quiz_maths_q1_question,quiz_maths_q2_answer,quiz_maths_q2_options,quiz_maths_q2_question,quiz_sport_q1_answer,quiz_sport_q1_options,quiz_sport_q1_question
12,"List(10, 11, 12, 13)",5 + 7 = ?,4,"List(1, 2, 3, 4)",12 - 8 = ?,Huston Rocket,"List(New York Bulls, Los Angeles Kings, Golden State Warriros, Huston Rocket)",Which one is correct team name in NBA?


In [0]:
spark.read.format("parquet")\
    .option("inferSchema", True)\
    .option("header", True)\
    .load("/mnt/dataset/example_3_flattened.parquet").display()

id,orderStatus,tags,timestamp,paymentDetails_amount,paymentDetails_method,paymentDetails_transactionId,user_email,user_name,user_userId,products_name,products_price,products_productId,products_quantity
1,completed,"List(electronics, online_sale)",2023-01-15T10:00:00Z,1250.0,credit_card,TXN12345,alice.s@example.com,Alice Smith,user_A,Laptop,1200.0,P101,1
1,completed,"List(electronics, online_sale)",2023-01-15T10:00:00Z,1250.0,credit_card,TXN12345,alice.s@example.com,Alice Smith,user_A,Mouse,25.0,P102,2
2,pending,List(peripherals),2023-01-15T11:30:00Z,375.0,paypal,TXN67890,bob.j@example.com,Bob Johnson,user_B,Keyboard,75.0,P201,1
2,pending,List(peripherals),2023-01-15T11:30:00Z,375.0,paypal,TXN67890,bob.j@example.com,Bob Johnson,user_B,Monitor,300.0,P202,1
