In [1]:
import pyspark

In [2]:
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder.appName('Pyspark').getOrCreate()

24/03/28 14:21:21 WARN Utils: Your hostname, Vemulapallis-Mac-mini.local resolves to a loopback address: 127.0.0.1; using 192.168.0.107 instead (on interface en1)
24/03/28 14:21:21 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/03/28 14:21:21 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [5]:
ProductCategoryDF = spark.read.csv(
        f'AdventureWorks/SalesLT.ProductCategory.csv',
        header=True,
        inferSchema=True
    )

In [7]:
ProductCategoryDF.columns

['ProductCategoryID',
 'ParentProductCategoryID',
 'Name',
 'rowguid',
 'ModifiedDate']

In [9]:
ProductCategoryDF.dtypes

[('ProductCategoryID', 'int'),
 ('ParentProductCategoryID', 'string'),
 ('Name', 'string'),
 ('rowguid', 'string'),
 ('ModifiedDate', 'string')]

In [11]:
from pyspark.sql.functions import col, isnan, when, count

In [13]:
ProductCategoryDF.select(count('ProductCategoryID')).show()

+------------------------+
|count(ProductCategoryID)|
+------------------------+
|                      41|
+------------------------+



In [14]:
ProductCategoryDF.select([count(when (col(c) == 'NULL',c)).alias(c) for c in ProductCategoryDF.columns]).show()

+-----------------+-----------------------+----+-------+------------+
|ProductCategoryID|ParentProductCategoryID|Name|rowguid|ModifiedDate|
+-----------------+-----------------------+----+-------+------------+
|                0|                      4|   0|      0|           0|
+-----------------+-----------------------+----+-------+------------+



In [15]:
ProductCategoryDF.select([count(when (col(c) == '00:00.0',c)).alias(c) for c in ProductCategoryDF.columns]).show()

+-----------------+-----------------------+----+-------+------------+
|ProductCategoryID|ParentProductCategoryID|Name|rowguid|ModifiedDate|
+-----------------+-----------------------+----+-------+------------+
|                0|                      0|   0|      0|          41|
+-----------------+-----------------------+----+-------+------------+



In [18]:
#Modified date column has all the values with invalid data 00:00.0 and can be dropped. We do not need rowguid and this can also be dropped.
ProductCategoryDF = ProductCategoryDF.drop('ModifiedDate', 'rowguid')

In [19]:
ProductCategoryDF.show()

+-----------------+-----------------------+---------------+
|ProductCategoryID|ParentProductCategoryID|           Name|
+-----------------+-----------------------+---------------+
|                1|                   NULL|          Bikes|
|                2|                   NULL|     Components|
|                3|                   NULL|       Clothing|
|                4|                   NULL|    Accessories|
|                5|                      1| Mountain Bikes|
|                6|                      1|     Road Bikes|
|                7|                      1|  Touring Bikes|
|                8|                      2|     Handlebars|
|                9|                      2|Bottom Brackets|
|               10|                      2|         Brakes|
|               11|                      2|         Chains|
|               12|                      2|      Cranksets|
|               13|                      2|    Derailleurs|
|               14|                     

In [23]:
# analyzing the categories of products based on ParentProductCategoryID
ProductCategoryDF.groupBy('ParentProductCategoryID').count().where(col('ParentProductCategoryID') != 'NULL').show()

+-----------------------+-----+
|ParentProductCategoryID|count|
+-----------------------+-----+
|                      3|    8|
|                      1|    3|
|                      4|   12|
|                      2|   14|
+-----------------------+-----+



In [58]:
#List of product categories which doesn't have a ParentCategory. These are the products to which other products are related to.
ParentProductDF = ProductCategoryDF.select(col('ProductCategoryID').alias('ParentCategoryID'), col('Name').alias("ParentName")).where(col('ParentProductCategoryID') == 'NULL')
ParentProductDF.show()


+----------------+-----------+
|ParentCategoryID| ParentName|
+----------------+-----------+
|               1|      Bikes|
|               2| Components|
|               3|   Clothing|
|               4|Accessories|
+----------------+-----------+



In [67]:
# We can join the ProducutCategoryDF with the ParentProductDF to add the name of the parent Product to the product ID
ProductCategorywithParentDF_tmp = ProductCategoryDF.join(ParentProductDF, ProductCategoryDF.ParentProductCategoryID==ParentProductDF.ParentCategoryID, 'left')
ProductCategorywithParentDF_tmp.show()

+-----------------+-----------------------+---------------+----------------+----------+
|ProductCategoryID|ParentProductCategoryID|           Name|ParentCategoryID|ParentName|
+-----------------+-----------------------+---------------+----------------+----------+
|                1|                   NULL|          Bikes|            null|      null|
|                2|                   NULL|     Components|            null|      null|
|                3|                   NULL|       Clothing|            null|      null|
|                4|                   NULL|    Accessories|            null|      null|
|                5|                      1| Mountain Bikes|               1|     Bikes|
|                6|                      1|     Road Bikes|               1|     Bikes|
|                7|                      1|  Touring Bikes|               1|     Bikes|
|                8|                      2|     Handlebars|               2|Components|
|                9|             

