In [1]:
from pyspark.sql import SparkSession
import getpass
username = getpass.getuser()
spark = SparkSession. \
    builder. \
    config('spark.ui.port', '0'). \
    config("spark.sql.warehouse.dir", f"/user/itv009538/warehouse"). \
    enableHiveSupport(). \
    master('yarn'). \
    getOrCreate()

# Higher Level API's - DataFrames and Spark SQL
## 1. Creating managed and external spark tables using CSV and JSON files
## 2. Spark programs/queries to analyze the data in the HDFS using DataFrames and Spark SQL

In [2]:
spark

## 1.1 Create a managed spark table and load data to it from the CSV file

In [5]:
spark.sql("use itv009538_week5_assignment")

In [6]:
spark.sql("create table itv009538_week5_assignment.groceries(order_id string, location string, item string, order_date string, quantity integer)")

In [8]:
spark.sql("show tables").show()

+--------------------+---------+-----------+
|            database|tableName|isTemporary|
+--------------------+---------+-----------+
|itv009538_week5_a...|groceries|      false|
|itv009538_week5_a...|    test1|      false|
+--------------------+---------+-----------+



In [9]:
groceries_df = spark.read.csv("/public/trendytech/groceries.csv", header="true", inferSchema = "true")

In [10]:
groceries_df.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- location: string (nullable = true)
 |-- item: string (nullable = true)
 |-- order_date: string (nullable = true)
 |-- quantity: integer (nullable = true)



In [11]:
groceries_df.show(5)

+--------+--------+--------+----------+--------+
|order_id|location|    item|order_date|quantity|
+--------+--------+--------+----------+--------+
|      o1| Seattle| Bananas|01/01/2017|       7|
|      o2|    Kent|  Apples|02/01/2017|      20|
|      o3|Bellevue| Flowers|02/01/2017|      10|
|      o4| Redmond|    Meat|03/01/2017|      40|
|      o5| Seattle|Potatoes|04/01/2017|       9|
+--------+--------+--------+----------+--------+
only showing top 5 rows



In [12]:
groceries_df.createOrReplaceTempView("groceries_temp")

In [13]:
spark.sql("select * from groceries_temp").show()

+--------+---------+--------+----------+--------+
|order_id| location|    item|order_date|quantity|
+--------+---------+--------+----------+--------+
|      o1|  Seattle| Bananas|01/01/2017|       7|
|      o2|     Kent|  Apples|02/01/2017|      20|
|      o3| Bellevue| Flowers|02/01/2017|      10|
|      o4|  Redmond|    Meat|03/01/2017|      40|
|      o5|  Seattle|Potatoes|04/01/2017|       9|
|      o6| Bellevue|   Bread|04/01/2017|       5|
|      o7|  Redmond|   Bread|05/01/2017|       5|
|      o8| Issaquah|   Onion|05/01/2017|       4|
|      o9|  Redmond|  Cheese|05/01/2017|      15|
|     o10| Issaquah|   Onion|06/01/2017|       4|
|     o11|   Renton|   Bread|05/01/2017|       5|
|     o12| Issaquah|   Onion|07/01/2017|       4|
|     o13|Sammamish|   Bread|07/01/2017|       5|
|     o14| Issaquah|  Tomato|07/01/2017|       6|
|     o15| Issaquah|    Meat|08/01/2017|       3|
|     o16| Issaquah|    Meat|09/01/2017|       5|
|     o17| Issaquah|    Meat|10/01/2017|       6|


In [15]:
spark.sql("insert into itv009538_week5_assignment.groceries select * from groceries_temp")

In [17]:
spark.sql("select * from itv009538_week5_assignment.groceries limit 5").show()

+--------+--------+--------+----------+--------+
|order_id|location|    item|order_date|quantity|
+--------+--------+--------+----------+--------+
|      o1| Seattle| Bananas|01/01/2017|       7|
|      o2|    Kent|  Apples|02/01/2017|      20|
|      o3|Bellevue| Flowers|02/01/2017|      10|
|      o4| Redmond|    Meat|03/01/2017|      40|
|      o5| Seattle|Potatoes|04/01/2017|       9|
+--------+--------+--------+----------+--------+



In [19]:
spark.sql("show tables").show()

+--------------------+--------------+-----------+
|            database|     tableName|isTemporary|
+--------------------+--------------+-----------+
|itv009538_week5_a...|     groceries|      false|
|itv009538_week5_a...|         test1|      false|
|                    |groceries_temp|       true|
+--------------------+--------------+-----------+



