# Workshop

The following presents a few simple tasks designed for initial orientation and to introduce some basic PySpark functions.

In [1]:
# Setting up a Spark Session, please execute this cell always first!
from pyspark.sql import SparkSession
import pyspark
import pyspark.sql.functions as F
from pyspark.sql.types import NumericType

spark = SparkSession.builder \
    .appName("Pyspark Intro Taks")\
    .getOrCreate()

**1. Read the dataset _GroceryDataset.csv_ from the data directory and check the schema. How many different datatypes appear in the dataset?**

In [98]:
#Task 1 - Code here

<details>
<summary>Solution - 1</summary>
<code>
df = spark.read.csv("../data/GroceryDataset.csv", header=True, inferSchema=True)<br>
# Get datatypes - Option 1:
df.printSchema()
<br>
# Get datatypes - Option 2:
distinct_dataTypes = df.dtypes
</code>
</details>

**2. Display the first 5 and the last 5 rows of the dataset. What might be the reason that a _PySparkValueError_ occurs when you try to show the last 5 rows?**

In [3]:
#Task 2 - Code here

<details>
<summary>Solution - 2</summary>
<code>
#Display the first 5 rows:
spark.createDataFrame(df.head(5)).show()
<br>
#Display the last 5 rows:
spark.createDataFrame(df.tail(5)).show()
# This throws an PySparkValueError because the last 17 rows of column "Rating" have the value 'NULL'. Therefore spark cannot determine the Value type of this column without further information.
# If you want to display the last rows of the table you have to set n=18 instead of n=5.
</code>
</details>

**3. Count the total (distinct) number of records. Then, drop all rows containing _NULL_ values. Now, count the total (distinct) number of records again. <br> What is the problem if you simply drop all rows containing _NULL_ values?**

In [44]:
#Task 3 - Code here

<details>
<summary>Solution - 3</summary>
<code>
total_number = df.distinct().count()
print(total_number)
total_number_noNULL = df.dropna().distinct().count()
print(total_number_noNULL)
<br>#Problem: The dataset is significantly smaller after dropping all the rows containing 'NULL' values. This can directly impact all further data analyzation and machine learning training as important rows might be dropped during the process.
</code>
</details>

**4. Clean Dataset/Data Preparation - Part 1: Address missing values (_NULL_-Values)**
* In the column _Currency_ replace all _NULL_ with the given Currency symbol.
* In the column _Discount_ replace all _NULL_ with _No Discount_ instead.
* In the column _Title_ replace all _NULL_ with _No Title_.
* In the column _Feature_ replace all _NULL_ with _No Feature_.
* In the column _Product Description_ replace all _NULL_ with _No Description_.

In [97]:
#Task 4 - Code here

<details>
<summary>Solution - 4</summary>
<code>
df = df.withColumn('Currency', F.when(F.col('Currency').isNull(),'$').otherwise(F.col('Currency'))) \
.withColumn('Discount', F.when(F.col('Discount').isNull(),'No Discount').otherwise(F.col('Discount'))) \
.withColumn('Title',F.when(F.col('Title').isNull(),'No Title').otherwise(F.col('Title'))) \
.withColumn('Feature',F.when(F.col('Feature').isNull(),'No Feature').otherwise(F.col('Feature'))) \
.withColumn('Product Description',F.when(F.col('Product Description').isNull(),'No Product Description').otherwise(F.col('Product Description'))) 
</code>
</details>

**5. Clean Dataset/Data Preparation - Part 2:<br> In the column _Rating_ extract the rating that is written after "Rated". Use the regular expression "\d+\.\d+" to extract only numbers like "4.3". Additionally make sure only 0 appears otherwise.**

In [96]:
#Task 5 - Code here

<details>
<summary>Solution - 5</summary>
<code>
df = df.withColumn('Rating', F.regexp_extract(F.col('Rating'), "\d+(\.\d+)?", 0)) \
.withColumn('Rating', 
            F.when(F.col('Rating').isNull(), 0)
            .when(F.col('Rating')=="",0)
            .otherwise(F.col('Rating')))
