In [110]:
# installing java as spark requires Java Virtual Machine (JVM)
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# downloading apache spark with hadoop (it will be in a .zip format)
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz

# unziping the .zip file
!tar xf spark-3.1.1-bin-hadoop3.2.tgz

In [111]:
# installing findspark
# it provides findspark.init() to make pyspark importable as a regular library
!pip install -q findspark

# setting the environmental path for both Java And Spark to make sure that PySpark will run on Google Colab
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"
import findspark
findspark.init()

In [112]:
# creating a spark session that will be used to perform all the necessary task on Spark
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").appName('pySpark_rr').getOrCreate()
spark

In [113]:
# for creating my own dataframe, i'm creating a few columns
import pandas as pd
import random as rd
import datetime

# 1. creating 1000 product id's
id = [i for i in range(1, 1001)]
'''
above statement is same as
id = []
for i in range(1, 1001):
  id.append(i)
'''

# 2. generating prices for the products
prices = []
for i in range(1, 1001):
  prices.append(rd.randint(500, 2500))
'''
above statement is same as
prices = [rd.randint(500, 2500) for i in range(1, 1001)]
'''

# 3. adding a currency to make it a string
for i , p in enumerate(prices):
  prices[i] = '₹ ' + str(p)
'''
above statement is same as
for i in range(len(prices)):
  prices[i] = '₹ ' + str(prices[i])
'''

# 4. generating the product status
statuses = ['shipped', 'out for delivery', 'not shipped', 'out of stock', '']
status = [rd.choice(statuses) for i in range(1, 1001)]
'''
above statement is same as
status = []
for i in range(1, 1001):
  status.append(rd.choice(statuses))
'''

# 5. generating product names
products = ['', 'Console', 'PS', 'Book', 'Watch', 'Health-Kit', 'Ticket']
product = [rd.choice(products) for p in range(1000)]
'''
above statement is same as
product = []
for i in range(1, 1001):
  product.append(rd.choice(products))
'''

# 5. generating product order dates
endDate = datetime.date.today()
startDate = endDate - datetime.timedelta(days=1000)
orderDate = []

for i in range(1000):
  randomDate = startDate + datetime.timedelta(days = rd.randint(0, 1000))
  orderDate.append(randomDate.strftime("%Y-%m-%d"))

# printing a random info
print('id: ', id[40])
print('Date: ', orderDate[40])
print('Name: ', product[40])
print('Price: ', prices[40])
print('Status: ', status[40])

id:  41
Date:  2024-04-05
Name:  Console
Price:  ₹ 1747
Status:  shipped


In [114]:
# combining now to create a pandas dataframe
data = {
    'Product_ID': id,
    'Product_Name': product,
    'Order_Date': orderDate,
    'Order_Price': prices,
    'Order_Status': status
}

df = pd.DataFrame(data)
df.head()

Unnamed: 0,Product_ID,Product_Name,Order_Date,Order_Price,Order_Status
0,1,Book,2023-06-29,₹ 514,out of stock
1,2,Ticket,2021-10-28,₹ 2347,out of stock
2,3,Health-Kit,2021-07-10,₹ 1215,shipped
3,4,Watch,2021-09-17,₹ 1566,
4,5,Console,2022-09-24,₹ 2037,out for delivery


In [115]:
# saving it into csv to be used for spark operation
df.to_csv('myDATA.csv', index = False)

**Let's Spark Now :)**

In [116]:
# as we have alreading created a spark session, lets move forward
# reading the csv file and loading it into a spark dataframe

productDF = spark.read.csv('myDATA.csv', header = True, inferSchema = True)

In [117]:
# lets see the schema of the spark dataframe
productDF.printSchema()

root
 |-- Product_ID: integer (nullable = true)
 |-- Product_Name: string (nullable = true)
 |-- Order_Date: string (nullable = true)
 |-- Order_Price: string (nullable = true)
 |-- Order_Status: string (nullable = true)



In [118]:
# lets see top 10 rows
productDF.show(10)

