<a href="https://colab.research.google.com/github/imtheguna/PySpark-Learning/blob/GoogleColab/5_PySpark_withColumn.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!apt-get update # Update apt-get repository.
!apt-get install openjdk-8-jdk-headless -qq > /dev/null # Install Java.
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz # Download Apache Sparks.
!tar xf spark-3.1.1-bin-hadoop3.2.tgz # Unzip the tgz file.
!pip install -q findspark # Install findspark. Adds PySpark to the System path during runtime.

# Set environment variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

!ls

# Initialize findspark
import findspark
findspark.init()
!pip install pyspark

Get:1 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease [1,581 B]
Get:2 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,622 B]
Get:3 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  Packages [830 kB]
Get:4 http://security.ubuntu.com/ubuntu jammy-security InRelease [110 kB]
Hit:5 http://archive.ubuntu.com/ubuntu jammy InRelease
Get:6 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [119 kB]
Hit:7 https://ppa.launchpadcontent.net/c2d4u.team/c2d4u4.0+/ubuntu jammy InRelease
Get:8 http://security.ubuntu.com/ubuntu jammy-security/universe amd64 Packages [1,082 kB]
Get:9 http://archive.ubuntu.com/ubuntu jammy-backports InRelease [109 kB]
Get:10 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease [18.1 kB]
Hit:11 https://ppa.launchpadcontent.net/graphics-drivers/ppa/ubuntu jammy InRelease
Get:12 http://archive.ubuntu.com/ubuntu jammy-updates/main amd64 Packages [2,069 kB]
Get:13 h

In [5]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession \
      .builder \
      .appName('SelectColumns').getOrCreate()

df = spark.read.csv('/content/data2.csv',header=True,inferSchema=True)

df.show(1)

+-------+----------------+-----------+-----------+-------------------+----------------+------------------+--------------+----------------------+-------------+---------------+
| period|series_reference|region_name|filled jobs|filled jobs revised|filled jobs diff|filled jobs % diff|total_earnings|total earnings revised|earnings diff|earnings % diff|
+-------+----------------+-----------+-----------+-------------------+----------------+------------------+--------------+----------------------+-------------+---------------+
|2020.09|     BDCQ.SED1RA|  Northland|      65520|              65904|             384|               0.6|           953|                   959|            6|            0.6|
+-------+----------------+-----------+-----------+-------------------+----------------+------------------+--------------+----------------------+-------------+---------------+
only showing top 1 row



In [8]:
## Renaming a column

df1 = df.withColumn('region',col('region_name'))

df1 = df1.drop(col('region_name'))

df1.show(1)

+-------+----------------+-----------+-------------------+----------------+------------------+--------------+----------------------+-------------+---------------+---------+
| period|series_reference|filled jobs|filled jobs revised|filled jobs diff|filled jobs % diff|total_earnings|total earnings revised|earnings diff|earnings % diff|   region|
+-------+----------------+-----------+-------------------+----------------+------------------+--------------+----------------------+-------------+---------------+---------+
|2020.09|     BDCQ.SED1RA|      65520|              65904|             384|               0.6|           953|                   959|            6|            0.6|Northland|
+-------+----------------+-----------+-------------------+----------------+------------------+--------------+----------------------+-------------+---------------+---------+
only showing top 1 row



In [13]:
## Applying a function to a column

df1 = df.withColumn('region_name',expr('upper(region_name)'))

df1.show(1)

+-------+----------------+-----------+-----------+-------------------+----------------+------------------+--------------+----------------------+-------------+---------------+
| period|series_reference|region_name|filled jobs|filled jobs revised|filled jobs diff|filled jobs % diff|total_earnings|total earnings revised|earnings diff|earnings % diff|
+-------+----------------+-----------+-----------+-------------------+----------------+------------------+--------------+----------------------+-------------+---------------+
|2020.09|     BDCQ.SED1RA|  NORTHLAND|      65520|              65904|             384|               0.6|           953|                   959|            6|            0.6|
+-------+----------------+-----------+-----------+-------------------+----------------+------------------+--------------+----------------------+-------------+---------------+
only showing top 1 row



In [16]:
## Conditional column update with “withColumn”

df1 = df.withColumn('total_earnings_type',when(col('total_earnings')>1000,'High').otherwise('Low'))

df1.show(10)

+-------+----------------+------------------+-----------+-------------------+----------------+------------------+--------------+----------------------+-------------+---------------+-------------------+
| period|series_reference|       region_name|filled jobs|filled jobs revised|filled jobs diff|filled jobs % diff|total_earnings|total earnings revised|earnings diff|earnings % diff|total_earnings_type|
+-------+----------------+------------------+-----------+-------------------+----------------+------------------+--------------+----------------------+-------------+---------------+-------------------+
|2020.09|     BDCQ.SED1RA|         Northland|      65520|              65904|             384|               0.6|           953|                   959|            6|            0.6|                Low|
|2020.09|     BDCQ.SED1RB|          Auckland|     708372|             714506|            6134|               0.9|         12420|                 12530|          110|            0.9|           

In [18]:
## Using a User-Defined Function (UDF) with “withColumn”
from pyspark.sql.types import StringType

def total_earnings_type(amt):
  if amt>1000:
    return 'High'
  return 'Low'

get_total_earnings_type = udf(total_earnings_type,StringType())

df1 = df.withColumn('total_earnings_type',get_total_earnings_type(col('total_earnings')))

df1.show(10)

+-------+----------------+------------------+-----------+-------------------+----------------+------------------+--------------+----------------------+-------------+---------------+-------------------+
| period|series_reference|       region_name|filled jobs|filled jobs revised|filled jobs diff|filled jobs % diff|total_earnings|total earnings revised|earnings diff|earnings % diff|total_earnings_type|
+-------+----------------+------------------+-----------+-------------------+----------------+------------------+--------------+----------------------+-------------+---------------+-------------------+
|2020.09|     BDCQ.SED1RA|         Northland|      65520|              65904|             384|               0.6|           953|                   959|            6|            0.6|                Low|
|2020.09|     BDCQ.SED1RB|          Auckland|     708372|             714506|            6134|               0.9|         12420|                 12530|          110|            0.9|           

In [19]:
## Combining multiple columns into one

## We will use the “concat_ws” function, which allows us to concatenate multiple columns with a specified delimiter.

df1 = df.withColumn('total_earnings_type',
                    concat_ws('-',col('total_earnings'),get_total_earnings_type(col('total_earnings'))))
df1.show(5)

+-------+----------------+-------------+-----------+-------------------+----------------+------------------+--------------+----------------------+-------------+---------------+-------------------+
| period|series_reference|  region_name|filled jobs|filled jobs revised|filled jobs diff|filled jobs % diff|total_earnings|total earnings revised|earnings diff|earnings % diff|total_earnings_type|
+-------+----------------+-------------+-----------+-------------------+----------------+------------------+--------------+----------------------+-------------+---------------+-------------------+
|2020.09|     BDCQ.SED1RA|    Northland|      65520|              65904|             384|               0.6|           953|                   959|            6|            0.6|            953-Low|
|2020.09|     BDCQ.SED1RB|     Auckland|     708372|             714506|            6134|               0.9|         12420|                 12530|          110|            0.9|         12420-High|
|2020.09|     B