# MDA 2022
## Pyspark Sample Code
-----------------------------------------------------------------

## Setup
--------------------------------------------------

Let's setup Spark on your Colab environment.  Run the cell below!

In [None]:
!pip install pyspark
!pip install -U -q PyDrive
!apt install openjdk-8-jdk-headless -qq
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

Now we authenticate a Google Drive client to processing data

**Make sure to follow the interactive instructions.**

In [None]:
from google.colab import drive
# This will prompt for authorization.
drive.mount('/content/drive')

Mounted at /content/drive


## Check and extract data
--------------------------------------------------

In [None]:
!ls '/content/drive/My Drive/Colab Notebooks/MDA/HW2/'

 checkstatus.csv   MDA_2021.ipynb       SystemID.csv
 CompanyID.csv	   Sample_Data.zip      Traffic.csv
 Data.zip	   Sample_Traffic.csv  'پروژه MDA2021.pdf'


In [None]:
# !unzip "/content/drive/My Drive/Colab Notebooks/MDA/HW3/Sample_Data.zip" -d "/content/drive/My Drive/Colab Notebooks/MDA/HW2"
!unzip "./Sample_Data.zip" -d "./"

Archive:  ./Sample_Data.zip
  inflating: ./Sample_Traffic.csv    


the cells above, extract data which is in '/content/drive/My Drive/Test' to /content/drive/My Drive/Test/Traffic.csv  

## Initializing Spark and read data
--------------------------------------------------

In [None]:
from pyspark import SparkContext, SparkConf 
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField, StringType, IntegerType,TimestampType
from pyspark.sql.functions import col,current_timestamp,to_date,hour,dayofweek
spark = SparkSession \
    .builder \
    .appName("Spark_Processor") \
    .master("local[*]") \
    .getOrCreate()

sc=spark.sparkContext

schema = StructType([ \
        StructField("DEVICE_CODE", IntegerType(), True), 
        StructField("SYSTEM_ID",IntegerType(),True), \
        StructField("ORIGINE_CAR_KEY",StringType(),True), \
        StructField("FINAL_CAR_KEY",StringType(),True), \
        StructField("CHECK_STATUS_KEY", IntegerType(), True), \
        StructField("COMPANY_ID", StringType(), True), \
        StructField("PASS_DAY_TIME", TimestampType(), True)
    ])

21/12/05 20:48:27 WARN Utils: Your hostname, amin-X556UQK resolves to a loopback address: 127.0.1.1; using 192.168.1.15 instead (on interface wlp3s0)
21/12/05 20:48:27 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
21/12/05 20:48:29 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


#### Reading Sample_Traffic.cvs File

In [None]:
df=spark.read.csv('Sample_Traffic.csv',header=True,schema=schema)
df.show(5)

+-----------+---------+---------------+-------------+----------------+----------+-------------------+
|DEVICE_CODE|SYSTEM_ID|ORIGINE_CAR_KEY|FINAL_CAR_KEY|CHECK_STATUS_KEY|COMPANY_ID|      PASS_DAY_TIME|
+-----------+---------+---------------+-------------+----------------+----------+-------------------+
|     200501|       81|       10477885|     10477885|               5|       161|2021-06-01 03:54:39|
|        155|       81|       87625017|     87625017|               5|       161|2021-06-01 04:14:21|
|     631757|       81|        8652928|      8652928|               5|       161|2021-06-01 03:58:57|
|     631757|       81|        8548123|      8548123|               5|       161|2021-06-01 04:01:38|
|     631757|       81|       24715264|     24715264|               5|       161|2021-06-01 03:56:57|
+-----------+---------+---------------+-------------+----------------+----------+-------------------+
only showing top 5 rows



In this part we map the data with key of ('Final_CAR_KEY','PASS_DAY_TIME') and value of ('DEVICE_CODE') which shows the pathway of the cars. Now we have a RDD which has rows that each row contains pathway of each date with license plate of each passed car.

