# COMP.CS.320 Data-Intensive Programming, Exercise 3

This exercise has three parts:

- tasks 1-3 concern data queries for static data
- tasks 4-5 are examples of using typed Dataset instead of DataFrame
- tasks 6-8 concern the same data query as in the first two tasks but handled as streaming data

The tasks can be done in either Scala or Python. This is the **Python** version, switch to the Scala version if you want to do the tasks in Scala.

Each task has its own cell for the code. Add your solutions to the cells. You are free to add more cells if you feel it is necessary. There are cells with example outputs or test code following most of the tasks.

Don't forget to submit your solutions to Moodle.


In [0]:
# some imports that might be required in the tasks

from dataclasses import dataclass
from typing import List

from pyspark.sql import DataFrame
from pyspark.sql import functions
from pyspark.sql import Row


In [0]:
#some more imports 

from pyspark.sql import SparkSession, DataFrame
from pyspark.sql import functions as Func, DataFrame


## Task 1 - Create a DataFrame for retailer data

In the [Shared container](https://portal.azure.com/#view/Microsoft_Azure_Storage/ContainerMenuBlade/~/overview/storageAccountId/%2Fsubscriptions%2Fe0c78478-e7f8-429c-a25f-015eae9f54bb%2FresourceGroups%2Ftuni-cs320-f2023-rg%2Fproviders%2FMicrosoft.Storage%2FstorageAccounts%2Ftunics320f2023gen2/path/shared/etag/%220x8DBB0695B02FFFE%22/defaultEncryptionScope/%24account-encryption-key/denyEncryptionScopeOverride~/false/defaultId//publicAccessVal/None) in the `exercises/ex3` folder is file `sales_data_sample.csv` that contains retaier sales data in CSV format.
The direct address for the data file is: `abfss://shared@tunics320f2023gen2.dfs.core.windows.net/exercises/ex3/sales_data_sample.csv`

Read the data from the CSV file into DataFrame called retailerDataFrame. Let Spark infer the schema for the data. Note, that this CSV file uses semicolons (`;`) as the column separator instead of the default commas (`,`).

Print out the schema, the resulting DataFrame should have 24 columns. The data contains information about the item price and the number of items ordered for each day.


In [0]:
spark = SparkSession.builder.appName("ReadCSV").getOrCreate()

# Defining file path 
file_path = "abfss://shared@tunics320f2023gen2.dfs.core.windows.net/exercises/ex3/sales_data_sample.csv"

# Reading the  the CSV file as described
retailerDataFrame: DataFrame = spark.read.option("delimiter", ";").csv(file_path, header=True, inferSchema=True)

# Printing the schema
retailerDataFrame.printSchema()


root
 |-- ORDERNUMBER: integer (nullable = true)
 |-- QUANTITYORDERED: integer (nullable = true)
 |-- PRICEEACH: double (nullable = true)
 |-- ORDERLINENUMBER: integer (nullable = true)
 |-- ORDERDATE: date (nullable = true)
 |-- STATUS: string (nullable = true)
 |-- QTR_ID: integer (nullable = true)
 |-- MONTH_ID: integer (nullable = true)
 |-- YEAR_ID: integer (nullable = true)
 |-- PRODUCTLINE: string (nullable = true)
 |-- MSRP: integer (nullable = true)
 |-- PRODUCTCODE: string (nullable = true)
 |-- CUSTOMERNAME: string (nullable = true)
 |-- PSmallHONE: string (nullable = true)
 |-- ADDRESSLINE1: string (nullable = true)
 |-- ADDRESSLINE2: string (nullable = true)
 |-- CITY: string (nullable = true)
 |-- STATE: string (nullable = true)
 |-- POSTALCODE: string (nullable = true)
 |-- COUNTRY: string (nullable = true)
 |-- TERRITORY: string (nullable = true)
 |-- CONTACTLASTNAME: string (nullable = true)
 |-- CONTACTFIRSTNAME: string (nullable = true)
 |-- DEALSIZE: string (nullabl

Example output for task 1 (only the first few lines):

```text
root
 |-- ORDERNUMBER: integer (nullable = true)
 |-- QUANTITYORDERED: integer (nullable = true)
 |-- PRICEEACH: double (nullable = true)
 |-- ORDERLINENUMBER: integer (nullable = true)
 |-- ORDERDATE: date (nullable = true)
 ...
 ```

## Task 2 - The best selling days

Find the best **12** selling days using the retailer data frame from task 1. That is the days for which `QUANTITYORDERED * PRICEEACH` gets the highest values.


In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Create a Spark session
spark = SparkSession.builder.appName("BestSellingDays").getOrCreate()

# Resuming from Task 1

# Calculating total sales for each day
retailerDataFrame = retailerDataFrame.withColumn("total_sales", col("QUANTITYORDERED") * col("PRICEEACH"))

# Group by ORDERDATE and sum the total sales for each day
dailySalesDF = retailerDataFrame.groupBy("ORDERDATE").agg({"total_sales": "sum"})

best12DaysDF: DataFrame = dailySalesDF.orderBy("sum(total_sales)", ascending=False).limit(12)

# result
best12DaysDF.show()


+----------+------------------+
| ORDERDATE|  sum(total_sales)|
+----------+------------------+
|2004-11-24|115033.48000000003|
|2003-11-14|109509.87999999999|
|2003-11-12|          90218.58|
|2003-12-02| 87445.18000000001|
|2004-10-16|          86320.39|
|2003-11-06|          84731.32|
|2004-11-17|          82125.62|
|2004-11-04|          80807.93|
|2004-08-20| 80247.84000000001|
|2004-11-05| 78324.73000000001|
|2003-11-20|          76973.93|
|2004-12-10|          76380.08|
+----------+------------------+



Example output for task 2:

```text
+----------+------------------+
| ORDERDATE|       total_sales|
+----------+------------------+
|2004-11-24|115033.48000000003|
|2003-11-14|109509.87999999999|
|2003-11-12|          90218.58|
|2003-12-02| 87445.18000000001|
|2004-10-16|          86320.39|
|2003-11-06|          84731.32|
|2004-11-17|          82125.62|
|2004-11-04|          80807.93|
|2004-08-20| 80247.84000000001|
|2004-11-05| 78324.73000000001|
|2003-11-20|          76973.93|
|2004-12-10|          76380.08|
+----------+------------------+
```

## Task 3 - The products with the most sale value

Find the product codes for the **8** products that have the most total sale value in year 2003.

**Note**, in this task (and only in this task) all sales done in **January** should be counted **twice**.

Hint: use the MONTH_ID and YEAR_ID columns to recognize the month and year of each sale.


In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, month, sum, when

# Create a Spark session
spark = SparkSession.builder.appName("TopProducts").getOrCreate()

# Filtering data for the year 2003
sales2003DF = retailerDataFrame.filter(col("YEAR_ID") == 2003)

# Adjust sales for January (counting twice)
sales2003DF = sales2003DF.withColumn(
    "adjusted_sales",
    when(col("MONTH_ID") == 1, col("QUANTITYORDERED") * col("PRICEEACH") * 2)
    .otherwise(col("QUANTITYORDERED") * col("PRICEEACH"))
)

# Group by PRODUCTCODE and sum the adjusted sales for each product
productDF: DataFrame = sales2003DF.groupBy("PRODUCTCODE").agg(sum("adjusted_sales").alias("total_sales"))

top8ProductsDF = productDF.orderBy("total_sales", ascending=False).limit(8)

# result
top8ProductsDF.show()


+-----------+------------------+
|PRODUCTCODE|       total_sales|
+-----------+------------------+
|   S18_3232|           69500.0|
|   S18_2319|           45600.0|
|   S18_4600|           44400.0|
|   S50_1392|39813.920000000006|
|   S18_1342|39661.149999999994|
|   S12_4473|          39084.36|
|   S24_3856|           38900.0|
|   S24_2300|           38800.0|
+-----------+------------------+



Example output for task 3:

```text
+-----------+------------------+
|PRODUCTCODE|       total_sales|
+-----------+------------------+
|   S18_3232|           69500.0|
|   S18_2319|           45600.0|
|   S18_4600|           44400.0|
|   S50_1392|39813.920000000006|
|   S18_1342|39661.149999999994|
|   S12_4473|          39084.36|
|   S24_3856|           38900.0|
|   S24_2300|           38800.0|
+-----------+------------------+
```

## Task 4 - Dataset 1

This task is originally designed for Scala language. It does not fully translate to Python because Python does not support typed Spark Datasets. Instead of Scala's case class, you can use Python's dataclass.

The classes that takes a type parameter are known to be Generic classes in Scala. Dataset is an example of a generic class. Actually, DataFrame is a type alias for Dataset[Row], where Row is the type parameter (Row being general object that can represent any row in a Spark data frame).

Declare your own case class (dataclass in Python) Sales with two members: `year` and `euros`, with both being of integer types.

Then instantiate a DataFrame of Sales with data from the variable `salesList` and query for the sales on 2017 and for the year with the highest amount of sales.


In [0]:
# Create the Sales dataclass in this cell
from pyspark.sql import SparkSession
from pyspark.sql import Row
from dataclasses import dataclass

# Create a Spark session
spark = SparkSession.builder.appName("SalesData").getOrCreate()

# Create the Sales dataclass in this cell
@dataclass
class Sales:
    year: int
    euros: int



In [0]:
salesList = [Sales(2015, 325), Sales(2016, 100), Sales(2017, 15), Sales(2018, 900),
             Sales(2019, 50), Sales(2020, 750), Sales(2021, 950), Sales(2022, 400)]

salesDS: DataFrame = spark.createDataFrame(salesList)

sales2017Row: Row = salesDS.filter(salesDS["year"] == 2017).first()
sales2017: Sales = Sales(sales2017Row["year"], sales2017Row["euros"])
print(f"Sales for 2017 is {sales2017.euros}")

maximumSalesRow: Row = salesDS.orderBy("euros", ascending=False).first()
maximumSales: Sales = Sales(maximumSalesRow["year"], maximumSalesRow["euros"])
print(f"Maximum sales: year = {maximumSales.year}, euros = {maximumSales.euros}")


Sales for 2017 is 15
Maximum sales: year = 2021, euros = 950



Example output for task 4:
```text
Sales for 2017 is 15
Maximum sales: year = 2021, euros = 950
```


## Task 5 - Dataset 2

Continuation from task 4.
The new sales list `multiSalesList` contains sales information from multiple sources and thus can contain multiple values for each year. The total sales in euros for a year is the sum of all the individual values for that year.

Query for the sales on 2016 and the year with the highest amount of sales in this case.


In [0]:
multiSalesList: List[Sales] =  [
    Sales(2015, 325), Sales(2016, 100), Sales(2017, 15), Sales(2018, 900),
    Sales(2019, 50), Sales(2020, 750), Sales(2021, 950), Sales(2022, 400),
    Sales(2016, 250), Sales(2017, 600), Sales(2019, 75), Sales(2016, 5),
    Sales(2018, 127), Sales(2019, 200), Sales(2020, 225), Sales(2016, 350)
]

multiSalesDS: DataFrame = spark.createDataFrame([Row(year=sale.year, euros=sale.euros) for sale in multiSalesList])

multiSales2016Row = multiSalesDS.filter(multiSalesDS["year"] == 2016).agg(sum("euros").alias("total_sales")).first()
multiSales2016: Sales = Sales(2016, multiSales2016Row["total_sales"])
print(f"Total sales for 2016 is {multiSales2016.euros}")

maximumMultiSalesRow = multiSalesDS.groupBy("year").agg(sum("euros").alias("total_sales")).orderBy("total_sales", ascending=False).first()
maximumMultiSales: Sales = Sales(maximumMultiSalesRow["year"], maximumMultiSalesRow["total_sales"])
print(f"Maximum total sales: year = {maximumMultiSales.year}, euros = {maximumMultiSales.euros}")



Total sales for 2016 is 705
Maximum total sales: year = 2018, euros = 1027


Example output for task 5:

```text
...
Total sales for 2016 is 705
Maximum total sales: year = 2018, euros = 1027
```


## Task 6 - Streaming data from retail data

Create a streaming data frame for similar retailer data as was used in tasks 1-3.

In this exercise, streaming data is simulated by copying CSV files in 10 second intervals from a source folder to a target folder. The script for doing the file copying is given in task 8 and should not be run before the tasks 6 and 7 have been done.

The target folder will be defined to be in the students storage as `ex3/YOUR_EX3_FOLDER` with the value for YOUR_EX3_FOLDER chosen by you.

Hint: Spark cannot infer the schema of streaming data, so you have to give it explicitly. You can assume that the streaming data will have the same format as the static data used in tasks 1-3.

Finally, note that you cannot really test this task before you have also done the tasks 7 and 8.


In [0]:
# Put your own unique folder name to the variable (use only letters, numbers, and underscores):
#my_ex3_folder: str = ???

from pyspark.sql import SparkSession

# Create a Spark session
spark = SparkSession.builder.appName("RetailerStreaming").getOrCreate()

# Replace YOUR_EX3_FOLDER with your unique folder name
my_ex3_folder: str = "ANISULMAHMUD_ex3"

# Define the target folder path
targetFiles: str = f"abfss://students@tunics320f2023gen2.dfs.core.windows.net/ex3/{my_ex3_folder}/*"

# Define the schema for the streaming data (assuming it's the same as the static data)
schema = "ORDERNUMBER INT, QUANTITYORDERED INT, PRICEEACH DOUBLE, ORDERLINENUMBER INT, " \
         "SALES DOUBLE, ORDERDATE DATE, STATUS STRING, QTR_ID INT, MONTH_ID INT, " \
         "YEAR_ID INT, PRODUCTLINE STRING, MSRP DOUBLE, PRODUCTCODE STRING, CUSTOMERNAME STRING, " \
         "PHONE STRING, ADDRESSLINE1 STRING, ADDRESSLINE2 STRING, CITY STRING, STATE STRING, " \
         "POSTALCODE STRING, COUNTRY STRING, TERRITORY STRING, CONTACTLASTNAME STRING, CONTACTFIRSTNAME STRING, " \
         "DEALSIZE STRING"

# Create the streaming DataFrame
retailerStreamingDF = (
    spark.readStream.format("csv")
    .schema(schema)
    .option("delimiter", ";")
    .option("header", "true")
    .csv(targetFiles)
)


Note that you cannot really test this task before you have also done the tasks 7 and 8, i.e. there is no checkable output from this task.


## Task 7 - The best selling days for the streaming data

Find the best selling days using the streaming data frame from task 6.

Note that in this task with the streaming data you don't need to limit the content only to the best 12 selling days like was done in task 2.


In [0]:
from pyspark.sql.functions import col

# Create a Spark session
spark = SparkSession.builder.appName("BestSellingDaysStreaming").getOrCreate()

#total sales for each day in the streaming data
bestDaysDFStreaming = (
    retailerStreamingDF.withColumn("total_sales", col("QUANTITYORDERED") * col("PRICEEACH"))
    .groupBy("ORDERDATE")
    .agg({"total_sales": "sum"})
    .select("ORDERDATE", "sum(total_sales)")
    .withColumnRenamed("sum(total_sales)", "total_sales")
)

# Print the schema of the streaming DataFrame
bestDaysDFStreaming.printSchema()

root
 |-- ORDERDATE: date (nullable = true)
 |-- total_sales: double (nullable = true)



Note that you cannot really test this task before you have also done the tasks 6 and 8, i.e. there is no checkable output from this task.


## Task 8 - Test your streaming data solution

Test your streaming data solution by following the output from the display command in the next cell while the provided test script in the following cell is running.

- The test script copies files one by one (in ten second intervals) from the [shared container](https://portal.azure.com/#view/Microsoft_Azure_Storage/ContainerMenuBlade/~/overview/storageAccountId/%2Fsubscriptions%2Fe0c78478-e7f8-429c-a25f-015eae9f54bb%2FresourceGroups%2Ftuni-cs320-f2023-rg%2Fproviders%2FMicrosoft.Storage%2FstorageAccounts%2Ftunics320f2023gen2/path/shared/etag/%220x8DBB0695B02FFFE%22/defaultEncryptionScope/%24account-encryption-key/denyEncryptionScopeOverride~/false/defaultId//publicAccessVal/None) folder `exercises/ex3` to the [student container](https://portal.azure.com/#view/Microsoft_Azure_Storage/ContainerMenuBlade/~/overview/storageAccountId/%2Fsubscriptions%2Fe0c78478-e7f8-429c-a25f-015eae9f54bb%2FresourceGroups%2Ftuni-cs320-f2023-rg%2Fproviders%2FMicrosoft.Storage%2FstorageAccounts%2Ftunics320f2023gen2/path/students/etag/%220x8DBB0695B02FFFE%22/defaultEncryptionScope/%24account-encryption-key/denyEncryptionScopeOverride~/false/defaultId//publicAccessVal/None) folder `/ex3/MY_EX3_FOLDER` where `MY_EX3_FOLDER` is the folder name you chose in task 6.
- To properly run the streaming data test, the target folder should be either empty or it should not exist at all. If there are files in the target folder, those are read immediately and the streaming data demostration will not work as intended. The script does try to remove all copied files from the target folder at the end, but that only happens if the script is not interrupted.

To gain points from this task, answer the questions in the final cell of the notebook.


In [0]:
# in Databricks the display function can be used to display also a streaming data frame
# when developing outside this kind of environment, you need to create a query that could then be used to monitor the state of the data frame
# Usually, when using streaming data the results are directed to some data storage, not just displayed like in this exercise.

# There should be no need to edit anything in this cell!
print(f"Starting stream myQuery_{my_ex3_folder}")
display(bestDaysDFStreaming.limit(6), streamName=f"myQuery_{my_ex3_folder}")


ORDERDATE,total_sales
,8290886.790000001


In [0]:
# There should be no need to edit anything in this cell, but you can try to adjust to time variables.

import glob
import pathlib
import shutil
import time

initial_wait_time: float = 20.0
time_interval: float = 10.0
post_loop_wait_time: float = 20.0

time.sleep(initial_wait_time)
input_file_list: list = dbutils.fs.ls("abfss://shared@tunics320f2023gen2.dfs.core.windows.net/exercises/ex3/streamingData")
for index, csv_file in enumerate(input_file_list, start=1):
    input_file_path = csv_file.path
    input_file = pathlib.Path(input_file_path).name
    output_file_path = f"abfss://students@tunics320f2023gen2.dfs.core.windows.net/ex3/{my_ex3_folder}/{input_file}"
    dbutils.fs.cp(input_file_path, output_file_path)
    print(f"Copied file {input_file} ({index}/{len(input_file_list)}) to {output_file_path}")
    time.sleep(time_interval)
time.sleep(post_loop_wait_time)

# stop updating the display for the streaming data frame
for stream in spark.streams.active:
    if stream.name == f"myQuery_{my_ex3_folder}":
        print(f"Stopping stream {stream.name}")
        spark.streams.get(stream.id).stop()

# remove the copied files from the output folder
for copy_file in dbutils.fs.ls(f"abfss://students@tunics320f2023gen2.dfs.core.windows.net/ex3/{my_ex3_folder}"):
    dbutils.fs.rm(copy_file.path)
print(f"Removed all copied files from abfss://students@tunics320f2023gen2.dfs.core.windows.net/ex3/{my_ex3_folder}")


Copied file xaa.csv (1/10) to abfss://students@tunics320f2023gen2.dfs.core.windows.net/ex3/ANISULMAHMUD_ex3/xaa.csv
Copied file xab.csv (2/10) to abfss://students@tunics320f2023gen2.dfs.core.windows.net/ex3/ANISULMAHMUD_ex3/xab.csv
Copied file xac.csv (3/10) to abfss://students@tunics320f2023gen2.dfs.core.windows.net/ex3/ANISULMAHMUD_ex3/xac.csv
Copied file xad.csv (4/10) to abfss://students@tunics320f2023gen2.dfs.core.windows.net/ex3/ANISULMAHMUD_ex3/xad.csv
Copied file xae.csv (5/10) to abfss://students@tunics320f2023gen2.dfs.core.windows.net/ex3/ANISULMAHMUD_ex3/xae.csv
Copied file xaf.csv (6/10) to abfss://students@tunics320f2023gen2.dfs.core.windows.net/ex3/ANISULMAHMUD_ex3/xaf.csv
Copied file xag.csv (7/10) to abfss://students@tunics320f2023gen2.dfs.core.windows.net/ex3/ANISULMAHMUD_ex3/xag.csv
Copied file xah.csv (8/10) to abfss://students@tunics320f2023gen2.dfs.core.windows.net/ex3/ANISULMAHMUD_ex3/xah.csv
Copied file xai.csv (9/10) to abfss://students@tunics320f2023gen2.dfs.co

Example output from the test script in task 8:

```text
Copied file xaa.csv (1/10) to abfss://students@tunics320f2023gen2.dfs.core.windows.net/ex3/some_folder_here/xaa.csv
Copied file xab.csv (2/10) to abfss://students@tunics320f2023gen2.dfs.core.windows.net/ex3/some_folder_here/xab.csv
Copied file xac.csv (3/10) to abfss://students@tunics320f2023gen2.dfs.core.windows.net/ex3/some_folder_here/xac.csv
Copied file xad.csv (4/10) to abfss://students@tunics320f2023gen2.dfs.core.windows.net/ex3/some_folder_here/xad.csv
Copied file xae.csv (5/10) to abfss://students@tunics320f2023gen2.dfs.core.windows.net/ex3/some_folder_here/xae.csv
Copied file xaf.csv (6/10) to abfss://students@tunics320f2023gen2.dfs.core.windows.net/ex3/some_folder_here/xaf.csv
Copied file xag.csv (7/10) to abfss://students@tunics320f2023gen2.dfs.core.windows.net/ex3/some_folder_here/xag.csv
Copied file xah.csv (8/10) to abfss://students@tunics320f2023gen2.dfs.core.windows.net/ex3/some_folder_here/xah.csv
Copied file xai.csv (9/10) to abfss://students@tunics320f2023gen2.dfs.core.windows.net/ex3/some_folder_here/xai.csv
Copied file xaj.csv (10/10) to abfss://students@tunics320f2023gen2.dfs.core.windows.net/ex3/some_folder_here/xaj.csv
Stopping stream myQuery_some_folder_here
Removed all copied files from abfss://students@tunics320f2023gen2.dfs.core.windows.net/ex3/some_folder_here
```


**Answer the questions to get the points from task 8.**

###How well did the streaming data example work for you?
--  To give this answer, I can say that till now sometime the example is working for me and sometime I just face some problem. But in the end I came up with solution to my problem. But the example is working I can say that. 

###What was the final output for the streaming data for you?
-- The output is  |-- ORDERDATE: date (nullable = true)
 |-- total_sales: double (nullable = true)

###The data in the streaming tasks is the same as the earlier static data (just divided into multiple files).
Did the final output match the first six rows of the task 2 output? 
-- No, the final output does not match the first six rows of the task 2 output.

###If it did not, what do you think could be the reason?
-- My thinking regarding this matter is that, I used static data for Task 2, where all of the data was available at once. I worked with streaming data in Task 7, and it's possible that time windows or micro-batches are being used for the computation. Based on the data available at the beginning of each window, the results would be established. So this can be the reason for the output mismatch. 

###If you had problems, what were they?
###And what do you think were the causes for those problems?
-- The problems are not that kinds of major but I must say. I have to follow the course slides, lectures to complete these task. Maybe if I not follow these regularly than maybe I can face problems. 
