# withColumn Method

In [0]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("withColumn Methos").getOrCreate()

In [0]:
from pyspark.sql.functions import *
from datetime import date
data = [(1,"balaji","jadhav",date(2026, 10, 1),"18000"),
         (2,"ramkishan","jadhav",date(2026, 10, 5),"20000"), 
         (3,"lavkush","pawar",date(2026, 11, 13),"25000"), 
         (4,"alok","pawar",date(2026, 11, 20),"30000"),
         (5,"gopal","rathod",date(2026, 12, 30),"40000")]
schema = ["id","firstname","lastname","admission_date","Fees"]
df=spark.createDataFrame(data=data,schema=schema)
df.show()
df.printSchema()

### change data type of the column

In [0]:
df = df.withColumn("Fees",col("Fees").cast("Integer"))
df.show()
df.printSchema()

In [0]:
df = df.withColumn("id",col("id").cast("Integer"))
df.show()
df.printSchema()

### create new constant column

In [0]:
df = df.withColumn("Country",lit("India"))
df.show()

In [0]:
display(df)

### update existing column data

In [0]:
df= df.withColumn("Fees",col("Fees")+5000)
df.show()

In [0]:
df.printSchema()

### Copy colume

In [0]:
df=df.withColumn("copy_fees_column",col("Fees"))
df.show()

### create a new column from an existing column

In [0]:
df=df.withColumn("new_fees",col("Fees")*2)
df.show()

### conditional column (IF-ELSE logic)

In [0]:
df=df.withColumn("cast",when(col("new_fees") > 50000,"OPEN").otherwise("OBJ"))
df.show()

### concatenate columns

In [0]:
df=df.withColumn("full_name",concat_ws(" ",col("firstname"),col("lastname")))
df.show()


In [0]:
df.write \
  .mode("overwrite") \
  .option("header", "true") \
  .csv("/Volumes/pyspark/pyspark_schema/volume2/withColumn/")


In [0]:
df1=spark.read.csv("/Volumes/pyspark/pyspark_schema/volume2/withColumn/",header=True)
df1.show()

In [0]:
data1 = [(1,"balaji",["python","azure"]),(2,"ramkishan",["java","aws"])]
schema1 = ["id","name","skill"]
df2=spark.createDataFrame(data=data1,schema=schema1)
display(df2)
df2.printSchema()

### explode function

In [0]:
df2=df2.withColumn("skills",explode(col("skill")))
df2.show()
df2.printSchema()

In [0]:
data1 = [(1,"balaji","python,azure"),(2,"ramkishan","java,aws")]
schema1 = ["id","name","skill"]
df4=spark.createDataFrame(data=data1,schema=schema1)
display(df4)
df4.printSchema()

### split function

In [0]:
df4 = df4.withColumn("splitarray",split(col("skill"),","))
display(df4)
df4.printSchema()

In [0]:
data1 = [(1,"balaji","python","azure"),(2,"ramkishan","java","aws")]
schema1 = ["id","name","primaryskill","secondaryskill"]
df5=spark.createDataFrame(data=data1,schema=schema1)
display(df5)
df5.printSchema()

### array function

In [0]:
from pyspark.sql.functions import array,col
df5=df5.withColumn("skillarray",array(col("primaryskill"),col("secondaryskill")))
display(df5)
df5.printSchema()

In [0]:
data1 = [(1,"balaji",["python","azure"]),(2,"ramkishan",["java","aws"])]
schema1 = ["id","name","skill"]
df6=spark.createDataFrame(data=data1,schema=schema1)
display(df6)
df6.printSchema()

### array_contains function

In [0]:
from pyspark.sql.functions import array_contains
df6=df6.withColumn("contains",array_contains(col("skill"),"python"))
display(df6)
df6.printSchema()