In [None]:
rdd = df.rdd.map(lambda x: ((x['FINAL_CAR_KEY'], x['PASS_DAY_TIME'].date()),  x['DEVICE_CODE']))
rdd.take(5)



[(('10477885', datetime.date(2021, 6, 1)), 200501),
 (('87625017', datetime.date(2021, 6, 1)), 155),
 (('8652928', datetime.date(2021, 6, 1)), 631757),
 (('8548123', datetime.date(2021, 6, 1)), 631757),
 (('24715264', datetime.date(2021, 6, 1)), 631757)]

Now we group_by the above RDD by its key for each car in every date to have all pathway that each car passed in a list. Notice that in this part we use set instead of list to not having repeated pathways. This approach base on the usage of this part does not have huge effect on the whole of the problem.

In [None]:
rdd_group = rdd.groupByKey().mapValues(set)
rdd_group.take(5)



[(('69939810', datetime.date(2021, 6, 1)), {206602, 631830, 900233}),
 (('11046172', datetime.date(2021, 6, 1)), {206602}),
 (('29077699', datetime.date(2021, 6, 1)),
  {119,
   128,
   206602,
   208602,
   210602,
   631367,
   631763,
   631829,
   900135,
   900233,
   900234,
   900240,
   900246,
   900266,
   22010111,
   22010123}),
 (('40682798', datetime.date(2021, 6, 1)), {206602}),
 (('48823778', datetime.date(2021, 6, 1)), {206602})]

# A-Priori

## Step by Step

For having baskets, we don't need to keep license plates, so we get baskets without them.

In [None]:
import numpy as np

baskets_rdd = rdd_group.map(lambda x: x[1])
baskets_rdd.take(2)



[{206602, 631830, 900233}, {206602}]

The below cell is written for testing the algorithm which we can take sample from whole data to run the code faster. The threshold parameter is a ratio of whole baskets which we choose a collection as a frequent collection if it's more than this threshold. 
I explain about parameters and results in the end of this notebook and also in the report pdf.

In [None]:
SAMPLE = True
SAMPLE_SIZE = 0.01
THRESHOLD = 0.001

if SAMPLE:
  baskets_rdd = baskets_rdd.sample(True, SAMPLE_SIZE)
  baskets_rdd = baskets_rdd.coalesce(10)
  baskets_rdd.cache()


In the next two cells, we count the number of baskets and then with using threshold, we get minimum support. I explain about MIN_COUNT in the end of this notebook and report.

In [None]:
BASKETS_COUNT = baskets_rdd.count()
BASKETS_COUNT



165

In [None]:
MIN_COUNT = int(THRESHOLD * BASKETS_COUNT)
MIN_COUNT

0

In this part at first we get count of repeat of each item and then with using support, we get most frequent items.

In [None]:
frequent_items_rdd = baskets_rdd.flatMap(lambda basket: [((item,),1) for item in basket]).reduceByKey(lambda x,y: x+y).filter(lambda x: x[1] > MIN_COUNT)
frequent_items_rdd.take(5)

[((631363,), 2),
 ((900171,), 3),
 ((119,), 5),
 ((22010087,), 1),
 ((22010095,), 1)]

This cell count most frequent items number.


In [None]:
FREQUENT_ITEMS_COUNT = frequent_items_rdd.count()
FREQUENT_ITEMS_COUNT

202

## A-Priori Function
The next following cells are written for A-Priori function. At first some other functions are written and I explain them in the following and cells and then I write the original function for A-Priori function.

#### generate_combinations
This function gets frequent itemsets of its previous step and generates candidate of the next step. At first it gets the frequent remaining items from whole of the frequent itemssets and then sort them. Then for each of frequent itemset, it gets the largest element and then compare tp remaining items of the collection and if that item be larger than that one, it adds it to the end of that collection and generates a new candidate. In this function it does'nt need to add smaller elements to the collections.

