In [2]:
import pandas as pd
from pyspark.sql.types import ArrayType, StructType
from pyspark.sql.functions import explode_outer, col, arrays_zip
import os
from pyspark.sql.functions import pandas_udf, explode

ModuleNotFoundError: No module named 'pyspark'

In [None]:
# File location and type
file_location = "/FileStore/tables/Patient_1.ndjson"
file_type = "ndjson"

In [1]:
dbutils.fs.ls("dbfs:" + file_location)
df = spark.read.json("dbfs:" + file_location)

NameError: name 'dbutils' is not defined

In [0]:
'''
Transforms a data frame which contains struct columns into a data frame where
each such column is expanded into multiple columns, one for each unique value
in the original struct. Columns are prefaced with the parent name for organization.
'''
def flatten_structs(nested_df):
    stack = [((), nested_df)]
    columns = []

    while len(stack) > 0:
        # Grab columns that are already flat and move to final df
        parents, df = stack.pop()
        flat_cols = [col(".".join(parents + (c[0],))).alias("_".join(parents + (c[0],))) for c in df.dtypes if c[1][:6] != "struct"]
        columns.extend(flat_cols)
        
        # Recursively collect columns within each struct, searching until
        # the projection is flat
        nested_cols = [c[0] for c in df.dtypes if c[1][:6] == "struct" ]    
        for nc in nested_cols:
            projected_df = df.select(nc + ".*")
            stack.append((parents + (nc,), projected_df))
            
    return nested_df.select(columns)

In [0]:
'''
Transforms a spark dataframe into an "exploded" data frame. To explode a data frame 
means to add a new row containing identical identifying information to a base row 
for each array-element encountered in columns whose fields are structs. The result
is returned as a dataframe.
'''
def explode_arrays(df):
    flat_cols = [field.name for field in df.schema.fields if type(field.dataType) != ArrayType]
    struct_cols = [field.name for field in df.schema.fields if type(field.dataType) == ArrayType]
    
    # Add new rows for each array element, then select and join with columns
    # that are already flat
    exploded_df = df.withColumn('vals', explode_outer(arrays_zip(*struct_cols))) \
           .select(*flat_cols,'vals.*') \
           .fillna('', subset=struct_cols)
    return exploded_df

In [0]:
'''
A wrapper function to perform full flattening of an input spark data frame.
Struct columns are separated out into multiple columns, and then the intermediate
expanded data frame is exploded so that each new element encountered generates a 
new row.
'''
def flatten_df(dfflat):
    while len([field.name for field in dfflat.schema.fields if type(field.dataType) == StructType or type(field.dataType) == ArrayType ]) !=0 :
        dfflat = flatten_structs(dfflat)
        dfflat = explode_arrays(dfflat)
    return dfflat

In [0]:
'''
Generates a DDL "create-table" string from a flattened spark dataframe. The result
of this function is a statement of the form:

  CREATE TABLE [table_name] (
    [field1] nvarchar(50),
    [field2] nvarchar(50),
    ...
  )
  
For type safety reasons, all fields in the created DDL are saved as nvarchar(50).
'''
def generate_ddl(df, tblname):
    createtbl = 'CREATE TABLE [' + tblname +  '] ( \n'
    num_columns = len(dfflat.columns)
    for i, y in enumerate(dfflat.columns):
        if i == num_columns-1:
            column_name = '\t['+ y + '] nvarchar(50)); \n\n'
        else:
            column_name = '\t[' + y + '] nvarchar(50), \n'
        createtbl += column_name
    return createtbl

In [3]:
dfflat = flatten_df(df)

NameError: name 'flatten_df' is not defined

In [0]:
ddl = generate_ddl(dfflat, "patient")
print(ddl)