# Retail Analysis  Solution

## Author : Syed Kabir 

## Table of Contents
* 
    * [Part 1 : Working with RDD](#part-1)
        * [1.1 Data Preparation and Loading](#1.1)
        * [1.2 Data Partitioning in RDD](#1.2)
        * [1.3 Query/Analysis](#1.3)
    * [Part 2 : Working with DataFrames](#2-dataframes)
        * [2.1 Data Preparation and Loading](#2-dataframes)
        * [2.2 Query/Analysis](#2.2)
    * [Part 3 :  RDDs vs DataFrame vs Spark SQL](#part-3)

# Part 1 : Working with RDDs <a class="anchor" name="part-1"></a>
## 1. Working with RDD
In this section, RDDs are being created from the given datasets, partitioning in these RDDs are being performed and also perform use of various RDD operations to answer the queries for retail analysis. 

### 1.1 Data Preparation and Loading <a class="anchor" name="1.1"></a>
#### 1.1.1: Codes are wriiten to create a SparkContext object using SparkSession, which tells Spark how to access a cluster. To create a SparkSession, at first SparkConf object is built that contains information about the application, using Melbourne time as the session timezone. An appropriate name for the application is given and Spark is run locally with as many working processors as logical cores available in the machine. 

In [1]:
# Import SparkConf class into program
from pyspark import SparkConf

# local[*]: run Spark in local mode with as many working processors as logical cores on your machine
master = "local[*]"

# The `appName` field is a name to be shown on the Spark cluster UI page
app_name = "Retail Analysis App"

# Setup configuration parameters for Spark
spark_conf = SparkConf(). \
                setMaster(master).setAppName(app_name). \
                set("spark.sql.session.timeZone", "Australia/Melbourne") # setting the timezone for the session

In [2]:
# Import SparkSession classes
from pyspark.sql import SparkSession # Spark SQL

# Method : Using SparkSession; Configuring time zone
spark = SparkSession.builder. \
                     config(conf=spark_conf). \
                     getOrCreate()
sc = spark.sparkContext
sc.setLogLevel('ERROR')

#### 1.1.2 : Load the features, sales and stores csv file into features, sales and stores RDDs

In [3]:
features_rdd_raw = sc.textFile('features.csv')
sales_rdd_raw = sc.textFile('sales.csv')
stores_rdd_raw = sc.textFile('stores.csv')

#### 1.1.2 For each features, sales and stores RDDs, remove the header rows and display the total count and first 10 records. Hint : You can use csv.reader to parse rows in RDDs.

In [4]:
header_features = features_rdd_raw.first()
features_rdd = features_rdd_raw.filter(lambda row: row != header_features)
print(f"Total number of rows in featutures_rdd: {features_rdd.count()}")
print("First 10 rows of featutures_rdd : ")
features_rdd.take(10)

Total number of rows in featutures_rdd: 8190
First 10 rows of featutures_rdd : 


['1,05/02/2010,42.31,2.572,NA,NA,NA,NA,NA,211.0963582,8.106,FALSE',
 '1,12/02/2010,38.51,2.548,NA,NA,NA,NA,NA,211.2421698,8.106,TRUE',
 '1,19/02/2010,39.93,2.514,NA,NA,NA,NA,NA,211.2891429,8.106,FALSE',
 '1,26/02/2010,46.63,2.561,NA,NA,NA,NA,NA,211.3196429,8.106,FALSE',
 '1,05/03/2010,46.5,2.625,NA,NA,NA,NA,NA,211.3501429,8.106,FALSE',
 '1,12/03/2010,57.79,2.667,NA,NA,NA,NA,NA,211.3806429,8.106,FALSE',
 '1,19/03/2010,54.58,2.72,NA,NA,NA,NA,NA,211.215635,8.106,FALSE',
 '1,26/03/2010,51.45,2.732,NA,NA,NA,NA,NA,211.0180424,8.106,FALSE',
 '1,02/04/2010,62.27,2.719,NA,NA,NA,NA,NA,210.8204499,7.808,FALSE',
 '1,09/04/2010,65.86,2.77,NA,NA,NA,NA,NA,210.6228574,7.808,FALSE']

In [5]:
header_sales = sales_rdd_raw.first()
sales_rdd = sales_rdd_raw.filter(lambda row: row != header_sales)
print(f"Total number of rows in saleses_rdd: {sales_rdd.count()}")
print("First 10 rows of sales_rdd : ")
sales_rdd.take(10)

Total number of rows in saleses_rdd: 421570
First 10 rows of sales_rdd : 


['1,1,05/02/2010,24924.5,FALSE',
 '1,1,12/02/2010,46039.49,TRUE',
 '1,1,19/02/2010,41595.55,FALSE',
 '1,1,26/02/2010,19403.54,FALSE',
 '1,1,05/03/2010,21827.9,FALSE',
 '1,1,12/03/2010,21043.39,FALSE',
 '1,1,19/03/2010,22136.64,FALSE',
 '1,1,26/03/2010,26229.21,FALSE',
 '1,1,02/04/2010,57258.43,FALSE',
 '1,1,09/04/2010,42960.91,FALSE']

In [6]:
header_stores = stores_rdd_raw.first()
stores_rdd = stores_rdd_raw.filter(lambda row: row != header_stores)
print(f"Total number of rows in stores_rdd: {stores_rdd.count()}")
print("First 10 rows of stores_rdd : ")
stores_rdd.take(10)

Total number of rows in stores_rdd: 45
First 10 rows of stores_rdd : 


['1,A,151315',
 '2,A,202307',
 '3,B,37392',
 '4,A,205863',
 '5,B,34875',
 '6,A,202505',
 '7,B,70713',
 '8,A,155078',
 '9,B,125833',
 '10,B,126512']

### 1.2 Data Partitioning in RDD <a class="anchor" name="1.2"></a>
#### 1.2.1 : How many partitions do the above RDDs have? How is the data in these RDDs partitioned by default, when we do not explicitly specify any partitioning strategy? Can you explain Why it will be partitioned in this number? Hint: searching the source code to try to understand the reason. 

In [7]:
numPartitions = features_rdd.getNumPartitions()
print(f"Total partitions: {numPartitions}")

Total partitions: 2


In [8]:
numPartitions = sales_rdd.getNumPartitions()
print(f"Total partitions: {numPartitions}")

Total partitions: 2


In [9]:
numPartitions = stores_rdd.getNumPartitions()
print(f"Total partitions: {numPartitions}")

Total partitions: 2


<p style="color:black"><strong>ANSWER:</strong> 
    From the source code, we can see that the default partition number is 2 if the function "textfile" did not have the minPartitions parameters. The default setting is min(self.defaultParallelism, 2)
</p>

https://spark.apache.org/docs/latest/api/python/_modules/pyspark/context.html#SparkContext.textFile

#### 1.2.2 :  Create a key value RDD for the store RDD, use the store type as the key and all of columns as the value. Print out the first 5 records of the key-value RDD. 

In [10]:
# Implement function with logic to be applied to the RDDs
def parseRecord(line):
    # Split line separated by comma
    array_line = line.split(',')
    # Return a tuple with the car model as first element and the remaining as the second element
    return (array_line[1], [array_line[0],array_line[2]])

storetypekey_rdd = stores_rdd.map(parseRecord)
storetypekey_rdd.take(5)

[('A', ['1', '151315']),
 ('A', ['2', '202307']),
 ('B', ['3', '37392']),
 ('A', ['4', '205863']),
 ('B', ['5', '34875'])]

#### 1.2.3 : Write the code to seperate the store key-value RDD based on the store type (each type should be in the same partition). Print out the total partition's number and the number of records in each partition. 

In [11]:
def print_partitions(data):
    numPartitions = data.getNumPartitions()
    partitions = data.glom().collect()
    print(f"####### NUMBER OF PARTITIONS: {numPartitions}")
    for index, partition in enumerate(partitions):
        # show partition if it is not empty
        if len(partition) > 0:
            print(f"Partition {index}: {len(partition)} records")
            print(partition)

In [12]:
# Creating a hash function to convert catagorical value to hashed value 
def hash_function(key):
    hashed_key = hash(key)
    return hashed_key

# hash partitioning
no_of_partitions= storetypekey_rdd.keys().distinct().count()
hash_partitioned_rdd = storetypekey_rdd.partitionBy(no_of_partitions, hash_function)

# Displaying partitioning
print_partitions(hash_partitioned_rdd)   

####### NUMBER OF PARTITIONS: 3
Partition 0: 23 records
[('B', ['3', '37392']), ('B', ['5', '34875']), ('B', ['7', '70713']), ('B', ['9', '125833']), ('B', ['10', '126512']), ('B', ['12', '112238']), ('B', ['15', '123737']), ('B', ['16', '57197']), ('B', ['17', '93188']), ('B', ['18', '120653']), ('B', ['21', '140167']), ('B', ['22', '119557']), ('B', ['23', '114533']), ('B', ['25', '128107']), ('B', ['29', '93638']), ('C', ['30', '42988']), ('B', ['35', '103681']), ('C', ['37', '39910']), ('C', ['38', '39690']), ('C', ['42', '39690']), ('C', ['43', '41062']), ('C', ['44', '39910']), ('B', ['45', '118221'])]
Partition 1: 22 records
[('A', ['1', '151315']), ('A', ['2', '202307']), ('A', ['4', '205863']), ('A', ['6', '202505']), ('A', ['8', '155078']), ('A', ['11', '207499']), ('A', ['13', '219622']), ('A', ['14', '200898']), ('A', ['19', '203819']), ('A', ['20', '203742']), ('A', ['24', '203819']), ('A', ['26', '152513']), ('A', ['27', '204184']), ('A', ['28', '206302']), ('A', ['31', '

### 1.3 Query/Analysis <a class="anchor" name="1.3"></a>
For this part, write relevant RDD operations to answer the following queries.

**1.3.1 Calculate the average weekly sales for each year.**

In [13]:
# Have a look on sales rdd
sales_rdd = sales_rdd_raw.filter(lambda row: row != header_sales)
sales_rdd.take(5)

['1,1,05/02/2010,24924.5,FALSE',
 '1,1,12/02/2010,46039.49,TRUE',
 '1,1,19/02/2010,41595.55,FALSE',
 '1,1,26/02/2010,19403.54,FALSE',
 '1,1,05/03/2010,21827.9,FALSE']

In [14]:
# Now we have to split each element separated by ',' and creating tupples by taking only values of two columns 
# Implement function with logic to be applied to the RDDs
def parseRecord(line):
    # Split line separated by comma
    array_line = line.split(',')
    # Return a tuple with 'Date' as first element and weekly sales as the second element
    return (array_line[2], array_line[3])

weeklySales_rdd = sales_rdd.map(parseRecord)
weeklySales_rdd.take(5)

[('05/02/2010', '24924.5'),
 ('12/02/2010', '46039.49'),
 ('19/02/2010', '41595.55'),
 ('26/02/2010', '19403.54'),
 ('05/03/2010', '21827.9')]

In [15]:
# The date value is needed to be splitted by '/'for grabbing only year value
# Taking only year component of the date
weeklySales_year_rdd = weeklySales_rdd. \
                                    map(lambda x: x[0].split('/')). \
                                    map(lambda x: (x[2]))
weeklySales_year_rdd.distinct().collect()

['2010', '2011', '2012']

In [16]:
# zipping year values with the sales value
weeklySales_rdd1 = weeklySales_year_rdd.zip(weeklySales_rdd. \
                                            map(lambda x : x[1])). \
                                            map(lambda x: [float(i) for i in x]) # Converting string values of sales to float value

In [17]:
# Calculating average weekly sales for each year
weeklyAvgSales_rdd = weeklySales_rdd1.groupByKey(). \
                                        mapValues(lambda x: sum(x)/len(x))
weeklyAvgSales_rdd.collect()

[(2010.0, 16270.275737033313),
 (2012.0, 15694.948597357718),
 (2011.0, 15954.070675386392)]

<div style="background:rgba(255,181,116,0.5);padding:10px">
    <strong>NOTE:</strong> Other ways are also acceptable if they can get the correct result. 
    

</div>

**1.3.2 Find the highest temperature record in 2011 in the 'type B' store. You should display the store ID, date, highest temperature and type in the result.**

##### Selecting required elements from features rdd

In [18]:
# Let's at first have a look on features_rdd
features_rdd = features_rdd_raw.filter(lambda row: row != header_features)
features_rdd.take(5)

['1,05/02/2010,42.31,2.572,NA,NA,NA,NA,NA,211.0963582,8.106,FALSE',
 '1,12/02/2010,38.51,2.548,NA,NA,NA,NA,NA,211.2421698,8.106,TRUE',
 '1,19/02/2010,39.93,2.514,NA,NA,NA,NA,NA,211.2891429,8.106,FALSE',
 '1,26/02/2010,46.63,2.561,NA,NA,NA,NA,NA,211.3196429,8.106,FALSE',
 '1,05/03/2010,46.5,2.625,NA,NA,NA,NA,NA,211.3501429,8.106,FALSE']

In [19]:
# Implement function with logic to be applied to the RDDs
def parseRecordFeatures(line):
    # Split line separated by comma
    array_line = line.split(',')
    # Return a tuple with the car model as first element and the remaining as the second element
    return (array_line[0], (array_line[1], array_line[2]))

features_rdd1 = features_rdd.map(parseRecordFeatures)
features_rdd1.take(5)

[('1', ('05/02/2010', '42.31')),
 ('1', ('12/02/2010', '38.51')),
 ('1', ('19/02/2010', '39.93')),
 ('1', ('26/02/2010', '46.63')),
 ('1', ('05/03/2010', '46.5'))]

##### Selecting required elements from stores rdd

In [20]:
# Let's at first have a look on stores_rdd
stores_rdd = stores_rdd_raw.filter(lambda row: row != header_stores)
stores_rdd.take(5)

['1,A,151315', '2,A,202307', '3,B,37392', '4,A,205863', '5,B,34875']

In [21]:
# Implement function with logic to be applied to the RDDs
def parseRecordSales(line):
    # Split line separated by comma
    array_line = line.split(',')
    # Return a tuple with the store id as first element and store type as the second element
    return (array_line[0], array_line[1])

stores_rdd1 = stores_rdd.map(parseRecordSales)
stores_rdd1.take(5)

[('1', 'A'), ('2', 'A'), ('3', 'B'), ('4', 'A'), ('5', 'B')]

##### Joining features and stores rdd

In [22]:
# Joining feature and store rdds
feature_store_rdd = features_rdd1.join(stores_rdd1)
feature_store_rdd.take(5)

[('4', (('05/02/2010', '43.76'), 'A')),
 ('4', (('12/02/2010', '28.84'), 'A')),
 ('4', (('19/02/2010', '36.45'), 'A')),
 ('4', (('26/02/2010', '41.36'), 'A')),
 ('4', (('05/03/2010', '43.49'), 'A'))]

##### Initial Unpacking of the elements

In [23]:
feature_store_rdd1 = feature_store_rdd.map(lambda x : [x[0]] +list(x[1]))
feature_store_rdd1.take(3)

[['4', ('05/02/2010', '43.76'), 'A'],
 ['4', ('12/02/2010', '28.84'), 'A'],
 ['4', ('19/02/2010', '36.45'), 'A']]

##### Final Unpacking Elements

In [24]:
feature_store_rdd2 = feature_store_rdd1.map(lambda x: [x[0]] + list(x[1]) + [x[2]]) 
feature_store_rdd2.take(3)

[['4', '05/02/2010', '43.76', 'A'],
 ['4', '12/02/2010', '28.84', 'A'],
 ['4', '19/02/2010', '36.45', 'A']]

##### Unpacking year data from the date

In [25]:
# Unpacking year data from the date
feature_store_rdd3 = feature_store_rdd2.map(lambda x:  x[1].split('/')).map(lambda x: (x[2]))
feature_store_rdd3.take(3)

['2010', '2010', '2010']

In [26]:
# Zipping year data with the joined unpacked RDD
feature_store_rdd4 = feature_store_rdd3.zip(feature_store_rdd2) 
feature_store_rdd4.take(3)

[('2010', ['4', '05/02/2010', '43.76', 'A']),
 ('2010', ['4', '12/02/2010', '28.84', 'A']),
 ('2010', ['4', '19/02/2010', '36.45', 'A'])]

##### Rearranging and removing Date elements

In [27]:
# Rearranging and removing Date elements
feature_store_rdd5 = feature_store_rdd4.map(lambda x: [x[0]] + x[1]).map (lambda x : (x[1], x[0], x[4], x[3]))  
feature_store_rdd5.take(3)

[('4', '2010', 'A', '43.76'),
 ('4', '2010', 'A', '28.84'),
 ('4', '2010', 'A', '36.45')]

##### Filtering for store type B and year = 2011

In [28]:
feature_store_rdd_final = feature_store_rdd5.filter(lambda x : x[2] == 'B'). \
                                             filter(lambda x : x[1]=='2011')

##### Finding final rdd with maximum value of Temperature Filtering for store type B and year = 2011

In [29]:
feature_store_rdd_max = feature_store_rdd_final.max(key=lambda x: float(x[3]))
print(feature_store_rdd_max)

('10', '2011', 'B', '95.36')
