## Loading Data

In [1]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from operator import add

In [2]:
spark = SparkSession \
    .builder \
    .appName("Spark_Processor") \
    .master("local[*]") \
    .getOrCreate()

sc=spark.sparkContext

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/12/20 20:39:53 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
data_df = spark.read.option('header', 'true').csv('Sample_Data/Sample_Traffic.csv')

In [4]:
# a preview of the data
data_df.show()

+-----------+---------+---------------+-------------+----------------+----------+-------------------+
|DEVICE_CODE|SYSTEM_ID|ORIGINE_CAR_KEY|FINAL_CAR_KEY|CHECK_STATUS_KEY|COMPANY_ID|      PASS_DAY_TIME|
+-----------+---------+---------------+-------------+----------------+----------+-------------------+
|     200501|       81|       10477885|     10477885|               5|       161|2021-06-01 03:54:39|
|        155|       81|       87625017|     87625017|               5|       161|2021-06-01 04:14:21|
|     631757|       81|        8652928|      8652928|               5|       161|2021-06-01 03:58:57|
|     631757|       81|        8548123|      8548123|               5|       161|2021-06-01 04:01:38|
|     631757|       81|       24715264|     24715264|               5|       161|2021-06-01 03:56:57|
|     206602|       81|       69939810|     69939810|               5|       161|2021-06-01 04:06:38|
|     206602|       81|       11046172|     11046172|               5|       161|2

In [5]:
# deleting outliers
data = data_df.rdd
data = data.filter(lambda x: x['ORIGINE_CAR_KEY'] == x['FINAL_CAR_KEY'])

In [6]:
# a function to generate the required rdd
def create_key_value(row):
    key = (row['FINAL_CAR_KEY'], row['PASS_DAY_TIME'])
    value = row['DEVICE_CODE']
    return (key, value)

In [7]:
# creating the required rdd
data = data.map(create_key_value)

In [8]:
# cleaning the data: for some cameras, a car had been submited mutiple time in a single moment
# which is not rational. so, we will remove this redundancy
def day_time(basket):
    plate = basket[0][0]
    day_time = basket[0][1]
    camera_code = basket[1]
    day, time = day_time.split()
    time = time[:-3]
    return ((plate, day + " " + time), camera_code)

data_modified = data.map(day_time)
clean_data = data_modified.groupByKey().mapValues(lambda x: tuple(set(tuple(x))))

In [9]:
# removing time, leaving only day
def day(basket):
    plate = basket[0][0]
    day_time = basket[0][1]
    camera_codes = basket[1]
    day = day_time.split()[0]
    return ((plate, day), camera_codes)

def flatten(x):
    if isinstance(x, str):
        yield x
    else:
        for item in x:
            yield from flatten(item)

clean_data = clean_data.map(day)
clean_data = clean_data.groupByKey().mapValues(tuple).mapValues(lambda x: tuple(flatten(x)))
camera_baskets = clean_data.values()

## A-Priori Algorithm

In [10]:
# counting the visits to each camera
def extract_cameras(basket):
    items = tuple((camera, 1) for camera in basket)
    return items

camera_count = camera_baskets.flatMap(extract_cameras).reduceByKey(add)

In [11]:
# calculating support threshold. we set it equal to mean + 1 * std
SUPPORT_S = camera_count.values().mean() + camera_count.values().stdev()

                                                                                

In [12]:
# finding the frequently visited cameras
frequent_cameras = camera_count.filter(lambda x: x[1] >= SUPPORT_S)

In [13]:
# broadcasting the frequent cameras
frequent_cameras_list = sc.broadcast(frozenset(frequent_cameras.map(lambda x: x[0]).collect()))



In [14]:
# remove cameras which are not frequents
def remove_infrequent_cameras(basket):
    return tuple(item for item in basket if item in frequent_cameras_list.value)

baskets_with_frequent_cameras = camera_baskets.map(remove_infrequent_cameras)    
baskets_with_frequent_cameras = baskets_with_frequent_cameras.filter(lambda x: len(x)<300)

In [15]:
# creating two tuples
def create_two_tuple(basket):
    n = len(basket)
    two_tuples = []
    
    for i in range(n):
        for j in range(i+1, n):
            two_tuple = tuple(sorted([basket[i], basket[j]]))
            if len(set(two_tuple)) == 2:
                two_tuples.append((two_tuple, 1))
    return two_tuples

frequent_two_tuples = baskets_with_frequent_cameras.flatMap(create_two_tuple).reduceByKey(add).filter(lambda x: x[1] >= SUPPORT_S)

