## Creating DataFrames with ArrayType Columns

In [0]:
array_applications=[
    ("Raja",["TV","Refregirent","Oven","AC"]),
    ("Raghu",["AC","Washing machine",None]),
    ("Ramesh",["GRinder","TV",None]),
    ("Rajesh",None)
]
df_app=spark.createDataFrame(data=array_applications,schema=["name","Applications"])
df_app.printSchema()
display(df_app)

root
 |-- name: string (nullable = true)
 |-- Applications: array (nullable = true)
 |    |-- element: string (containsNull = true)



name,Applications
Raja,"List(TV, Refregirent, Oven, AC)"
Raghu,"List(AC, Washing machine, null)"
Ramesh,"List(GRinder, TV, null)"
Rajesh,


## Creating DataFrames with MapType Columns

In [0]:
map_brand=[
    ("Raja",{"TV":"LG","Refregirent":"Samsung","Oven":"Philips","AC":"Voltas"}),
    ("Raghu",{"AC":"Sansung","Washing machine":"LG"}),
    ("Ramesh",{"GRinder":"Preethi","TV":""}),
    ("Rajesh",None)
]
df_brand=spark.createDataFrame(data=map_brand,schema=["name","Brand"])
df_brand.printSchema()
display(df_brand)

root
 |-- name: string (nullable = true)
 |-- Brand: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)



name,Brand
Raja,"Map(AC -> Voltas, TV -> LG, Refregirent -> Samsung, Oven -> Philips)"
Raghu,"Map(AC -> Sansung, Washing machine -> LG)"
Ramesh,"Map(TV -> , GRinder -> Preethi)"
Rajesh,


# Explode array fields

In [0]:
from pyspark.sql.functions import explode
df2=df_app.select(df_app.name,explode(df_app.Applications))
df_app.printSchema()
display(df_app)
df2.printSchema()
display(df2)


root
 |-- name: string (nullable = true)
 |-- Applications: array (nullable = true)
 |    |-- element: string (containsNull = true)



name,Applications
Raja,"List(TV, Refregirent, Oven, AC)"
Raghu,"List(AC, Washing machine, null)"
Ramesh,"List(GRinder, TV, null)"
Rajesh,


root
 |-- name: string (nullable = true)
 |-- col: string (nullable = true)



name,col
Raja,TV
Raja,Refregirent
Raja,Oven
Raja,AC
Raghu,AC
Raghu,Washing machine
Raghu,
Ramesh,GRinder
Ramesh,TV
Ramesh,


### Explode map field

In [0]:
df3=df_brand.select(df_brand.name,explode(df_brand.Brand))
df_brand.printSchema()
display(df_brand)
df3.printSchema()
display(df3)


root
 |-- name: string (nullable = true)
 |-- Brand: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)



name,Brand
Raja,"Map(AC -> Voltas, TV -> LG, Refregirent -> Samsung, Oven -> Philips)"
Raghu,"Map(AC -> Sansung, Washing machine -> LG)"
Ramesh,"Map(TV -> , GRinder -> Preethi)"
Rajesh,


root
 |-- name: string (nullable = true)
 |-- key: string (nullable = false)
 |-- value: string (nullable = true)



name,key,value
Raja,AC,Voltas
Raja,TV,LG
Raja,Refregirent,Samsung
Raja,Oven,Philips
Raghu,AC,Sansung
Raghu,Washing machine,LG
Ramesh,TV,
Ramesh,GRinder,Preethi


Explode_outer consider null values

In [0]:
from pyspark.sql.functions import explode_outer
df2=df_app.select(df_app.name,explode_outer(df_app.Applications))
df_app.printSchema()
display(df_app)
df2.printSchema()
display(df2)


root
 |-- name: string (nullable = true)
 |-- Applications: array (nullable = true)
 |    |-- element: string (containsNull = true)



name,Applications
Raja,"List(TV, Refregirent, Oven, AC)"
Raghu,"List(AC, Washing machine, null)"
Ramesh,"List(GRinder, TV, null)"
Rajesh,


root
 |-- name: string (nullable = true)
 |-- col: string (nullable = true)



name,col
Raja,TV
Raja,Refregirent
Raja,Oven
Raja,AC
Raghu,AC
Raghu,Washing machine
Raghu,
Ramesh,GRinder
Ramesh,TV
Ramesh,


In [0]:
df3=df_brand.select(df_brand.name,explode_outer(df_brand.Brand))
df_brand.printSchema()
display(df_brand)
df3.printSchema()
display(df3)

root
 |-- name: string (nullable = true)
 |-- Brand: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)



name,Brand
Raja,"Map(AC -> Voltas, TV -> LG, Refregirent -> Samsung, Oven -> Philips)"
Raghu,"Map(AC -> Sansung, Washing machine -> LG)"
Ramesh,"Map(TV -> , GRinder -> Preethi)"
Rajesh,


root
 |-- name: string (nullable = true)
 |-- key: string (nullable = true)
 |-- value: string (nullable = true)



name,key,value
Raja,AC,Voltas
Raja,TV,LG
Raja,Refregirent,Samsung
Raja,Oven,Philips
Raghu,AC,Sansung
Raghu,Washing machine,LG
Ramesh,TV,
Ramesh,GRinder,Preethi
Rajesh,,


## Positional Explode


In [0]:
from pyspark.sql.functions import posexplode
display(df_app.select(df_app.name,posexplode(df_app.Applications)))
display(df_brand.select(df_brand.name,posexplode(df_brand.Brand)))

name,pos,col
Raja,0,TV
Raja,1,Refregirent
Raja,2,Oven
Raja,3,AC
Raghu,0,AC
Raghu,1,Washing machine
Raghu,2,
Ramesh,0,GRinder
Ramesh,1,TV
Ramesh,2,


