# Hands On 1 - Bonus Track
In this part, we will test our Apache Spark skills on the Amazon food dataset (that you can find on `/data/Reviews.csv`). The goal is to find all the pairs of products frequently reviewed together.

## Task 1
The input Amazon food dataset lists all the reviews per-row (one review per line), and it is comma-separated. In each line, two of the columns represent
the user id and product id (third and second columns, respectively). The schema of Reviews.csv is the following:

|Id|ProductId|UserId|ProfileName|HelpfulnessNumerator|HelpfulnessDenominator|Score|Time|Summary|Text|
|--|---------|------|-----------|--------------------|----------------------|-----|----|-------|----|
|  |         |      |           |                    |                      |     |    |       |    |

Write a single Spark application that:
1. Transposes the original Amazon food dataset, obtaining an RDD of pairs (tuples) of the type:
_(UserId, list of the ProductId reviewed by that user)_

The returned RDD contains one pair/tuple for each user, which contains the `user_id` and the complete list of (distinct) products reviewed by that user. If user `user_id` reviewed more times the same product, that product must occur only one time in the returned list of the `product_ids` reviewed by `user_id`;

2. Counts the frequencies of all the pairs of products reviewed together (the frequency of a pair of products is given by the number of users who reviewed both products);

3. Stores on the output folder all the pairs of products that appear more than once and their frequencies. The pairs of products must be sorted by decreasing frequency.

Inspect the output of your application to search for interesting facts. 

**Pay attention** that the line starting with `“Id,”` is the header of the file and **must not** be considered.

In [None]:
# Set input and output folders
inputPath  = "data/Reviews.csv"
outputPath = "out_bonus/" 

# Read the content of the input file
reviewsRDD = sc.textFile(inputPath)

In [None]:
# Discard the header
reviewsRDDnoHeader = reviewsRDD.filter(lambda line: line.startswith("Id,")==False)

In [None]:
# This Python function splits the input line and returns a tuple (userId, productId)
def extractUserIdProductID(line):
    columns = line.split(",")
    userId= columns[2]
    productId= columns[1]
    
    return (userId,productId)


# Generate one pair (UserId, ProductId) from each input line
pairUserProductRDD = reviewsRDDnoHeader.map(extractUserIdProductID)

In [None]:
# Remove duplicate pairs, if any
pairUserProductDistinctRDD = pairUserProductRDD.distinct()

In [None]:
# Generate one "transaction" for each user
# (user_id, list of the product_ids reviewed by user_id)
UserIDListOfReviewedProductsRDD = pairUserProductDistinctRDD.groupByKey()

In [None]:
# We are interested only in the value part (the lists of products that have been reviewed together)
transactionsRDD = UserIDListOfReviewedProductsRDD.values()

In [None]:
# Given an input transaction (i.e., a list of products reviewed by the same user), 
# this Python function returns all the possible pair of products. Each pair of product is associated with
# a frequency equal to 1. Hence, this method returns a set of (key, value) pairs, where
# - key = pairs of products
# - value = 1
def extractPairsOfProducts(transaction):

    products = list(transaction)

    returnedPairs = []
    
    for product1 in products:
        for product2 in products:
            if product1<product2:
                returnedPairs.append( ((product1, product2), 1) )
                
    return returnedPairs


# Generate an RDD of (key,value) pairs, where
# - key = pairs of products
# - value = 1

# One pair is returned for each combination of products appearing in the same transaction  
pairsOfProductsOneRDD = transactionsRDD.flatMap(extractPairsOfProducts)

In [None]:
# Count the frequency (i.e., number of occurrences) of each key (= pair of products)
pairsFrequenciesRDD = pairsOfProductsOneRDD.reduceByKey(lambda count1, count2: count1 + count2)

In [None]:
# Select only the pairs that appear more than once and their frequencies.
atLeast2PairsFrequenciesRDD = pairsFrequenciesRDD.filter(lambda inputTuple: inputTuple[1]> 1)

In [None]:
# Sort pairs of products by decreasing frequency
atLeast2PairsFrequenciesSortedRDD = atLeast2PairsFrequenciesRDD.sortBy(lambda inputTuple: inputTuple[1], False).cache()

In [None]:
# Store the result in the output folder
atLeast2PairsFrequenciesSortedRDD.saveAsTextFile(outputPath)

## Task 2

Extend the implemented application in order to write on the standard output the top 10, most frequent, pairs of products and their frequencies

In [None]:
# The pairs of products in atLeast2PairsFrequenciesSortedRDD are already sorted by frequency.
# The first 10 pairs are already the top 10 ones
topPairsOfProducts = atLeast2PairsFrequenciesSortedRDD.take(10)

In [None]:
# Print the selected pairs of products on the standard ouput of the driver
for pairOfProducts in topPairsOfProducts:
    print(pairOfProducts)