

# Initializing the pyspark in Colab




In [None]:
!apt-get update # Update apt-get repository.
!apt-get install openjdk-8-jdk-headless -qq > /dev/null # Install Java.
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz # Download Apache Sparks.
!tar xf spark-3.1.1-bin-hadoop3.2.tgz # Unzip the tgz file.
!pip install -q findspark # Install findspark. Adds PySpark to the System path during runtime.

# Set environment variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

!ls

# Initialize findspark
import findspark
findspark.init()

Get:1 http://security.ubuntu.com/ubuntu jammy-security InRelease [110 kB]
Hit:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease
Get:3 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,626 B]
Hit:4 http://archive.ubuntu.com/ubuntu jammy InRelease
Get:5 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [119 kB]
Get:6 http://archive.ubuntu.com/ubuntu jammy-backports InRelease [109 kB]
Hit:7 https://ppa.launchpadcontent.net/c2d4u.team/c2d4u4.0+/ubuntu jammy InRelease
Hit:8 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease
Hit:9 https://ppa.launchpadcontent.net/graphics-drivers/ppa/ubuntu jammy InRelease
Hit:10 https://ppa.launchpadcontent.net/ubuntugis/ppa/ubuntu jammy InRelease
Get:11 http://security.ubuntu.com/ubuntu jammy-security/universe amd64 Packages [1,065 kB]
Get:12 http://archive.ubuntu.com/ubuntu jammy-updates/universe amd64 Packages [1,339 kB]
Get:13 http://security.ubuntu.com/ubuntu jammy-s

# Setting sparkSession context

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark

# Checking Spark session

In [None]:
from pyspark.sql import SparkSession
import time

# Ensure SparkSession is available
def get_spark_session():
    try:
        # Try to get the current Spark session if it exists
        spark = SparkSession.builder.getOrCreate()
    except ValueError:
        # Create a new session if it doesn't exist
        spark = SparkSession.builder\
            .master("local[*]")\
            .appName("ColabPySpark")\
            .getOrCreate()
    return spark

spark = get_spark_session()

# Measure execution time
start_time = time.time()

# Create and show the DataFrame
df = spark.createDataFrame([{"Hello": "World"} for x in range(1000)])
df.show(3)

# Calculate and print execution time
execution_time = time.time() - start_time
print(f"Execution time: {execution_time} seconds")

# Optionally, stop the Spark session if you're done with it
# spark.stop()


+-----+
|Hello|
+-----+
|World|
|World|
|World|
+-----+
only showing top 3 rows

Execution time: 12.092931985855103 seconds


# Doing above thing using Pandas

In [None]:
import pandas as pd
import time

start_time = time.time()

# Create the DataFrame
df = pd.DataFrame([{"Hello": "World"} for x in range(1000)])

# Display the first 3 rows of the DataFrame
print(df.head(3))

# Calculate and print execution time
execution_time = time.time() - start_time
print(f"Execution time: {execution_time} seconds")


   Hello
0  World
1  World
2  World
Execution time: 0.03188967704772949 seconds


# Mounting the Google Drive

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


# Loading and getting CSV file

In [None]:
csv_path = r'/content/drive/MyDrive/data/csv/batch.csv'

In [None]:
df = spark.read.csv(csv_path,header=True)
df.printSchema()
df.show(10)

root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- dob: string (nullable = true)
 |-- age: string (nullable = true)
 |-- salary: string (nullable = true)
 |-- department: string (nullable = true)

+---+-----+----------+----+------+----------+
| id| name|       dob| age|salary|department|
+---+-----+----------+----+------+----------+
|  1| John|1992-05-12|  30| 70000|        IT|
|  2|Alice|1997-02-28|  25| 60000|        HR|
|  3|  Bob|      null|null| 80000|        IT|
|  4|Emily|1994-11-22|  28| 65000|   Finance|
+---+-----+----------+----+------+----------+



# Loading Json Batch File

In [None]:
json_b_path = r'/content/drive/MyDrive/data/json/batch.jsonl'
df1 = spark.read.json(json_b_path)
df1.printSchema()
df1.show(10)

root
 |-- age: long (nullable = true)
 |-- department: string (nullable = true)
 |-- dob: string (nullable = true)
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- salary: long (nullable = true)

