In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
spark = SparkSession.builder \
    .appName("PySparkPractice") \
    .getOrCreate()

## DataFrame Creation and Basic Operations

In [0]:
employee_data = [
    {"id": 1, "name": "John Doe", "department": "IT", "salary": 75000, "join_date": "2020-01-15"},
    {"id": 2, "name": "Jane Smith", "department": "HR", "salary": 65000, "join_date": "2019-03-22"},
    {"id": 3, "name": "Mike Johnson", "department": "IT", "salary": 80000, "join_date": "2021-07-10"},
    {"id": 4, "name": "Emily Davis", "department": "Finance", "salary": 90000, "join_date": "2018-11-05"},
    {"id": 5, "name": "David Wilson", "department": "HR", "salary": 70000, "join_date": "2020-09-18"}
]

In [0]:
df = spark.createDataFrame(employee_data)

In [0]:
df.printSchema()

root
 |-- department: string (nullable = true)
 |-- id: long (nullable = true)
 |-- join_date: string (nullable = true)
 |-- name: string (nullable = true)
 |-- salary: long (nullable = true)



In [0]:
df.show()

+----------+---+----------+------------+------+
|department| id| join_date|        name|salary|
+----------+---+----------+------------+------+
|        IT|  1|2020-01-15|    John Doe| 75000|
|        HR|  2|2019-03-22|  Jane Smith| 65000|
|        IT|  3|2021-07-10|Mike Johnson| 80000|
|   Finance|  4|2018-11-05| Emily Davis| 90000|
|        HR|  5|2020-09-18|David Wilson| 70000|
+----------+---+----------+------------+------+



In [0]:
df.select(['name','salary']).show()

+------------+------+
|        name|salary|
+------------+------+
|    John Doe| 75000|
|  Jane Smith| 65000|
|Mike Johnson| 80000|
| Emily Davis| 90000|
|David Wilson| 70000|
+------------+------+



In [0]:
df.select(df.name, df.salary).show()

+------------+------+
|        name|salary|
+------------+------+
|    John Doe| 75000|
|  Jane Smith| 65000|
|Mike Johnson| 80000|
| Emily Davis| 90000|
|David Wilson| 70000|
+------------+------+



In [0]:
df.filter(col("salary")>70000).show()

+----------+---+----------+------------+------+
|department| id| join_date|        name|salary|
+----------+---+----------+------------+------+
|        IT|  1|2020-01-15|    John Doe| 75000|
|        IT|  3|2021-07-10|Mike Johnson| 80000|
|   Finance|  4|2018-11-05| Emily Davis| 90000|
+----------+---+----------+------------+------+



In [0]:
df.withColumn('bonus', col('salary')*0.1).show()

+----------+---+----------+------------+------+------+
|department| id| join_date|        name|salary| bonus|
+----------+---+----------+------------+------+------+
|        IT|  1|2020-01-15|    John Doe| 75000|7500.0|
|        HR|  2|2019-03-22|  Jane Smith| 65000|6500.0|
|        IT|  3|2021-07-10|Mike Johnson| 80000|8000.0|
|   Finance|  4|2018-11-05| Emily Davis| 90000|9000.0|
|        HR|  5|2020-09-18|David Wilson| 70000|7000.0|
+----------+---+----------+------------+------+------+



In [0]:
df.agg({'salary':'avg'}).show()

+-----------+
|avg(salary)|
+-----------+
|    76000.0|
+-----------+



In [0]:
df.groupBy('department').avg('salary').show()

+----------+-----------+
|department|avg(salary)|
+----------+-----------+
|        IT|    77500.0|
|        HR|    67500.0|
|   Finance|    90000.0|
+----------+-----------+



## Data Cleaning and Transformation

In [0]:
customer_data = [
    {"cust_id": 1001, "name": "Alice", "email": "alice@example.com", "age": 32, "city": "New York", "last_purchase": "2023-05-15"},
    {"cust_id": 1002, "name": "Bob", "email": None, "age": 45, "city": "Chicago", "last_purchase": "2023-06-22"},
    {"cust_id": 1003, "name": "Charlie", "email": "charlie@example.com", "age": None, "city": "Boston", "last_purchase": "2023-04-10"},
    {"cust_id": 1004, "name": None, "email": "dave@example.com", "age": 29, "city": "New York", "last_purchase": None},
    {"cust_id": 1005, "name": "Eve", "email": "eve@example.com", "age": 35, "city": "San Francisco", "last_purchase": "2023-07-01"}
]

In [0]:
customer_df = spark.createDataFrame(customer_data)

In [0]:
customer_df.show()

+----+-------------+-------+-------------------+-------------+-------+
| age|         city|cust_id|              email|last_purchase|   name|
+----+-------------+-------+-------------------+-------------+-------+
|  32|     New York|   1001|  alice@example.com|   2023-05-15|  Alice|
|  45|      Chicago|   1002|               NULL|   2023-06-22|    Bob|
|NULL|       Boston|   1003|charlie@example.com|   2023-04-10|Charlie|
|  29|     New York|   1004|   dave@example.com|         NULL|   NULL|
|  35|San Francisco|   1005|    eve@example.com|   2023-07-01|    Eve|
+----+-------------+-------+-------------------+-------------+-------+



