# Writing Windows Function

You need to write Spark code to perform the steps mentioned below. You
should write it to optimize for execution speed.

#### 1. Filter the complete dataset for DISH = (Biryani or Pizza or Dosa) from X, where X is the complete dataset
#### 2. Group by ORDERID, STOREID, PRICE
#### 3. If the number of records in the group are > 1
#####         3.1 Calculate no. of records per dish
#####         3.2 Calculate total no.of records
######       In case you can't find such a record, drop all these records
##### 3.3 Update the column JUSTANOTHERFEATURE with a list of distinct values which are greater than 1 in that column for the group.
#### 4. Else do nothing.
#### 5. Merge it back with X and write to the output to a S3 path partitioned by the DISH name.

In case you assume anything about the problem statement while writing the code for it, please do mention it.


### We can get Dataset from the given link
https://drive.google.com/file/d/1JbO0JS9UKZdqhwLrTRVYfivTLycfOrpC/

In [1]:
#!/usr/bin/env bash
!pwd

/home/rita/Documents/Spark/Assignments


## Important points:

#### 1. Python : 3.8.10
#### 2. Spark : 3.2.0
#### 3. Hadoop : 3.3.1

### I have added additional Libraries to  - SPARK_HOME/jars 

#### 1. aws-java-sdk-bundle-1.12.151.jar
#### 2. hadoop-aws-3.3.1.jar 
#### 3. jets3t-0.9.4.jar

###### sudo wget https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.3.1/hadoop-aws-3.3.1.jar 
###### sudo wget https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.12.151/aws-java-sdk-bundle-1.12.151.jar 
###### sudo wget https://repo1.maven.org/maven2/net/java/dev/jets3t/jets3t/0.9.4/jets3t-0.9.4.jar

In [18]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, split, window,substring, col,col, udf, collect_list, collect_set, count, when,sum
from pyspark.sql.functions import *
from datetime import datetime
import time
from pyspark.sql import Window
from pyspark.sql.types import  *

In [30]:
import xml.etree.ElementTree as ET
import os

tree = ET.parse(r'/home/rita/Desktop/aws_keys/aws_keys.xml'.format(os.path))

def getData(Data):
    root = tree.getroot()
    for ele in root.findall('add'):
        key = ele.get('key')
        value = ele.get('value')
        if(Data == key):
            result = value
    return result

In [31]:
access_key = getData("access_key")
secret_key = getData("secret_key")

In [3]:
spark = SparkSession.builder \
.appName("Window Function Assignment") \
.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
.config("spark.hadoop.fs.s3a.fast.upload","true") \
.config("spark.hadoop.fs.s3a.multiobjectdelete.enable", "false") \
.config("spark.hadoop.fs.s3a.aws.credentials.provider", "com.amazonaws.auth.WebIdentityTokenCredentialsProvider") \
.master("local[*]").getOrCreate()
data = spark.read.format("parquet")\
.options(header='true', inferschema='true', delimiter=',')\
.load("/home/rita/Documents/Spark/Assignments/spark_assignment_data.snappy.parquet")

22/02/04 20:01:30 WARN Utils: Your hostname, EMPID21092 resolves to a loopback address: 127.0.1.1; using 192.168.1.6 instead (on interface wlp3s0)
22/02/04 20:01:30 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/02/04 20:01:31 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/02/04 20:01:32 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
22/02/04 20:01:32 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
                                                                                

In [4]:
#data.toPandas().to_csv('data.csv')

In [5]:
print((data.count(), len(data.columns)))

(1129292, 7)


### Filter the complete dataset for DISH = (Biryani or Pizza or Dosa) from X, where X is the complete dataset

In [6]:
dishes = ['Biryani','Pizza','Dosa']
filtered_df = data.filter(data.DISH.isin(dishes))
#filtered_df.toPandas().to_csv('filtered.csv')

In [7]:
print((filtered_df.count(), len(filtered_df.columns)))

(805280, 7)


In [8]:
dish_part = Window.partitionBy("DISH")

count_dishes = filtered_df.select('DISH').groupby(col('DISH')).agg(count(col('DISH')))
count_dishes.show()

+-------+-----------+
|   DISH|count(DISH)|
+-------+-----------+
|   Dosa|       4938|
|Biryani|        585|
|  Pizza|     799757|
+-------+-----------+



In [9]:
partition_ = Window.partitionBy("ORDERID","STOREID","PRICE").orderBy("DISH")

grouped_df = filtered_df.withColumn("records", count("*").over(partition_))
#grouped_df.toPandas().to_csv('grouped.csv')

In [10]:
count_dishes = grouped_df.select('DISH').groupby(col('DISH')).agg(count(col('DISH')))
count_dishes.show()

+-------+-----------+
|   DISH|count(DISH)|
+-------+-----------+
|   Dosa|       4938|
|Biryani|        585|
|  Pizza|     799757|
+-------+-----------+



In [11]:
records_greater_than_1 = grouped_df.filter(grouped_df.records > 1)
#records_greater_than_1.toPandas().to_csv('records_greater_than_1.csv')

In [12]:
count_dishes = records_greater_than_1.select('DISH').groupby(col('DISH')).agg(count(col('DISH')))
count_dishes.show()

[Stage 15:>                                                       (0 + 12) / 12]

+-----+-----------+
| DISH|count(DISH)|
+-----+-----------+
| Dosa|        403|
|Pizza|      13229|
+-----+-----------+



                                                                                

In [13]:
no_of_records = records_greater_than_1.agg(count("*"))
no_of_records.show()



+--------+
|count(1)|
+--------+
|   13632|
+--------+



                                                                                

In [32]:
spark._jsc.hadoopConfiguration().set("fs.s3a.access.key", access_key)
spark._jsc.hadoopConfiguration().set("fs.s3a.secret.key", secret_key)
#spark._jsc.hadoopConfiguration().set("fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem")
#spark._jsc.hadoopConfiguration().set("com.amazonaws.services.s3.enableV4", "true")
spark._jsc.hadoopConfiguration().set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider")
#spark._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "")

In [33]:
try:
    records_greater_than_1.coalesce(1) \
    .write.option("header",True) \
        .option("maxRecordsPerFile", 5000) \
        .partitionBy("DISH") \
        .mode("overwrite") \
        .format("parquet") \
        .save("s3a://giruu/output_")
except Exception as e:
    print(e)

22/02/04 20:07:30 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
                                                                                