+----+----------+----------+---+-----+------+
| age|department|       dob| id| name|salary|
+----+----------+----------+---+-----+------+
|  30|        IT|1992-05-12|  1| John| 70000|
|  25|        HR|1997-02-28|  2|Alice| 60000|
|null|        IT|      null|  3|  Bob| 80000|
|  28|   Finance|1994-11-22|  4|Emily| 65000|
|  41|        HR|1981-12-18|  5|David| 90000|
|  33|   Finance|1989-07-05|  6|Susan| 75000|
|  46|        IT|1976-03-15|  7| Mike| 95000|
+----+----------+----------+---+-----+------+



# Loading and reading Parquet File

In [None]:
par_path = r'/content/drive/MyDrive/data/parquet/reference.parquet'
df2 = spark.read.parquet(par_path)
df2.printSchema()
df2.show(10)

root
 |-- department: string (nullable = true)
 |-- manager: string (nullable = true)
 |-- lead: string (nullable = true)

+----------+-------+--------+
|department|manager|    lead|
+----------+-------+--------+
|   Finance|  Megan|   Molly|
|        HR|   Brad|   Brian|
|        IT|  Chris|Chandler|
|  Delivery|   Leon|  Louise|
+----------+-------+--------+



In [None]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType

In [None]:
schema = StructType([
    StructField("id", IntegerType()),
    StructField("name", StringType()),
    StructField("dob", DateType()),
    StructField("age", IntegerType()),
    StructField("salary", IntegerType()),
    StructField("department", StringType()),
])

In [None]:
df_csv = spark.read.format("csv").schema(schema).option("header", True).load(csv_path)
df_csv.printSchema()
df_csv.show()

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- dob: date (nullable = true)
 |-- age: integer (nullable = true)
 |-- salary: integer (nullable = true)
 |-- department: string (nullable = true)

+---+-----+----------+----+------+----------+
| id| name|       dob| age|salary|department|
+---+-----+----------+----+------+----------+
|  1| John|1992-05-12|  30| 70000|        IT|
|  2|Alice|1997-02-28|  25| 60000|        HR|
|  3|  Bob|      null|null| 80000|        IT|
|  4|Emily|1994-11-22|  28| 65000|   Finance|
+---+-----+----------+----+------+----------+



In [None]:
json_path = r'/content/drive/MyDrive/data/json'
df_json = spark.read.json(json_path)
df_json.show()

+----+----------+----------+---+------+------+
| age|department|       dob| id|  name|salary|
+----+----------+----------+---+------+------+
|  30|        IT|1992-05-12|  1|  John| 70000|
|  25|        HR|1997-02-28|  2| Alice| 60000|
|null|        IT|      null|  3|   Bob| 80000|
|  28|   Finance|1994-11-22|  4| Emily| 65000|
|  41|        HR|1981-12-18|  5| David| 90000|
|  33|   Finance|1989-07-05|  6| Susan| 75000|
|  46|        IT|1976-03-15|  7|  Mike| 95000|
|  30|   Finance|1992-06-30| 10|Sophie| 62000|
|  28|   Finance|1994-11-22|  4| Emily| 70000|
|  25|   Finance|1997-02-28|  2| Alice| 90000|
|  39|        IT|1983-10-14|  9| James| 87000|
|  30|        IT|1992-05-12|  1|  John| 70000|
|  27|        HR|1995-08-20|  8|  Lisa| 58000|
+----+----------+----------+---+------+------+



In [None]:
df_json = df_json.orderBy("id")
df_json.printSchema()
df_json.show()

root
 |-- age: long (nullable = true)
 |-- department: string (nullable = true)
 |-- dob: string (nullable = true)
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- salary: long (nullable = true)

+----+----------+----------+---+------+------+
| age|department|       dob| id|  name|salary|
+----+----------+----------+---+------+------+
|  30|        IT|1992-05-12|  1|  John| 70000|
|  30|        IT|1992-05-12|  1|  John| 70000|
|  25|        HR|1997-02-28|  2| Alice| 60000|
|  25|   Finance|1997-02-28|  2| Alice| 90000|
|null|        IT|      null|  3|   Bob| 80000|
|  28|   Finance|1994-11-22|  4| Emily| 65000|
|  28|   Finance|1994-11-22|  4| Emily| 70000|
|  41|        HR|1981-12-18|  5| David| 90000|
|  33|   Finance|1989-07-05|  6| Susan| 75000|
|  46|        IT|1976-03-15|  7|  Mike| 95000|
|  27|        HR|1995-08-20|  8|  Lisa| 58000|
|  39|        IT|1983-10-14|  9| James| 87000|
|  30|   Finance|1992-06-30| 10|Sophie| 62000|
+----+----------+----------+

# Union Data