In [0]:
customer_df.na.drop().show()

+---+-------------+-------+-----------------+-------------+-----+
|age|         city|cust_id|            email|last_purchase| name|
+---+-------------+-------+-----------------+-------------+-----+
| 32|     New York|   1001|alice@example.com|   2023-05-15|Alice|
| 35|San Francisco|   1005|  eve@example.com|   2023-07-01|  Eve|
+---+-------------+-------+-----------------+-------------+-----+



In [0]:
customers_filled = customer_df.na.fill({'email': 'no_email@example.com'})
customers_filled.show()

+----+-------------+-------+--------------------+-------------+-------+
| age|         city|cust_id|               email|last_purchase|   name|
+----+-------------+-------+--------------------+-------------+-------+
|  32|     New York|   1001|   alice@example.com|   2023-05-15|  Alice|
|  45|      Chicago|   1002|no_email@example.com|   2023-06-22|    Bob|
|NULL|       Boston|   1003| charlie@example.com|   2023-04-10|Charlie|
|  29|     New York|   1004|    dave@example.com|         NULL|   NULL|
|  35|San Francisco|   1005|     eve@example.com|   2023-07-01|    Eve|
+----+-------------+-------+--------------------+-------------+-------+



In [0]:
customers_clean = customers_filled.filter(col('name').isNotNull())
customers_clean.show()

+----+-------------+-------+--------------------+-------------+-------+
| age|         city|cust_id|               email|last_purchase|   name|
+----+-------------+-------+--------------------+-------------+-------+
|  32|     New York|   1001|   alice@example.com|   2023-05-15|  Alice|
|  45|      Chicago|   1002|no_email@example.com|   2023-06-22|    Bob|
|NULL|       Boston|   1003| charlie@example.com|   2023-04-10|Charlie|
|  35|San Francisco|   1005|     eve@example.com|   2023-07-01|    Eve|
+----+-------------+-------+--------------------+-------------+-------+



In [0]:
avg_age = customers_clean.select(avg('age')).collect()[0][0]


In [0]:
customers_clean = customers_clean.na.fill({'age': avg_age})
customers_clean.show()

+---+-------------+-------+--------------------+-------------+-------+
|age|         city|cust_id|               email|last_purchase|   name|
+---+-------------+-------+--------------------+-------------+-------+
| 32|     New York|   1001|   alice@example.com|   2023-05-15|  Alice|
| 45|      Chicago|   1002|no_email@example.com|   2023-06-22|    Bob|
| 37|       Boston|   1003| charlie@example.com|   2023-04-10|Charlie|
| 35|San Francisco|   1005|     eve@example.com|   2023-07-01|    Eve|
+---+-------------+-------+--------------------+-------------+-------+



In [0]:
customers_clean.orderBy('last_purchase').show()

+---+-------------+-------+--------------------+-------------+-------+
|age|         city|cust_id|               email|last_purchase|   name|
+---+-------------+-------+--------------------+-------------+-------+
| 37|       Boston|   1003| charlie@example.com|   2023-04-10|Charlie|
| 32|     New York|   1001|   alice@example.com|   2023-05-15|  Alice|
| 45|      Chicago|   1002|no_email@example.com|   2023-06-22|    Bob|
| 35|San Francisco|   1005|     eve@example.com|   2023-07-01|    Eve|
+---+-------------+-------+--------------------+-------------+-------+



## Convert to date type

In [0]:
customers_clean.printSchema()

root
 |-- age: long (nullable = true)
 |-- city: string (nullable = true)
 |-- cust_id: long (nullable = true)
 |-- email: string (nullable = false)
 |-- last_purchase: string (nullable = true)
 |-- name: string (nullable = true)



In [0]:
customers_clean = customers_clean.withColumn(
    'last_purchase',
    to_date(col('last_purchase'), 'yyyy-MM-dd')
)
customers_clean.show()

+---+-------------+-------+--------------------+-------------+-------+
|age|         city|cust_id|               email|last_purchase|   name|
+---+-------------+-------+--------------------+-------------+-------+
| 32|     New York|   1001|   alice@example.com|   2023-05-15|  Alice|
| 45|      Chicago|   1002|no_email@example.com|   2023-06-22|    Bob|
| 37|       Boston|   1003| charlie@example.com|   2023-04-10|Charlie|
| 35|San Francisco|   1005|     eve@example.com|   2023-07-01|    Eve|
+---+-------------+-------+--------------------+-------------+-------+



In [0]:
customers_clean.printSchema()

root
 |-- age: long (nullable = true)
 |-- city: string (nullable = true)
 |-- cust_id: long (nullable = true)
 |-- email: string (nullable = false)
 |-- last_purchase: date (nullable = true)
 |-- name: string (nullable = true)

