Data Preparation

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder \
    .appName("Practice") \
    .enableHiveSupport() \
    .getOrCreate()

customers_data = [
    (101, 'Ali', 'ali@gmail.com', 'Mumbai', '2022-05-10'),
    (102, 'Neha', 'neha@yahoo.com', 'Delhi', '2023-01-15'),
    (103, 'Ravi', 'ravi@hotmail.com', 'Bangalore', '2021-11-01'),
    (104, 'Sneha', 'sneha@outlook.com', 'Hyderabad', '2020-07-22'),
    (105, 'Amit', 'amit@gmail.com', 'Chennai', '2023-03-10'),
]
orders_data = [
    (1, 101, 'Laptop', 'Electronics', 2, 50000.0, '2024-01-10'),
    (2, 101, 'Mouse', 'Electronics', 1, 1200.0, '2024-01-15'),
    (3, 102, 'Tablet', 'Electronics', 1, 20000.0, '2024-02-01'),
    (4, 103, 'Bookshelf', 'Furniture', 1, 3500.0, '2024-02-10'),
    (5, 104, 'Mixer', 'Appliances', 1, 5000.0, '2024-02-15'),
    (6, 105, 'Notebook', 'Stationery', 5, 500.0, '2024-03-01'),
    (7, 102, 'Phone', 'Electronics', 1, 30000.0, '2024-03-02'),
]

customers_df = spark.createDataFrame(customers_data, ["CustomerID", "Name", "Email", "City", "SignupDate"])
orders_df = spark.createDataFrame(orders_data, ["OrderID", "CustomerID", "Product", "Category", "Quantity", "Price", "OrderDate"])

spark.sql("CREATE DATABASE IF NOT EXISTS sales")

customers_df.write.mode("overwrite").saveAsTable("sales.customers")
orders_df.write.mode("overwrite").saveAsTable("sales.orders")


SECTION A: PySpark DataFrame Tasks

1.

In [3]:
orders_df = orders_df.withColumn("TotalAmount", col("Price") * col("Quantity"))


2.

In [4]:
orders_df.filter(col("TotalAmount") > 10000).show()


+-------+----------+-------+-----------+--------+-------+----------+-----------+
|OrderID|CustomerID|Product|   Category|Quantity|  Price| OrderDate|TotalAmount|
+-------+----------+-------+-----------+--------+-------+----------+-----------+
|      1|       101| Laptop|Electronics|       2|50000.0|2024-01-10|   100000.0|
|      3|       102| Tablet|Electronics|       1|20000.0|2024-02-01|    20000.0|
|      7|       102|  Phone|Electronics|       1|30000.0|2024-03-02|    30000.0|
+-------+----------+-------+-----------+--------+-------+----------+-----------+



3.

In [5]:
customers_df = customers_df.withColumn("City", lower(col("City")))


4.

In [6]:
orders_df = orders_df.withColumn("OrderYear", year(to_date("OrderDate")))


5.

In [7]:
customers_df = customers_df.fillna({"Email": "unknown@example.com"})


6.

In [8]:
orders_df = orders_df.withColumn(
    "AmountCategory",
    when(col("TotalAmount") < 5000, "Low")
    .when(col("TotalAmount").between(5000, 20000), "Medium")
    .otherwise("High")
)


SECTION B: Spark SQL Tasks

In [9]:
orders_df.createOrReplaceTempView("orders")
customers_df.createOrReplaceTempView("customers")


7.

In [11]:
spark.sql("""
    SELECT o.*
    FROM orders o
    JOIN customers c ON o.CustomerID = c.CustomerID
    WHERE c.Name = 'Ali'
""").show()


+-------+----------+-------+-----------+--------+-------+----------+-----------+---------+--------------+
|OrderID|CustomerID|Product|   Category|Quantity|  Price| OrderDate|TotalAmount|OrderYear|AmountCategory|
+-------+----------+-------+-----------+--------+-------+----------+-----------+---------+--------------+
|      1|       101| Laptop|Electronics|       2|50000.0|2024-01-10|   100000.0|     2024|          High|
|      2|       101|  Mouse|Electronics|       1| 1200.0|2024-01-15|     1200.0|     2024|           Low|
+-------+----------+-------+-----------+--------+-------+----------+-----------+---------+--------------+



8.

In [12]:
spark.sql("""
    SELECT CustomerID, SUM(Price * Quantity) AS TotalSpending
    FROM orders
    GROUP BY CustomerID
""").show()


+----------+-------------+
|CustomerID|TotalSpending|
+----------+-------------+
|       101|     101200.0|
|       102|      50000.0|
|       103|       3500.0|
|       104|       5000.0|
|       105|       2500.0|
+----------+-------------+



9.