In [None]:
df_csv.printSchema()

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- dob: date (nullable = true)
 |-- age: integer (nullable = true)
 |-- salary: integer (nullable = true)
 |-- department: string (nullable = true)



In [None]:
df_json.printSchema()

root
 |-- age: long (nullable = true)
 |-- department: string (nullable = true)
 |-- dob: string (nullable = true)
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- salary: long (nullable = true)



In [None]:
df = df_csv.union(df_json)
df.printSchema()
df.show()

root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- dob: string (nullable = true)
 |-- age: long (nullable = true)
 |-- salary: string (nullable = true)
 |-- department: string (nullable = true)

+----+-------+----------+----+------+----------+
|  id|   name|       dob| age|salary|department|
+----+-------+----------+----+------+----------+
|   1|   John|1992-05-12|  30| 70000|        IT|
|   2|  Alice|1997-02-28|  25| 60000|        HR|
|   3|    Bob|      null|null| 80000|        IT|
|   4|  Emily|1994-11-22|  28| 65000|   Finance|
|  30|     IT|1992-05-12|   1|  John|     70000|
|  30|     IT|1992-05-12|   1|  John|     70000|
|  25|     HR|1997-02-28|   2| Alice|     60000|
|  25|Finance|1997-02-28|   2| Alice|     90000|
|null|     IT|      null|   3|   Bob|     80000|
|  28|Finance|1994-11-22|   4| Emily|     65000|
|  28|Finance|1994-11-22|   4| Emily|     70000|
|  41|     HR|1981-12-18|   5| David|     90000|
|  33|Finance|1989-07-05|   6| Susan|     7

In [None]:
print(df_csv.columns, df_json.columns)

['id', 'name', 'dob', 'age', 'salary', 'department'] ['age', 'department', 'dob', 'id', 'name', 'salary']


In [None]:
df_json = df_json.select(df_csv.columns)
df_json.printSchema()
df_json.show()

root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- dob: string (nullable = true)
 |-- age: long (nullable = true)
 |-- salary: long (nullable = true)
 |-- department: string (nullable = true)

+---+------+----------+----+------+----------+
| id|  name|       dob| age|salary|department|
+---+------+----------+----+------+----------+
|  1|  John|1992-05-12|  30| 70000|        IT|
|  1|  John|1992-05-12|  30| 70000|        IT|
|  2| Alice|1997-02-28|  25| 90000|   Finance|
|  2| Alice|1997-02-28|  25| 60000|        HR|
|  3|   Bob|      null|null| 80000|        IT|
|  4| Emily|1994-11-22|  28| 65000|   Finance|
|  4| Emily|1994-11-22|  28| 70000|   Finance|
|  5| David|1981-12-18|  41| 90000|        HR|
|  6| Susan|1989-07-05|  33| 75000|   Finance|
|  7|  Mike|1976-03-15|  46| 95000|        IT|
|  8|  Lisa|1995-08-20|  27| 58000|        HR|
|  9| James|1983-10-14|  39| 87000|        IT|
| 10|Sophie|1992-06-30|  30| 62000|   Finance|
+---+------+----------+----+

In [None]:

df_json = spark.read.format("json").schema(schema).option("header", True).load(json_path)
df_json.printSchema()
df_json.show()

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- dob: date (nullable = true)
 |-- age: integer (nullable = true)
 |-- salary: integer (nullable = true)
 |-- department: string (nullable = true)

+---+------+----------+----+------+----------+
| id|  name|       dob| age|salary|department|
+---+------+----------+----+------+----------+
|  1|  John|1992-05-12|  30| 70000|        IT|
|  2| Alice|1997-02-28|  25| 60000|        HR|
|  3|   Bob|      null|null| 80000|        IT|
|  4| Emily|1994-11-22|  28| 65000|   Finance|
|  5| David|1981-12-18|  41| 90000|        HR|
|  6| Susan|1989-07-05|  33| 75000|   Finance|
|  7|  Mike|1976-03-15|  46| 95000|        IT|
| 10|Sophie|1992-06-30|  30| 62000|   Finance|
|  4| Emily|1994-11-22|  28| 70000|   Finance|
|  2| Alice|1997-02-28|  25| 90000|   Finance|
|  9| James|1983-10-14|  39| 87000|        IT|
|  1|  John|1992-05-12|  30| 70000|        IT|
|  8|  Lisa|1995-08-20|  27| 58000|        HR|
+---+------+---------

In [None]:
df = df_csv.union(df_json)
df.printSchema()
df.show()

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- dob: date (nullable = true)
 |-- age: integer (nullable = true)
 |-- salary: integer (nullable = true)
 |-- department: string (nullable = true)

