In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from tqdm import tqdm
import json
import time
import os

from pyspark import SparkContext

import spotipy
import spotipy.oauth2 as oauth2
from spotipy.oauth2 import SpotifyOAuth,SpotifyClientCredentials

import pinecone

  from tqdm.autonotebook import tqdm


In [2]:
sc = SparkContext(appName="Apriori")

23/12/02 22:48:41 WARN Utils: Your hostname, Adityas-MacBook-Air-5.local resolves to a loopback address: 127.0.0.1; using 192.168.100.11 instead (on interface en0)
23/12/02 22:48:41 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/12/02 22:48:42 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
def slice_to_txt(slice_path, txt_file_path):
    data_dict = {}

    with open(slice_path) as json_file:
    	data = json.load(json_file)

    txt_file = open(txt_file_path,"a")

    for playlist in data['playlists']:
        line = ""
        for tracks in playlist['tracks']:
            line += tracks['track_uri'].split(':')[-1]
            line += " "

        txt_file.write(line[:-1]+'\n')

    txt_file.close()

def create_txt_slices(DATA_FOLDER, TXT_SLICES_FOLDER):

    os.mkdir(TXT_SLICES_FOLDER)

    for file in tqdm(os.listdir(DATA_FOLDER)):
        if file.split('.')[-1] == 'json':
            slice_to_txt(os.path.join(DATA_FOLDER, file), os.path.join(TXT_SLICES_FOLDER, f"slice{file.split('.')[-2]}.txt"))

# create_txt_slices('data100', 'data_txt_slices_100')

# Apriori

In [4]:
def get_frequent_k_tuples(candidate_k_tuples, min_support, shared_itemset):

    def calc_support(x):
        x_support = len([1 for t in shared_itemset.value if x.issubset(t)])
        return (x, x_support)

    frequent_k_tuples = sc.parallelize(candidate_k_tuples, 32).map(calc_support).filter(lambda x: x[1] > min_support).collect()
    return frequent_k_tuples

def construct_candidate_k_tuples(frequent_k_tuples): 
    n = len(frequent_k_tuples)
    k = len(frequent_k_tuples[0][0])

    new_candidate_tuples = []

    for i in range(n):
        for j in range(i+1, n):
            itemset1 = list(frequent_k_tuples[i][0])
            itemset2 = list(frequent_k_tuples[j][0])

            if itemset1[:-1] == itemset2[:-1]:
                new_candidate_tuples.append(frequent_k_tuples[i][0] | {itemset2[-1]})

    return new_candidate_tuples

In [5]:
def apriori_dataload(input_dir):
    
    data = sc.textFile(input_dir)
    itemset = data.map(lambda line: sorted(line.strip().split(' ')))

    return itemset

def apriori_support_estimates(itemset):

    shared_itemset = sc.broadcast(itemset.map(lambda x: set(x)).collect())
    candidate_k_tuples = itemset.flatMap(lambda x: set(x)).distinct().collect()
    candidate_k_tuples = [{x} for x in candidate_k_tuples]

    for sup_ratio in [0.005, 0.01, 0.015, 0.02, 0.05, 0.10]:
        support = sup_ratio * itemset.count()
        frequent_k_tuples = get_frequent_k_tuples(candidate_k_tuples, support, shared_itemset)
        
        print(f"Support - {sup_ratio*100}%  {support} | # of top tracks considered - {len(frequent_k_tuples)}")

def apriori(itemset, support_threshold_ratio):

    shared_itemset = sc.broadcast(itemset.map(lambda x: set(x)).collect())
    support_threshold = itemset.count()*support_threshold_ratio
    start = time.time()

    k = 0
    candidate_k_tuples = itemset.flatMap(lambda x: set(x)).distinct().collect()
    candidate_k_tuples = [{x} for x in candidate_k_tuples]

    while len(candidate_k_tuples) > 0:

        frequent_k_tuples = get_frequent_k_tuples(candidate_k_tuples, support_threshold, shared_itemset)
        k += 1
        print(f"k = {k} | freq_itemsets = {len(frequent_k_tuples)}")
        print(f"Time elapsed = {time.time() - start}")

        if len(frequent_k_tuples) == 0:
            break

        candidate_k_tuples = construct_candidate_k_tuples(frequent_k_tuples)

In [6]:
itemsets = apriori_dataload('data_txt_slices')
# # apriori_support_estimates(itemsets)
apriori(itemsets, 0.01)



CodeCache: size=131072Kb used=11922Kb max_used=11935Kb free=119149Kb
 bounds [0x0000000104424000, 0x0000000104fe4000, 0x000000010c424000]
 total_blobs=4720 nmethods=3912 adapters=723
 compilation: disabled (not enough contiguous free space left)


                                                                                

k = 1 | freq_itemsets = 662
Time elapsed = 12.636770009994507


                                                                                

k = 2 | freq_itemsets = 112
Time elapsed = 30.464622020721436
k = 3 | freq_itemsets = 8
Time elapsed = 30.649697065353394
k = 4 | freq_itemsets = 0
Time elapsed = 30.785190105438232


# FP-Growth Spark

In [7]:
from pyspark.ml.fpm import FPGrowth
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = SparkSession.builder.appName("AprioriDF").getOrCreate()

In [8]:
itemset = apriori_dataload('data_txt_slices')
print("Itemsets Created")
itemset = itemset.map(lambda x : set(x)).map(lambda x : list(x))
print("Removed duplicates")
itemset = itemset.map(lambda x: (x,)).toDF().withColumnRenamed('_1', 'playlists')
print("PySpark DF created")
itemset.show(5)

Itemsets Created
Removed duplicates
PySpark DF created
+--------------------+
|           playlists|
+--------------------+
|[20aGiCMoN89NFAEu...|
|[2faSzprTWJ7L1EkZ...|
|[6ITDAE1VFqNtNBJ5...|
|[17txou7v6Jxrwm4S...|
|[6hqt1z34Oz0OZtSf...|
+--------------------+
only showing top 5 rows



In [9]:
fpGrowth = FPGrowth(itemsCol="playlists", minSupport=0.005)
model = fpGrowth.fit(itemset)

freq_itemsets = model.freqItemsets
freq_itemsets.show(5)

[Stage 14:>                                                         (0 + 5) / 5]

+--------------------+----+
|               items|freq|
+--------------------+----+
|[28cnXtME493VX9NO...|  36|
|[45yEy5WJywhJ3sDI...|  55|
|[3f7gYMirBEKuc572...|  47|
|[7zFXmv6vqI4qOt4y...|  37|
|[2b9lp5A6CqSzwOrB...|  37|
+--------------------+----+
only showing top 5 rows



                                                                                

In [10]:
freq_itemsets = freq_itemsets.withColumn('k', F.size('items'))
df = freq_itemsets.toPandas()

                                                                                