In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
spark=SparkSession.builder.appName("spark_learning").getOrCreate()

In [139]:
data = [
    (101, "Red", "Acer", 10, 15.5),
    (102, "Green", "Asus", 20, 18.7),
    (103, "Blue", "Dell", 15, 20.2),
    (104, "Yellow", "HP", 30, 22.1),
    (105, "Purple", "Lenovo", 25, 17.8)
]
columns = ["product_id", "color", "brand", "quantity", "price"]

In [141]:
df=spark.createDataFrame(data, columns)
df.createOrReplaceTempView("Products")

In [142]:
df.show()

+----------+------+------+--------+-----+
|product_id| color| brand|quantity|price|
+----------+------+------+--------+-----+
|       101|   Red|  Acer|      10| 15.5|
|       102| Green|  Asus|      20| 18.7|
|       103|  Blue|  Dell|      15| 20.2|
|       104|Yellow|    HP|      30| 22.1|
|       105|Purple|Lenovo|      25| 17.8|
+----------+------+------+--------+-----+



In [151]:
# Add a new column total_value which is calculated as quantity * price
df.withColumn("total_value", col("quantity") * col("price")).show()


+----------+------+------+--------+-----+-----------+
|product_id| color| brand|quantity|price|total_value|
+----------+------+------+--------+-----+-----------+
|       101|   Red|  Acer|      10| 15.5|      155.0|
|       102| Green|  Asus|      20| 18.7|      374.0|
|       103|  Blue|  Dell|      15| 20.2|      303.0|
|       104|Yellow|    HP|      30| 22.1|      663.0|
|       105|Purple|Lenovo|      25| 17.8|      445.0|
+----------+------+------+--------+-----+-----------+



In [159]:
spark.sql("""
SELECT *, quantity * price AS total_value FROM products
""").show()

+----------+------+------+--------+-----+-----------+
|product_id| color| brand|quantity|price|total_value|
+----------+------+------+--------+-----+-----------+
|       101|   Red|  Acer|      10| 15.5|      155.0|
|       102| Green|  Asus|      20| 18.7|      374.0|
|       103|  Blue|  Dell|      15| 20.2|      303.0|
|       104|Yellow|    HP|      30| 22.1|      663.0|
|       105|Purple|Lenovo|      25| 17.8|      445.0|
+----------+------+------+--------+-----+-----------+



In [160]:
# Rename the product_id column to id.
df.withColumnRenamed("product_id","id").show()

+---+------+------+--------+-----+
| id| color| brand|quantity|price|
+---+------+------+--------+-----+
|101|   Red|  Acer|      10| 15.5|
|102| Green|  Asus|      20| 18.7|
|103|  Blue|  Dell|      15| 20.2|
|104|Yellow|    HP|      30| 22.1|
|105|Purple|Lenovo|      25| 17.8|
+---+------+------+--------+-----+



In [161]:
spark.sql("""select *,product_id as id from Products""").show()

+----------+------+------+--------+-----+---+
|product_id| color| brand|quantity|price| id|
+----------+------+------+--------+-----+---+
|       101|   Red|  Acer|      10| 15.5|101|
|       102| Green|  Asus|      20| 18.7|102|
|       103|  Blue|  Dell|      15| 20.2|103|
|       104|Yellow|    HP|      30| 22.1|104|
|       105|Purple|Lenovo|      25| 17.8|105|
+----------+------+------+--------+-----+---+



In [162]:
# Retrieve rows where the brand is HP.
df.filter(df.brand=="HP").show()

+----------+------+-----+--------+-----+
|product_id| color|brand|quantity|price|
+----------+------+-----+--------+-----+
|       104|Yellow|   HP|      30| 22.1|
+----------+------+-----+--------+-----+



In [164]:
spark.sql("""
select * from Products where brand=="HP"
""").show()

+----------+------+-----+--------+-----+
|product_id| color|brand|quantity|price|
+----------+------+-----+--------+-----+
|       104|Yellow|   HP|      30| 22.1|
+----------+------+-----+--------+-----+



In [175]:
# Sort the rows by quantity in descending order.
df.orderBy(col("quantity").desc()).show()