+----------+------------+----------+-----------+----------------+
|Product_ID|Product_Name|Order_Date|Order_Price|    Order_Status|
+----------+------------+----------+-----------+----------------+
|         1|        Book|2023-06-29|      ₹ 514|    out of stock|
|         2|      Ticket|2021-10-28|     ₹ 2347|    out of stock|
|         3|  Health-Kit|2021-07-10|     ₹ 1215|         shipped|
|         4|       Watch|2021-09-17|     ₹ 1566|            null|
|         5|     Console|2022-09-24|     ₹ 2037|out for delivery|
|         6|          PS|2024-01-13|     ₹ 1752|     not shipped|
|         7|      Ticket|2022-02-21|     ₹ 2338|            null|
|         8|        null|2023-02-11|     ₹ 1585|out for delivery|
|         9|       Watch|2021-07-31|      ₹ 769|out for delivery|
|        10|        null|2022-07-05|     ₹ 2493|    out of stock|
+----------+------------+----------+-----------+----------------+
only showing top 10 rows



In [119]:
# as we saw that order price is in string format, thus we cant perform any mathematical operation on that column
# therefore, we have to create a new column which only consists of numbers and is in form of an integer
# Data pre-processing
productDF = productDF.withColumn('Product_price', productDF.Order_Price.substr(2, 10).cast('int'))

In [120]:
# lets check the data now
productDF.show(5)

+----------+------------+----------+-----------+----------------+-------------+
|Product_ID|Product_Name|Order_Date|Order_Price|    Order_Status|Product_price|
+----------+------------+----------+-----------+----------------+-------------+
|         1|        Book|2023-06-29|      ₹ 514|    out of stock|          514|
|         2|      Ticket|2021-10-28|     ₹ 2347|    out of stock|         2347|
|         3|  Health-Kit|2021-07-10|     ₹ 1215|         shipped|         1215|
|         4|       Watch|2021-09-17|     ₹ 1566|            null|         1566|
|         5|     Console|2022-09-24|     ₹ 2037|out for delivery|         2037|
+----------+------------+----------+-----------+----------------+-------------+
only showing top 5 rows



In [121]:
# now if you don't like python, pandas then you can leverage the power of pySpark to write SQL
productDF.registerTempTable('productTable')

In [122]:
# lets query the table
qry = spark.sql('select * from productTable limit 5')
qry.show()

+----------+------------+----------+-----------+----------------+-------------+
|Product_ID|Product_Name|Order_Date|Order_Price|    Order_Status|Product_price|
+----------+------------+----------+-----------+----------------+-------------+
|         1|        Book|2023-06-29|      ₹ 514|    out of stock|          514|
|         2|      Ticket|2021-10-28|     ₹ 2347|    out of stock|         2347|
|         3|  Health-Kit|2021-07-10|     ₹ 1215|         shipped|         1215|
|         4|       Watch|2021-09-17|     ₹ 1566|            null|         1566|
|         5|     Console|2022-09-24|     ₹ 2037|out for delivery|         2037|
+----------+------------+----------+-----------+----------------+-------------+



**Check how many distinct products exist in our data**
**Also, list them**

In [123]:
from pyspark.sql.functions import *
# using pyspark
distinctNames = productDF.filter(productDF["Product_Name"].isNotNull())
print("The number of distinct products are:", distinctNames.select(countDistinct("Product_Name")).collect()[0][0])
print("The distinct products are:")
distinct_product_names = distinctNames.select("Product_Name").distinct().show()

The number of distinct products are: 6
The distinct products are:
+------------+
|Product_Name|
+------------+
|          PS|
|      Ticket|
|        Book|
|     Console|
|  Health-Kit|
|       Watch|
+------------+



In [124]:
# using SQL

qry = spark.sql('SELECT COUNT(DISTINCT Product_Name) AS PRODUCTS FROM productTable WHERE PRODUCT_NAME IS NOT NULL')
print("The number of distinct products are:")
qry.show()

qry = spark.sql('SELECT DISTINCT Product_Name FROM productTable WHERE PRODUCT_NAME IS NOT NULL')
print("The distinct products are:")
qry.show()

The number of distinct products are:
+--------+
|PRODUCTS|
+--------+
|       6|
+--------+

The distinct products are:
+------------+
|Product_Name|
+------------+
|          PS|
|      Ticket|
|        Book|
|     Console|
|  Health-Kit|
|       Watch|
+------------+



In [125]:
# let's describe the df
productDF.describe().show()