+---+------+----------+----+------+----------+
| id|  name|       dob| age|salary|department|
+---+------+----------+----+------+----------+
|  1|  John|1992-05-12|  30| 70000|        IT|
|  2| Alice|1997-02-28|  25| 60000|        HR|
|  3|   Bob|      null|null| 80000|        IT|
|  4| Emily|1994-11-22|  28| 65000|   Finance|
|  1|  John|1992-05-12|  30| 70000|        IT|
|  2| Alice|1997-02-28|  25| 60000|        HR|
|  3|   Bob|      null|null| 80000|        IT|
|  4| Emily|1994-11-22|  28| 65000|   Finance|
|  5| David|1981-12-18|  41| 90000|        HR|
|  6| Susan|1989-07-05|  33| 75000|   Finance|
|  7|  Mike|1976-03-15|  46| 95000|        IT|
| 10|Sophie|1992-06-30|  30| 62000|   Finance|
|  4| Emily|1994-11-22|  28| 70000|   Finance|
|  2| Alice|1997-02-2

In [None]:
df = df.dropDuplicates()
df.show()

+---+------+----------+----+------+----------+
| id|  name|       dob| age|salary|department|
+---+------+----------+----+------+----------+
|  5| David|1981-12-18|  41| 90000|        HR|
|  3|   Bob|      null|null| 80000|        IT|
|  2| Alice|1997-02-28|  25| 90000|   Finance|
|  9| James|1983-10-14|  39| 87000|        IT|
|  4| Emily|1994-11-22|  28| 65000|   Finance|
|  2| Alice|1997-02-28|  25| 60000|        HR|
|  8|  Lisa|1995-08-20|  27| 58000|        HR|
|  1|  John|1992-05-12|  30| 70000|        IT|
|  6| Susan|1989-07-05|  33| 75000|   Finance|
| 10|Sophie|1992-06-30|  30| 62000|   Finance|
|  4| Emily|1994-11-22|  28| 70000|   Finance|
|  7|  Mike|1976-03-15|  46| 95000|        IT|
+---+------+----------+----+------+----------+



# Transformation

In [None]:
df.show()

+---+------+----------+----+------+----------+
| id|  name|       dob| age|salary|department|
+---+------+----------+----+------+----------+
|  5| David|1981-12-18|  41| 90000|        HR|
|  3|   Bob|      null|null| 80000|        IT|
|  2| Alice|1997-02-28|  25| 90000|   Finance|
|  9| James|1983-10-14|  39| 87000|        IT|
|  4| Emily|1994-11-22|  28| 65000|   Finance|
|  2| Alice|1997-02-28|  25| 60000|        HR|
|  8|  Lisa|1995-08-20|  27| 58000|        HR|
|  1|  John|1992-05-12|  30| 70000|        IT|
|  6| Susan|1989-07-05|  33| 75000|   Finance|
| 10|Sophie|1992-06-30|  30| 62000|   Finance|
|  4| Emily|1994-11-22|  28| 70000|   Finance|
|  7|  Mike|1976-03-15|  46| 95000|        IT|
+---+------+----------+----+------+----------+



## Select

In [None]:
from pyspark.sql import functions as F

In [None]:
df.select("salary").show()


+------+
|salary|
+------+
| 90000|
| 80000|
| 90000|
| 87000|
| 65000|
| 60000|
| 58000|
| 70000|
| 75000|
| 62000|
| 70000|
| 95000|
+------+



In [None]:
df.select(
    df.id,
    (df.salary + .05 * df.salary).alias('updated_salary'),
    (F.expr("salary + .05 * salary")).alias('updated_salary_expr'),
    (F.year(F.current_timestamp()) - F.year("dob")).alias('age'),
).orderBy(df.id).show()

+---+--------------+-------------------+----+
| id|updated_salary|updated_salary_expr| age|
+---+--------------+-------------------+----+
|  1|       73500.0|           73500.00|  32|
|  2|       94500.0|           94500.00|  27|
|  2|       63000.0|           63000.00|  27|
|  3|       84000.0|           84000.00|null|
|  4|       68250.0|           68250.00|  30|
|  4|       73500.0|           73500.00|  30|
|  5|       94500.0|           94500.00|  43|
|  6|       78750.0|           78750.00|  35|
|  7|       99750.0|           99750.00|  48|
|  8|       60900.0|           60900.00|  29|
|  9|       91350.0|           91350.00|  41|
| 10|       65100.0|           65100.00|  32|
+---+--------------+-------------------+----+



## WithColumn