In [16]:
# results
top_two_tuples = frequent_two_tuples.sortBy(lambda x: x[1], ascending=False).take(10)
print(f'number of two tuples is: {frequent_two_tuples.count()}')
print('===================================')
print('===== top frequent two tuples =====')
for two_tuple in top_two_tuples:
    print(two_tuple)


                                                                                

number of two tuples is: 24
===== top frequent two tuples =====
(('900212', '900244'), 55733)
(('900142', '900212'), 34622)
(('100700841', '900101'), 25697)
(('100700853', '900142'), 24949)
(('100700864', '900185'), 23455)
(('100700853', '900212'), 23150)
(('100700868', '900222'), 22815)
(('100700841', '900236'), 22592)
(('100700824', '900107'), 21723)
(('900142', '900244'), 21102)


In [17]:
# creating three tuples
def create_three_tuples(basket):
    n = len(basket)
    three_tuples = []
    
    for i in range(n):
        for j in range(i+1, n):
            for k in range(j+1, n):
                three_tuple = tuple(sorted([basket[i], basket[j], basket[k]]))
                if len(set(three_tuple)) == 3:
                    three_tuples.append((three_tuple, 1))
    return three_tuples

frequent_three_tuples = baskets_with_frequent_cameras.flatMap(create_three_tuples).reduceByKey(add).filter(lambda x: x[1] >= SUPPORT_S)

In [27]:
# results
top_three_tuples = frequent_three_tuples.sortBy(lambda x: x[1], ascending=False).take(10)
print(f'number of three tuples is: {frequent_three_tuples.count()}')
print('=================================')
print('===== top frequent three tuples =====')
for three_tuple in top_three_tuples:
    print(three_tuple)

number of three tuples is: 24
===== top frequent three tuples =====
(('175', '203902', '900191'), 48710)
(('100700853', '900142', '900212'), 40184)
(('100700868', '900155', '900222'), 36422)
(('900142', '900212', '900244'), 33052)
(('100700853', '900212', '900244'), 27559)
(('22010119', '900108', '900268'), 25845)
(('22009977', '900225', '900268'), 25806)
(('100700839', '900212', '900244'), 22235)
(('100700853', '900142', '900244'), 20402)
(('22010118', '900215', '900256'), 18280)


## SON Algorithm

In [19]:
# setting the hyperparameters and spliting the data into chunks
NUM_OF_CHUNKS = 3
RELAXING_FACTOR = 1.2
SUPPORT_S = 14071.60192803407
SON_SUPPORT_S = SUPPORT_S/(NUM_OF_CHUNKS * RELAXING_FACTOR)

son_final_frequent_two_tuples = sc.parallelize([])
son_final_frequent_three_tuples = sc.parallelize([])

def hash_data(basket):
    x = sum(int(camera_code) for camera_code in basket) % NUM_OF_CHUNKS
    return (x, basket)

hashed_baskets = camera_baskets.map(hash_data)

rdd1 = hashed_baskets.filter(lambda x: x[0] == 0).values()
rdd2 = hashed_baskets.filter(lambda x: x[0] == 1).values()
rdd3 = hashed_baskets.filter(lambda x: x[0] == 2).values()

data_chunks = [rdd1, rdd2, rdd3]

In [20]:
# creating candidates

def son_create_two_tuples(basket):
    n = len(basket)
    two_tuples = []
    
    for i in range(n):
        for j in range(i+1, n):
            if basket[i] in son_frequent_cameras_list.value and basket[j] in son_frequent_cameras_list.value:
                two_tuple = tuple(sorted([basket[i], basket[j]]))
                if len(set(two_tuple)) == 2:
                    two_tuples.append((two_tuple, 1))
    return two_tuples

def son_create_three_tuples(basket):
    n = len(basket)
    three_tuples = []
    
    for i in range(n):
        for j in range(i+1, n):
            for k in range(j+1, n):
                if basket[i] in son_frequent_cameras_list.value and basket[j] in son_frequent_cameras_list.value and basket[k] in son_frequent_cameras_list.value:
                    three_tuple = tuple(sorted([basket[i], basket[j], basket[k]]))
                    if len(set(three_tuple)) == 3:
                        three_tuples.append((three_tuple, 1))
    return three_tuples
                    
def son_extract_cameras(basket):
    return [(camera_code, 1) for camera_code in basket]

def son_remove_infrequent_cameras(basket):
    return tuple(camera_code for camera_code in basket if camera_code in son_frequent_cameras_list.value)