+-------+-----------------+------------+----------+-----------+------------+-----------------+
|summary|       Product_ID|Product_Name|Order_Date|Order_Price|Order_Status|    Product_price|
+-------+-----------------+------------+----------+-----------+------------+-----------------+
|  count|             1000|         872|      1000|       1000|         818|             1000|
|   mean|            500.5|        null|      null|       null|        null|         1489.867|
| stddev|288.8194360957494|        null|      null|       null|        null|583.4242462926633|
|    min|                1|        Book|2021-07-10|     ₹ 1000| not shipped|              502|
|    max|             1000|       Watch|2024-04-05|      ₹ 997|     shipped|             2498|
+-------+-----------------+------------+----------+-----------+------------+-----------------+



In [126]:
# there may be a situation that the price column might contain null values
# let's replace the null values with the mean value of the price
avgPrice = productDF.select(mean('Product_price')).collect()[0][0]
productDF = productDF.fillna(avgPrice, subset = ['Product_price'])
productDF.show(5)

+----------+------------+----------+-----------+----------------+-------------+
|Product_ID|Product_Name|Order_Date|Order_Price|    Order_Status|Product_price|
+----------+------------+----------+-----------+----------------+-------------+
|         1|        Book|2023-06-29|      ₹ 514|    out of stock|          514|
|         2|      Ticket|2021-10-28|     ₹ 2347|    out of stock|         2347|
|         3|  Health-Kit|2021-07-10|     ₹ 1215|         shipped|         1215|
|         4|       Watch|2021-09-17|     ₹ 1566|            null|         1566|
|         5|     Console|2022-09-24|     ₹ 2037|out for delivery|         2037|
+----------+------------+----------+-----------+----------------+-------------+
only showing top 5 rows



In [127]:
'''now we just received a news that the rows where product name and order status is null,
those rows should not be considered into our analysis'''
print("the number of rows before filtering:", productDF.count())
productDF = productDF.filter((productDF["Product_Name"].isNotNull()) & (productDF["Order_Status"].isNotNull()))
print("the number of rows after filtering:", productDF.count())


the number of rows before filtering: 1000
the number of rows after filtering: 707


In [128]:
# you can see, we have removed 293 irrelevant rows
# as we have made changes to our dataframe, we have to change our sql Table as well.
productDF.registerTempTable('productTable')

**now lets find out the total order price for all the products and order it by total price in descending order**

In [129]:
# using pyspark
tempDF = productDF.groupBy('Product_name').agg(sum('Product_Price').alias('Total_Price'))
tempDF.sort(col('Total_Price').desc()).show()
'''
works same as above
qry_df = productDF.groupBy("Product_name").agg(sum("Product_Price").alias("Total_Price")).orderBy("Total_Price", ascending=False)
qry_df.show()
'''

+------------+-----------+
|Product_name|Total_Price|
+------------+-----------+
|      Ticket|     191118|
|       Watch|     178711|
|        Book|     173077|
|  Health-Kit|     172519|
|     Console|     168919|
|          PS|     166459|
+------------+-----------+



'\nworks same as above\nqry_df = productDF.groupBy("Product_name").agg(sum("Product_Price").alias("Total_Price")).orderBy("Total_Price", ascending=False)\nqry_df.show()\n'

In [130]:
# using sql
qry = spark.sql('SELECT Product_Name, SUM(Product_price) AS Total_Price FROM productTable GROUP BY 1 ORDER BY 2 DESC')
qry.show()

+------------+-----------+
|Product_Name|Total_Price|
+------------+-----------+
|      Ticket|     191118|
|       Watch|     178711|
|        Book|     173077|
|  Health-Kit|     172519|
|     Console|     168919|
|          PS|     166459|
+------------+-----------+



**lets find out distribution of order status, order by order status in ascending.**

**(round distribution to 2 decimal)**

In [131]:
tempDF = productDF.groupBy('Order_Status').agg(
    round(((count("*")/productDF.count())*100), 2).alias('Distribution')
)
tempDF.orderBy("Order_Status", ascending=True).show()

+----------------+------------+
|    Order_Status|Distribution|
+----------------+------------+
|     not shipped|       24.61|
|out for delivery|        27.3|
|    out of stock|       25.88|
|         shipped|       22.21|
+----------------+------------+