In [None]:
df = df.withColumn(
    "salary_rase",
    (F.col("salary") + .05 * F.col("salary"))
)
df.show()

+---+------+----------+----+------+----------+-----------+
| id|  name|       dob| age|salary|department|salary_rase|
+---+------+----------+----+------+----------+-----------+
|  5| David|1981-12-18|  41| 90000|        HR|    94500.0|
|  3|   Bob|      null|null| 80000|        IT|    84000.0|
|  2| Alice|1997-02-28|  25| 90000|   Finance|    94500.0|
|  9| James|1983-10-14|  39| 87000|        IT|    91350.0|
|  4| Emily|1994-11-22|  28| 65000|   Finance|    68250.0|
|  2| Alice|1997-02-28|  25| 60000|        HR|    63000.0|
|  8|  Lisa|1995-08-20|  27| 58000|        HR|    60900.0|
|  1|  John|1992-05-12|  30| 70000|        IT|    73500.0|
|  6| Susan|1989-07-05|  33| 75000|   Finance|    78750.0|
| 10|Sophie|1992-06-30|  30| 62000|   Finance|    65100.0|
|  4| Emily|1994-11-22|  28| 70000|   Finance|    73500.0|
|  7|  Mike|1976-03-15|  46| 95000|        IT|    99750.0|
+---+------+----------+----+------+----------+-----------+



In [None]:
df = df.withColumn(
    "salary_rase",
    (F.col("salary") + .05 * F.col("salary"))
    ).withColumn(
    "salary_rase_10%",
    (F.col("salary") + .1 * F.col("salary"))
)
df.show()

+---+------+----------+----+------+----------+-----------+---------------+
| id|  name|       dob| age|salary|department|salary_rase|salary_rase_10%|
+---+------+----------+----+------+----------+-----------+---------------+
|  5| David|1981-12-18|  41| 90000|        HR|    94500.0|        99000.0|
|  3|   Bob|      null|null| 80000|        IT|    84000.0|        88000.0|
|  2| Alice|1997-02-28|  25| 90000|   Finance|    94500.0|        99000.0|
|  9| James|1983-10-14|  39| 87000|        IT|    91350.0|        95700.0|
|  4| Emily|1994-11-22|  28| 65000|   Finance|    68250.0|        71500.0|
|  2| Alice|1997-02-28|  25| 60000|        HR|    63000.0|        66000.0|
|  8|  Lisa|1995-08-20|  27| 58000|        HR|    60900.0|        63800.0|
|  1|  John|1992-05-12|  30| 70000|        IT|    73500.0|        77000.0|
|  6| Susan|1989-07-05|  33| 75000|   Finance|    78750.0|        82500.0|
| 10|Sophie|1992-06-30|  30| 62000|   Finance|    65100.0|        68200.0|
|  4| Emily|1994-11-22|  

## WithColumns

In [None]:
df = df.withColumns(
    {
    "salary_raise101": F.col("salary") + 0.05 * F.col("salary"),
    "age101": F.year(F.current_timestamp()) - F.year(F.col("dob"))
    }
)
df.show()

# Filter

In [None]:
df.filter(
    F.col("salary_rase") >= 75000
).show()

+---+-----+----------+----+------+----------+-----------+---------------+
| id| name|       dob| age|salary|department|salary_rase|salary_rase_10%|
+---+-----+----------+----+------+----------+-----------+---------------+
|  5|David|1981-12-18|  41| 90000|        HR|    94500.0|        99000.0|
|  3|  Bob|      null|null| 80000|        IT|    84000.0|        88000.0|
|  2|Alice|1997-02-28|  25| 90000|   Finance|    94500.0|        99000.0|
|  9|James|1983-10-14|  39| 87000|        IT|    91350.0|        95700.0|
|  6|Susan|1989-07-05|  33| 75000|   Finance|    78750.0|        82500.0|
|  7| Mike|1976-03-15|  46| 95000|        IT|    99750.0|       104500.0|
+---+-----+----------+----+------+----------+-----------+---------------+



# When Otherwise

In [None]:
df = df.withColumn(
    "age_group",
    F.when(
        F.col("age") <= 20,
        "Upto 20"
    ).when(
        (
            (F.col("age") > 20) &
            (F.col("age") <= 30)
        ),
        "21 to 30"
    ).when(
        (
            (F.col("age") > 30) &
            (F.col("age") <= 40)
        ),
        "30 to 40"
    ).otherwise(
        "More than 40"
    )
)
df.show()

