In [148]:
from pyspark.sql import SparkSession
import getpass

username = getpass.getuser()
spark = SparkSession.builder \
    .config('spark.shuffle.useOldFetchProtocol','true') \
    .config("spark.ui.port", "0") \
    .config('spark.sql.warehouse.dir', f'/user/{username}/warehouse') \
    .enableHiveSupport() \
.master('yarn') \
.getOrCreate()

In [149]:
from pyspark.sql.functions import *

In [147]:
spark

In [23]:
# providing schema 
df_schema = "user_id long, item_id string, rating double, category_id long" 

In [24]:
df_food = spark.read \
.format('csv') \
.option('header',False) \
.schema(df_schema) \
.load('grocery/Grocery_and_Gourmet_Food.csv')

In [25]:
df_food.show() # show some reocrds of the data 

+----------+--------------+------+-----------+
|   user_id|       item_id|rating|category_id|
+----------+--------------+------+-----------+
|1888861614| ALP49FBWT4I7V|   5.0| 1370304000|
|1888861614|A1KPIZOCLB9FZ8|   4.0| 1400803200|
|1888861614|A2W0FA06IYAYQE|   4.0| 1399593600|
|1888861614|A2PTZTCH2QUYBC|   5.0| 1397952000|
|1888861614|A2VNHGJ59N4Z90|   4.0| 1397606400|
|1888861614| ATQL0XOLZNHZ4|   1.0| 1392940800|
|1888861614| A94ZG5CWN70E7|   5.0| 1385856000|
|1888861614|A3QBT8YC3CZ7C9|   3.0| 1383696000|
|1888861614| AGKW3A1RB1YGO|   5.0| 1380672000|
|1888861614|A1B65IWLUVOUQB|   5.0| 1378425600|
|1888861614|A34SURE2O1LJ4C|   5.0| 1377129600|
|4639725183|A1QVBUH9E1V6I8|   5.0| 1416355200|
|4639725183|A3F886P3E8L99T|   5.0| 1415577600|
|4639725183|A2CHH5U12THP2D|   5.0| 1414972800|
|4639725183|A29A6N9S9GDG6L|   5.0| 1409961600|
|4639725183|A378HXZATS9HKM|   5.0| 1400630400|
|4639725183|A2ZSQ8Y53ZNQK1|   5.0| 1395532800|
|4639725183|A21ZU2TINEQE0H|   4.0| 1391817600|
|4639725183| 

In [26]:
df_food.count() # data contains arond 5 millions records

5074160

## Items having the least rating

In [40]:
# step one to find the average rating item wise
# 2 step include sorting the data based on average rating
items_with_least_rating = df_food.groupBy('item_id') \
.agg(round(mean('rating'),2).alias('average_rating')) \
.sort('average_rating',ascending=True)

In [41]:
items_with_least_rating.show()

+--------------+--------------+
|       item_id|average_rating|
+--------------+--------------+
|A2LIHS415LATML|           1.0|
|A17ST3YM5BIBER|           1.0|
|A12E6LVLKQ3IOO|           1.0|
|A3CEVCM0VXXCAU|           1.0|
|A32O03IFN4D7LA|           1.0|
|A2RPQKXC8BBNW9|           1.0|
| AYJI1BHUGFMMA|           1.0|
|A26XHAPDNQDGX4|           1.0|
|A3VUKIUS48VVBE|           1.0|
|A1UIQ8VAHWFKP9|           1.0|
|A3130MWD7ATB3C|           1.0|
|A2XXHSIYFF0RGJ|           1.0|
| A6RV2Q3UARDEN|           1.0|
|A3E0UM11TPJCUE|           1.0|
|A3H0YLAEYYRGTA|           1.0|
|A2LJQAAN4L814I|           1.0|
|A2L43I5B9PPMHO|           1.0|
|A1T007YW8HS2SP|           1.0|
|A2I0V1PMQTSC4V|           1.0|
|A3VL76XQGJEDTW|           1.0|
+--------------+--------------+
only showing top 20 rows