for chunk in data_chunks:
    camera_count = chunk.flatMap(son_extract_cameras).reduceByKey(add)
    
    son_frequent_cameras = camera_count.filter(lambda x: x[1] >= SON_SUPPORT_S)
    son_frequent_cameras_list = sc.broadcast(frozenset(son_frequent_cameras.keys().collect()))
    
    son_baskets_with_frequent_cameras = chunk.map(son_remove_infrequent_cameras)
    son_baskets_with_frequent_cameras = son_baskets_with_frequent_cameras.filter(lambda x: len(x) < 300)
    
    son_frequent_two_tuples = son_baskets_with_frequent_cameras.flatMap(son_create_two_tuples).reduceByKey(add).filter(lambda x: x[1] >= SON_SUPPORT_S)
    son_frequent_three_tuples = son_baskets_with_frequent_cameras.flatMap(son_create_three_tuples).reduceByKey(add).filter(lambda x: x[1] >= SON_SUPPORT_S)
    
    son_final_frequent_two_tuples = son_final_frequent_two_tuples.union(son_frequent_two_tuples)
    son_final_frequent_three_tuples = son_final_frequent_three_tuples.union(son_frequent_three_tuples)


                                                                                

In [21]:
# broadcasting the candidates
son_final_frequent_two_tuples_list = sc.broadcast(frozenset(son_final_frequent_two_tuples.keys().collect()))
son_final_frequent_three_tuples_list = sc.broadcast(frozenset(son_final_frequent_three_tuples.keys().collect()))

                                                                                

In [22]:
# verifying the candidates
def verify_two_tuple(basket):
    n = len(basket)
    two_tuples = []
    
    for i in range(n):
        for j in range(i+1, n):
            two_tuple = tuple(sorted([basket[i], basket[j]]))
            if two_tuple in son_final_frequent_two_tuples_list.value and len(set(two_tuple)) == 2:
                two_tuples.append((two_tuple, 1))
    return two_tuples

def verify_three_tuple(basket):
    n = len(basket)
    three_tuples = []
    
    for i in range(n):
        for j in range(i+1, n):
            for k in range(j+1, n):
                three_tuple = tuple(sorted([basket[i], basket[j], basket[k]]))
                if three_tuple in son_final_frequent_three_tuples_list.value and len(set(three_tuple)) == 3:
                    three_tuples.append((three_tuple, 1))
    return three_tuples

son_camera_baskets = baskets_with_frequent_cameras

son_final_frequent_two_tuples = son_camera_baskets.flatMap(verify_two_tuple).reduceByKey(add).filter(lambda x: x[1] >= SUPPORT_S)
son_final_frequent_three_tuples = son_camera_baskets.flatMap(verify_three_tuple).reduceByKey(add).filter(lambda x: x[1]>= SUPPORT_S)


In [23]:
# results
top_two_tuples = son_final_frequent_two_tuples.sortBy(lambda x: x[1], ascending=False).take(10)
print(f'number of two tuples is: {son_final_frequent_two_tuples.count()}')
print('===================================')
print('===== top frequent two tuples =====')
for two_tuple in top_two_tuples:
    print(two_tuple)


                                                                                

number of two tuples is: 24
===== top frequent two tuples =====
(('900212', '900244'), 55733)
(('900142', '900212'), 34622)
(('100700841', '900101'), 25697)
(('100700853', '900142'), 24949)
(('100700864', '900185'), 23455)
(('100700853', '900212'), 23150)
(('100700868', '900222'), 22815)
(('100700841', '900236'), 22592)
(('100700824', '900107'), 21723)
(('900142', '900244'), 21102)


In [28]:
# results
top_three_tuples = son_final_frequent_three_tuples.sortBy(lambda x: x[1], ascending=False).take(10)
print(f'number of three tuples is: {son_final_frequent_three_tuples.count()}')
print('=================================')
print('===== top frequent three tuples =====')
for three_tuple in top_three_tuples:
    print(three_tuple)

number of three tuples is: 24
===== top frequent three tuples =====
(('175', '203902', '900191'), 48710)
(('100700853', '900142', '900212'), 40184)
(('100700868', '900155', '900222'), 36422)
(('900142', '900212', '900244'), 33052)
(('100700853', '900212', '900244'), 27559)
(('22010119', '900108', '900268'), 25845)
(('22009977', '900225', '900268'), 25806)
(('100700839', '900212', '900244'), 22235)
(('100700853', '900142', '900244'), 20402)
(('22010118', '900215', '900256'), 18280)