For example, suppose [b,c,d] collection which was frequent in the previous step and the remaining collection of frequent is [a,b,c,d,z] (these elements are sorted). From these remaining elements, z is the only element which is larger than other elements so a new candidate is [b,c,d,z]. But [a,b,c,d] is not a candidate. Assume that it is frequent. We know that all subsets of a frequent set is also frequent. Now if [a,b,c] was frequent, so it would be available in the frequent list and create its combination in checking [a,b,c] but if it isn't frequent, so [a,b,c,d] wouldn't be frequent and was not created correctly.

#### get_new_frequents
This function gets all the baskets as input and gets the most frequents itemsets of the next step. In this function we get help from a new function which is defined as "count_freq" which gets a basket as input and for each candidate of that step, if that candidate exists on that basket, it creates an output as (candidate,1). Finally with counting all candidates, we can get most frequent itemsets.

In [None]:
def generate_combinations(old_combinations):
  """
  Input old_frequent_itemsets, and create new candidates from it
  """

  def generate_combinations_util(old_combination):
    """
    lambda function that maps an old combination to a number of new candidate combinations
    """
    old_combination = old_combination[0]
    old_combination_max_item = old_combination[-1]

    # Can do here numpy way
    bigger_items = remaining_items[remaining_items>old_combination_max_item]
    new_candidates = []
    for x in bigger_items:
      new_candidates.append( old_combination + (x,) )

    return new_candidates

  remaining_items = np.array(old_combinations.flatMap(lambda x: x[0]).distinct().sortBy(lambda x: x).collect())
  # print('remaining items: ', remaining_items)

  new_combinations = old_combinations.flatMap(generate_combinations_util)
  return new_combinations

def get_new_frequents(candidates):
  # frequent_itemsets_rdd = frequent_itemsets_rdd.filter(lambda x: set(x[0]) <= set(x[1])).map(lambda x: (x[0], 1)).reduceByKey(lambda x,y: x+y).filter(lambda x: x[1]>MIN_COUNT)
  
  _candidates = candidates.collect()
  def count_freq(basket):
    candidates_present = []
    for candidate in _candidates:
      if set(candidate) <= set(basket):
        candidates_present.append( (candidate,1) )
    
    return candidates_present

  frequent_itemsets_rdd = baskets_rdd.flatMap(count_freq).reduceByKey(lambda x,y: x+y).filter(lambda x: x[1]>MIN_COUNT)
  
  return frequent_itemsets_rdd

k = 1
frequent_itemsets_rdd = frequent_items_rdd
while frequent_itemsets_rdd.count() != 0:
  print('-----------------loop start----------------------')
  k += 1
  candidates = generate_combinations(frequent_itemsets_rdd)
  frequent_itemsets_rdd = get_new_frequents(candidates)
  print('Itemsets of size ', k, ', count: ', frequent_itemsets_rdd.count())
  print('sample: ')
  print(frequent_itemsets_rdd.take(10))



-----------------loop start----------------------
Itemsets of size  2 , count:  732
sample: 
[((208602, 22010060), 1), ((631633, 900269), 1), ((631357, 631633), 1), ((631357, 900269), 1), ((119, 900171), 1), ((22010117, 100700845), 1), ((900233, 22010117), 1), ((900233, 100700845), 1), ((900149, 900233), 1), ((900149, 22010117), 1)]
-----------------loop start----------------------




Itemsets of size  3 , count:  1772
sample: 
[((900171, 900233, 22010117), 1), ((900171, 900233, 100700845), 1), ((900171, 22010117, 100700845), 1), ((119, 900149, 900233), 1), ((119, 900149, 22010117), 1), ((119, 900149, 100700845), 1), ((119, 900233, 22010117), 1), ((119, 900233, 100700845), 1), ((119, 22010117, 100700845), 1), ((900124, 900160, 900233), 1)]
-----------------loop start----------------------




