In [None]:
purchasePath = "data/Purchases.txt"
# usersPath = "data/Users.txt"
cataloguePath = "data/Catalogue.txt"

outputPath1 = "outPart1/"
outputPath2 = "outPart2/"

In [None]:
# Define the rdds associated with Purchases and Catalogue
# SaleTimestamp,UserID,ItemID,SalePrice
purchaseRDD = sc.textFile(purchasePath)

# ItemID,Name,Category,StillInProduction
catalogueRDD = sc.textFile(cataloguePath)

In [None]:
#########################################
# PART 1
#########################################

In [None]:
# filter and keep only the purchases in year 2022 and year 2023
purchases2223 = purchaseRDD\
    .filter(lambda s: s.startswith("2023") or s.startswith("2022"))\
    .cache()

In [None]:
# count for each user and year the number of purchases the user made
# key = userId
# value = (count2022, count2023)
# and use a reduceByKey to count the number of purchases for each year 2002 and 2023 separately

def userIdCounters(line):
    fields = line.split(",")
    userId = fields[1]
    year = fields[0].split("/")[0]
    
    if(year=="2022"):
        return (userId, (1,0)) # add 1 to count2022, 0 to count2023
    else:
        return (userId, (0,1)) # add 1 to count2023, 0 to count2022


userCountPurchases2223 = purchases2223.map(userIdCounters)\
    .reduceByKey(lambda a,b: (a[0]+b[0], a[1]+b[1])).cache() # total count 2022 , total count 2023

In [None]:
# compute for each year 2022 and 2023, the maximum number of purchases among all users
maxPurchases2223 = userCountPurchases2223.values()\
    .reduce(lambda a,b: (max(a[0],b[0]),max(a[1],b[1]))) #  (max 2022, max 2023)


max22 = maxPurchases2223[0]
max23 = maxPurchases2223[1]

In [None]:
# filter the JavaPairRDD userCountPurchases2223 
# by keeping only the records with value part (count of purchases) equal to max22 or max23
# and then return only the unique users (keys)
res1 = userCountPurchases2223\
    .filter(lambda x: (x[1][0] == max22 or x[1][1] == max23))\
    .keys()

In [None]:
# Store the result of Part 1
res1.saveAsTextFile(outputPath1)

In [None]:
#########################################
# PART 2 - v1
#########################################

In [None]:
# considering the purchases in year 2022/2023 (purchases2223 RDD)
# we use a mapToPair with
# key = itemID
# value = userID
# and a distinct to obtain the distinct user-product purchases,
# a perform a map + reduceByKey to count for each itemID,
# the number of distinct users who bought that item
# key = itemID
# value = numberOfDistinctUsersPurchases

def ItemUser(line):
    fields = line.split(",")
    userId = fields[1]
    itemId = fields[2]
    
    return (itemId, userId)

itemDistinctUsersPurchases = purchases2223\
                            .map(ItemUser)\
                            .distinct()\
                            .map(lambda t: (t[0], 1))\
                            .reduceByKey(lambda v1, v2: v1 + v2)

In [None]:
# for each item, we retrieve the corresponding category

def ItemCategory(line):
    fields = line.split(",")
    itemId = fields[0]
    category = fields[2]
    
    return (itemId, category)
    

itemCategory = catalogueRDD.map(ItemCategory).cache()

In [None]:
# join itemCategory RDD with itemDistinctUsersPurchases
itemCategoryPurchases = itemCategory.join(itemDistinctUsersPurchases)\
                        .cache()

In [None]:
# compute for each category the maximum number distinct users who purchased the item
# first, we obtain the following RDD
# key = category
# value = number of distinct users who purchased an item
# and then we use a reduceByKey to compute the maximum value for each category
maxDistinctUsersPurchasesPerCategory = itemCategoryPurchases\
                                    .map(lambda t:  (t[1][0], t[1][1]))\
                                    .reduceByKey(lambda v1, v2: max(v1, v2))

In [None]:
# map itemCategoryPurchases to ( (category, numPurchases), itemid), join with maxDistinctUsersPurchasesPerCategory 
# (first map to ((category, maxPurchases), None))
# after join, format is
# key = (category,numPurchases)
# value = (itemId, None),
# then, use a map to obtain the format for the result
# key = category
# value = itemId

def CatItemId(t):
    category = t[0][0]
    itemId = t[1][0]
    
    return (category, itemId)


res2Partial = itemCategoryPurchases\
        .map(lambda t: ( (t[1][0], t[1][1]), t[0]))\
        .join(maxDistinctUsersPurchasesPerCategory.map(lambda tmax: (tmax, None)))\
        .map(CatItemId)


# Alternative solution for this step
#
# map itemCategoryPurchases to (category, (itemid, numPurchases), join with maxDistinctUsersPurchasesPerCategory
# and filter, keeping only the entries with numPurchases == maxPurchases
# after join, format is
# key = category
# value = (itemId, numPurchases), maxPurchasesPerCategory
# then, use a map to obtain the format for the result
# key = category
# value = itemId

#def CatItemId(t):
#    category = t[0]
#    itemId = t[1][0][0]
#    
#    return (category, itemId)
#    
#
#res2Partial = itemCategoryPurchases\
#        .map(lambda t: (t[1][0], (t[0], t[1][1])))\
#        .join(maxDistinctUsersPurchasesPerCategory)\
#        .filter(lambda t: t[1][0][1] == t[1][1])\
#        .map(CatItemId)

In [None]:
# from res2Partial we need to add the 0-case, i.e., categories with items which were never purchased.
# Consider all distinct categories (itemCategory.values().distinct()) and subtract those in res2Partial.
# At the end, map the selected categories to pairs
# key = category
# value = "NoPurchases"
unsoldCategories = itemCategory.values().distinct()\
        .subtract(res2Partial.keys())\
        .map(lambda cat: (cat, "NoPurchases"))

In [None]:
# update the result of the second part with a final Union
res2Final = res2Partial.union(unsoldCategories)

In [None]:
res2Final.saveAsTextFile(outputPath2)