+---+------+----------+----+------+----------+-----------+---------------+------------+
| id|  name|       dob| age|salary|department|salary_rase|salary_rase_10%|   age_group|
+---+------+----------+----+------+----------+-----------+---------------+------------+
|  5| David|1981-12-18|  41| 90000|        HR|    94500.0|        99000.0|More than 40|
|  3|   Bob|      null|null| 80000|        IT|    84000.0|        88000.0|More than 40|
|  2| Alice|1997-02-28|  25| 90000|   Finance|    94500.0|        99000.0|    21 to 30|
|  9| James|1983-10-14|  39| 87000|        IT|    91350.0|        95700.0|    30 to 40|
|  4| Emily|1994-11-22|  28| 65000|   Finance|    68250.0|        71500.0|    21 to 30|
|  2| Alice|1997-02-28|  25| 60000|        HR|    63000.0|        66000.0|    21 to 30|
|  8|  Lisa|1995-08-20|  27| 58000|        HR|    60900.0|        63800.0|    21 to 30|
|  1|  John|1992-05-12|  30| 70000|        IT|    73500.0|        77000.0|    21 to 30|
|  6| Susan|1989-07-05|  33| 750

# Lit

### Lit give the default value to the column

In [None]:
df = df.withColumn(
    "company",
    F.lit("Abacus Insight")
)
df.show()

+---+------+----------+----+------+----------+-----------+---------------+------------+--------------+
| id|  name|       dob| age|salary|department|salary_rase|salary_rase_10%|   age_group|       company|
+---+------+----------+----+------+----------+-----------+---------------+------------+--------------+
|  5| David|1981-12-18|  41| 90000|        HR|    94500.0|        99000.0|More than 40|Abacus Insight|
|  3|   Bob|      null|null| 80000|        IT|    84000.0|        88000.0|More than 40|Abacus Insight|
|  2| Alice|1997-02-28|  25| 90000|   Finance|    94500.0|        99000.0|    21 to 30|Abacus Insight|
|  9| James|1983-10-14|  39| 87000|        IT|    91350.0|        95700.0|    30 to 40|Abacus Insight|
|  4| Emily|1994-11-22|  28| 65000|   Finance|    68250.0|        71500.0|    21 to 30|Abacus Insight|
|  2| Alice|1997-02-28|  25| 60000|        HR|    63000.0|        66000.0|    21 to 30|Abacus Insight|
|  8|  Lisa|1995-08-20|  27| 58000|        HR|    60900.0|        63800.0

# Null and Collace

In [None]:
df = df.withCoulmns(
    {
        "age": F.colaese(
            F.year(F.current_timestamp()) - F.year(F.col("dob")),
            F.lit(-1),
        ),
        "has_dob_1": ~F.isnull("dob"),
        "has_dob_2": F.col("dob").isNotNull(),
    }
)

# Drop Column

In [None]:
df.show()

+---+------+----------+----+------+----------+-----------+---------------+------------+--------------+
| id|  name|       dob| age|salary|department|salary_rase|salary_rase_10%|   age_group|       company|
+---+------+----------+----+------+----------+-----------+---------------+------------+--------------+
|  5| David|1981-12-18|  41| 90000|        HR|    94500.0|        99000.0|More than 40|Abacus Insight|
|  3|   Bob|      null|null| 80000|        IT|    84000.0|        88000.0|More than 40|Abacus Insight|
|  2| Alice|1997-02-28|  25| 90000|   Finance|    94500.0|        99000.0|    21 to 30|Abacus Insight|
|  9| James|1983-10-14|  39| 87000|        IT|    91350.0|        95700.0|    30 to 40|Abacus Insight|
|  4| Emily|1994-11-22|  28| 65000|   Finance|    68250.0|        71500.0|    21 to 30|Abacus Insight|
|  2| Alice|1997-02-28|  25| 60000|        HR|    63000.0|        66000.0|    21 to 30|Abacus Insight|
|  8|  Lisa|1995-08-20|  27| 58000|        HR|    60900.0|        63800.0

In [None]:
df = df.drop("salary_rase_10%")
df.show()