Itemsets of size  4 , count:  3518
sample: 
[((119, 900171, 900233, 22010117), 1), ((119, 900171, 900233, 100700845), 1), ((119, 900171, 22010117, 100700845), 1), ((900149, 900233, 22010117, 100700845), 1), ((119, 900124, 900160, 900233), 1), ((119, 900124, 900160, 22010117), 1), ((119, 900124, 900160, 100700845), 1), ((900124, 900149, 900160, 900233), 1), ((900124, 900149, 900160, 22010117), 1), ((900124, 900149, 900160, 100700845), 1)]
-----------------loop start----------------------




Itemsets of size  5 , count:  5823
sample: 
[((119, 900149, 900233, 22010117, 100700845), 1), ((900124, 900160, 900233, 22010117, 100700845), 1), ((900149, 900171, 900233, 22010117, 100700845), 1), ((119, 900124, 900149, 900160, 900233), 1), ((119, 900124, 900149, 900160, 22010117), 1), ((119, 900124, 900149, 900160, 100700845), 1), ((119, 900124, 900160, 900171, 900233), 1), ((119, 900124, 900160, 900171, 22010117), 1), ((119, 900124, 900160, 900171, 100700845), 1), ((900124, 900149, 900160, 900171, 900233), 1)]
-----------------loop start----------------------




Itemsets of size  6 , count:  7960
sample: 
[((119, 900124, 900160, 900233, 22010117, 100700845), 1), ((900124, 900149, 900160, 900233, 22010117, 100700845), 1), ((119, 900149, 900171, 900233, 22010117, 100700845), 1), ((900124, 900160, 900171, 900233, 22010117, 100700845), 1), ((119, 900124, 900149, 900160, 900171, 900233), 1), ((119, 900124, 900149, 900160, 900171, 22010117), 1), ((119, 900124, 900149, 900160, 900171, 100700845), 1), ((114, 900101, 900236, 900237, 22010039, 100700841), 1), ((900142, 900152, 900212, 900222, 900244, 100700866), 1), ((631795, 801710, 900124, 900207, 900243, 22010083), 1)]
-----------------loop start----------------------




Itemsets of size  7 , count:  8889
sample: 
[((119, 900124, 900149, 900160, 900233, 22010117, 100700845), 1), ((119, 900124, 900160, 900171, 900233, 22010117, 100700845), 1), ((900124, 900149, 900160, 900171, 900233, 22010117, 100700845), 1), ((144, 230103, 631795, 801710, 900207, 900243, 22010083), 1), ((144, 230103, 631795, 900102, 900207, 900243, 22010083), 1), ((230103, 631795, 801710, 900124, 900207, 900243, 22010083), 1), ((230103, 631795, 900102, 900124, 900207, 900243, 22010083), 1), ((144, 801710, 900102, 900124, 900207, 900243, 22010083), 1), ((144, 230103, 801710, 900102, 900218, 900243, 22010083), 1), ((144, 631795, 801710, 900102, 900218, 900243, 22010083), 1)]
-----------------loop start----------------------




Itemsets of size  8 , count:  8032
sample: 
[((119, 900124, 900149, 900160, 900171, 900233, 22010117, 100700845), 1), ((144, 230103, 631795, 801710, 900124, 900218, 900243, 22010083), 1), ((144, 230103, 631795, 900102, 900124, 900218, 900243, 22010083), 1), ((631795, 801710, 900102, 900124, 900207, 900218, 900243, 22010083), 1), ((230103, 801710, 900102, 900124, 900207, 900218, 900243, 22010083), 1), ((144, 230103, 801710, 900124, 900207, 900218, 900243, 22010083), 1), ((144, 230103, 900102, 900124, 900207, 900218, 900243, 22010083), 1), ((144, 631795, 801710, 900124, 900207, 900218, 900243, 22010083), 1), ((144, 631795, 900102, 900124, 900207, 900218, 900243, 22010083), 1), ((144, 230103, 631795, 801710, 900102, 900124, 900207, 900243), 1)]
-----------------loop start----------------------




