In [217]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import json_tuple
from pyspark.sql.functions import explode, col, explode_outer, flatten
from pyspark.sql.functions import json_tuple
from pyspark.sql.types import StructType, StructField, ArrayType, StringType, LongType
import json


spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Read a json

In [268]:
nam_file='/home/glue_user/workspace/jupyter_workspace/data_analysis/input/films.json'
nam_schema='/home/glue_user/workspace/jupyter_workspace/data_analysis/schema/films.json'

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [269]:
with open(nam_schema,'r') as file:
    data = json.load(file)
    
schema = StructType.fromJson(data)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [270]:
df = spark.read.json(nam_file,multiLine=True, schema=schema)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Export schema in json to load file

In [271]:
print(df.schema.json())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

{"fields":[{"metadata":{},"name":"count","nullable":true,"type":"long"},{"metadata":{},"name":"next","nullable":true,"type":"string"},{"metadata":{},"name":"previous","nullable":true,"type":"string"},{"metadata":{},"name":"results","nullable":true,"type":{"containsNull":true,"elementType":{"fields":[{"metadata":{},"name":"characters","nullable":true,"type":{"containsNull":true,"elementType":"string","type":"array"}},{"metadata":{},"name":"created","nullable":true,"type":"timestamp"},{"metadata":{},"name":"director","nullable":true,"type":"string"},{"metadata":{},"name":"edited","nullable":true,"type":"timestamp"},{"metadata":{},"name":"episode_id","nullable":true,"type":"long"},{"metadata":{},"name":"opening_crawl","nullable":true,"type":"string"},{"metadata":{},"name":"planets","nullable":true,"type":{"containsNull":true,"elementType":"string","type":"array"}},{"metadata":{},"name":"producer","nullable":true,"type":"string"},{"metadata":{},"name":"release_date","nullable":true,"type":

In [195]:
df.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- count: long (nullable = true)
 |-- next: string (nullable = true)
 |-- previous: string (nullable = true)
 |-- results: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- characters: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- created: timestamp (nullable = true)
 |    |    |-- director: string (nullable = true)
 |    |    |-- edited: timestamp (nullable = true)
 |    |    |-- episode_id: long (nullable = true)
 |    |    |-- opening_crawl: string (nullable = true)
 |    |    |-- planets: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- producer: string (nullable = true)
 |    |    |-- release_date: date (nullable = true)
 |    |    |-- species: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- starships: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |  

### Explode a json column

In [282]:
df_explode = df.select(explode(col('results')).alias('results'))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Include in colunar dataframe to write in s3

In [283]:
df_exploded = df_explode.select("results.*")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [284]:
df_exploded.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- characters: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- created: timestamp (nullable = true)
 |-- director: string (nullable = true)
 |-- edited: timestamp (nullable = true)
 |-- episode_id: long (nullable = true)
 |-- opening_crawl: string (nullable = true)
 |-- planets: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- producer: string (nullable = true)
 |-- release_date: date (nullable = true)
 |-- species: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- starships: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- title: string (nullable = true)
 |-- url: string (nullable = true)
 |-- vehicles: array (nullable = true)
 |    |-- element: string (containsNull = true)

In [285]:
df_exploded.select('created').show(10,truncate=False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------------------+
|created                |
+-----------------------+
|2014-12-10 14:23:31.88 |
|2014-12-12 11:26:24.656|
|2014-12-18 10:39:33.255|
|2014-12-19 16:52:55.74 |
|2014-12-20 10:57:57.886|
|2014-12-20 18:49:38.403|
+-----------------------+

### Explode other column for example

In [290]:
df_planets = df_exploded.select(explode(col('planets')).alias('planets')).dropDuplicates(["planets"])

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [291]:
df_planets.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- planets: string (nullable = true)

In [293]:
df_planets.show(100, truncate=False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------------------------------+
|planets                          |
+---------------------------------+
|https://swapi.dev/api/planets/5/ |
|https://swapi.dev/api/planets/2/ |
|https://swapi.dev/api/planets/3/ |
|https://swapi.dev/api/planets/10/|
|https://swapi.dev/api/planets/1/ |
|https://swapi.dev/api/planets/13/|
|https://swapi.dev/api/planets/15/|
|https://swapi.dev/api/planets/4/ |
|https://swapi.dev/api/planets/8/ |
|https://swapi.dev/api/planets/14/|
|https://swapi.dev/api/planets/6/ |
|https://swapi.dev/api/planets/12/|
|https://swapi.dev/api/planets/11/|
|https://swapi.dev/api/planets/16/|
|https://swapi.dev/api/planets/7/ |
|https://swapi.dev/api/planets/27/|
|https://swapi.dev/api/planets/19/|
|https://swapi.dev/api/planets/17/|
|https://swapi.dev/api/planets/9/ |
|https://swapi.dev/api/planets/18/|
+---------------------------------+

In [294]:
df_planets.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

20