In [132]:
# using sql
qry = spark.sql("""
    SELECT Order_Status,
      ROUND((COUNT(Order_Status)/(SELECT COUNT(Order_Status) From productTable))*100, 2) AS Distribution
    FROM productTable
    GROUP BY 1
    ORDER BY 1
""")
qry.show()

+----------------+------------+
|    Order_Status|Distribution|
+----------------+------------+
|     not shipped|       24.61|
|out for delivery|        27.3|
|    out of stock|       25.88|
|         shipped|       22.21|
+----------------+------------+



**adding a new column (Priority): if price > 2000, then flag the column as Yes otherwise No**

In [133]:
# using spark
tempDF = productDF.withColumn('Priority', when(productDF.Product_price > 2000, 'Yes').otherwise('No'))
productDF.show(10)

+----------+------------+----------+-----------+----------------+-------------+
|Product_ID|Product_Name|Order_Date|Order_Price|    Order_Status|Product_price|
+----------+------------+----------+-----------+----------------+-------------+
|         1|        Book|2023-06-29|      ₹ 514|    out of stock|          514|
|         2|      Ticket|2021-10-28|     ₹ 2347|    out of stock|         2347|
|         3|  Health-Kit|2021-07-10|     ₹ 1215|         shipped|         1215|
|         5|     Console|2022-09-24|     ₹ 2037|out for delivery|         2037|
|         6|          PS|2024-01-13|     ₹ 1752|     not shipped|         1752|
|         9|       Watch|2021-07-31|      ₹ 769|out for delivery|          769|
|        11|     Console|2021-10-30|     ₹ 2025|         shipped|         2025|
|        12|          PS|2023-10-09|      ₹ 881|    out of stock|          881|
|        13|       Watch|2022-11-18|      ₹ 970|     not shipped|          970|
|        14|      Ticket|2022-12-03|    

In [134]:
# using sql
# here we can't add it to the actual table, but we can still query it
qry = spark.sql("""
    SELECT *,
      CASE WHEN product_price > 2000 THEN 'Yes' Else 'No'
      END AS Priority
    FROM productTable
""")
qry.show(10)

+----------+------------+----------+-----------+----------------+-------------+--------+
|Product_ID|Product_Name|Order_Date|Order_Price|    Order_Status|Product_price|Priority|
+----------+------------+----------+-----------+----------------+-------------+--------+
|         1|        Book|2023-06-29|      ₹ 514|    out of stock|          514|      No|
|         2|      Ticket|2021-10-28|     ₹ 2347|    out of stock|         2347|     Yes|
|         3|  Health-Kit|2021-07-10|     ₹ 1215|         shipped|         1215|      No|
|         5|     Console|2022-09-24|     ₹ 2037|out for delivery|         2037|     Yes|
|         6|          PS|2024-01-13|     ₹ 1752|     not shipped|         1752|      No|
|         9|       Watch|2021-07-31|      ₹ 769|out for delivery|          769|      No|
|        11|     Console|2021-10-30|     ₹ 2025|         shipped|         2025|     Yes|
|        12|          PS|2023-10-09|      ₹ 881|    out of stock|          881|      No|
|        13|       Wa

**avg product price based on year**

In [135]:
productDF.select('order_date').show(5)

+----------+
|order_date|
+----------+
|2023-06-29|
|2021-10-28|
|2021-07-10|
|2022-09-24|
|2024-01-13|
+----------+
only showing top 5 rows



In [138]:
# using pyspark
tempDF = productDF.withColumn('Ord_Date', to_date(col('Order_Date')))
tempDF = tempDF.groupBy(year('Ord_Date').alias('Year')).agg(avg('Product_price').alias('avg_price'))
tempDF.show()

+----+------------------+
|Year|         avg_price|
+----+------------------+
|2023|1515.1433823529412|
|2022|1415.9473684210527|
|2024|1534.0243902439024|
|2021|          1520.464|
+----+------------------+



In [139]:
# using sql
qry = spark.sql("""
    SELECT YEAR(order_date) AS year,
      AVG(Product_Price) AS avg_price
    FROM productTable
    GROUP BY 1
""")
qry.show()

