https://medium.com/@thomaspt748/how-to-flatten-json-files-dynamically-using-apache-pyspark-c6b1b5fd4777

In [1]:
from typing import Final, Dict, Tuple

from pyspark.sql.session import SparkSession
from pyspark.sql import DataFrame as SDF
from pyspark.sql.functions import *
from pyspark.sql.types import StructType, ArrayType

StatementMeta(, 323c3e99-65c5-4d79-9911-d10d8c951d75, 3, Finished, Available)

In [2]:
def rename_dataframe_cols(df: SDF, col_names: Dict[str, str]) -> SDF:
    """
    Rename all columns in dataframe
    """
    return df.select(*[col(col_name).alias(col_names.get(col_name, col_name)) for col_name in df.columns])

def update_column_names(df: SDF, index: int) -> SDF:
    df_temp = df
    all_cols = df_temp.columns
    new_cols = dict((column, f"{column}*{index}") for column in all_cols)
    df_temp = df_temp.transform(lambda df_x: rename_dataframe_cols(df_x, new_cols))

    return df_temp


StatementMeta(, 323c3e99-65c5-4d79-9911-d10d8c951d75, 4, Finished, Available)

In [3]:
def flatten_json(df_arg: SDF, index: int = 1) -> SDF:
    """
    Flatten Json in a spark dataframe using recursion
    """
	# Update all column names with index 1
    df = update_column_names(df_arg, index) if index == 1 else df_arg

	# Get all field names fron the dataframe
    fields = df.schema.fields

	# For all columns in the dataframe
    for field in fields:
        data_type = str(field.dataType)
        column_name = field.name

        first_10_chars = data_type[0:10]
	
        # If it is an Array column
        if first_10_chars == 'ArrayType(':
            # Explode Array column
            df_temp = df.withColumn(column_name, explode_outer(col(column_name)))
            return flatten_json(df_temp, index + 1)

        # If it is a json object
        elif first_10_chars == 'StructType':
            current_col = column_name
            
            append_str = current_col

            # Get data type of current column
            data_type_str = str(df.schema[current_col].dataType)

            # Change the column name if the current column name exists in the data type string
            df_temp = df.withColumnRenamed(column_name, column_name + "#1") \
                if column_name in data_type_str else df
            current_col = current_col + "#1" if column_name in data_type_str else current_col

            # Expand struct column values
            df_before_expanding = df_temp.select(f"{current_col}.*")
            newly_gen_cols = df_before_expanding.columns

            # Find next level value for the column
            begin_index = append_str.rfind('*')
            end_index = len(append_str)
            level = append_str[begin_index + 1: end_index]
            next_level = int(level) + 1

            # Update column names with new level
            custom_cols = dict((field, f"{append_str}->{field}*{next_level}") for field in newly_gen_cols)
            df_temp2 = df_temp.select("*", f"{current_col}.*").drop(current_col)
            df_temp3 = df_temp2.transform(lambda df_x: rename_dataframe_cols(df_x, custom_cols))
            return flatten_json(df_temp3, index + 1)

    return df

StatementMeta(, 323c3e99-65c5-4d79-9911-d10d8c951d75, 5, Finished, Available)

In [None]:
if __name__ == "__main__":
    spark = SparkSession \
        .builder \
        .appName("FlatJson") \
        .master("local[*]").getOrCreate()

In [4]:
sample_json = """
[
	{
		"id": "0001",
		"type": "donut",
		"name": "Cake",
		"ppu": 0.55,
		"batters":
			{
				"batter":
					[
						{ "id": "1001", "type": "Regular" },
						{ "id": "1002", "type": "Chocolate" },
						{ "id": "1003", "type": "Blueberry" },
						{ "id": "1004", "type": "Devil's Food" }
					]
			},
		"topping":
			[
				{ "id": "5001", "type": "None" },
				{ "id": "5002", "type": "Glazed" },
				{ "id": "5005", "type": "Sugar" },
				{ "id": "5007", "type": "Powdered Sugar" },
				{ "id": "5006", "type": "Chocolate with Sprinkles" },
				{ "id": "5003", "type": "Chocolate" },
				{ "id": "5004", "type": "Maple" }
			]
	},
	{
		"id": "0002",
		"type": "donut",
		"name": "Raised",
		"ppu": 0.55,
		"batters":
			{
				"batter":
					[
						{ "id": "1001", "type": "Regular" }
					]
			},
		"topping":
			[
				{ "id": "5001", "type": "None" },
				{ "id": "5002", "type": "Glazed" },
				{ "id": "5005", "type": "Sugar" },
				{ "id": "5003", "type": "Chocolate" },
				{ "id": "5004", "type": "Maple" }
			]
	},
	{
		"id": "0003",
		"type": "donut",
		"name": "Old Fashioned",
		"ppu": 0.55,
		"batters":
			{
				"batter":
					[
						{ "id": "1001", "type": "Regular" },
						{ "id": "1002", "type": "Chocolate" }
					]
			},
		"topping":
			[
				{ "id": "5001", "type": "None" },
				{ "id": "5002", "type": "Glazed" },
				{ "id": "5003", "type": "Chocolate" },
				{ "id": "5004", "type": "Maple" }
			]
	}
]
"""

StatementMeta(, 323c3e99-65c5-4d79-9911-d10d8c951d75, 6, Finished, Available)

In [5]:
sc = spark.sparkContext
tsRDD = sc.parallelize([sample_json])

df2 = spark.read.option("multiline", "true").json(tsRDD)

df2.show(10, False)

df3 = flatten_json(df2)

df3.show()

StatementMeta(, 323c3e99-65c5-4d79-9911-d10d8c951d75, 7, Finished, Available)

+-------------------------------------------------------------------------------+----+-------------+----+-----------------------------------------------------------------------------------------------------------------------------------------+-----+
|batters                                                                        |id  |name         |ppu |topping                                                                                                                                  |type |
+-------------------------------------------------------------------------------+----+-------------+----+-----------------------------------------------------------------------------------------------------------------------------------------+-----+
|{[{1001, Regular}, {1002, Chocolate}, {1003, Blueberry}, {1004, Devil's Food}]}|0001|Cake         |0.55|[{5001, None}, {5002, Glazed}, {5005, Sugar}, {5007, Powdered Sugar}, {5006, Chocolate with Sprinkles}, {5003, Chocolate}, {5004, Maple}]|donut|
