In [0]:
# All imports go here
import pyspark
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, ArrayType  

# Function to flatten a nested schema
def flatten(df):
    connector = '-'  # Separator for flattened column names
    
    # Recursive function to flatten a schema
    def flattenSchema(schema, prefix=None):
        fields = []
        
        for field in schema.fields:
            # Check if the field name contains dot notation
            if '.' in field.name:
                name = prefix + '.' + '`' + field.name + '`' if prefix else '`' + field.name + '`'
            else:
                name = prefix + '.' + field.name if prefix else field.name
            dtype = field.dataType
            
            # If the field is an array, get its elementType
            if isinstance(dtype, ArrayType):
                dtype = dtype.elementType
                
            # If the field is a struct, recursively flatten its schema
            if isinstance(dtype, StructType):
                fields += flattenSchema(dtype, prefix=name)
            else:
                fields.append(name)
        return fields
    
    newDf = df
    
    # Flatten the schema and create flattened columns
    for col_name in flattenSchema(df.schema):
        newDf = newDf.withColumn(col_name.replace('`', '').replace('.', connector), col(col_name))
    
    # Drop original struct columns from the DataFrame
    for field in newDf.schema:
        if isinstance(field.dataType, StructType):
            newDf = newDf.drop(field.name)
            
    newDf.show()  # Show the flattened DataFrame
    return newDf

# Initialize Spark session
spark = pyspark.sql.SparkSession.builder.appName("Flatten_JSON").getOrCreate()

# Load JSON file into a DataFrame
df = spark.read.format("json").load("dbfs:/FileStore/shared_uploads/chaitu.wynbit@outlook.com/exampleEliteJSON-1.json", multiLine=True)

# Apply the flatten function to flatten the DataFrame
flattened_df = flatten(df)


+--------------------+------------+--------------------+--------------------+-------------------------+----------------+--------------+------+--------------------+-----------------------+----------------+------------+---------------+------+--------+---------------+--------------+--------------------+---------------+---------------------+--------------------+-------------------+---------------------------+------------------------+----------------+---------------------------------+----------------------------+---------------------------------------------+-----------------------------+------------------------------------------+----------------------------------------------------------------+-------------------------------------------------------------------+-----------------------------------------------------------------------+---------------------------+-----------------+-----------------+------------------+-----------------------------------+--------------------------------------+-----

In [0]:
flattened_df.columns

['AgencyID',
 'AgencyNumber',
 'AuditEnabled',
 'CreatedByAuthor',
 'FormHierarchyCollectionID',
 'IncidentOriginID',
 'IncidentTypeID',
 'Locked',
 'OID',
 'PatientCareReportNumber',
 'SoftwareCreator',
 'SoftwareName',
 'SoftwareVersion',
 'Status',
 'Validity',
 '__lake_clientId',
 '__lake_dataset',
 '__lake_lastModified',
 '__lake_sourceId',
 '__lake_sourcePipeline',
 '__lake_sourceversion',
 '__lake_timestamp',
 '__lake_universal_identifier',
 '__lake_universal_integer',
 '__lake_yearMonth',
 'FinishedIncident-FinishedDateTime',
 'FinishedIncident-PerformerID',
 'IncidentIncidentNumberModValue-IncidentNumber',
 'IncidentTime-DispatchNotified',
 'IncidentTime-IncidentTimePSAPModValue-PSAP',
 'Scene-Exposures-ExposureProtectiveEquipments-ProtectiveEquipment',
 'Scene-Exposures-ExposureSuspectedExposureModValue-SuspectedExposure',
 'Scene-Exposures-ExposureTypes-ExposureTypeExposureTypeModValue-NotValue',
 'Scene-Exposures-LicensureID',
 'Scene-GPSLatitude',
 'Scene-GPSLocation',
 'S

In [0]:
flattened_df.count()

1

In [0]:
for column in flattened_df.columns:
    first_value = flattened_df.select(col(column)).first()[0]
    
    if isinstance(first_value, list):
        print("column name",column)
        print("value of the columns",first_value)

column name Scene-Exposures-ExposureProtectiveEquipments-ProtectiveEquipment
value of the columns [['PersonalProtectiveEquipmentUsage_EyeProtection', 'PersonalProtectiveEquipmentUsage_Gloves', 'PersonalProtectiveEquipmentUsage_MaskN95']]
column name Scene-Exposures-ExposureSuspectedExposureModValue-SuspectedExposure
value of the columns ['EMSWorkRelatedHarmModifier_No']
column name Scene-Exposures-ExposureTypes-ExposureTypeExposureTypeModValue-NotValue
value of the columns [['TypeOfWorkRelatedHarmModifier_NotValue_NotApplicable']]
column name Scene-Exposures-LicensureID
value of the columns ['1122334455']
column name Scene-Response-CrewMembers-CrewMemberAgencyPerformerIDModValue-LicensureID
value of the columns ['1122334455', '214']
column name Scene-Response-CrewMembers-CrewMemberAgencyPerformerIDModValue-PerformerID
value of the columns ['ed8ad2d5-f0a4-447f-b3ef-850e40ec9f7b', '2724f3bd-c30d-4d7f-8b55-685eca6b2ae2']
column name Scene-Response-CrewMembers-CrewMemberCrewMemberLevelModV

In [0]:
from pyspark.sql import functions as F

flattened_df = flattened_df.withColumn(
    'Scene-Response-Patient-Vitals-VitalsGCSTotalModValue-GCSTotal',
    F.when(
        (F.array_contains(F.col('Scene-Response-Patient-Vitals-VitalsGCSTotalModValue-GCSTotal'), 3)) | 
        (F.array_contains(F.col('Scene-Response-Patient-Vitals-VitalsGCSTotalModValue-GCSTotal'), 8)),
        'severe'
    )
    .when(
        (F.array_contains(F.col('Scene-Response-Patient-Vitals-VitalsGCSTotalModValue-GCSTotal'), 9)) | 
        (F.array_contains(F.col('Scene-Response-Patient-Vitals-VitalsGCSTotalModValue-GCSTotal'), 12)),
        'moderate'
    )
    .when(
        (F.array_contains(F.col('Scene-Response-Patient-Vitals-VitalsGCSTotalModValue-GCSTotal'), 13)) | 
        (F.array_contains(F.col('Scene-Response-Patient-Vitals-VitalsGCSTotalModValue-GCSTotal'), 14)),
        'mild'
    )
    .otherwise('normal')
)


In [0]:
flattened_df.write.parquet("Flattened_json.parquet")

In [0]:
# mv Flattened_json.parquet /Workspace/Users/chaitu.wynbit@outlook.com/data