### Reading json data with an inferred schema

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = (SparkSession.builder
         .appName("read-json-data")
         .master("spark://spark-master:7077")
         .config("spark.executor.memory", "512m")
         .getOrCreate())

spark.sparkContext.setLogLevel("ERROR")

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/01/17 23:02:27 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
# Read JSON file into a DataFrame
df = (spark.read.format("json")
      .option("multiLine", "true")
      .load("../data/nobel_prizes.json"))

                                                                                

In [3]:
df.printSchema()

root
 |-- category: string (nullable = true)
 |-- laureates: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- firstname: string (nullable = true)
 |    |    |-- id: string (nullable = true)
 |    |    |-- motivation: string (nullable = true)
 |    |    |-- share: string (nullable = true)
 |    |    |-- surname: string (nullable = true)
 |-- overallMotivation: string (nullable = true)
 |-- year: string (nullable = true)



In [4]:
# Display contents of DataFrame
df.show()

[Stage 1:>                                                          (0 + 1) / 1]

+----------+--------------------+--------------------+----+
|  category|           laureates|   overallMotivation|year|
+----------+--------------------+--------------------+----+
| chemistry|[{Carolyn, 1015, ...|                null|2022|
| economics|[{Ben, 1021, "for...|                null|2022|
|literature|[{Annie, 1017, "f...|                null|2022|
|     peace|[{Ales, 1018, "Th...|                null|2022|
|   physics|[{Alain, 1012, "f...|                null|2022|
|  medicine|[{Svante, 1011, "...|                null|2022|
| chemistry|[{Benjamin, 1002,...|                null|2021|
| economics|[{David, 1007, "f...|                null|2021|
|literature|[{Abdulrazak, 100...|                null|2021|
|     peace|[{Maria, 1005, "f...|                null|2021|
|   physics|[{Syukuro, 999, "...|"for groundbreaki...|2021|
|  medicine|[{David, 997, "fo...|                null|2021|
| chemistry|[{Emmanuelle, 991...|                null|2020|
| economics|[{Paul, 995, "for...|       

                                                                                

In [5]:
df_flattened = (
    df
    .withColumn("laureates",explode(col("laureates"))) # Explode the laureates array column into rows
    .select(col("category")
            , col("year")
            , col("overallMotivation")
            , col("laureates.id")
            , col("laureates.firstname")
            , col("laureates.surname")
            , col("laureates.share")
            , col("laureates.motivation"))) # Use dot notion for columns in the STRUCT field

df_flattened.show(truncate=False)

+----------+----+-----------------+----+--------------------------+-----------+-----+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|category  |year|overallMotivation|id  |firstname                 |surname    |share|motivation                                                                                                                                                                                                                                                                                                                                                                              |
+----------+----+-----------------+----+--------------------------+-------

In [6]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

json_schema = StructType(
    [StructField('category', StringType(), True),
     StructField('laureates', ArrayType(StructType(
         [StructField('firstname', StringType(), True), 
          StructField('id', StringType(), True), 
          StructField('motivation', StringType(), True), 
          StructField('share', StringType(), True), 
          StructField('surname', StringType(), True)
          ]), True), True),
     StructField('overallMotivation', StringType(), True), 
     StructField('year', IntegerType(), True)])

json_df_with_schema = (
    spark.read.format("json")
    .schema(json_schema)
    .option("multiLine", "true")
    .option("mode", "PERMISSIVE")
    .option("columnNameOfCorruptRecord", "corrupt_record")
    .load("../data/nobel_prizes.json"))

### `get_json_object()` and `json_tuple()` functions

In [7]:
from pyspark.sql.functions import get_json_object
from pyspark.sql.types import StringType

# create a DataFrame with a JSON string column
df = spark.createDataFrame([
  (1, '{"name": "Alice", "age": 25}'),
  (2, '{"name": "Bob", "age": 30}')
], ["id", "json_data"])

# extract the "name" field from the JSON string column
name_df = df.select(get_json_object("json_data", "$.name").alias("name"))

# cast the extracted value to a string
name_str_df = name_df.withColumn("name_str", name_df["name"].cast(StringType()))

name_str_df.show()

[Stage 4:>                                                          (0 + 1) / 1]

+-----+--------+
| name|name_str|
+-----+--------+
|Alice|   Alice|
|  Bob|     Bob|
+-----+--------+



                                                                                

In [8]:
from pyspark.sql.functions import json_tuple

# create a DataFrame with a JSON string column
df = spark.createDataFrame([
  (1, '{"name": "Alice", "age": 25}'),
  (2, '{"name": "Bob", "age": 30}')
], ["id", "json_data"])

# extract the "name" and "age" fields from the JSON string column
name_age_df = df.select(json_tuple("json_data", "name", "age").alias("name", "age"))

name_age_df.show()

+-----+---+
| name|age|
+-----+---+
|Alice| 25|
|  Bob| 30|
+-----+---+



### `flatten()` and `collect_list()` functions

In [16]:
from pyspark.sql.functions import flatten, collect_list

# create a DataFrame with an array of arrays column
df = spark.createDataFrame([
  (1, [[1, 2], [3, 4], [5, 6]]),
  (2, [[7, 8], [9, 10], [11, 12]])
], ["id", "data"])

# use collect_list() function to group by specified columns
collect_df = df.select(collect_list("data").alias("data"))
collect_df.show(truncate=False)

+-------------------------------------------------------+
|data                                                   |
+-------------------------------------------------------+
|[[[7, 8], [9, 10], [11, 12]], [[1, 2], [3, 4], [5, 6]]]|
+-------------------------------------------------------+



In [17]:
# use flatten() function to merge all the elements of the inner arrays
flattened_df = collect_df.select(flatten("data").alias("merged_data"))
flattened_df.show(truncate=False)

+---------------------------------------------------+
|merged_data                                        |
+---------------------------------------------------+
|[[1, 2], [3, 4], [5, 6], [7, 8], [9, 10], [11, 12]]|
+---------------------------------------------------+



In [78]:
from pyspark.sql.functions import explode, flatten, collect_list

# create a DataFrame with nested array column
df = spark.createDataFrame([
  (1, [[[1, 2], [3, 4]], [[5, 6], [7, 8]]]),
  (2, [[[9, 10], [11, 12]], [[13, 14], [15, 16]]])
], ["id", "data"])

# explode the outermost array to flatten the structure
exploded_df = df.select(col("id"),explode("data").alias("inner_data"))


# # use collect_list() to group all the inner arrays together
grouped_df = exploded_df.groupBy("id").agg(collect_list("inner_data").alias("merged_data"))

# # use flatten() to merge all the elements of the inner arrays
flattened_df = grouped_df.select(flatten("merged_data").alias("final_data"))

flattened_df.show(truncate=False)


+---------------------------------------+
|final_data                             |
+---------------------------------------+
|[[1, 2], [3, 4], [5, 6], [7, 8]]       |
|[[9, 10], [11, 12], [13, 14], [15, 16]]|
+---------------------------------------+



In [10]:
# Stop the Spark Session
spark.stop()