In [78]:
ProductCategoryWithParentDF = ProductCategorywithParentDF_tmp.select(col('ProductCategoryID').alias('ProductCategoryID_par'), col('ParentCategoryID'), col('Name').alias('CategoryName'),col('ParentName'))
ProductCategoryWithParentDF.show()

+---------------------+----------------+---------------+----------+
|ProductCategoryID_par|ParentCategoryID|   CategoryName|ParentName|
+---------------------+----------------+---------------+----------+
|                    1|            null|          Bikes|      null|
|                    2|            null|     Components|      null|
|                    3|            null|       Clothing|      null|
|                    4|            null|    Accessories|      null|
|                    5|               1| Mountain Bikes|     Bikes|
|                    6|               1|     Road Bikes|     Bikes|
|                    7|               1|  Touring Bikes|     Bikes|
|                    8|               2|     Handlebars|Components|
|                    9|               2|Bottom Brackets|Components|
|                   10|               2|         Brakes|Components|
|                   11|               2|         Chains|Components|
|                   12|               2|      Cr

In [None]:
#From the analysis above, we can see that all the products with ParentCategoryID = 2 are the spare parts or components of Bikes.
#To get the components related to a parent Product we can join the products table with the Parent Product category to add the
#ParentProductID and parentProductName. 

In [46]:
#Convert Product table into Spark dataframe for analysis 


ProductsDF = spark.read.csv(f'AdventureWorks/SalesLT.Product.csv', header=True, inferSchema=True)

In [52]:
ProductsDF.show()

+---------+--------------------+-------------+-----+------------+---------+----+-------+-----------------+--------------+-------------+-----------+----------------+--------------------+----------------------+--------------------+------------+
|ProductID|                Name|ProductNumber|Color|StandardCost|ListPrice|Size| Weight|ProductCategoryID|ProductModelID|SellStartDate|SellEndDate|DiscontinuedDate|      ThumbNailPhoto|ThumbnailPhotoFileName|             rowguid|ModifiedDate|
+---------+--------------------+-------------+-----+------------+---------+----+-------+-----------------+--------------+-------------+-----------+----------------+--------------------+----------------------+--------------------+------------+
|      680|HL Road Frame - B...|   FR-R92B-58|Black|     1059.31|   1431.5|  58|1016.04|               18|             6|      00:00.0|       NULL|            NULL|0x474946383961500...|  no_image_availabl...|43DD68D6-14A4-461...|     01:36.8|
|      706|HL Road Frame - R

In [81]:

ProductswithParentDF = ProductsDF.join(ProductCategoryWithParentDF, ProductsDF.ProductCategoryID == ProductCategoryWithParentDF.ProductCategoryID_par, "left") \
.select(ProductsDF.ProductID, ProductsDF.Name, ProductsDF.ProductCategoryID, ProductCategoryWithParentDF.CategoryName,ProductCategoryWithParentDF.ParentName)
ProductswithParentDF.show()

+---------+--------------------+-----------------+------------+-----------+
|ProductID|                Name|ProductCategoryID|CategoryName| ParentName|
+---------+--------------------+-----------------+------------+-----------+
|      680|HL Road Frame - B...|               18| Road Frames| Components|
|      706|HL Road Frame - R...|               18| Road Frames| Components|
|      707|Sport-100 Helmet,...|               35|     Helmets|Accessories|
|      708|Sport-100 Helmet,...|               35|     Helmets|Accessories|
|      709|Mountain Bike Soc...|               27|       Socks|   Clothing|
|      710|Mountain Bike Soc...|               27|       Socks|   Clothing|
|      711|Sport-100 Helmet,...|               35|     Helmets|Accessories|
|      712|        AWC Logo Cap|               23|        Caps|   Clothing|
|      713|Long-Sleeve Logo ...|               25|     Jerseys|   Clothing|
|      714|Long-Sleeve Logo ...|               25|     Jerseys|   Clothing|
|      715|L

In [87]:
#To get all the components select the products with ParentName a Components
ProductswithParentDF.filter(ProductswithParentDF.ParentName == 'Components').show()

+---------+--------------------+-----------------+---------------+----------+
|ProductID|                Name|ProductCategoryID|   CategoryName|ParentName|
+---------+--------------------+-----------------+---------------+----------+
|      947|HL Touring Handle...|                8|     Handlebars|Components|
|      946|LL Touring Handle...|                8|     Handlebars|Components|
|      813|  HL Road Handlebars|                8|     Handlebars|Components|
|      812|  ML Road Handlebars|                8|     Handlebars|Components|
|      811|  LL Road Handlebars|                8|     Handlebars|Components|
|      810|HL Mountain Handl...|                8|     Handlebars|Components|
|      809|ML Mountain Handl...|                8|     Handlebars|Components|
|      808|LL Mountain Handl...|                8|     Handlebars|Components|
|      996|   HL Bottom Bracket|                9|Bottom Brackets|Components|
|      995|   ML Bottom Bracket|                9|Bottom Bracket