+----+------------------+
|year|         avg_price|
+----+------------------+
|2023|1515.1433823529412|
|2022|1415.9473684210527|
|2024|1534.0243902439024|
|2021|          1520.464|
+----+------------------+



**show the most products for all the order status**

**(TOUGH ONE)**

In [145]:
from pyspark.sql.window import Window

order_product_group = productDF.groupBy("Order_Status", "Product_Name").count()
window_ = Window.partitionBy('Order_Status').orderBy(desc('count'))

order_product_group = order_product_group.withColumn("rank", row_number().over(window_))

order_product_group = order_product_group.filter("rank = 1")

order_product_group.select(['Order_Status', 'Product_Name', 'count']).show()

+----------------+------------+-----+
|    Order_Status|Product_Name|count|
+----------------+------------+-----+
|    out of stock|       Watch|   42|
|         shipped|      Ticket|   34|
|     not shipped|      Ticket|   37|
|out for delivery|          PS|   35|
+----------------+------------+-----+



In [140]:
# using sql
qry = spark.sql("""
    SELECT Order_Status, Product_Name, count,
           ROW_NUMBER() OVER (PARTITION BY Order_Status ORDER BY count DESC) AS rank
    FROM (
        SELECT Order_Status, Product_Name, COUNT(*) AS count
        FROM productTable
        GROUP BY Order_Status, Product_Name
    ) temp
""")
sql_df = qry.filter("rank = 1")
sql_df.select(['Order_Status', 'Product_Name', 'count']).show()

+----------------+------------+-----+
|    Order_Status|Product_Name|count|
+----------------+------------+-----+
|    out of stock|       Watch|   42|
|         shipped|      Ticket|   34|
|     not shipped|      Ticket|   37|
|out for delivery|          PS|   35|
+----------------+------------+-----+



In [163]:
# using sql
qry = spark.sql("""
    SELECT *,
      ROW_NUMBER() OVER (PARTITION BY Order_Status ORDER BY count DESC) as rn
    FROM (
        SELECT Order_Status, Product_Name, COUNT(*) AS count
        FROM productTable
        GROUP BY Order_Status, Product_Name
        order by 1, 3 desc
    ) temp
""")
qry.show()

+----------------+------------+-----+---+
|    Order_Status|Product_Name|count| rn|
+----------------+------------+-----+---+
|    out of stock|       Watch|   42|  1|
|    out of stock|  Health-Kit|   33|  2|
|    out of stock|      Ticket|   31|  3|
|    out of stock|          PS|   28|  4|
|    out of stock|        Book|   25|  5|
|    out of stock|     Console|   24|  6|
|         shipped|      Ticket|   34|  1|
|         shipped|  Health-Kit|   29|  2|
|         shipped|        Book|   28|  3|
|         shipped|     Console|   28|  4|
|         shipped|       Watch|   20|  5|
|         shipped|          PS|   18|  6|
|     not shipped|      Ticket|   37|  1|
|     not shipped|        Book|   32|  2|
|     not shipped|          PS|   31|  3|
|     not shipped|  Health-Kit|   29|  4|
|     not shipped|       Watch|   23|  5|
|     not shipped|     Console|   22|  6|
|out for delivery|          PS|   35|  1|
|out for delivery|     Console|   34|  2|
+----------------+------------+---

In [164]:
# lets see how our dataframe looks
productDF.show(5)

+----------+------------+----------+-----------+----------------+-------------+
|Product_ID|Product_Name|Order_Date|Order_Price|    Order_Status|Product_price|
+----------+------------+----------+-----------+----------------+-------------+
|         1|        Book|2023-06-29|      ₹ 514|    out of stock|          514|
|         2|      Ticket|2021-10-28|     ₹ 2347|    out of stock|         2347|
|         3|  Health-Kit|2021-07-10|     ₹ 1215|         shipped|         1215|
|         5|     Console|2022-09-24|     ₹ 2037|out for delivery|         2037|
|         6|          PS|2024-01-13|     ₹ 1752|     not shipped|         1752|
+----------+------------+----------+-----------+----------------+-------------+
only showing top 5 rows



In [177]:
'''
for this part, this much is enough
lets export our park dataframe into a csv so that it can be used for the next time
'''
productPD = productDF.toPandas()
productPD.to_csv('products.csv', header = True, index = False)