In [10]:
!apt-get update # Update apt-get repository.
!apt-get install openjdk-8-jdk-headless -qq > /dev/null # Install Java.
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz # Download Apache Sparks.
!tar xf spark-3.1.1-bin-hadoop3.2.tgz # Unzip the tgz file.
!pip install -q findspark # Install findspark. Adds PySpark to the System path during runtime.

# Set environment variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

!ls

# Initialize findspark
import findspark
findspark.init()

# Create a PySpark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark

0% [Working]            Hit:1 http://archive.ubuntu.com/ubuntu jammy InRelease
0% [Waiting for headers] [Connecting to security.ubuntu.com (185.125.190.36)] [Connected to cloud.r-                                                                                                    Get:2 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,626 B]
0% [Waiting for headers] [Connecting to security.ubuntu.com (185.125.190.36)] [2 InRelease 3,626 B/30% [Waiting for headers] [Connecting to security.ubuntu.com (185.125.190.36)] [Connecting to ppa.lau                                                                                                    Get:3 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [119 kB]
0% [3 InRelease 12.7 kB/119 kB 11%] [Connecting to security.ubuntu.com (185.125.190.36)] [Connecting                                                                                                    Get:4 https://developer.download.nvidia.com/compute

In [33]:
!pip install -q pip install fastjsonschema

In [31]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, ArrayType

# Inicializa a sessão Spark
spark = SparkSession.builder.appName("json_to_df").getOrCreate()

# Definir o esquema conforme discutido anteriormente
meta_schema = StructType([
    StructField("field1", IntegerType(), True),
    StructField("field2", StringType(), True)
])

data_element_schema = StructType([
    StructField("data1", StringType(), True),
    StructField("data2", IntegerType(), False)
])

data_schema = ArrayType(data_element_schema)

complete_schema = StructType([
    StructField("meta", meta_schema, True),
    StructField("data", data_schema, True)
])

# JSON fornecido
json_data = '{"meta": {"field1": 1, "field2": "a"}, "data": [{"data1": "a", "data2": 1}, {"data1": "a", "data2": 1}, {"data1": "a", "data2": 1},{"data1": "a"}]}'

# Criar um RDD com o JSON
rdd = spark.sparkContext.parallelize([json_data])

# Converter o RDD para DataFrame usando o esquema definido
df = spark.read.json(rdd, schema=complete_schema)

# Exibir o DataFrame
df.show()
df.printSchema()


+------+--------------------+
|  meta|                data|
+------+--------------------+
|{1, a}|[{a, 1}, {a, 1}, ...|
+------+--------------------+

root
 |-- meta: struct (nullable = true)
 |    |-- field1: integer (nullable = true)
 |    |-- field2: string (nullable = true)
 |-- data: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- data1: string (nullable = true)
 |    |    |-- data2: integer (nullable = false)



In [181]:
import fastjsonschema
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, ArrayType

# Inicializa a sessão Spark
spark = SparkSession.builder.appName("json_to_df").getOrCreate()
json_data = {"meta": {"field1": 1, "field2": "a"}, "data": [{"data2": 1}, {"data1": "a", "data2": 1}, {"data1": "a", "data2": 1},{"data1": "a", "data2": 1}]}
json_str_bad = '{"meta": {"field1": 1, "field2": "a"}, "data": [{"data1": "a", "data2": 1}, {"data1": "a", "data2": 1}, {"data1": "a", "data2": 1},{"data1": "a"}]}'
json_str_god = '{"meta": {"field1": 1, "field2": "a"}, "data": [{"data1": "a", "data2": 1}, {"data1": "a", "data2": 1}, {"data1": "a", "data2": 1},{"data1": "a", "data2": 1}]}'
json_str_ybad = '{"meta": {"field2": "a"}, "data": [{"data1": "a", "data2": 1}, {"data1": "a", "data2": 1}, {"data1": "a", "data2": 1},{"data1": "a"}]}'

data = [(1,"a", json_str_god),
        (1,"a", json_str_bad),
        (1,"a", json_str_god),
        (1,"a", json_str_god),
        (1,"a", json_str_ybad),
        (1,"a", json_str_god)]

# Definir o esquema conforme discutido anteriormente
meta_schema = StructType([
    StructField("key", IntegerType(), True),
    StructField("topic", StringType(), True),
    StructField("value", StringType(), True)
])

df = spark.createDataFrame(data, meta_schema)

df.show()
df.printSchema()


# from pyspark.sql.functions import get_json_object

# df = df.select("key",
#                get_json_object(df.value, "$.data").alias("data"))
# df.show()
# df.printSchema()


point_schema = {
  "$schema": "http://json-schema.org/draft-04/schema#",
  "type": "object",
  "properties": {
    "meta": {
      "type": "object",
      "properties": {
        "field1": {
          "type": "integer"
        },
        "field2": {
          "type": "string"
        }
      },
      "required": ["field1", "field2"]
    },
    "data": {
      "type": "array",
      "items": {
        "type": "object",
        "properties": {
          "data1": {
            "type": "string"
          },
          "data2": {
            "type": "integer"
          }
        },
        "required": ["data1", "data2"]
      }
    }
  },
  "required": ["meta", "data"]
}
point_validator = fastjsonschema.compile(point_schema)

try:
    point_validator(json_data)
except fastjsonschema.JsonSchemaException as e:
    print(dir(e))
    print(f"Data failed validation: {e}")
    print(e.name)
    print(e.path, type(e.path))
    print(e.rule)
    print(e.value)

+---+-----+--------------------+
|key|topic|               value|
+---+-----+--------------------+
|  1|    a|{"meta": {"field1...|
|  1|    a|{"meta": {"field1...|
|  1|    a|{"meta": {"field1...|
|  1|    a|{"meta": {"field1...|
|  1|    a|{"meta": {"field2...|
|  1|    a|{"meta": {"field1...|
+---+-----+--------------------+

root
 |-- key: integer (nullable = true)
 |-- topic: string (nullable = true)
 |-- value: string (nullable = true)

['__cause__', '__class__', '__context__', '__delattr__', '__dict__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__module__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__setstate__', '__sizeof__', '__str__', '__subclasshook__', '__suppress_context__', '__traceback__', '__weakref__', 'args', 'definition', 'message', 'name', 'path', 'rule', 'rule_definition', 'value', 'with_traceback']
Data failed validati

In [109]:
from pyspark.sql.types import *
from pyspark.sql.functions import udf, explode
import json

class SchemaConverter:
    """
    Class to convert standard JSON-schema (json-schema.org) to pyspark.sql.types.StructType.

    Supported types are: "string", "number" (double), "float", "integer" (long), "boolean", "object" and "array".
    The root of the JSON-schema is required to be:
     - type 'object' and to have a field 'properties' having the contents of the schema
     - or a field 'definitions' and '$ref'-field with reference to a main structure inside 'definitions'
    In order to use such a reference in the schema the corresponding field name should be '$ref'.
    The value is the address in the definitions, i.e. the path following the first occurrence of '#/definitions/'
    will be applied on the definitions field. So a reference will look like: "$ref": "#/definitions/path/to/struct"

    There isn't validation of input JSON-schema.
    """

    # Supported simple types
    SimpleTypeMap = {
        "string": StringType(),
        "number": DoubleType(),
        "float": FloatType(),
        "integer": IntegerType(),
        "boolean": BooleanType()
    }

    def __init__(self, js: dict):
        # Original standard JSON-schema (json-schema.org)
        self.json_schema = js
        # Main element to build schema
        if "$ref" in self.json_schema:
            self._main_element = self._get_ref_object(self.json_schema.get("$ref"))
        elif "properties" in self.json_schema:
            self._main_element = self.json_schema
        else:
            raise Exception("Format of JSON-Schema is not recognized by JsonToSparkSchemaConverter")
        # Converting schema
        self._build_schema_from_json()

    def _get_ref_object(self, ref: str) -> dict:
        """
        Get object via $ref
        :param ref: internal reference to element, f.e. '#/definitions/path_to_element'
        :return: element via reference
        """
        path = ref.split('/')
        result = self.json_schema
        for i in range(1, len(path)):
            result = result.get(path[i])
        return result

    def _get_simple_field(self, _type: str) -> StructType():
        """
        Returns StructType() for simple type
        """
        return self.SimpleTypeMap.get(_type)

    def _get_oneof_field(self, oneof_list: list) -> StructType():
        """
        Returns StructType() for 'oneOf' constructions.
        F.e. {"oneOf":[{"type":"null"},{"type":"integer"}]}
        """
        for item in oneof_list:
            if "type" in item.keys():
                if item.get("type") in self.SimpleTypeMap:
                    return self._get_simple_field(item.get("type"))
                elif item.get("type") == "array":
                    return self._get_array(item.get("items"))
                elif item.get("type") == "object":
                    return self._get_object(item.get("properties"))
            elif "$ref" in item.keys():
                obj = self._get_ref_object(item.get("$ref"))
                return self._get_object(obj.get("properties"))
        raise Exception('oneOf field is incorrect')

    def _get_array(self, _items: dict) -> StructType():
        """
        Returns StructType() for 'array'.
        F.e. {"type":"array","items":{"type":"string"}}
        """
        if "type" in _items:
            if _items.get("type") in self.SimpleTypeMap:
                return ArrayType(self.SimpleTypeMap.get(_items.get("type")))
            elif _items.get("type") == "array":
                return ArrayType(self._get_array(_items.get("items")))
            elif _items.get("type") == "object":
                return ArrayType(self._get_object(_items.get("properties")))
            else:
                raise Exception("Array element's type is not supported by JsonToSparkSchemaConverter")
        elif "$ref" in _items:
            obj = self._get_ref_object(_items.get("$ref"))
            array_type = self._get_object(obj.get("properties"))
            return ArrayType(array_type)
        else:
            raise Exception('Array type is not supported by JsonToSparkSchemaConverter')

    def _get_object(self, _properties: dict) -> StructType():
        """
        Returns StructType() for 'object'
        :param _properties: content of the 'properties'-field
        """
        result = StructType()
        for filed_name, field_info in _properties.items():
            if "type" in field_info:
                _type = field_info.get("type")
                if _type in self.SimpleTypeMap:
                    result = result.add(filed_name, self._get_simple_field(_type))
                elif _type == "array":
                    result = result.add(filed_name, self._get_array(field_info.get("items")))
                elif _type == "object":
                    result = result.add(filed_name, self._get_object(field_info.get("properties")))
            elif "oneOf" in field_info:
                result = result.add(filed_name, self._get_oneof_field(field_info.get("oneOf")))
            else:
                raise Exception('Type is not supported by JsonToSparkSchemaConverter')
        return result

    def _build_schema_from_json(self):
        """
        Main builder for schema
        """
        self.schema_for_spark = self._get_object(self._main_element.get("properties"))

converter = SchemaConverter(point_schema)
# print(converter.schema_for_spark)
schema = converter.schema_for_spark


@udf(schema)
def json_to_dict_list(json_str):
    try:
        return json.loads(json_str)
    except json.JSONDecodeError:
        return []

@udf(StringType())
def validate_schema(json_str):
  try:
      point_validator = fastjsonschema.compile(point_schema)
      point_validator(json.loads(json_str))
      return None
  except (json.JSONDecodeError, fastjsonschema.JsonSchemaException) as e:
      return f"Data failed validation: {e}"


point_validator = fastjsonschema.compile(point_schema)

# # Aplicar a UDF para transformar a coluna JSON em uma coluna de lista
df_ = df.withColumn("value_list", json_to_dict_list(df["value"]))
df_.show()
df_.printSchema()

df__ = df_.withColumn("schema_validate", validate_schema(df_["value"]))
df__.show(truncate=False)
df__.printSchema()
a = df__.select("value_list.meta","schema_validate", "value_list.data")
a.printSchema()

exploded_df = a.select("meta.field1", "schema_validate", explode(a.data).alias("data_item"))
exploded_df.show()


+---+-----+--------------------+--------------------+
|key|topic|               value|          value_list|
+---+-----+--------------------+--------------------+
|  1|    a|{"meta": {"field1...|{{1, a}, [{a, 1},...|
|  1|    a|{"meta": {"field1...|{{1, a}, [{a, 1},...|
|  1|    a|{"meta": {"field1...|{{1, a}, [{a, 1},...|
|  1|    a|{"meta": {"field1...|{{1, a}, [{a, 1},...|
|  1|    a|{"meta": {"field1...|{{1, a}, [{a, 1},...|
+---+-----+--------------------+--------------------+

root
 |-- key: integer (nullable = true)
 |-- topic: string (nullable = true)
 |-- value: string (nullable = true)
 |-- value_list: struct (nullable = true)
 |    |-- meta: struct (nullable = true)
 |    |    |-- field1: integer (nullable = true)
 |    |    |-- field2: string (nullable = true)
 |    |-- data: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- data1: string (nullable = true)
 |    |    |    |-- data2: integer (nullable = true)

+---+-----+--------

In [182]:
from pyspark.sql.functions import col, explode

# Assumindo que 'point_schema' e 'df' já estão definidos
point_validator = fastjsonschema.compile(point_schema)

@udf(schema)
def json_to_dict_list(json_str):
    try:
        return json.loads(json_str)
    except json.JSONDecodeError:
        return []

@udf(StringType())
def validate_schema(json_str):
    try:
        point_validator(json.loads(json_str))
        return None
    except (json.JSONDecodeError, fastjsonschema.JsonSchemaException) as e:
        return '.'.join(e.path)


# Aplicar as transformações e explodir o DataFrame em uma única cadeia de operações
exploded_df = (df
               .withColumn("value_list", json_to_dict_list(col("value")))
               .withColumn("schema_validate", validate_schema(col("value")))
               .select("value_list.meta.field1", "schema_validate", explode("value_list.data").alias("data_item")))

# Exibir o resultado final (opcional para depuração)
exploded_df.show()

+----------------------+---------------+---------+
|value_list.meta.field1|schema_validate|data_item|
+----------------------+---------------+---------+
|                     1|           null|   {a, 1}|
|                     1|           null|   {a, 1}|
|                     1|           null|   {a, 1}|
|                     1|           null|   {a, 1}|
|                     1|    data.data.3|   {a, 1}|
|                     1|    data.data.3|   {a, 1}|
|                     1|    data.data.3|   {a, 1}|
|                     1|    data.data.3|{a, null}|
|                     1|           null|   {a, 1}|
|                     1|           null|   {a, 1}|
|                     1|           null|   {a, 1}|
|                     1|           null|   {a, 1}|
|                     1|           null|   {a, 1}|
|                     1|           null|   {a, 1}|
|                     1|           null|   {a, 1}|
|                     1|           null|   {a, 1}|
|                  null|      d

In [183]:
from pyspark.sql.functions import col, explode, from_json, monotonically_increasing_id
from pyspark.sql.types import StringType

# Utilizar from_json para converter a string JSON em uma coluna struct
json_df = df.withColumn("index", monotonically_increasing_id()).withColumn("json_struct", from_json(col("value"), schema))

# Extrair campos necessários da struct JSON
extracted_df = json_df.select(
    col("index"),
    col("json_struct.*"),
    col("value"),
    validate_schema(col("value")).alias('is_valid')
)
extracted_df.show()
extracted_df.printSchema()
# extracted_df.select(col(extracted_df.is_valid))
# Explodir o DataFrame com base no array 'data'
exploded_df = extracted_df.select(
    col("index"),
    col("meta.field1"),
    explode(col("data")).alias("data_item"),
    col("value")
)

# Validar schema apenas nos dados necessários
validated_df = exploded_df.withColumn(
    "schema_validate",
    validate_schema(col("value"))
).drop("value")

# Exibir o resultado (com limitação para depuração)
validated_df.select("*", "data_item.data2").show(truncate=False, n=20)

validated_df.printSchema()


+----------+---------+--------------------+--------------------+-----------+
|     index|     meta|                data|               value|   is_valid|
+----------+---------+--------------------+--------------------+-----------+
|         0|   {1, a}|[{a, 1}, {a, 1}, ...|{"meta": {"field1...|       null|
|         1|   {1, a}|[{a, 1}, {a, 1}, ...|{"meta": {"field1...|data.data.3|
|         2|   {1, a}|[{a, 1}, {a, 1}, ...|{"meta": {"field1...|       null|
|8589934592|   {1, a}|[{a, 1}, {a, 1}, ...|{"meta": {"field1...|       null|
|8589934593|{null, a}|[{a, 1}, {a, 1}, ...|{"meta": {"field2...|  data.meta|
|8589934594|   {1, a}|[{a, 1}, {a, 1}, ...|{"meta": {"field1...|       null|
+----------+---------+--------------------+--------------------+-----------+

root
 |-- index: long (nullable = false)
 |-- meta: struct (nullable = true)
 |    |-- field1: integer (nullable = true)
 |    |-- field2: string (nullable = true)
 |-- data: array (nullable = true)
 |    |-- element: struct (con