In [1]:
import os
import findspark

findspark.init(r'C:\spark\spark-3.4.3-bin-hadoop3')

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode
from pyspark.sql.types import StructType, StructField, ArrayType, IntegerType, StringType
from pyspark.sql.types import *
from pyspark.sql.functions import *

spark = SparkSession.builder.appName("Flatten JSON Structures").getOrCreate()
data = [{
    "id": 1,
    "name": "Alice",
    "info": {
        "age": 30,
        "emails": ["alice@example.com", "alice@yahoo.com"],
    },
    "address":{
        "city": "naperville",
        "zip":[605,705]

    }
}, {
    "id": 2,
    "name": "Bob",
    "info": {
        "age": 25,
        "emails": ["bob@example.com"],
    },
    "address":{
        "city": "naperville",
        "zip":[605,705]

    }
}]
schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("info", StructType([
        StructField("age", IntegerType(), True),
        StructField("emails", ArrayType(StringType()), True)
    ]), True),
    StructField("address", StructType([
        StructField("city", StringType(), True),
        StructField("zip", ArrayType(IntegerType()), True)
    ]), True)
])

# Create a DataFrame
df = spark.createDataFrame(data, schema)
df.show()

driver_cores = spark.sparkContext.getConf().get("spark.driver.cores")
num_executors = spark.sparkContext.getConf().get("spark.executor.instances")
print(driver_cores,num_executors)




+---+-----+--------------------+--------------------+
| id| name|                info|             address|
+---+-----+--------------------+--------------------+
|  1|Alice|{30, [alice@examp...|{naperville, [605...|
|  2|  Bob|{25, [bob@example...|{naperville, [605...|
+---+-----+--------------------+--------------------+

None None


In [2]:
def flatten(df):
   #compute Complex Fields (Lists and Structs) in Schema   
   complex_fields = dict([(field.name, field.dataType)
                             for field in df.schema.fields
                             if type(field.dataType) == ArrayType or  type(field.dataType) == StructType])
   print("Complex Fields:", complex_fields)
   print(list(complex_fields.keys())[0])
   while len(complex_fields)!=0:
      col_name=list(complex_fields.keys())[0]
      print ("Processing :"+col_name+" Type : "+str(type(complex_fields[col_name])))
    
      #if StructType then convert all sub element to columns.
      #i.e. flatten structs
      if (type(complex_fields[col_name]) == StructType):
         e= [ f.name for f in  complex_fields[col_name]]
         print(e) 
         expanded = [col(col_name+'.'+k).alias(col_name+'_'+k) for k in [ n.name for n in  complex_fields[col_name]]]
         print("expanded---->",expanded)
         df=df.select("*", *expanded).drop(col_name)
         print('df--->',df)
         df.show() 
      #if ArrayType then add the Array Elements as Rows using the explode function
      #i.e. explode Arrays
      elif (type(complex_fields[col_name]) == ArrayType):    
         df=df.withColumn(col_name,explode_outer(col_name))
    
      #recompute remaining Complex Fields in Schema       
      complex_fields = dict([(field.name, field.dataType)
                             for field in df.schema.fields
                             if type(field.dataType) == ArrayType or  type(field.dataType) == StructType])
   return df
   
df_flatten = flatten(df)
df_flatten.show()
# display(df_flatten)

Complex Fields: {'info': StructType([StructField('age', IntegerType(), True), StructField('emails', ArrayType(StringType(), True), True)]), 'address': StructType([StructField('city', StringType(), True), StructField('zip', ArrayType(IntegerType(), True), True)])}
info
Processing :info Type : <class 'pyspark.sql.types.StructType'>
['age', 'emails']
expanded----> [Column<'info.age AS info_age'>, Column<'info.emails AS info_emails'>]
df---> DataFrame[id: int, name: string, address: struct<city:string,zip:array<int>>, info_age: int, info_emails: array<string>]
+---+-----+--------------------+--------+--------------------+
| id| name|             address|info_age|         info_emails|
+---+-----+--------------------+--------+--------------------+
|  1|Alice|{naperville, [605...|      30|[alice@example.co...|
|  2|  Bob|{naperville, [605...|      25|   [bob@example.com]|
+---+-----+--------------------+--------+--------------------+

Processing :address Type : <class 'pyspark.sql.types.Struc

In [3]:
driver_cores = spark.sparkContext.getConf().get("spark.driver.cores")
num_executors = spark.sparkContext.getConf().get("spark.executor.instances")
print(driver_cores,num_executors)

None None
