# Multidimensional data frames: Using PySpark with JSON data
This chapter covers
- Drawing parallels between JSON documents and Python data structures
- Ingesting JSON data within a data frame
- Representing hierarchical data in a data frame through complex column types
- Reducing duplication and reliance on auxiliary tables with a document/hierarchical data model
- Creating and unpacking data from complex data types

## Start a spark session

In [None]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import os

# change the account name to your email account
account='sli'

# define a root path to access the data in the DataAnalysisWithPythonAndPySpark
data_path='/net/clusterhn/home/'+account+'/isa460/data/'

# check if the Spark session is active. If it is activate, close it

try:
    if spark:
        spark.stop()
except:
    pass    

spark = (SparkSession.builder.appName("Multidimensional Data Frame")
        .config("spark.port.maxRetries", "100")
        .getOrCreate())

# confiture the log level (defaulty is WWARN)
spark.sparkContext.setLogLevel('ERROR')

![json data as a limited python dictionary](https://raw.githubusercontent.com/Suhong88/ISA460_Fall2023/main/images/Figure%206.1.png)

In [None]:
# load a json documnet as dictionary in python
import json                        
 
sample_json = """{
  "id": 143,
  "name": "Silicon Valley",
  "type": "Scripted",
  "language": "English",
  "genres": [
    "Comedy"
  ],
  "network": {
    "id": 8,
    "name": "HBO",
    "country": {
      "name": "United States",
      "code": "US",
      "timezone": "America/New_York"
    }
  }
}"""

document=json.loads(sample_json)
print(document)

print(type(document))

## Reading JSON data in PySpark

In [None]:
shows=spark.read.json(data_path+'shows/shows-silicon-valley.json')

In [None]:
shows.count()

In [None]:
shows.printSchema()

In [None]:
# read multiple json files

three_shows=spark.read.json(data_path+'shows/shows-*.json', multiLine=True)

In [None]:
three_shows.count()

In [None]:
assert three_shows.count()==3

In [None]:
# look at the top columns of a json file
print(shows.columns)

### Work with array

In [None]:
# Genres is stored as an array

three_shows.select(F.col('name'), F.col('genres')).show(5, False)

In [None]:
# extract element of an array

three_shows.select(F.col('genres')[0], F.col('genres').getItem(0)).show(5, False)

In [None]:
# extract element of an array and store it at a new column

three_shows1=(three_shows.withColumn('genre1', F.col('genres')[0]).withColumn('genre2', F.col('genres')[1]).
              withColumn('genre3', F.col('genres')[2]))

three_shows1.select('genre1', 'genre2', 'genre3').show()

In [None]:
# array function: size(), array(), array_distinct(), array_intersect(), array_repeat(), array_position()

three_shows2=three_shows1.select('genre1', 'genre2', 'genre3')

(three_shows2.select(F.array('genre1', 'genre2', 'genre3').alias('combined_genres'),
                     F.array_repeat('combined_genres', 3).alias('repeated_genres'),
                     F.array_distinct('repeated_genres').alias('genres_norepeat'),
                     F.size(F.col('genres_norepeat')[0]).alias('array_size'),
                     F.array_position('combined_genres','Comedy').alias('comedy_position')
                    ).show(5, False))

#### In class: display number of episodes for each show

In [None]:
three_shows.select('name', '_embedded.episodes').show()

### The Map Type: Keys and Values within a column

In [None]:
three_shows.columns

In [None]:
three_shows.select('type').show()

In [None]:
columns=['name', 'language', 'type']

shows_map1=three_shows.select(*[F.lit(column) for column in columns], F.array(*columns).alias('values'))

shows_map1.show(3, False)

In [None]:
shows_map2=shows_map1.select(F.array(*columns).alias('keys'), 'values')

shows_map2.show(3, False)

In [None]:
# create a map based on two arrays

shows_map3=shows_map2.select(F.map_from_arrays("keys", "values").alias("mapped"))

shows_map3.show(3, False)

In [None]:
shows_map3.printSchema()

In [None]:
# Three ways to access map value. Functions associated with map include map_value(), create_map()

shows_map3.select(F.col('mapped.name'), F.col('mapped')["name"], shows_map3.mapped['name']).show()

### the Struct: Nesting columns within columns

The struct is very different from the array and the map in that the number of fields and their names are known ahead of time. In our case, the schedule struct column is fixed: we know that each record of our data frame will contain that schedule struct (or a null value, if we want to be pedantic), and within that struct there will be an array of strings, days, and a string, time. 

In [None]:
# The schedule column is a struct. It includes two columns: days and time

three_shows.select("schedule").printSchema()

In [None]:
three_shows.select("schedule").show(5, False)

In [None]:
# look at three _embedded column
# major content is under episode

three_shows.select(F.col("_embedded")).printSchema()

In [None]:
# extract episodes

shows_clean=three_shows.withColumn("episodes", F.col('_embedded.episodes')).drop('_embedded')

shows_clean.printSchema()

In [None]:
# extract a field from a stuct

shows_clean.select(F.col('episodes.name')).show(3)


In [None]:
# explode an array

shows_clean.select(F.explode(F.col('episodes.name'))).show(10, False)

### Building and Using the data frame schema

In [None]:
import pyspark.sql.types as T
 
episode_links_schema = T.StructType(
    [
        T.StructField(
            "self", T.StructType([T.StructField("href", T.StringType())]) 
        )
    ]
)  
  
episode_image_schema = T.StructType(
    [
        T.StructField("medium", T.StringType()),                        
        T.StructField("original", T.StringType()),                       
    ]
)  
  
episode_schema = T.StructType(
    [
        T.StructField("_links", episode_links_schema),                    
        T.StructField("airdate", T.DateType()),
        T.StructField("airstamp", T.TimestampType()),
        T.StructField("airtime", T.StringType()),
        T.StructField("id", T.StringType()),
        T.StructField("image", episode_image_schema),                    
        T.StructField("name", T.StringType()),
        T.StructField("number", T.LongType()),
        T.StructField("runtime", T.LongType()),
        T.StructField("season", T.LongType()),
        T.StructField("summary", T.StringType()),
        T.StructField("url", T.StringType()),
    ]
)
 
embedded_schema = T.StructType(
    [
        T.StructField(
            "_embedded",
            T.StructType(
                [
                    T.StructField(
                        "episodes", T.ArrayType(episode_schema)          
                    )
                ]
            ),
        )
    ]
)

In [None]:
# read the show data with defined schema

shows_with_schema=spark.read.json(data_path+'shows/shows-*.json', multiLine=True,
                  schema=embedded_schema,
                  mode="FAILFAST")

shows_with_schema.printSchema()

In [None]:
# verify the date type for the airdate and airstamp

for column in ['airdate', 'airstamp']:
    shows_with_schema.select(f"_embedded.episodes.{column}").select(F.explode(column)).show(5, False)

### Getting to the "just right" data frame: Explode and collect

In [None]:
## Explode
episodes = shows.select(
    "id", F.explode("_embedded.episodes").alias("episodes")
)                                                              
episodes.show(5, truncate=70)

In [None]:
episodes.select('episodes').printSchema()

In [None]:
# extract the name from the above episodes dataframe

e2=episodes.select('episodes')
e2.select('episodes.name').show(5, False)

In [None]:
#posexplode: used to explode an array or a map column in a DataFrame into multiple rows, 
#while also preserving the position (index) of each element in the array or key in the map

episode_name_id = shows.select(
    F.map_from_arrays(                                         
        F.col("_embedded.episodes.id"), F.col("_embedded.episodes.name")
    ).alias("name_id")
)
 
episode_name_id = episode_name_id.select(
    F.posexplode("name_id").alias("position", "id", "name") 
)
 
episode_name_id.show(5)

#### Note:
Both explode() and posexplode() will skip any null values in the array or the map. If you want to have null as records, you can use explode_outer() or posexplode_outer() the same way.

#### In Calss 2  Display name of episode for each show
collect_list(), collect_set()

In [None]:
three_shows.select('name', '_embedded.episodes').show()

### In class exercise

#### Exercise 6.6
Using three_shows, compute the time between the first and last episodes for each show (identifed by name). Which show had the longest tenure?

In [None]:
three_shows=spark.read.json(data_path+'shows/shows-*.json', multiLine=True)
three_shows.printSchema()

In [None]:
three_shows.select('_embedded.episodes.airdate').show(5, truncate=500)

### exercise 6.7
Take the shows data frame and extract the air date and name of each episode in two array columns.

In [None]:
three_shows=spark.read.json(data_path+'shows/shows-*.json', multiLine=True)
#three_shows.printSchema()

#### Exercise 6.8

Given the following data frame, create a new data frame that contains a single map from one to square:

In [None]:
exo6_8 = spark.createDataFrame([[1, 2], [2, 4], [3, 9], [4, 16]], ["one", "square"])

exo6_8.show()