<a href="https://colab.research.google.com/github/MathMachado/Data_Engineering_With_PySpark/blob/main/Reading_JSON.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Reading JSON data

## Example 1: Reading simple json data

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode

# Create Spark session
spark = SparkSession.builder \
    .appName("Processing Nested JSON Data") \
    .getOrCreate()

# JSON data as a string
json_data = '''
{
  "id": 1,
  "nome": "Alice",
  "idade": 30,
  "endereco": {
    "rua": "Rua das Flores",
    "cidade": "Cidade Feliz",
    "pais": "Brasil"
  },
  "pedidos": [
    {
      "id_pedido": 101,
      "produto": "Camiseta",
      "quantidade": 2
    },
    {
      "id_pedido": 102,
      "produto": "Calça",
      "quantidade": 1
    }
  ]
}
'''

# Create DataFrame from JSON data
df = spark.read.json(spark.sparkContext.parallelize([json_data]))

# Select specific fields
df_select = df.select(
    col("id"),
    col("nome"),
    col("idade"),
    col("endereco.rua").alias("rua"),
    col("endereco.cidade").alias("cidade"),
    col("endereco.pais").alias("pais"),
    explode(col("pedidos")).alias("pedido") # Here we are exploding the column pedidos
)

# Select fields from nested array
df_result = df_select.select(
    col("id"),
    col("nome"),
    col("idade"),
    col("rua"),
    col("cidade"),
    col("pais"),
    col("pedido.id_pedido").alias("id_pedido"),
    col("pedido.produto"),
    col("pedido.quantidade")
)

# Show result
df_result.show(truncate=False)

## Example 2: Reading a more complex json data

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode

# Create Spark session
spark = SparkSession.builder \
    .appName("Processing Nested JSON Data") \
    .getOrCreate()

# JSON data as a string
json_data = '''
{
  "id": 1,
  "name": "John",
  "age": 35,
  "address": {
    "street": "123 Main St",
    "city": "Big City",
    "country": "USA"
  },
  "orders": [
    {
      "order_id": 101,
      "products": [
        {
          "product_id": 1,
          "name": "Laptop",
          "quantity": 1
        },
        {
          "product_id": 2,
          "name": "Phone",
          "quantity": 2
        }
      ]
    },
    {
      "order_id": 102,
      "products": [
        {
          "product_id": 3,
          "name": "Tablet",
          "quantity": 1
        }
      ]
    }
  ]
}
'''

# Create DataFrame from JSON data
df = spark.read.json(spark.sparkContext.parallelize([json_data]))

# Select specific fields
df_select = df.select(
    col("id"),
    col("name"),
    col("age"),
    col("address.street").alias("street"),
    col("address.city").alias("city"),
    col("address.country").alias("country"),
    explode(col("orders")).alias("order") # First explode
)

# Select fields from nested array
df_result = df_select.select(
    col("id"),
    col("name"),
    col("age"),
    col("street"),
    col("city"),
    col("country"),
    col("order.order_id").alias("order_id"),
    explode(col("order.products")).alias("product") # Second explode
)

# Select fields from nested struct
df_final = df_result.select(
    col("id"),
    col("name"),
    col("age"),
    col("street"),
    col("city"),
    col("country"),
    col("order_id"),
    col("product.product_id").alias("product_id"),
    col("product.name").alias("product_name"),
    col("product.quantity")
)

# Show result
df_final.show(truncate=False)

## Example 3: Even more complex example

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode

# Create Spark session
spark = SparkSession.builder \
    .appName("Processing More Complex Nested JSON Data") \
    .getOrCreate()

# JSON data as a string
json_data = '''
{
  "company": "Tech Corp",
  "employees": [
    {
      "id": 1,
      "name": "Alice",
      "department": "Engineering",
      "skills": ["Python", "Java", "SQL"],
      "projects": [
        {
          "id": 101,
          "name": "Project A",
          "status": "In Progress",
          "tasks": [
            {"id": 1, "description": "Implement feature X"},
            {"id": 2, "description": "Fix bug in module Y"}
          ]
        },
        {
          "id": 102,
          "name": "Project B",
          "status": "Completed",
          "tasks": [
            {"id": 1, "description": "Refactor codebase"},
            {"id": 2, "description": "Write unit tests"}
          ]
        }
      ]
    },
    {
      "id": 2,
      "name": "Bob",
      "department": "Marketing",
      "skills": ["Social Media", "Content Creation"],
      "projects": [
        {
          "id": 201,
          "name": "Campaign X",
          "status": "Planned",
          "tasks": [
            {"id": 1, "description": "Create ad copy"},
            {"id": 2, "description": "Design promotional graphics"}
          ]
        }
      ]
    }
  ]
}
'''

# Create DataFrame from JSON data
df = spark.read.json(spark.sparkContext.parallelize([json_data]))

# Explode employees array
df_exploded = df.select(explode(col("employees")).alias("employee"))

# Select fields from nested structs and arrays
df_result = df_exploded.select(
    col("employee.id").alias("employee_id"),
    col("employee.name").alias("employee_name"),
    col("employee.department").alias("employee_department"),
    explode(col("employee.skills")).alias("employee_skill"),
    col("employee.projects.id").alias("project_id"),
    col("employee.projects.name").alias("project_name"),
    col("employee.projects.status").alias("project_status"),
    explode(col("employee.projects.tasks")).alias("task")
)

# Select fields from nested struct
df_final = df_result.select(
    col("employee_id"),
    col("employee_name"),
    col("employee_department"),
    col("employee_skill"),
    col("project_id"),
    col("project_name"),
    col("project_status"),
    col("task.id").alias("task_id"),
    col("task.description").alias("task_description")
)

# Show result
df_final.show(truncate=False)