# Writing Windows Function

You need to write Spark code to perform the steps mentioned below. You
should write it to optimize for execution speed.

#### 1. Filter the complete dataset for DISH = (Biryani or Pizza or Dosa) from X, where X is the complete dataset
#### 2. Group by ORDERID, STOREID, PRICE
#### 3. If the number of records in the group are > 1
##### 3.1 If the group just has records with DISH = Biryani
######       Just keep one record with the following updated:
######       TOTALAMOUNT = SUM(TOTALAMOUNT)
######       QUANTITY = SUM(QUANTITY)
######       JUSTANOTHERFEATURE = SUM(JUSTANOTHERFEATURE)
##### 3.2 If the group consists of records with DISH = Biryani and DISH = Pizza or Dosa
######      Compute SUM(TOTALAMOUNT)
######          If SUM(TOTALAMOUNT) ≤ 0, drop all these records
######          If SUM(TOTALAMOUNT) > 0,
######          Find and keep one record with a positive TOTALAMOUNT and DISH = Pizza or Dosa and update the following:
######            TOTALAMOUNT = SUM(TOTALAMOUNT)
######            QUANTITY = SUM(QUANTITY)
######            JUSTANOTHERFEATURE = SUM(JUSTANOTHERFEATURE)
######            If SUM(QUANTITY)=0:
######            If TOTALAMOUNT > 0:
######            QUANTITY = 10
######            JUSTANOTHERFEATURE = 20
######            If TOTALAMOUNT < 0:
######            QUANTITY = -20
######            JUSTANOTHERFEATURE = -10

######           In case you can't find such a record, drop all these records

##### 3.3 Update the column JUSTANOTHERFEATURE with a list of distinct values which are greater than 1 in that column for the group.
#### 4. Else do nothing.
#### 5. Merge it back with X and write to the output to a S3 path partitioned by the DISH name.

In case you assume anything about the problem statement while writing the code for it, please do mention it.


### We can get Dataset from the given link
https://drive.google.com/file/d/1JbO0JS9UKZdqhwLrTRVYfivTLycfOrpC/

In [145]:
#!/usr/bin/env bash
!pwd

/home/rita/Documents/Spark/Assignments


In [146]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, split, window,substring, col,col, udf, collect_list, collect_set, count, when,sum
from pyspark.sql.functions import *
from datetime import datetime
import time
from pyspark.sql import Window
from pyspark.sql.types import  *

In [147]:
spark = SparkSession.builder.appName("Window Function Assignment").master("local[*]").getOrCreate()

In [159]:
data = spark.read.format("parquet")\
.options(header='true', inferschema='true', delimiter=',')\
.load("/home/rita/Documents/Spark/Assignments/spark_assignment_data.snappy.parquet")

data.toPandas().to_csv('data.csv')

                                                                                

In [149]:
print((data.count(), len(data.columns)))

(1129292, 7)


In [165]:
!pip install pyarrow

Collecting pyarrow
  Downloading pyarrow-6.0.1-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (25.6 MB)