In [13]:
spark.sql("""
    SELECT Category, SUM(Price * Quantity) AS Revenue
    FROM orders
    GROUP BY Category
    ORDER BY Revenue DESC
    LIMIT 1
""").show()


+-----------+--------+
|   Category| Revenue|
+-----------+--------+
|Electronics|151200.0|
+-----------+--------+



10.

In [14]:
customer_orders_df = spark.sql("""
    SELECT c.Name AS CustomerName,
           o.Product,
           o.Price * o.Quantity AS TotalAmount
    FROM orders o
    JOIN customers c ON o.CustomerID = c.CustomerID
""")
customer_orders_df.createOrReplaceTempView("customer_orders")


11.

In [15]:
spark.sql("""
    SELECT *
    FROM customer_orders
    JOIN orders o ON customer_orders.Product = o.Product
    WHERE to_date(o.OrderDate) > '2024-02-29'
""").show()


+------------+--------+-----------+-------+----------+--------+-----------+--------+-------+----------+-----------+---------+--------------+
|CustomerName| Product|TotalAmount|OrderID|CustomerID| Product|   Category|Quantity|  Price| OrderDate|TotalAmount|OrderYear|AmountCategory|
+------------+--------+-----------+-------+----------+--------+-----------+--------+-------+----------+-----------+---------+--------------+
|        Neha|   Phone|    30000.0|      7|       102|   Phone|Electronics|       1|30000.0|2024-03-02|    30000.0|     2024|          High|
|        Amit|Notebook|     2500.0|      6|       105|Notebook| Stationery|       5|  500.0|2024-03-01|     2500.0|     2024|           Low|
+------------+--------+-----------+-------+----------+--------+-----------+--------+-------+----------+-----------+---------+--------------+



SECTION C: Advanced Practice

12.

In [16]:
customers_df.createOrReplaceGlobalTempView("customers")

spark.sql("""
    SELECT *
    FROM global_temp.customers
    WHERE City = 'mumbai'
""").show()


+----------+----+-------------+------+----------+
|CustomerID|Name|        Email|  City|SignupDate|
+----------+----+-------------+------+----------+
|       101| Ali|ali@gmail.com|mumbai|2022-05-10|
+----------+----+-------------+------+----------+



13.

In [17]:
orders_df.write.mode("overwrite").parquet("/tmp/orders_with_totalamount.parquet")


14.

In [18]:
parquet_df = spark.read.parquet("/tmp/orders_with_totalamount.parquet")
parquet_df.count()


7

SECTION D: UDF + Built-in Function Tasks

15.

In [19]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

def mask_email(email):
    if '@' in email:
        user, domain = email.split('@')
        return user[0] + '***@' + domain
    return email

mask_udf = udf(mask_email, StringType())

customers_df = customers_df.withColumn("MaskedEmail", mask_udf("Email"))
customers_df.select("Email", "MaskedEmail").show()


+-----------------+----------------+
|            Email|     MaskedEmail|
+-----------------+----------------+
|    ali@gmail.com|  a***@gmail.com|
|   neha@yahoo.com|  n***@yahoo.com|
| ravi@hotmail.com|r***@hotmail.com|
|sneha@outlook.com|s***@outlook.com|
|   amit@gmail.com|  a***@gmail.com|
+-----------------+----------------+



16.

In [20]:
from pyspark.sql.functions import concat_ws

customers_df = customers_df.withColumn("Label", concat_ws(" from ", "Name", "City"))
customers_df.select("Label").show()


+--------------------+
|               Label|
+--------------------+
|     Ali from mumbai|
|     Neha from delhi|
| Ravi from bangalore|
|Sneha from hyderabad|
|   Amit from chennai|
+--------------------+



17.

In [21]:
from pyspark.sql.functions import regexp_replace

orders_df = orders_df.withColumn("CleanProduct", regexp_replace("Product", "[^a-zA-Z0-9 ]", ""))
orders_df.select("Product", "CleanProduct").show()


+---------+------------+
|  Product|CleanProduct|
+---------+------------+
|   Laptop|      Laptop|
|    Mouse|       Mouse|
|   Tablet|      Tablet|
|Bookshelf|   Bookshelf|
|    Mixer|       Mixer|
| Notebook|    Notebook|
|    Phone|       Phone|
+---------+------------+



18.

In [22]:
from pyspark.sql.functions import to_date, datediff, current_date

customers_df = customers_df.withColumn("SignupDate", to_date("SignupDate"))
customers_df = customers_df.withColumn("CustomerAgeDays", datediff(current_date(), "SignupDate"))
customers_df.select("Name", "CustomerAgeDays").show()


+-----+---------------+
| Name|CustomerAgeDays|
+-----+---------------+
|  Ali|           1121|
| Neha|            871|
| Ravi|           1311|
|Sneha|           1778|
| Amit|            817|
+-----+---------------+