### Items having the most rating.

In [42]:
items_with_most_rating = df_food.groupBy('item_id') \
.agg(round(mean('rating'),2).alias('average_rating')) \
.sort('average_rating',ascending=False)

In [43]:
items_with_most_rating .show()

+--------------+--------------+
|       item_id|average_rating|
+--------------+--------------+
|A1ZWK7IW6H2FAP|           5.0|
| ALDN272WIKK4O|           5.0|
|A1F1L9NPKTS8VG|           5.0|
|A220OR0RVHLYRW|           5.0|
|A2ABAWEL5D5VKY|           5.0|
|A3323BXQL4AX9N|           5.0|
|A3FHEA1DI07RY9|           5.0|
|A2ZZY9GHGWNA61|           5.0|
|A1216PLO14YY8A|           5.0|
|A2KVXV01UXW0QF|           5.0|
|A3K4FHBFZDLRS8|           5.0|
|A2YP6HFA6HZAKE|           5.0|
| ACCY5W0Z2CNZB|           5.0|
| AWSRBXQ639RA5|           5.0|
|A12EIR6ZXRGSBL|           5.0|
|A3OTF2L5DAQG3S|           5.0|
| AB4BA1YLME0GE|           5.0|
|A3PMPK39R9M0RP|           5.0|
|A1PZ9BIG3R4O12|           5.0|
|A199Z1JUVCGX64|           5.0|
+--------------+--------------+
only showing top 20 rows



## Items with longest review

In [67]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DataType

In [72]:

# creating a data frame as the text review is not therer in the data set
# below data contains three coluns, item id, date, and review
data = [(100, "2001-12-01", 'awesome'),
        (101, "2003-10-13", 'Amazing product, I am loving it'),
          (101, "2003-11-19", 'Not happy at all!'),
          (101, "2003-11-13", "The product is fantastic! It's user-friendly, durable, and it has made my life so much easier."),
          (102, "2003-11-23", "Honestly, I expected more from this product. It didn't live up to the hype"),
          (103, "2007-11-13", 'Economical'),
          (101, "2003-11-13", 'worst'),
          (104, "2003-01-10", "I am absolutely delighted with this product! The quality is outstanding, and it exceeded my expectations"),
          (101, "2003-11-13", 'I will but it again'),
          (105, "2003-11-13", 'Do not buy it'),
          (103, "2004-11-01", "I found it to be quite frustrating to use. Disappointed"),
          (106, "2003-11-13", 'It is exceptiona'),
          (100, "2003-11-07", 'Lovely!'),
          (102, "2003-09-13", "I regret buying it and would advise others to steer clear. Save your money for something better."),
          (105, "2003-12-13", 'Pocket Friendly'),
       ] # date is in year mm dd format

# Define the schema
custom_schema = StructType([
    StructField("item_id", StringType(), True),
    StructField("date", StringType(), True),
    StructField("review", StringType(), True)
])

# Create a DataFrame
df = spark.createDataFrame(data, schema=custom_schema)

# Convert the "date" column to DateType using to_date function
df = df.withColumn("date", to_date(df["date"], "yyyy-MM-dd"))

# Show the DataFrame
df.show()

+-------+----------+--------------------+
|item_id|      date|              review|
+-------+----------+--------------------+
|    100|2001-12-01|             awesome|
|    101|2003-10-13|Amazing product, ...|
|    101|2003-11-19|   Not happy at all!|
|    101|2003-11-13|The product is fa...|
|    102|2003-11-23|Honestly, I expec...|
|    103|2007-11-13|          Economical|
|    101|2003-11-13|               worst|
|    104|2003-01-10|I am absolutely d...|
|    101|2003-11-13| I will but it again|
|    105|2003-11-13|       Do not buy it|
|    103|2004-11-01|I found it to be ...|
|    106|2003-11-13|    It is exceptiona|
|    100|2003-11-07|             Lovely!|
|    102|2003-09-13|I regret buying i...|
|    105|2003-12-13|     Pocket Friendly|
+-------+----------+--------------------+