In [20]:
spark.sql("describe extended itv009538_week5_assignment.groceries").show(truncate = False)

+----------------------------+----------------------------------------------------------------------------------------------+-------+
|col_name                    |data_type                                                                                     |comment|
+----------------------------+----------------------------------------------------------------------------------------------+-------+
|order_id                    |string                                                                                        |null   |
|location                    |string                                                                                        |null   |
|item                        |string                                                                                        |null   |
|order_date                  |string                                                                                        |null   |
|quantity                    |int                             

## 1.2 Create an external spark table with the same data

In [7]:
spark.sql("create table itv009538_week5_assignment.groceries_external_table (order_id string, location string, order_date string, quantity integer) using csv options (header='true') location '/public/trendytech/groceries.csv'")

In [8]:
spark.sql("select * from itv009538_week5_assignment.groceries_external_table limit 5")

order_id,location,order_date,quantity
o1,Seattle,Bananas,
o2,Kent,Apples,
o3,Bellevue,Flowers,
o4,Redmond,Meat,
o5,Seattle,Potatoes,


In [9]:
spark.sql("describe extended itv009538_week5_assignment.groceries_external_table").show(truncate=False)

+----------------------------+-------------------------------------------------------------+-------+
|col_name                    |data_type                                                    |comment|
+----------------------------+-------------------------------------------------------------+-------+
|order_id                    |string                                                       |null   |
|location                    |string                                                       |null   |
|order_date                  |string                                                       |null   |
|quantity                    |int                                                          |null   |
|                            |                                                             |       |
|# Detailed Table Information|                                                             |       |
|Database                    |itv009538_week5_assignment                                   

## 1.3 Dropping external table - only metadata gets deleted

In [19]:
spark.sql("show tables").show(truncate = False)

+--------------------------+------------------------+-----------+
|database                  |tableName               |isTemporary|
+--------------------------+------------------------+-----------+
|itv009538_week5_assignment|gorceries_external_table|false      |
|itv009538_week5_assignment|groceries               |false      |
|itv009538_week5_assignment|test1                   |false      |
+--------------------------+------------------------+-----------+



In [20]:
spark.sql("drop table gorceries_external_table")

In [21]:
spark.sql("show tables").show(truncate = False)

+--------------------------+---------+-----------+
|database                  |tableName|isTemporary|
+--------------------------+---------+-----------+
|itv009538_week5_assignment|groceries|false      |
|itv009538_week5_assignment|test1    |false      |
+--------------------------+---------+-----------+



## 1.4 Delete managed table - both the data and metadata will get deleted

In [22]:
spark.sql("drop table groceries")

In [23]:
spark.sql("show tables").show(truncate = False)

+--------------------------+---------+-----------+
|database                  |tableName|isTemporary|
+--------------------------+---------+-----------+
|itv009538_week5_assignment|test1    |false      |
+--------------------------+---------+-----------+



## 1.5 Create a managed spark table and load data to it from the JSON file

In [24]:
spark.sql("create table itv009538_week5_assignment.orders(order_id integer,order_date string,customer_id integer,order_status string)")

In [25]:
orders_df=spark.read.json("/public/trendytech/datasets/orders.json")

In [26]:
orders_df_new=orders_df.select("order_id","order_date","customer_id","order_status")

In [27]:
orders_df_new.show(5)

+--------+--------------------+-----------+---------------+
|order_id|          order_date|customer_id|   order_status|
+--------+--------------------+-----------+---------------+
|       1|2013-07-25 00:00:...|      11599|         CLOSED|
|       2|2013-07-25 00:00:...|        256|PENDING_PAYMENT|
|       3|2013-07-25 00:00:...|      12111|       COMPLETE|
|       4|2013-07-25 00:00:...|       8827|         CLOSED|
|       5|2013-07-25 00:00:...|      11318|       COMPLETE|
+--------+--------------------+-----------+---------------+
only showing top 5 rows



In [28]:
orders_df_new.createOrReplaceTempView("orders_temp")

In [29]:
spark.sql("select * from orders_temp").show(5)

+--------+--------------------+-----------+---------------+
|order_id|          order_date|customer_id|   order_status|
+--------+--------------------+-----------+---------------+
|       1|2013-07-25 00:00:...|      11599|         CLOSED|
|       2|2013-07-25 00:00:...|        256|PENDING_PAYMENT|
|       3|2013-07-25 00:00:...|      12111|       COMPLETE|
|       4|2013-07-25 00:00:...|       8827|         CLOSED|
|       5|2013-07-25 00:00:...|      11318|       COMPLETE|
+--------+--------------------+-----------+---------------+
only showing top 5 rows