[K     |████████████████████████████████| 25.6 MB 934 kB/s eta 0:00:01
Installing collected packages: pyarrow
Successfully installed pyarrow-6.0.1


In [168]:
import pyarrow.parquet as pq
import pyarrow as pa
import pandas as pd

headers=['DISH','ORDERID','STOREID','PRICE','TOTALAMOUNT','QUANTITY','JUSTANOTHERFEATURE']

row1 = ['Biryani','ijhibhid','acbgfa',-73.0,-29.6,-1.0,1.0]
row2 = ['Biryani','cjacbhid','cibdabc',-114.79,-44.326,-1.0,1.0]

df1 = pd.DataFrame([row1], columns=headers)
df2 = pd.DataFrame([row2], columns=headers)

df3 = df1.append(df2, ignore_index=True)


table = pa.Table.from_pandas(df3)

pq.write_table(table, 'example.parquet', flavor='spark')
pq.write_to_dataset(table, root_path="test_part_file", partition_cols=['B', 'C'], flavor='spark')

# Adding a new partition (B=b2/C=c3


row3 = ['Biryani','cjacbhid','cibdabc',-114.79,-44.326,-1.0,2.0]
df4 = pd.DataFrame([row3], columns=headers)

table2 = pa.Table.from_pandas(df4)
pq.write_to_dataset(table2, root_path="test_part_file", partition_cols=['B', 'C'], flavor='spark')

# Add another parquet file to the B=b2/C=c2 partition
# Note this does not overwrite existing partitions, it just appends a new .parquet file.
# If files already exist, then you will get a union result of the two (or multiple) files when you read the partition
row5 = ['Biryani','dbjjbhid','jcjgcj',-43.39,-19.166,-1.0,1.0]
df5 = pd.DataFrame([row5], columns=headers)
table3 = pa.Table.from_pandas(df5)
pq.write_to_dataset(table3, root_path="test_part_file", partition_cols=['B', 'C'], flavor='spark')

KeyError: 'B'

### Filter the complete dataset for DISH = (Biryani or Pizza or Dosa) from X, where X is the complete dataset

In [160]:
dishes = ['Biryani','Pizza','Dosa']
filtered_df = df.filter(df.DISH.isin(dishes))
filtered_df.toPandas().to_csv('filtered.csv')

                                                                                

In [151]:
print((filtered_df.count(), len(filtered_df.columns)))

(805280, 7)


In [152]:
dish_part = Window.partitionBy("DISH")

count_dishes = filtered_df.select('DISH').groupby(col('DISH')).agg(count(col('DISH')))
count_dishes.show()

+-------+-----------+
|   DISH|count(DISH)|
+-------+-----------+
|   Dosa|       4938|
|Biryani|        585|
|  Pizza|     799757|
+-------+-----------+



In [161]:
partition_ = Window.partitionBy("ORDERID","STOREID","PRICE").orderBy("DISH")

grouped_df = filtered_df.withColumn("records", count("*").over(partition_))
grouped_df.toPandas().to_csv('grouped.csv')

                                                                                

In [12]:
count_dishes = grouped_df.select('DISH').groupby(col('DISH')).agg(count(col('DISH')))
count_dishes.show()

+-------+-----------+
|   DISH|count(DISH)|
+-------+-----------+
|   Dosa|       4938|
|Biryani|        585|
|  Pizza|     799757|
+-------+-----------+



In [162]:
records_greater_than_1 = grouped_df.filter(grouped_df.records > 1)
records_greater_than_1.toPandas().to_csv('records_greater_than_1.csv')

                                                                                

In [61]:
biryani = records_greater_than_1.filter(grouped_df.DISH=="Biryani")
biryani.show()

                                                                                

+----+-------+-------+-----+-----------+--------+------------------+-------+
|DISH|ORDERID|STOREID|PRICE|TOTALAMOUNT|QUANTITY|JUSTANOTHERFEATURE|records|
+----+-------+-------+-----+-----------+--------+------------------+-------+
+----+-------+-------+-----+-----------+--------+------------------+-------+



In [15]:
count_dishes = records_greater_than_1.select('DISH').groupby(col('DISH')).agg(count(col('DISH')))
count_dishes.show()



+-----+-----------+
| DISH|count(DISH)|
+-----+-----------+
| Dosa|        404|
|Pizza|      13229|
+-----+-----------+



                                                                                

In [16]:
no_of_records = records_greater_than_1.agg(count("*"))
no_of_records.show()

                                                                                

+--------+
|count(1)|
+--------+
|   13633|
+--------+



In [19]:
biryani_records = records_greater_than_1.filter(records_greater_than_1.DISH == "Biryani")
biryani_records.show()

                                                                                

+----+-------+-------+-----+-----------+--------+------------------+-------+
|DISH|ORDERID|STOREID|PRICE|TOTALAMOUNT|QUANTITY|JUSTANOTHERFEATURE|records|
+----+-------+-------+-----+-----------+--------+------------------+-------+
+----+-------+-------+-----+-----------+--------+------------------+-------+



In [29]:
try:
    biryani_records = records_greater_than_1.filter(records_greater_than_1.DISH == "Biryani")
    calculated_biryani_records = biryani_records.withColumn("TOTALAMOUNT", sum("TOTALAMOUNT").over(dish_part)).withColumn("QUANTITY", sum("QUANTITY").over(dish_part)).withColumn("JUSTANOTHERFEATURE",sum("JUSTANOTHERFEATURE").over(dish_part))
    calculated_biryani_records.show()
except Exception as a:
    print(a)



+----+-------+-------+-----+-----------+--------+------------------+-------+
|DISH|ORDERID|STOREID|PRICE|TOTALAMOUNT|QUANTITY|JUSTANOTHERFEATURE|records|
+----+-------+-------+-----+-----------+--------+------------------+-------+
+----+-------+-------+-----+-----------+--------+------------------+-------+





In [71]:
biryani_or_pizza_or_dosa_records = records_greater_than_1.filter((records_greater_than_1.DISH == "Biryani")|(records_greater_than_1.DISH == "Pizza")|(records_greater_than_1.DISH == "Dosa"))
biryani_or_pizza_or_dosa_records.show()



+-----+--------+-------+------+------------------+--------+------------------+-------+
| DISH| ORDERID|STOREID| PRICE|       TOTALAMOUNT|QUANTITY|JUSTANOTHERFEATURE|records|
+-----+--------+-------+------+------------------+--------+------------------+-------+
|Pizza|baacbhid|iiijabi|189.29|61.568000000000005|     1.0|               1.0|      2|
|Pizza|baacbhid|iiijabi|189.29|            33.004|     1.0|               1.0|      2|
|Pizza|bacbbhid| jgedfj| 94.79|29.526000000000003|     1.0|               1.0|      2|
|Pizza|bacbbhid| jgedfj| 94.79|29.526000000000003|     1.0|               1.0|      2|
|Pizza|baccbhid| ehedfe| 23.39| 7.918000000000001|    1.82|               1.0|      2|
|Pizza|baccbhid| ehedfe| 23.39|4.8100000000000005|    1.11|               1.0|      2|
|Pizza|baccbhid| heaefh| 31.79|7.3260000000000005|     1.0|               1.0|      2|
|Pizza|baccbhid| heaefh| 31.79|            21.978|     3.0|               3.0|      2|
|Pizza|bacdbhid| fcfdff|115.79|            

                                                                                

In [72]:
count_dishes = biryani_or_pizza_or_dosa_records.select('DISH').groupby(col('DISH')).agg(count(col('DISH')))
count_dishes.show()



+-----+-----------+
| DISH|count(DISH)|
+-----+-----------+
| Dosa|        404|
|Pizza|      13229|
+-----+-----------+



                                                                                

In [86]:
sum_total = biryani_or_pizza_or_dosa_records.agg(sum("TOTALAMOUNT").alias("sum_total"))
sum_total.show()

+-----------------+
|        sum_total|
+-----------------+
|465602.0059999995|
+-----------------+



In [105]:
records = biryani_or_pizza_or_dosa_records.filter((col("TOTALAMOUNT")>0.0) & ((col("DISH")=="Dosa") | (col("DISH")=="Pizza")))
records.show()



+-----+--------+-------+------+------------------+--------+------------------+-------+
| DISH| ORDERID|STOREID| PRICE|       TOTALAMOUNT|QUANTITY|JUSTANOTHERFEATURE|records|
+-----+--------+-------+------+------------------+--------+------------------+-------+
|Pizza|baacbhid|iiijabi|189.29|61.568000000000005|     1.0|               1.0|      2|
|Pizza|baacbhid|iiijabi|189.29|            33.004|     1.0|               1.0|      2|
|Pizza|bacbbhid| jgedfj| 94.79|29.526000000000003|     1.0|               1.0|      2|
|Pizza|bacbbhid| jgedfj| 94.79|29.526000000000003|     1.0|               1.0|      2|
|Pizza|baccbhid| ehedfe| 23.39| 7.918000000000001|    1.82|               1.0|      2|
|Pizza|baccbhid| ehedfe| 23.39|4.8100000000000005|    1.11|               1.0|      2|
|Pizza|baccbhid| heaefh| 31.79|7.3260000000000005|     1.0|               1.0|      2|
|Pizza|baccbhid| heaefh| 31.79|            21.978|     3.0|               3.0|      2|
|Pizza|bacdbhid| fcfdff|115.79|            

                                                                                

In [108]:
rec = records.select("*") \
.groupby(col("DISH"),col("ORDERID"),col("STOREID"),col("PRICE")) \
.agg(sum(col("TOTALAMOUNT")).alias("TOTALAMOUNT"),sum(col("QUANTITY")).alias("QUANTITY"), sum(col("JUSTANOTHERFEATURE")).alias("JUSTANOTHERFEATURE"))
rec.show()                                                                                                                        

+-----+--------+---------+------------------+------------------+------------------+------------------+
| DISH| ORDERID|  STOREID|             PRICE|       TOTALAMOUNT|          QUANTITY|JUSTANOTHERFEATURE|
+-----+--------+---------+------------------+------------------+------------------+------------------+
|Pizza|dbhjbhid|   cecefc|63.290000000000006|36.852000000000004|               2.0|               2.0|
|Pizza|difebhid|   idicfi| 67.49000000000001|            32.634|              1.64|               2.0|
|Pizza|eafibhid|   ehedfe|             23.39|            13.616|              3.11|               2.0|
|Pizza|hheibhid|   ehedfe|             23.39|            23.754|              5.43|               2.0|
|Pizza| baibhid|   gibefg|             31.79|            16.058|2.1900000000000004|               2.0|
|Pizza|figcbhid|   hggcfh|             38.09|19.092000000000002|               2.0|               2.0|
|Pizza|jaefbhid|   ehedfe|             23.39|            15.392|3.5300000

In [163]:
r = rec.filter(col("QUANTITY")==0).count()
r

                                                                                

0