Itemsets of size  9 , count:  5806
sample: 
[((144, 230103, 631795, 801710, 900102, 900124, 900207, 900243, 22010083), 1), ((144, 230103, 631795, 801710, 900102, 900207, 900218, 900243, 22010083), 1), ((230103, 631795, 801710, 900102, 900124, 900207, 900218, 900243, 22010083), 1), ((144, 230103, 801710, 900102, 900124, 900218, 900244, 900277, 22010083), 1), ((144, 631795, 801710, 900102, 900124, 900218, 900244, 900277, 22010083), 1), ((144, 230103, 631795, 900124, 900207, 900218, 900244, 900277, 22010083), 1), ((230103, 631795, 801710, 900102, 900207, 900218, 900244, 900277, 22010083), 1), ((631795, 801710, 900124, 900207, 900218, 900243, 900244, 900277, 22010083), 1), ((631795, 900102, 900124, 900207, 900218, 900243, 900244, 900277, 22010083), 1), ((230103, 801710, 900124, 900207, 900218, 900243, 900244, 900277, 22010083), 1)]
-----------------loop start----------------------




Itemsets of size  10 , count:  3303
sample: 
[((144, 230103, 801710, 900102, 900124, 900207, 900218, 900244, 900277, 22010083), 1), ((144, 631795, 801710, 900102, 900124, 900207, 900218, 900244, 900277, 22010083), 1), ((144, 230103, 631795, 801710, 900207, 900218, 900243, 900244, 900277, 22010083), 1), ((144, 230103, 631795, 900102, 900207, 900218, 900243, 900244, 900277, 22010083), 1), ((230103, 631795, 801710, 900124, 900207, 900218, 900243, 900244, 900277, 22010083), 1), ((230103, 631795, 900102, 900124, 900207, 900218, 900243, 900244, 900277, 22010083), 1), ((144, 801710, 900102, 900124, 900207, 900218, 900243, 900244, 900277, 22010083), 1), ((144, 230103, 631795, 801710, 900124, 900207, 900243, 900244, 900277, 22010083), 1), ((144, 230103, 631795, 900102, 900124, 900207, 900243, 900244, 900277, 22010083), 1), ((144, 230103, 631795, 801710, 900124, 900207, 900218, 900243, 900277, 22010083), 1)]
-----------------loop start----------------------




