In [13]:
from pyspark import SparkConf, SparkContext
import os
import shutil

In [14]:
# Change to True to run the program on full dataset
isProd = False

In [15]:
number_cores = 2
memory_gb = 4 
# Create a configuration object and
# set the name of the application
conf = (
    SparkConf()
        .setAppName("SparkTask")
        .setMaster('local[{}]'.format(number_cores))
        .set('spark.driver.memory', '{}g'.format(memory_gb))
)
# Create a Spark Context object
sc = SparkContext(conf=conf)

# Solution

In [16]:
# Read the input file
if isProd:
    if not os.path.exists('input/Reviews.csv'):
        sc.stop()
        raise Exception("""
            Download the 'Reviews.csv' file from https://www.kaggle.com/datasets/snap/amazon-fine-food-reviews
            and put it in 'input' folder
        """)
    else:
        inputRdd = sc.textFile("input/Reviews.csv")
else:
    inputRdd = sc.textFile("/Users/denys/Desktop/Programming/Spark/spark-task-template/input/Sample.csv")

In [17]:
# Remove the header
filteredInput = inputRdd.filter(lambda line: line.startswith("Id,") == False)


# Note

Make sure you save program output to **output** folder

In [18]:
# Your code starts here
# result.saveAsTextFile("output/my_result")
UserProduct = filteredInput.map(lambda x: x.split(",")[2] + "," + x.split(",")[1]).map(lambda x: x.split(","))
UserProduct.collect()


                                                                                

[['A2', 'B1'],
 ['A4', 'B1'],
 ['A5', 'B1'],
 ['A1', 'B2'],
 ['A2', 'B3'],
 ['A3', 'B3'],
 ['A4', 'B3'],
 ['A5', 'B3'],
 ['A4', 'B4'],
 ['A2', 'B5'],
 ['A4', 'B5'],
 ['A2', 'B1'],
 ['A4', 'B5'],
 ['A5', 'B5']]

In [19]:
zero_value = set()

def seq_op(x,y):
    x.add(y)
    return x

def comb_op(x,y):
    return x.union(y)

UserProducts = UserProduct.aggregateByKey(zero_value, seq_op, comb_op).sortByKey()
UserProducts.collect()

[('A1', {'B2'}),
 ('A2', {'B1', 'B3', 'B5'}),
 ('A3', {'B3'}),
 ('A4', {'B1', 'B3', 'B4', 'B5'}),
 ('A5', {'B1', 'B3', 'B5'})]

In [20]:
ProductPairs = list(UserProducts.reduceByKey(lambda a,b: b.lookup(a)).map(lambda r: r[1]).filter(lambda x: len(x)>1).collect())
print(ProductPairs)

[{'B5', 'B1', 'B3'}, {'B5', 'B1', 'B4', 'B3'}, {'B5', 'B1', 'B3'}]


In [22]:
i = 0
j = 0 
tupleProduct = []
tempList = []
for x in ProductPairs:
    tempList.append(list(x))
while i < len(tempList):
    while j<len(tempList[i])-1:
        tupleProduct.append(tempList[i][j]+tempList[i][j+1])
        j+=1
    i+=1
    j=0
print(tupleProduct)

['B5B1', 'B1B3', 'B5B1', 'B1B4', 'B4B3', 'B5B1', 'B1B3']


In [23]:
nextStep = sc.parallelize(tupleProduct)
tupleProductWithOne = nextStep.map(lambda x: (x,1))
tupleProductWithOne.collect()

[('B5B1', 1),
 ('B1B3', 1),
 ('B5B1', 1),
 ('B1B4', 1),
 ('B4B3', 1),
 ('B5B1', 1),
 ('B1B3', 1)]

In [24]:
allTupleProd = list(tupleProductWithOne.countByKey().items())
tempProd = sc.parallelize(allTupleProd)
productPairsCounts = tempProd.sortBy(lambda x: -x[1])
productPairsCounts.collect()
result = productPairsCounts.zipWithIndex().filter(lambda vi: vi[1] < 10).keys()
result.collect()

[('B5B1', 3), ('B1B3', 2), ('B1B4', 1), ('B4B3', 1)]

In [25]:
outpath = 'output/output'
if os.path.exists(outpath) and os.path.isdir(outpath):
    shutil.rmtree(outpath)

result.saveAsTextFile(outpath)

In [12]:
sc.stop()