In [30]:
spark.sql("insert into itv009538_week5_assignment.orders select * from orders_temp")

In [31]:
spark.sql("select * from itv009538_week5_assignment.orders limit 5").show()

+--------+--------------------+-----------+---------------+
|order_id|          order_date|customer_id|   order_status|
+--------+--------------------+-----------+---------------+
|   34565|2014-02-23 00:00:...|       8702|       COMPLETE|
|   34566|2014-02-23 00:00:...|       3066|PENDING_PAYMENT|
|   34567|2014-02-23 00:00:...|       7314|SUSPECTED_FRAUD|
|   34568|2014-02-23 00:00:...|       1271|       COMPLETE|
|   34569|2014-02-23 00:00:...|      11083|       COMPLETE|
+--------+--------------------+-----------+---------------+



In [33]:
spark.sql("show tables").show()

+--------------------+-----------+-----------+
|            database|  tableName|isTemporary|
+--------------------+-----------+-----------+
|itv009538_week5_a...|     orders|      false|
|itv009538_week5_a...|      test1|      false|
|                    |orders_temp|       true|
+--------------------+-----------+-----------+



In [34]:
spark.sql("describe extended itv009538_week5_assignment.orders").show(truncate=False)

+----------------------------+-------------------------------------------------------------------------------------------+-------+
|col_name                    |data_type                                                                                  |comment|
+----------------------------+-------------------------------------------------------------------------------------------+-------+
|order_id                    |int                                                                                        |null   |
|order_date                  |string                                                                                     |null   |
|customer_id                 |int                                                                                        |null   |
|order_status                |string                                                                                     |null   |
|                            |                                                     

## 1.6 Create a external spark table and load data to it from the JSON file

In [35]:
spark.sql("create table itv009538_week5_assignment.orders_external(order_id integer,order_date string,customer_id integer,order_status string) using json location '/public/trendytech/datasets/orders.json'")

In [36]:
spark.sql("select * from itv009538_week5_assignment.orders_external limit 5").show()

+--------+--------------------+-----------+---------------+
|order_id|          order_date|customer_id|   order_status|
+--------+--------------------+-----------+---------------+
|       1|2013-07-25 00:00:...|      11599|         CLOSED|
|       2|2013-07-25 00:00:...|        256|PENDING_PAYMENT|
|       3|2013-07-25 00:00:...|      12111|       COMPLETE|
|       4|2013-07-25 00:00:...|       8827|         CLOSED|
|       5|2013-07-25 00:00:...|      11318|       COMPLETE|
+--------+--------------------+-----------+---------------+



## 2. Query the data in HDFS using both DataFrames and spark SQL

In [45]:
spark

In [3]:
products_df=spark.read \
    .format("csv") \
    .option("header","true") \
    .option("inferSchema","true") \
    .load("/user/itv009538/products_folder/products_wh_data.csv")
products_df.show(5)

+----------+--------+--------------------+-----------+------+--------------------+
|product_id|category|        product_name|description| price|           image_url|
+----------+--------+--------------------+-----------+------+--------------------+
|         1|       2|Quest Q64 10 FT. ...|       null| 59.98|http://images.acm...|
|         2|       2|Under Armour Men'...|       null|129.99|http://images.acm...|
|         3|       2|Under Armour Men'...|       null| 89.99|http://images.acm...|
|         4|       2|Under Armour Men'...|       null| 89.99|http://images.acm...|
|         5|       2|Riddell Youth Rev...|       null|199.99|http://images.acm...|
+----------+--------+--------------------+-----------+------+--------------------+
only showing top 5 rows



In [14]:
products_df.createOrReplaceTempView("products")

### Find the total number of products in the given dataset.

In [4]:
total_products=products_df.count()
print(total_products)

1345


In [15]:
total_products = spark.sql("select count(*) from products")
total_products.show()

+--------+
|count(1)|
+--------+
|    1345|
+--------+



### Find the number of unique categories of products in the given dataset.

In [7]:
unique_categories = products_df.select("category").distinct().count()
print(unique_categories)

55


In [16]:
unique_categories = spark.sql("Select count(distinct category) as unique_categories from products")
unique_categories.show()

+-----------------+
|unique_categories|
+-----------------+
|               55|
+-----------------+



