In [1]:
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.types import StructType

spark = SparkSession.builder.getOrCreate()

In [5]:
"""Read json from dbfs or local."""

from json import load
from os import path


def abs_to_rel(filepath: str) -> str:
    """Convert `/home` to `home`."""
    return ''.join(filepath[1:]) if filepath[0] == '/' else filepath


def local_file_api_path(filepath: str) -> str:
    """Convert `/home` to `dbfs/home` if exists."""
    dbfsfilepath = path.join('dbfs', abs_to_rel(filepath))
    return dbfsfilepath if path.exists(dbfsfilepath) else filepath


def load_json(filepath: str) -> dict:
    """Load json local file api."""
    with open(local_file_api_path(filepath), 'r') as f:
        return load(f)


meta = load_json('../mnt/houses.json')
glom(meta, 'input.options', default={})

{'basePath': '../mnt/houses/', 'header': True, 'sep': ','}

In [6]:
from glom import glom

def read(metajsonpath: str) -> DataFrame:
    """Read folder containing metadata."""
    meta = load_json(metajsonpath)
    format = glom(meta, 'input.format', default='text')
    schema = glom(meta, 'input.schema', default=None)
    fields = glom(meta, 'fields', default=[])
    path = glom(meta, 'input.path')
    return (
        spark
        .read
        .load(
            path=path,
            format=format,
            schema=StructType.fromJson(schema) if schema else schema,
            **glom(meta, 'input.options', default={})
        )
    )


df_houses = read('../mnt/houses.json')
df_houses.printSchema()

df_friends = read('../mnt/friends.json')
df_friends.printSchema()

root
 |-- id: long (nullable = true)
 |-- city: string (nullable = true)

root
 |-- firstname: string (nullable = true)
 |-- id: long (nullable = true)
 |-- lastname: string (nullable = true)

