In [3]:
import findspark

findspark.init()

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

In [6]:
data = [('Srikanth',{'hair':'black','eye':'brown'})]
         
schema = ['id','props']
         
df = spark.createDataFrame(data,schema)
         
df.show(truncate=False)
         
df.printSchema()

+--------+-----------------------------+
|id      |props                        |
+--------+-----------------------------+
|Srikanth|{eye -> brown, hair -> black}|
+--------+-----------------------------+

root
 |-- id: string (nullable = true)
 |-- props: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)



In [7]:
#convert any data type MapType,StructType into json string using to_json

from pyspark.sql.types import StringType,StructField,StructType

from pyspark.sql.functions import to_json

df1 = df.withColumn('propstring',to_json(df.props))

df1.show(truncate=False)

df1.printSchema()



+--------+-----------------------------+------------------------------+
|id      |props                        |propstring                    |
+--------+-----------------------------+------------------------------+
|Srikanth|{eye -> brown, hair -> black}|{"eye":"brown","hair":"black"}|
+--------+-----------------------------+------------------------------+

root
 |-- id: string (nullable = true)
 |-- props: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)
 |-- propstring: string (nullable = true)



In [9]:
data = [('Srikanth',('black','brown'))]

schema = StructType([\
                    StructField('name',StringType()),\
                    StructField('Properties',\
                    StructType([StructField('Hair',StringType()),\
                    StructField('Eye',StringType())]))])

df2 = spark.createDataFrame(data,schema)

df2.show(truncate=False)

df2.printSchema()

+--------+--------------+
|name    |Properties    |
+--------+--------------+
|Srikanth|{black, brown}|
+--------+--------------+

root
 |-- name: string (nullable = true)
 |-- Properties: struct (nullable = true)
 |    |-- Hair: string (nullable = true)
 |    |-- Eye: string (nullable = true)



In [11]:
df3= df2.withColumn('propjsonstring',to_json(df2.Properties))

df3.show(truncate=False)

df3.printSchema()

+--------+--------------+------------------------------+
|name    |Properties    |propjsonstring                |
+--------+--------------+------------------------------+
|Srikanth|{black, brown}|{"Hair":"black","Eye":"brown"}|
+--------+--------------+------------------------------+

root
 |-- name: string (nullable = true)
 |-- Properties: struct (nullable = true)
 |    |-- Hair: string (nullable = true)
 |    |-- Eye: string (nullable = true)
 |-- propjsonstring: string (nullable = true)



In [12]:
#json tuple

data = [('Srikanth','{"hair":"black","eye":"brown","skin":"brown"}'),\
        ('Manvith','{"hair":"black","eye":"blue","skin":"white"}')]
         
schema = ['Name','Prop']
         

df = spark.createDataFrame(data,schema)

df.show(truncate=False)

df.printSchema()

+--------+---------------------------------------------+
|Name    |Prop                                         |
+--------+---------------------------------------------+
|Srikanth|{"hair":"black","eye":"brown","skin":"brown"}|
|Manvith |{"hair":"black","eye":"blue","skin":"white"} |
+--------+---------------------------------------------+

root
 |-- Name: string (nullable = true)
 |-- Prop: string (nullable = true)



In [13]:
from pyspark.sql.functions import json_tuple

df.select('Name',json_tuple(df.Prop,'eye','skin').alias('eye','skin')).show()

+--------+-----+-----+
|    Name|  eye| skin|
+--------+-----+-----+
|Srikanth|brown|brown|
| Manvith| blue|white|
+--------+-----+-----+



In [18]:
#get_json_object

# it's used to extract the json string based on path from the json column


data = [('Srikanth','{"address":{"city":"hyd","state":"telangana"},"gender":"male"}'),\
         ('Manvith','{"address":{"city":"bang","state":"Karnataka"},"eye":"blue"}')]

schema = ['name','props']

df = spark.createDataFrame(data, schema)

df.show(truncate=False)

df.printSchema()

+--------+--------------------------------------------------------------+
|name    |props                                                         |
+--------+--------------------------------------------------------------+
|Srikanth|{"address":{"city":"hyd","state":"telangana"},"gender":"male"}|
|Manvith |{"address":{"city":"bang","state":"Karnataka"},"eye":"blue"}  |
+--------+--------------------------------------------------------------+

root
 |-- name: string (nullable = true)
 |-- props: string (nullable = true)



In [15]:
from pyspark.sql.functions import get_json_object

# help(get_json_object)

Help on function get_json_object in module pyspark.sql.functions:

get_json_object(col, path)
    Extracts json object from a json string based on json path specified, and returns json string
    of the extracted json object. It will return null if the input json string is invalid.
    
    .. versionadded:: 1.6.0
    
    Parameters
    ----------
    col : :class:`~pyspark.sql.Column` or str
        string column in json format
    path : str
        path to the json object to extract
    
    Examples
    --------
    >>> data = [("1", '''{"f1": "value1", "f2": "value2"}'''), ("2", '''{"f1": "value12"}''')]
    >>> df = spark.createDataFrame(data, ("key", "jstring"))
    >>> df.select(df.key, get_json_object(df.jstring, '$.f1').alias("c0"), \
    ...                   get_json_object(df.jstring, '$.f2').alias("c1") ).collect()
    [Row(key='1', c0='value1', c1='value2'), Row(key='2', c0='value12', c1=None)]



In [21]:
df.select('name',get_json_object(df.props,'$.gender').alias('gender'),\
         get_json_object('props','$.address.city').alias('city')).show()

+--------+------+----+
|    name|gender|city|
+--------+------+----+
|Srikanth|  male| hyd|
| Manvith|  null|bang|
+--------+------+----+