name,pos,key,value
Raja,0,AC,Voltas
Raja,1,TV,LG
Raja,2,Refregirent,Samsung
Raja,3,Oven,Philips
Raghu,0,AC,Sansung
Raghu,1,Washing machine,LG
Ramesh,0,TV,
Ramesh,1,GRinder,Preethi


## posexplode_outer

In [0]:
from pyspark.sql.functions import posexplode_outer
display(df_app.select(df_app.name,posexplode_outer(df_app.Applications)))
display(df_brand.select(df_brand.name,posexplode_outer(df_brand.Brand)))

name,pos,col
Raja,0.0,TV
Raja,1.0,Refregirent
Raja,2.0,Oven
Raja,3.0,AC
Raghu,0.0,AC
Raghu,1.0,Washing machine
Raghu,2.0,
Ramesh,0.0,GRinder
Ramesh,1.0,TV
Ramesh,2.0,


name,pos,key,value
Raja,0.0,AC,Voltas
Raja,1.0,TV,LG
Raja,2.0,Refregirent,Samsung
Raja,3.0,Oven,Philips
Raghu,0.0,AC,Sansung
Raghu,1.0,Washing machine,LG
Ramesh,0.0,TV,
Ramesh,1.0,GRinder,Preethi
Rajesh,,,


## create Sample Dataframe

In [0]:
df=spark.createDataFrame(sc.parallelize([['ABC',[1,2,3]],['XYZ',[2,None,4]],['KLM',[8,7]],['IJK',[5]]]),['key','value'])
df.display()

key,value
ABC,"List(1, 2, 3)"
XYZ,"List(2, null, 4)"
KLM,"List(8, 7)"
IJK,List(5)


Split Array Values Into Separate Columns

In [0]:
df.select("key",df.value[0],df.value[1],df.value[2]).display()

key,value[0],value[1],value[2]
ABC,1,2.0,3.0
XYZ,2,,4.0
KLM,8,7.0,
IJK,5,,


# HOW to Atomate this solution

Determine the size

In [0]:
from pyspark.sql.functions import size,col
dfSize=df.select("key","value",size("value").alias("NoOfArrayElements"))
dfSize.display()

key,value,NoOfArrayElements
ABC,"List(1, 2, 3)",3
XYZ,"List(2, null, 4)",3
KLM,"List(8, 7)",2
IJK,List(5),1


### Get the Maximum Size of the All Arrays

In [0]:
max_value=dfSize.agg({"NoOfArrayElements":"max"}).collect()[0][0]
print(max_value)

3


UDF to Convert Array Elements into Columns

In [0]:
def arraySplitInCols(df,maxElements):
    for i in range(maxElements):
        df=df.withColumn(f"new_col_{i}",df.value[i])
    return df

In [0]:
dfout=arraySplitInCols(df,max_value)
display(dfout)

key,value,new_col_0,new_col_1,new_col_2
ABC,"List(1, 2, 3)",1,2.0,3.0
XYZ,"List(2, null, 4)",2,,4.0
KLM,"List(8, 7)",8,7.0,
IJK,List(5),5,,


In [0]:
map_brand=[
    ("Raja",{"Currency":"IND","value":1},{"Currency":"I","value":1}),
    ("Raghu",{"Currency":"BND","value":5},{"Currency":"I","value":1}),
    ("Ramesh",{"Currency":"ID","value":10},{"Currency":"ND","value":5}),
    ("Rajesh",{"Currency":"IN","value":1},{"Currency":"ID","value":1})
]
df=spark.createDataFrame(data=map_brand,schema=["name","Brand","bolt"])
df.printSchema()
display(df)

root
 |-- name: string (nullable = true)
 |-- Brand: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)
 |-- bolt: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)



name,Brand,bolt
Raja,"Map(value -> 1, Currency -> IND)","Map(value -> 1, Currency -> I)"
Raghu,"Map(value -> 5, Currency -> BND)","Map(value -> 1, Currency -> I)"
Ramesh,"Map(value -> 10, Currency -> ID)","Map(value -> 5, Currency -> ND)"
Rajesh,"Map(value -> 1, Currency -> IN)","Map(value -> 1, Currency -> ID)"


In [0]:
from pyspark.sql.functions import expr

In [0]:
pivoted_df = df.withColumn("Brand_currency", expr("Brand.Currency")) \
    .withColumn("Brand_value", expr("Brand.value"))\
    .withColumn("Bolt_currency", expr("bolt.Currency"))\
    .withColumn("Bolt_value", expr("bolt.value"))
pivoted_df.display()
    

name,Brand,bolt,Brand_currency,Brand_value,Bolt_currency,Bolt_value
Raja,"Map(value -> 1, Currency -> IND)","Map(value -> 1, Currency -> I)",IND,1,I,1
Raghu,"Map(value -> 5, Currency -> BND)","Map(value -> 1, Currency -> I)",BND,5,I,1
Ramesh,"Map(value -> 10, Currency -> ID)","Map(value -> 5, Currency -> ND)",ID,10,ND,5
Rajesh,"Map(value -> 1, Currency -> IN)","Map(value -> 1, Currency -> ID)",IN,1,ID,1


In [0]:
map_brand=[
    ("Raja",{"TV":"LG","Refregirent":"Samsung","Oven":"Philips","AC":"Voltas"}),
    ("Raghu",{"AC":"Sansung","Washing machine":"LG"}),
    ("Ramesh",{"GRinder":"Preethi","TV":""}),
    ("Rajesh",None)
]
df_brand=spark.createDataFrame(data=map_brand,schema=["name","Brand"])
df_brand.printSchema()
display(df_brand)