+----------+------+------+--------+-----+
|product_id| color| brand|quantity|price|
+----------+------+------+--------+-----+
|       104|Yellow|    HP|      30| 22.1|
|       105|Purple|Lenovo|      25| 17.8|
|       102| Green|  Asus|      20| 18.7|
|       103|  Blue|  Dell|      15| 20.2|
|       101|   Red|  Acer|      10| 15.5|
+----------+------+------+--------+-----+



In [176]:
spark.sql("""select * from Products ORDER BY quantity desc""").show()

+----------+------+------+--------+-----+
|product_id| color| brand|quantity|price|
+----------+------+------+--------+-----+
|       104|Yellow|    HP|      30| 22.1|
|       105|Purple|Lenovo|      25| 17.8|
|       102| Green|  Asus|      20| 18.7|
|       103|  Blue|  Dell|      15| 20.2|
|       101|   Red|  Acer|      10| 15.5|
+----------+------+------+--------+-----+



In [178]:
# Add a new column discounted_price as price * 0.9, and then rename quantity to stock_quantity.
df.withColumn("discounted_price", col("price")*0.9). withColumnRenamed("quantity", "stock_quantity").show()

+----------+------+------+--------------+-----+------------------+
|product_id| color| brand|stock_quantity|price|  discounted_price|
+----------+------+------+--------------+-----+------------------+
|       101|   Red|  Acer|            10| 15.5|13.950000000000001|
|       102| Green|  Asus|            20| 18.7|             16.83|
|       103|  Blue|  Dell|            15| 20.2|             18.18|
|       104|Yellow|    HP|            30| 22.1|             19.89|
|       105|Purple|Lenovo|            25| 17.8|             16.02|
+----------+------+------+--------------+-----+------------------+



In [182]:
spark.sql("""
SELECT product_id, color, brand, quantity AS stock_quantity, price, price * 0.9 AS discounted_price FROM products

""").show()

+----------+------+------+--------------+-----+------------------+
|product_id| color| brand|stock_quantity|price|  discounted_price|
+----------+------+------+--------------+-----+------------------+
|       101|   Red|  Acer|            10| 15.5|13.950000000000001|
|       102| Green|  Asus|            20| 18.7|             16.83|
|       103|  Blue|  Dell|            15| 20.2|             18.18|
|       104|Yellow|    HP|            30| 22.1|             19.89|
|       105|Purple|Lenovo|            25| 17.8|             16.02|
+----------+------+------+--------------+-----+------------------+



In [183]:
# Find products with a quantity greater than 20, and sort them by price in ascending order.
df.filter(df.quantity>20).orderBy("price").show()

+----------+------+------+--------+-----+
|product_id| color| brand|quantity|price|
+----------+------+------+--------+-----+
|       105|Purple|Lenovo|      25| 17.8|
|       104|Yellow|    HP|      30| 22.1|
+----------+------+------+--------+-----+



In [184]:
# Retrieve products whose price is between 15 and 20.
df.filter((df.price>=15) & (df.price<=20)).show()

+----------+------+------+--------+-----+
|product_id| color| brand|quantity|price|
+----------+------+------+--------+-----+
|       101|   Red|  Acer|      10| 15.5|
|       102| Green|  Asus|      20| 18.7|
|       105|Purple|Lenovo|      25| 17.8|
+----------+------+------+--------+-----+



In [190]:
#  Add a total_value column, filter for products with total_value > 300, and sort the result by total_value in descending order.

df.withColumn("total_value", col("quantity") * col("price")).filter(col("total_value")>300).orderBy(col("total_value").desc()).show()



+----------+------+------+--------+-----+-----------+
|product_id| color| brand|quantity|price|total_value|
+----------+------+------+--------+-----+-----------+
|       104|Yellow|    HP|      30| 22.1|      663.0|
|       105|Purple|Lenovo|      25| 17.8|      445.0|
|       102| Green|  Asus|      20| 18.7|      374.0|
|       103|  Blue|  Dell|      15| 20.2|      303.0|
+----------+------+------+--------+-----+-----------+