In [116]:
@udf(IntegerType()) # function to find the lenght of reviews
def review_lenght(string):
    string = string.strip() # removing trailing spaces
    return len(string)


df =df.withColumn('review_length' , review_lenght('review'))



filter_max_length = df.selectExpr('max(review_length) as review_length')
filter_max_length = filter_max_length.first()[0]

df_longest_review_item =df.filter(df['review_length']==filter_max_length).select('item_id')
df_longest_review_item.show()

+-------+
|item_id|
+-------+
|    104|
+-------+



### Transform: change the date MM-DD-YYYY format.

In [119]:
df =df.withColumn('date' ,date_format('date','MM-dd-yyyy'))
df.show()  # transformed date

+-------+----------+--------------------+-------------+
|item_id|      date|              review|review_length|
+-------+----------+--------------------+-------------+
|    100|12-01-2001|             awesome|            7|
|    101|10-13-2003|Amazing product, ...|           31|
|    101|11-19-2003|   Not happy at all!|           17|
|    101|11-13-2003|The product is fa...|           94|
|    102|11-23-2003|Honestly, I expec...|           74|
|    103|11-13-2007|          Economical|           10|
|    101|11-13-2003|               worst|            5|
|    104|01-10-2003|I am absolutely d...|          104|
|    101|11-13-2003| I will but it again|           19|
|    105|11-13-2003|       Do not buy it|           13|
|    103|11-01-2004|I found it to be ...|           55|
|    106|11-13-2003|    It is exceptiona|           16|
|    100|11-07-2003|             Lovely!|            7|
|    102|09-13-2003|I regret buying i...|           96|
|    105|12-13-2003|     Pocket Friendly|       

###    Show a desired data frame operation which you learnt recently.

I learned recently joins , cache and broad cast join

### joins are mentioned below

In [120]:
spark

# creating orders and customers table

In [150]:
# Create a Spark session



orders_data = [
    (1, '2022-01-01', 1001, 'Pending'),
    (2, '2022-01-02', 1002, 'Shipped'),
    (3, '2022-01-03', 1003, 'Delivered'),
    (4, '2022-01-04', 1001, 'Delivered'),
    (5, '2022-01-05', 1004, 'Pending'),
    (6, '2022-01-06', 1002, 'Shipped'),
    (7, '2022-01-07', 1005, 'Pending'),
    (8, '2022-01-08', 1001, 'Shipped'),
    (9, '2022-01-09', 1003, 'Delivered'),
    (10, '2022-01-10', 1004, 'Delivered'),
]

# Define schema for Orders table
orders_schema = StructType([
    StructField("OrderID", IntegerType(), True),
    StructField("OrderDate", StringType(), True),
    StructField("CustomerID", IntegerType(), True),
    StructField("Status", StringType(), True)
])

# Create Orders DataFrame
orders_df = spark.createDataFrame(orders_data, schema=orders_schema)


customers_data = [
    (1001, 'John Doe', 'john@example.com'),
    (1002, 'Alice Smith', 'alice@example.com'),
    (1003, 'Bob Johnson', 'bob@example.com'),
    (1004, 'Eva Williams', 'eva@example.com'),
    (1005, 'Charlie Brown', 'charlie@example.com'),
]

# Define schema for Customers table
customers_schema = StructType([
    StructField("CustomerID", IntegerType(), True),
    StructField("CustomerName", StringType(), True),
    StructField("Email", StringType(), True)
])

# Create Customers DataFrame
customers_df = spark.createDataFrame(customers_data, schema=customers_schema)

# Show the created DataFrames
print("Orders Table:")
orders_df.show()

print("\nCustomers Table:")
customers_df.show()