+---+------+----------+----+------+----------+-----------+------------+--------------+
| id|  name|       dob| age|salary|department|salary_rase|   age_group|       company|
+---+------+----------+----+------+----------+-----------+------------+--------------+
|  5| David|1981-12-18|  41| 90000|        HR|    94500.0|More than 40|Abacus Insight|
|  3|   Bob|      null|null| 80000|        IT|    84000.0|More than 40|Abacus Insight|
|  2| Alice|1997-02-28|  25| 90000|   Finance|    94500.0|    21 to 30|Abacus Insight|
|  9| James|1983-10-14|  39| 87000|        IT|    91350.0|    30 to 40|Abacus Insight|
|  4| Emily|1994-11-22|  28| 65000|   Finance|    68250.0|    21 to 30|Abacus Insight|
|  2| Alice|1997-02-28|  25| 60000|        HR|    63000.0|    21 to 30|Abacus Insight|
|  8|  Lisa|1995-08-20|  27| 58000|        HR|    60900.0|    21 to 30|Abacus Insight|
|  1|  John|1992-05-12|  30| 70000|        IT|    73500.0|    21 to 30|Abacus Insight|
|  6| Susan|1989-07-05|  33| 75000|   Finan

# withColumnRename

In [None]:
df = df.withColumnRenamed("salary_rase","salary_raised").orderBy("id")
df.show()

+---+------+----------+----+------+----------+-------------+------------+--------------+
| id|  name|       dob| age|salary|department|salary_raised|   age_group|       company|
+---+------+----------+----+------+----------+-------------+------------+--------------+
|  1|  John|1992-05-12|  30| 70000|        IT|      73500.0|    21 to 30|Abacus Insight|
|  2| Alice|1997-02-28|  25| 60000|        HR|      63000.0|    21 to 30|Abacus Insight|
|  2| Alice|1997-02-28|  25| 90000|   Finance|      94500.0|    21 to 30|Abacus Insight|
|  3|   Bob|      null|null| 80000|        IT|      84000.0|More than 40|Abacus Insight|
|  4| Emily|1994-11-22|  28| 65000|   Finance|      68250.0|    21 to 30|Abacus Insight|
|  4| Emily|1994-11-22|  28| 70000|   Finance|      73500.0|    21 to 30|Abacus Insight|
|  5| David|1981-12-18|  41| 90000|        HR|      94500.0|More than 40|Abacus Insight|
|  6| Susan|1989-07-05|  33| 75000|   Finance|      78750.0|    30 to 40|Abacus Insight|
|  7|  Mike|1976-03-1

# Partition By

In [None]:
from pyspark.sql.window import Window

In [None]:
windowSpec = Window.partitionBy(df.age_group)
df = df.withColumn("min_age_in_group", F.min("age").over(windowSpec))
df.show()


+---+------+----------+----+------+----------+-------------+------------+--------------+----------------+
| id|  name|       dob| age|salary|department|salary_raised|   age_group|       company|min_age_in_group|
+---+------+----------+----+------+----------+-------------+------------+--------------+----------------+
|  6| Susan|1989-07-05|  33| 75000|   Finance|      78750.0|    30 to 40|Abacus Insight|              33|
|  9| James|1983-10-14|  39| 87000|        IT|      91350.0|    30 to 40|Abacus Insight|              33|
|  1|  John|1992-05-12|  30| 70000|        IT|      73500.0|    21 to 30|Abacus Insight|              25|
|  2| Alice|1997-02-28|  25| 90000|   Finance|      94500.0|    21 to 30|Abacus Insight|              25|
|  2| Alice|1997-02-28|  25| 60000|        HR|      63000.0|    21 to 30|Abacus Insight|              25|
|  4| Emily|1994-11-22|  28| 65000|   Finance|      68250.0|    21 to 30|Abacus Insight|              25|
|  4| Emily|1994-11-22|  28| 70000|   Finance|

# Join

## Loading Parquet File

In [None]:
par_path = r'/content/drive/MyDrive/data/parquet/reference.parquet'
df_par = spark.read.parquet(par_path)
df_par.printSchema()
df_par.show(10)

root
 |-- department: string (nullable = true)
 |-- manager: string (nullable = true)
 |-- lead: string (nullable = true)

+----------+-------+--------+
|department|manager|    lead|
+----------+-------+--------+
|   Finance|  Megan|   Molly|
|        HR|   Brad|   Brian|
|        IT|  Chris|Chandler|
|  Delivery|   Leon|  Louise|
+----------+-------+--------+



In [None]:
df.join(
    df_par,
    "department",
    "left"
).show()