### Find the top 5most expensive products based on their price, along with their product name, category, and image URL.

In [11]:
top5_expensive_products = products_df.select("product_name","category","image_url") \
    .orderBy("price", ascending = False) \
    .limit(5)
top5_expensive_products.show(truncate = False)

+------------------------------------------------+--------+-----------------------------------------------------------------------------------+
|product_name                                    |category|image_url                                                                          |
+------------------------------------------------+--------+-----------------------------------------------------------------------------------+
|SOLE E35 Elliptical                             |10      |http://images.acmesports.sports/SOLE+E35+Elliptical                                |
|SOLE F85 Treadmill                              |4       |http://images.acmesports.sports/SOLE+F85+Treadmill                                 |
|SOLE F85 Treadmill                              |10      |http://images.acmesports.sports/SOLE+F85+Treadmill                                 |
|SOLE F85 Treadmill                              |22      |http://images.acmesports.sports/SOLE+F85+Treadmill                           

In [17]:
top5_expensive_products = spark.sql("select product_name, category, image_url from products order by price desc limit 5")
top5_expensive_products.show(truncate = False)

+------------------------------------------------+--------+-----------------------------------------------------------------------------------+
|product_name                                    |category|image_url                                                                          |
+------------------------------------------------+--------+-----------------------------------------------------------------------------------+
|SOLE E35 Elliptical                             |10      |http://images.acmesports.sports/SOLE+E35+Elliptical                                |
|SOLE F85 Treadmill                              |4       |http://images.acmesports.sports/SOLE+F85+Treadmill                                 |
|SOLE F85 Treadmill                              |10      |http://images.acmesports.sports/SOLE+F85+Treadmill                                 |
|SOLE F85 Treadmill                              |22      |http://images.acmesports.sports/SOLE+F85+Treadmill                           

### Find the number of products in each category that have a price greater than $100. Display the results in a tabular format that shows the category name and the number of products that satisfy the condition.

In [12]:
products_above_100 = products_df.filter("price > 100") \
    .groupBy("Category") \
    .count() \
    .withColumnRenamed("count", "Number_of_products")
products_above_100.show(truncate = False)

+--------+------------------+
|Category|Number_of_products|
+--------+------------------+
|31      |17                |
|53      |16                |
|34      |15                |
|44      |9                 |
|12      |3                 |
|22      |4                 |
|47      |10                |
|52      |5                 |
|13      |1                 |
|6       |5                 |
|16      |11                |
|3       |5                 |
|20      |7                 |
|57      |6                 |
|54      |6                 |
|48      |17                |
|5       |11                |
|19      |13                |
|41      |11                |
|43      |23                |
+--------+------------------+
only showing top 20 rows



In [19]:
products_above_100 = spark.sql("select Category, count(*) as Number_of_products from products where price > 100 group by Category")
products_above_100.show(truncate = False)

+--------+------------------+
|Category|Number_of_products|
+--------+------------------+
|31      |17                |
|53      |16                |
|34      |15                |
|44      |9                 |
|12      |3                 |
|22      |4                 |
|47      |10                |
|52      |5                 |
|13      |1                 |
|6       |5                 |
|16      |11                |
|3       |5                 |
|20      |7                 |
|57      |6                 |
|54      |6                 |
|48      |17                |
|5       |11                |
|19      |13                |
|41      |11                |
|43      |23                |
+--------+------------------+
only showing top 20 rows



### What are the product names and prices of products that have a price greater than $200 and belong to category 5?

In [13]:
expensive_categroy_products = products_df.select("product_name", "price").filter("price > 200 and category = 5")
expensive_categroy_products.show(truncate = False)

+------------------------------------------------+------+
|product_name                                    |price |
+------------------------------------------------+------+
|"Goaliath 54"" In-Ground Basketball Hoop with P"|499.99|
|Fitness Gear 300 lb Olympic Weight Set          |209.99|
|Teeter Hang Ups NXT-S Inversion Table           |299.99|
+------------------------------------------------+------+



In [20]:
expensive_categroy_products = spark.sql("select product_name, price from products where price > 200 and category = 5")
expensive_categroy_products.show(truncate = False)

+------------------------------------------------+------+
|product_name                                    |price |
+------------------------------------------------+------+
|"Goaliath 54"" In-Ground Basketball Hoop with P"|499.99|
|Fitness Gear 300 lb Olympic Weight Set          |209.99|
|Teeter Hang Ups NXT-S Inversion Table           |299.99|
+------------------------------------------------+------+