</code>
</details>

**6. Clean Dataset/Data Preparation - Part 3:<br>Remove all string values in the column _Pricev. Additionally remove the $ -symbol and cast the price to numerical type. Then calculate the mean of the column _Pricev, round the mean value to two decimals and replace all the _NULL_ in the column with the calculated mean.**

In [95]:
# Task 6 - Code here

<details>
<summary>Solution - 6</summary>
<code>
df =df.withColumn('Price',F.when(F.col('Price').startswith("$"), F.substring_index(F.col('Price'), "$", -1)).otherwise(0)) \
.withColumn('Price', F.col('Price').cast('float'))
<br>
#Calculate the mean and replace all 0 values with the mean
price_mean_value = round(df.agg(F.mean(F.col('Price'))).collect()[0][0],2)
df = df.withColumn('Price', F.when(F.col("Price") == 0.0,price_mean_value).otherwise(F.col('Price')))\
.withColumn('Price', F.round(F.col('Price')))
</code>
</details>

**7. Filter the dataset to include only entries where the _Rating_ is greater than 0 and less than or equal to 5, and then arrange them in ascending order. How many rows do have a rating?**

In [94]:
#Task 7 - Code here

<details>
<summary>Solution - 7</summary>
<code>
df_ratings = df.filter((F.col('Rating') > 0) & (F.col('Rating') <= 5))
ordered_df_ratings=df_ratings.orderBy(F.col('Rating'),asc=True)
total_ratings = ordered_df_ratings.distinct().count()
</code>
</details>

**8. Group the dataset by the categorical column _Sub Category_ and calculate the average of ratings and prices and take the first occuring product id. <br>Rename the aggregated columns to meaningful names. Which Discount results in the best rating? Which Discount results in the worst rating?**

In [93]:
#Task 8 - Code here

<details>
<summary>Solution - 8</summary>
<code>
# Aggregate dataframe - Option 1:<br>
df_grouped = df_ratings.groupBy('Sub Category').agg(F.avg('Rating'), F.avg('Price'), F.first('Product_ID'))
<br>
# Aggregate dataframe - Option 2:<br>
df_grouped = df_ratings.groupBy('Sub Category').agg({"Rating": "avg","Price": "avg","Product_ID": "first"})
<br>
#Rename the Columns:
df_grouped = df_grouped.withColumnRenamed('first(Product_ID)','First_Product_ID')\
.withColumnRenamed('avg(Rating)','Average_Rating')\
.withColumnRenamed('avg(Price)','Average_Price')
<br>
# Order DataFrame by column 'Discount' - Option 1:
df_grouped = df_grouped.orderBy('Average_Rating', asc=False)
<br>
# Order DataFrame by column 'Discount' - Option 2:
df_grouped.sort(F.desc('Average_Rating'))

Best Rating: Breakfast
Worst Rating: Non-GMO
</code>
</details>

**9. Left Join the grouped dataset of task 8 with the cleaned dataset of task 6 based on the ID and only select the ID, the categeory, the average rating, the normal rating, the average price and the normal price. Order the joined DataFrame by the ascending ID.**

In [92]:
#Task 9 - Code here

<details>
<summary>Solution - 9</summary>
<code>
df_joined = df_grouped.join(df, on=df_grouped.First_Product_ID==df.Product_ID, how='left').select('First_Product_ID','Product_ID','Average_Rating','Rating','Average_Price', 'Price')
df_joined = df_joined.sort(F.asc('Product_ID'))
</code>
</details>

**10. Write and save the processed dataset to an output file "GroceryDataset_solution" as a .parquet-file.**

In [39]:
# Task 10 - Code here

<details>
<summary>Solution - 10</summary>
<code>
df_joined.write.mode("overwrite").parquet("../data/GroceryDataset_solution.parquet")
</code>
</details>

Please note that this setup is solely for training purposes. In reality, the dataset is unevenly weighted and not suitable for tasks like Machine Learning. This training is intended to provide an initial understanding of how to use PySpark and perform basic data cleaning steps.