+----------+---+------+----------+----+------+-------------+------------+--------------+----------------+-------+--------+
|department| id|  name|       dob| age|salary|salary_raised|   age_group|       company|min_age_in_group|manager|    lead|
+----------+---+------+----------+----+------+-------------+------------+--------------+----------------+-------+--------+
|   Finance|  6| Susan|1989-07-05|  33| 75000|      78750.0|    30 to 40|Abacus Insight|              33|  Megan|   Molly|
|        IT|  9| James|1983-10-14|  39| 87000|      91350.0|    30 to 40|Abacus Insight|              33|  Chris|Chandler|
|        IT|  1|  John|1992-05-12|  30| 70000|      73500.0|    21 to 30|Abacus Insight|              25|  Chris|Chandler|
|   Finance|  2| Alice|1997-02-28|  25| 90000|      94500.0|    21 to 30|Abacus Insight|              25|  Megan|   Molly|
|        HR|  2| Alice|1997-02-28|  25| 60000|      63000.0|    21 to 30|Abacus Insight|              25|   Brad|   Brian|
|   Finance|  4|

In [None]:
df.join(
    df_par,
    "department",
    "semi"
).show()

+----------+---+------+----------+----+------+-------------+------------+--------------+----------------+
|department| id|  name|       dob| age|salary|salary_raised|   age_group|       company|min_age_in_group|
+----------+---+------+----------+----+------+-------------+------------+--------------+----------------+
|   Finance|  6| Susan|1989-07-05|  33| 75000|      78750.0|    30 to 40|Abacus Insight|              33|
|        IT|  9| James|1983-10-14|  39| 87000|      91350.0|    30 to 40|Abacus Insight|              33|
|        IT|  1|  John|1992-05-12|  30| 70000|      73500.0|    21 to 30|Abacus Insight|              25|
|   Finance|  2| Alice|1997-02-28|  25| 90000|      94500.0|    21 to 30|Abacus Insight|              25|
|        HR|  2| Alice|1997-02-28|  25| 60000|      63000.0|    21 to 30|Abacus Insight|              25|
|   Finance|  4| Emily|1994-11-22|  28| 65000|      68250.0|    21 to 30|Abacus Insight|              25|
|   Finance|  4| Emily|1994-11-22|  28| 70000|

In [None]:
df_par.join(
    df,
    "department",
    "anti"
).show()

+----------+-------+------+
|department|manager|  lead|
+----------+-------+------+
|  Delivery|   Leon|Louise|
+----------+-------+------+



In [None]:
df = df.alias("dfm")
df_par = df_par.alias("dfp")

In [None]:
df.join(
    df_par,
    F.col("dfm.department") == F.col("dfp.department"),
    "inner"
).show()

+---+------+----------+----+------+----------+-------------+------------+--------------+----------------+----------+-------+--------+
| id|  name|       dob| age|salary|department|salary_raised|   age_group|       company|min_age_in_group|department|manager|    lead|
+---+------+----------+----+------+----------+-------------+------------+--------------+----------------+----------+-------+--------+
|  6| Susan|1989-07-05|  33| 75000|   Finance|      78750.0|    30 to 40|Abacus Insight|              33|   Finance|  Megan|   Molly|
|  9| James|1983-10-14|  39| 87000|        IT|      91350.0|    30 to 40|Abacus Insight|              33|        IT|  Chris|Chandler|
|  1|  John|1992-05-12|  30| 70000|        IT|      73500.0|    21 to 30|Abacus Insight|              25|        IT|  Chris|Chandler|
|  2| Alice|1997-02-28|  25| 90000|   Finance|      94500.0|    21 to 30|Abacus Insight|              25|   Finance|  Megan|   Molly|
|  2| Alice|1997-02-28|  25| 60000|        HR|      63000.0|  

In [None]:
df.show()

+---+------+----------+----+------+----------+-------------+------------+--------------+----------------+
| id|  name|       dob| age|salary|department|salary_raised|   age_group|       company|min_age_in_group|
+---+------+----------+----+------+----------+-------------+------------+--------------+----------------+
|  6| Susan|1989-07-05|  33| 75000|   Finance|      78750.0|    30 to 40|Abacus Insight|              33|
|  9| James|1983-10-14|  39| 87000|        IT|      91350.0|    30 to 40|Abacus Insight|              33|
|  1|  John|1992-05-12|  30| 70000|        IT|      73500.0|    21 to 30|Abacus Insight|              25|
|  2| Alice|1997-02-28|  25| 90000|   Finance|      94500.0|    21 to 30|Abacus Insight|              25|
|  2| Alice|1997-02-28|  25| 60000|        HR|      63000.0|    21 to 30|Abacus Insight|              25|
|  4| Emily|1994-11-22|  28| 65000|   Finance|      68250.0|    21 to 30|Abacus Insight|              25|
|  4| Emily|1994-11-22|  28| 70000|   Finance|

# Load Save Date

In [None]:
df.write.mode("overwrite").format("csv").save('/content/drive/MyDrive/data/result.csv')