Itemsets of size  11 , count:  1444
sample: 
[((144, 230103, 631795, 801710, 900102, 900124, 900218, 900243, 900244, 900277, 22010083), 1), ((144, 230103, 631795, 801710, 900102, 900124, 900207, 900243, 900244, 900277, 100701266), 1), ((144, 230103, 631795, 801710, 900102, 900207, 900218, 900243, 900244, 900277, 100701266), 1), ((230103, 631795, 801710, 900102, 900124, 900207, 900218, 900243, 900244, 900277, 100701266), 1), ((144, 230103, 631795, 801710, 900102, 900124, 900207, 900218, 900243, 900277, 100701266), 1), ((144, 230103, 631795, 801710, 900124, 900207, 900218, 900243, 900244, 22010083, 100701266), 1), ((144, 230103, 631795, 900102, 900124, 900207, 900218, 900243, 900244, 22010083, 100701266), 1), ((144, 230103, 631795, 801710, 900102, 900124, 900207, 900218, 900277, 22010083, 100701266), 1), ((144, 230103, 801710, 900102, 900124, 900207, 900218, 900243, 900277, 22010083, 100701266), 1), ((144, 631795, 801710, 900102, 900124, 900207, 900218, 900243, 900277, 22010083, 10070126



Itemsets of size  12 , count:  468
sample: 
[((144, 230103, 631795, 801710, 900102, 900124, 900207, 900218, 900243, 900244, 900277, 22010083), 1), ((144, 230103, 631795, 801710, 900124, 900207, 900218, 900243, 900244, 900277, 22010083, 100701266), 1), ((144, 230103, 631795, 900102, 900124, 900207, 900218, 900243, 900244, 900277, 22010083, 100701266), 1), ((230101, 631356, 631831, 900101, 900104, 900152, 900212, 900213, 900217, 900236, 900258, 100700835), 1), ((230101, 631356, 631831, 900101, 900104, 900152, 900212, 900213, 900217, 900244, 900258, 100700835), 1), ((230101, 631356, 631831, 900101, 900104, 900142, 900152, 900212, 900236, 900244, 900258, 100700835), 1), ((230101, 631356, 631831, 900104, 900142, 900152, 900212, 900213, 900236, 900244, 900258, 100700835), 1), ((230101, 631356, 631831, 900104, 900142, 900152, 900212, 900217, 900236, 900244, 900258, 100700835), 1), ((230101, 631831, 900101, 900104, 900152, 900212, 900213, 900217, 900236, 900244, 900258, 100700835), 1), ((23010

#### Following cell shows A-Priori algorithm as a function which I explained details in above. The following def will be used for SON algorithm in the next part.

In [None]:
def apriori(baskets_rdd, MIN_COUNT, verbose=False):
  frequent_items_rdd = baskets_rdd.flatMap(lambda basket: [((item,),1) for item in basket]).reduceByKey(lambda x,y: x+y).filter(lambda x: x[1] > MIN_COUNT)

  def generate_combinations(old_combinations):
    """
    Input old_frequent_itemsets, and create new candidates from it
    """

    def generate_combinations_util(old_combination):
      """
      lambda function that maps an old combination to a number of new candidate combinations
      """
      old_combination = old_combination[0]
      old_combination_max_item = old_combination[-1]

      # Can do here numpy way
      bigger_items = remaining_items[remaining_items>old_combination_max_item]
      new_candidates = []
      for x in bigger_items:
        new_candidates.append( old_combination + (x,) )

      return new_candidates

    remaining_items = np.array(old_combinations.flatMap(lambda x: x[0]).distinct().sortBy(lambda x: x).collect())

    new_combinations = old_combinations.flatMap(generate_combinations_util)
    return new_combinations

  def get_new_frequents(candidates):
    _candidates = candidates.collect()
    def count_freq(basket):
      candidates_present = []
      for candidate in _candidates:
        if set(candidate) <= set(basket):
          candidates_present.append( (candidate,1) )

      return candidates_present

    frequent_itemsets_rdd = baskets_rdd.flatMap(count_freq).reduceByKey(lambda x,y: x+y).filter(lambda x: x[1]>MIN_COUNT)

    return frequent_itemsets_rdd

  if verbose:
    print('MIN_COUNT is: ', MIN_COUNT)
  k = 1
  frequent_itemsets_rdd = frequent_items_rdd
  frequent_itemsets_rdds = []
  while frequent_itemsets_rdd.count() != 0:
    frequent_itemsets_rdds.append(frequent_itemsets_rdd)
    if verbose:
      print('-----------------loop start----------------------')
    k += 1
    candidates = generate_combinations(frequent_itemsets_rdd)
    frequent_itemsets_rdd = get_new_frequents(candidates)
    if verbose:
      print('Itemsets of size ', k, ', count: ', frequent_itemsets_rdd.count())
      print('sample: ')
      print(frequent_itemsets_rdd.take(10))

  return frequent_itemsets_rdds



The following cell is for testing the functionality of the written A-Priori function.

In [None]:
import numpy as np

SAMPLE = True
SAMPLE_SIZE = 0.01
THRESHOLD = 0.001


baskets_rdd = rdd_group.map(lambda x: x[1])

if SAMPLE:
  baskets_rdd = baskets_rdd.sample(True, SAMPLE_SIZE)
  baskets_rdd = baskets_rdd.coalesce(10)
  baskets_rdd.cache()

BASKETS_COUNT = baskets_rdd.count()
BASKETS_COUNT

MIN_COUNT = int(THRESHOLD * BASKETS_COUNT)


frequent_itemsets_rdds = apriori(baskets_rdd, MIN_COUNT, verbose=True)



MIN_COUNT is:  15
-----------------loop start----------------------




Itemsets of size  2 , count:  811
sample: 
[((101301, 900101), 30), ((900102, 100701100), 36), ((900182, 100701100), 33), ((145, 100700841), 20), ((900222, 900228), 28), ((900222, 100700868), 171), ((100700866, 100700868), 47), ((900234, 900276), 34), ((900235, 100700871), 41), ((209103, 900235), 18)]
-----------------loop start----------------------




Itemsets of size  3 , count:  184
sample: 
[((900212, 900244, 22009977), 23), ((631765, 900164, 900276), 17), ((631765, 900164, 100700820), 35), ((631765, 900276, 100700820), 21), ((900101, 900259, 100700841), 35), ((900155, 900222, 100700868), 50), ((205802, 900215, 900234), 19), ((142, 900215, 900234), 16), ((205802, 212802, 900233), 16), ((900215, 900234, 900256), 23)]
-----------------loop start----------------------




Itemsets of size  4 , count:  11
sample: 
[((22010087, 22010088, 22010094, 22010095), 28), ((900101, 900212, 900244, 100700839), 16), ((900193, 900212, 900244, 100700839), 16), ((900102, 900142, 900212, 900244), 18), ((900142, 900202, 900212, 900244), 16), ((900142, 900212, 900244, 900249), 17), ((900142, 900212, 900244, 100700853), 54), ((209103, 900265, 100700804, 100700834), 21), ((900142, 900152, 900212, 900244), 18), ((231, 900236, 900255, 100700841), 20)]
-----------------loop start----------------------
Itemsets of size  5 , count:  0
sample: 
[]


# SON

For SON algorithm, we use the function which I was defined in the last part. For sample in below cell and the support, the application and explanation is the same. In this part,we deploy a random number between 0 to number of partitions-1 which number of partitions shows that RDD will be divided to several parts. Then with filtering on deployed number, RDD will be divided to almost equal partitions. The minimum support of each of these partitions can be reached base on their size (which) are almost equal. Notice that because the size of each partition is small, the minimum support would be small too. For more confidence we multiple the support to 0.9 to not miss anything.

Now we deploy A-Priori algorithm in the all parts and then we union the output of these algorithms to get frequent itemsets of the algorithms. But these itemsets may contain false positive because of that we pass through all of the data one more time and count the candidates and filter them. We do this work exactly like A-Priori algorithm.
The result of this algorithm was exactly as the same as A-Priori algorithm (Actually we expected this happen because SON algorithm does'nt have false positive and false negative.)

So the result of the both algorithm is the same which I report it at the end of the code and also in report PDF.

In [None]:
import random

baskets_rdd = rdd_group.map(lambda x: x[1])

SAMPLE = False
SAMPLE_SIZE = 0.01
# THRESHOLD = 0.01
THRESHOLD = 0.02

if SAMPLE:
  baskets_rdd = baskets_rdd.sample(True, SAMPLE_SIZE)
  baskets_rdd = baskets_rdd.coalesce(10)
  baskets_rdd.cache()

BASKETS_COUNT = baskets_rdd.count()
BASKETS_COUNT

MIN_COUNT = int(THRESHOLD * baskets_rdd.count())
print('MIN_COUNT is: ', MIN_COUNT)
partitions_count = 4
def random_lambda(x):
    return (x, random.randint(0,partitions_count-1))

baskets_rdd_with_random = baskets_rdd.map(random_lambda)

# print(baskets_rdd.take(10))

baskets_rdds = []
for i in range(partitions_count):
  baskets_rdds.append(baskets_rdd_with_random.filter(lambda x: x[1] == i).map(lambda x: x[0]))


MIN_COUNTS = [ int(THRESHOLD * basket_rdd.count()) for basket_rdd in baskets_rdds]
print('MIN_COUNTS are: ', MIN_COUNTS)

frequent_itemsets_rdds_for_samples = []
for i in range(partitions_count):
  frequent_itemsets_rdds_for_samples.append(apriori(baskets_rdds[i], int(MIN_COUNTS[i]*0.9)))

max_size = max(list(map(len, frequent_itemsets_rdds_for_samples)))


for i in range(1, max_size+1):
  frequent_itemsets_rdds_for_samples = list(filter(lambda x: len(x) >= i, frequent_itemsets_rdds_for_samples))

  frequent_itemsets_rdd = sc.union([x[i-1] for x in frequent_itemsets_rdds_for_samples]).map(lambda x: x[0]).distinct()
  _candidates = frequent_itemsets_rdd.collect()
  def count_freq(basket):
    candidates_present = []
    for candidate in _candidates:
      if set(candidate) <= set(basket):
        candidates_present.append( (candidate,1) )
    
    return candidates_present

  frequent_itemsets_rdd = baskets_rdd.flatMap(count_freq).reduceByKey(lambda x,y: x+y).filter(lambda x: x[1]>MIN_COUNT)
 
  
  print('Itemsets of size ', i, ', count: ', frequent_itemsets_rdd.count())
  print('sample: ')
  print(frequent_itemsets_rdd.take(10))


MIN_COUNT is:  30408
MIN_COUNTS are:  [7589, 7592, 7608, 7580]
Itemsets of size  1 , count:  27
sample: 
[((900107,), 38371), ((100700841,), 67632), ((22010119,), 30749), ((100700804,), 33666), ((900155,), 45958), ((900244,), 61075), ((900234,), 31110), ((900268,), 42107), ((900142,), 43822), ((100700866,), 33268)]
Itemsets of size  2 , count:  1
sample: 
[((900212, 900244), 42733)]


# Results and Comparison

With setting threshold to 0.02 or in the other hand by setting support equal to 30408 and on the whole data of the sample, frequent items is equal to 27 which a 10 sample of that is as below:
((900107,), 38371), ((900155,), 45958), ((900207,), 35631), ((22010119,), 30749), ((900139,), 31477), ((900191,), 31379), ((100700824,), 34662), ((900244,), 61075), ((900164,), 37506), ((900236,), 41652)

The most frequent 2-member-set is ((900212,900244),42733) which the right number of each set is the repeat of that set.

For testing the code for bigger data, I get a sample of 0.01 from the data and put the threshold to 0.001 and the result is as below:

SAMPLE_SIZE = 0.01

THRESHOLD = 0.001

MIN_COUNT is:  15



Itemsets of size  2 , count:  811

sample: 

[((101301, 900101), 30), ((900102, 100701100), 36), ((900182, 100701100), 33), ((145, 100700841), 20), ((900222, 900228), 28), ((900222, 100700868), 171), ((100700866, 100700868), 47), ((900234, 900276), 34), ((900235, 100700871), 41), ((209103, 900235), 18)]



Itemsets of size  3 , count:  184

sample: 
[((900212, 900244, 22009977), 23), ((631765, 900164, 900276), 17), ((631765, 900164, 100700820), 35), ((631765, 900276, 100700820), 21), ((900101, 900259, 100700841), 35), ((900155, 900222, 100700868), 50), ((205802, 900215, 900234), 19), ((142, 900215, 900234), 16), ((205802, 212802, 900233), 16), ((900215, 900234, 900256), 23)]



Itemsets of size  4 , count:  11

sample: 
[((22010087, 22010088, 22010094, 22010095), 28), ((900101, 900212, 900244, 100700839), 16), ((900193, 900212, 900244, 100700839), 16), ((900102, 900142, 900212, 900244), 18), ((900142, 900202, 900212, 900244), 16), ((900142, 900212, 900244, 900249), 17), ((900142, 900212, 900244, 100700853), 54), ((209103, 900265, 100700804, 100700834), 21), ((900142, 900152, 900212, 900244), 18), ((231, 900236, 900255, 100700841), 20)]

