PySpark code to flatten any complex nested JSON/CSV/DataFrames/SQLSchema.

For example, lets look at nested JSONs -

*   Flattens all nested items:
{
"human":{
    "name":{
        "first_name":"Jay Lohokare"
      }
   }
}

Is converted to dataFrame with column = 'human-name-first_name'
The connector '-' can be changed by changing the connector variable.


*   Explodes Arrays:
{
"array":["one", "two", "three"]
}
Is converted to dataFrame with column = 'array' with 3 rows
<br><br>

The function can handle any level of nesting.

The function can <b>NOT</b> handle Arrays within Arrays. 
This is just to keep the code dynamic and generic. To handle Arrays within Arrays, modify ```
if isinstance``` in the ```for``` loop of ```flattenSchema``` function


In [0]:
#All imports go here
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, ArrayType  
    

In [0]:
#Read data
#This example reads data from JSONs
#However, dataframe can be loaded from any input format
dataFrame = sqlContext.read.json("PATH-TO-JSONs")

In [0]:
def relationalize(df):
    connector = '-'
    def flattenSchema(schema, prefix=None):
        fields = []
        
        for field in schema.fields:
          
            #The field.name is added with `` to handle the case where prohibited characters like '.' are a part of the JSON key
            if '.' in field.name:
                name = prefix + '.' + '`' + field.name + '`' if prefix else '`' + field.name + '`'
            else:
                name = prefix + '.' + field.name if prefix else field.name
            dtype = field.dataType
            
            if isinstance(dtype, ArrayType):
                dtype = dtype.elementType
                
    
            if isinstance(dtype, StructType):
                fields += flattenSchema(dtype, prefix=name)
                
            else:
                fields.append(name)
        return fields
    
    newDf = df
    
    for col_name in flattenSchema(df.schema):
        newDf = newDf.withColumn(col_name.replace('`', '').replace('.', connector), col(col_name))
    
    for field in newDf.schema:
        if isinstance(field.dataType, StructType):
            newDf = newDf.drop(field.name)
        elif isinstance(field.dataType, ArrayType):
            from pyspark.sql.functions import explode_outer
            newDf = newDf.withColumn(field.name, explode_outer(field.name))
            
        else:
            continue
            
    newDf.show()
    return newDf

In [0]:
#Get the flattened dataframe
flattenedDf = relationalize(dataFrame)

In [0]:
#Writing the data back
#This is example for writing JSON
#However, Dataframes can be stored into many other data formats
flattenedDf.write.json("PATH-TO-WRITE-JSONs")