Orders Table:
+-------+----------+----------+---------+
|OrderID| OrderDate|CustomerID|   Status|
+-------+----------+----------+---------+
|      1|2022-01-01|      1001|  Pending|
|      2|2022-01-02|      1002|  Shipped|
|      3|2022-01-03|      1003|Delivered|
|      4|2022-01-04|      1001|Delivered|
|      5|2022-01-05|      1004|  Pending|
|      6|2022-01-06|      1002|  Shipped|
|      7|2022-01-07|      1005|  Pending|
|      8|2022-01-08|      1001|  Shipped|
|      9|2022-01-09|      1003|Delivered|
|     10|2022-01-10|      1004|Delivered|
+-------+----------+----------+---------+


Customers Table:
+----------+-------------+-------------------+
|CustomerID| CustomerName|              Email|
+----------+-------------+-------------------+
|      1001|     John Doe|   john@example.com|
|      1002|  Alice Smith|  alice@example.com|
|      1003|  Bob Johnson|    bob@example.com|
|      1004| Eva Williams|    eva@example.com|
|      1005|Charlie Brown|charlie@example.com|
+--

In [151]:
# inner join to show customers and numbers of orders that they ordered
df_joined =customers_df.select('CustomerID','customerName') \
.join(orders_df.selectExpr('OrderID','CustomerID'), on= 'CustomerID',how ='inner')

In [152]:
df_joined

CustomerID,customerName,OrderID
1005,Charlie Brown,7
1002,Alice Smith,2
1002,Alice Smith,6
1001,John Doe,1
1001,John Doe,4
1001,John Doe,8
1003,Bob Johnson,3
1003,Bob Johnson,9
1004,Eva Williams,5
1004,Eva Williams,10


In [153]:
df_joined.groupBy('CustomerID','customerName').agg(count('OrderId').alias('total_order'))

CustomerID,customerName,total_order
1005,Charlie Brown,1
1002,Alice Smith,2
1001,John Doe,3
1003,Bob Johnson,2
1004,Eva Williams,2


### cache 

In [154]:
df_joined.cache() # we have chached the data frame to the ram , now it can be read faster whenever it is required
# while using cache we need to make sure it size should not be very high otherwise it would occupy the whole ram and increase execution time for
# other task

CustomerID,customerName,OrderID
1005,Charlie Brown,7
1002,Alice Smith,6
1002,Alice Smith,2
1001,John Doe,8
1001,John Doe,1
1001,John Doe,4
1003,Bob Johnson,3
1003,Bob Johnson,9
1004,Eva Williams,5
1004,Eva Williams,10


In [155]:
df_joined.unpersist() # this will remove the data frame from cache

CustomerID,customerName,OrderID
1005,Charlie Brown,7
1002,Alice Smith,6
1002,Alice Smith,2
1001,John Doe,8
1001,John Doe,1
1001,John Doe,4
1003,Bob Johnson,3
1003,Bob Johnson,9
1004,Eva Williams,5
1004,Eva Williams,10


# broad cast join

In [156]:
# this join optimize the query when one table is large and another is very small
# when this is apply, smaller table is copied to all the executor and join happend at local aggregation level, it avoids shuffling and save time
# suppose customer table is smale, we can implemet broadcast join like

# this join is applied automatically using Adaptive query optimization in latest version of spark that is beyond 3.2
df_joined_broadcast =customers_df.select('CustomerID','customerName') \
.join(broadcast(orders_df.select('OrderID','CustomerID')), on= 'CustomerID',how ='inner')

In [157]:
df_joined_broadcast.show()

+----------+-------------+-------+
|CustomerID| customerName|OrderID|
+----------+-------------+-------+
|      1001|     John Doe|      8|
|      1001|     John Doe|      4|
|      1001|     John Doe|      1|
|      1002|  Alice Smith|      6|
|      1002|  Alice Smith|      2|
|      1003|  Bob Johnson|      9|
|      1003|  Bob Johnson|      3|
|      1004| Eva Williams|     10|
|      1004| Eva Williams|      5|
|      1005|Charlie Brown|      7|
+----------+-------------+-------+



# Saving file in parquet format

In [164]:
df_joined_broadcast.write \
.format("parquet") \
.mode('overwrite') \
.save("practiceOutput")