<a href="https://colab.research.google.com/github/zahraDehghanian97/frequent_road/blob/master/question4.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **find Frequent itemset in Traffic Dataset using Spark**  

## **2. Setting Up Local Storage for Dataset**

### **2.1 Giving Access To Google Drive**

In [None]:
from google.colab import drive
# This will prompt for authorization.
drive.mount('/content/drive')

Mounted at /content/drive


## **3. Installing Required Libraries**

In [None]:
!pip install pyspark
!pip install -U -q PyDrive
!apt install openjdk-8-jdk-headless -qq
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

Collecting pyspark
  Downloading pyspark-3.2.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 45 kB/s 
[?25hCollecting py4j==0.10.9.2
  Downloading py4j-0.10.9.2-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 71.9 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.0-py2.py3-none-any.whl size=281805912 sha256=b04c89c1ef6fa2d16465ecccfe6c8592d8f3f6a39595ed77ac1017a0094ff8d3
  Stored in directory: /root/.cache/pip/wheels/0b/de/d2/9be5d59d7331c6c2a7c1b6d1a4f463ce107332b1ecd4e80718
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.2 pyspark-3.2.0
The following additional packages will be installed:
  openjdk-8-jre-headless
Suggested packages:
  openjdk-8-demo openjdk-8-source libnss-mdns fonts-dejavu-extra
  fonts-ipafont-gothic fonts-ipafont-mincho fonts-wqy-m

In [None]:
from pyspark import SparkContext, SparkConf 
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField, StringType, IntegerType,TimestampType
from pyspark.sql.functions import col,current_timestamp,to_date,hour,dayofweek

## **4. Data Loading and Preprocessing**

In [None]:
# !unzip "/content/drive/My Drive/Sample_Data.zip" -d "/content/drive/My Drive/Sample_Data"

In [None]:
from pyspark import SparkContext, SparkConf 
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField, StringType, IntegerType,TimestampType
from pyspark.sql.functions import col,current_timestamp,to_date,hour,dayofweek
spark = SparkSession \
    .builder \
    .appName("Spark_Processor") \
    .master("local[*]") \
    .getOrCreate()
sc=spark.sparkContext
schema = StructType([ \
        StructField("DEVICE_CODE", IntegerType(), True), 
        StructField("SYSTEM_ID",IntegerType(),True), \
        StructField("ORIGINE_CAR_KEY",StringType(),True), \
        StructField("FINAL_CAR_KEY",StringType(),True), \
        StructField("CHECK_STATUS_KEY", IntegerType(), True), \
        StructField("COMPANY_ID", StringType(), True), \
        StructField("PASS_DAY_TIME", TimestampType(), True)])
df=spark.read.csv('/content/drive/My Drive/Sample_Data/Sample_Traffic.csv',header=True,schema=schema)
df.show(1)

+-----------+---------+---------------+-------------+----------------+----------+-------------------+
|DEVICE_CODE|SYSTEM_ID|ORIGINE_CAR_KEY|FINAL_CAR_KEY|CHECK_STATUS_KEY|COMPANY_ID|      PASS_DAY_TIME|
+-----------+---------+---------------+-------------+----------------+----------+-------------------+
|     200501|       81|       10477885|     10477885|               5|       161|2021-06-01 03:54:39|
+-----------+---------+---------------+-------------+----------------+----------+-------------------+
only showing top 1 row



## **5. Algorithm Implementation**

### **5.1 A-priori**

In [None]:
# find frequent itemset 1

df_mapped = df.rdd.map(lambda x: ((x['ORIGINE_CAR_KEY'], x['PASS_DAY_TIME'].date()), [x['DEVICE_CODE']]))
df_mapped2 = df_mapped.reduceByKey(lambda a,b: a+b)
df_mapped2.take(10)
df_mapped2 = df_mapped2.sample(False,0.005)
supportRDD = df_mapped2.flatMap(lambda X: X[1]).map(lambda item: (item , 1)).reduceByKey(lambda a,b: a+b)
supportRDD = supportRDD.filter(lambda item: item[1] >= 100)

[(('40682798', datetime.date(2021, 6, 1)), [206602]),
 (('10334617', datetime.date(2021, 6, 1)),
  [22009977, 142, 100701059, 22009977, 100701059, 205201]),
 (('72847246', datetime.date(2021, 6, 1)), [22010062]),
 (('26001360', datetime.date(2021, 6, 1)),
  [206602, 207101, 137, 205802, 212802, 117, 117, 100700845, 100701145]),
 (('8609192', datetime.date(2021, 6, 1)),
  [205201,
   202601,
   205202,
   101301,
   230107,
   22009971,
   100700824,
   900276,
   22009977]),
 (('16609595', datetime.date(2021, 6, 1)),
  [22010051, 22010052, 22010120, 22010128]),
 (('73436862', datetime.date(2021, 6, 1)), [631781]),
 (('21614822', datetime.date(2021, 6, 1)),
  [631795,
   207101,
   900276,
   900216,
   900191,
   900234,
   900138,
   900265,
   175,
   175,
   135,
   107301,
   100700824,
   100700804]),
 (('27554425', datetime.date(2021, 6, 1)), [631763]),
 (('87107283', datetime.date(2021, 6, 1)), [22010053])]

In [None]:
# find frequent itemset 2 and 3

def remove(record):
    if(isinstance(record[0], tuple)):
        x1 = record[0]
        x2 = record[1]
    else:
        x1 = [record[0]]
        x2 = record[1]
    if(any(x == x2 for x in x1) == False):
        a = list(x1)
        a.append(x2)
        a.sort()
        result = tuple(a)
        return result 
    else:
        return x1


unique = supportRDD.map(lambda item: item[0]) 
resultRdd = supportRDD.map(lambda item: ([item[0]] , item[1])) 
allitems = df_mapped2.map(lambda item: item[1])
supportRDD2 = supportRDD.map(lambda item: item[0]) 
count = 2 
while(supportRDD2.isEmpty() == False):
    tmp = supportRDD2.cartesian(unique)
    tmp = tmp.map(lambda item: remove(item))
    tmp = tmp.filter(lambda item: len(item) == count)
    tmp = tmp.distinct()
    print(tmp.take(4))
    tmp_2 = tmp.cartesian(allitems)
    tmp_2 = tmp_2.filter(lambda item: all(x in item[1] for x in item[0]))
    print(tmp_2.take(4))
    tmp_2 = tmp_2.map(lambda item: (item[0], 1)).reduceByKey(lambda a,b: a+b)
    print(tmp_2.take(4))
    tmp_2 = tmp_2.filter(lambda item: item[1] >= 10)
    print(tmp_2.take(4))
    resultRdd = resultRdd.union(tmp_2)
    print(resultRdd.collect())
    tmp_2 = tmp_2.map(lambda item: item[0])
    supportRDD2 = tmp_2
    count = count+1 
    print('finish ' + str(count))
    if(count == 4):
      break

[(900222, 22010061), (900244, 100700853), (22010083, 100700853), (631357, 100700841)]
[((900222, 22010061), [100701251, 22010060, 631748, 900215, 900171, 803001, 100700868, 900222, 900102, 900246, 900102, 900246, 100701100, 900182, 100700866, 100701100, 100701130, 900102, 900102, 900246, 900222, 100700868, 22010061, 900155, 100701130, 900222, 900102, 900246]), ((900244, 100700853), [100700868, 900222, 631360, 900152, 900212, 900102, 900244, 900155, 100700853]), ((900244, 100700853), [900249, 100700853, 22009977, 900244, 900212, 900249, 900101]), ((900244, 100700853), [100700853, 900236, 22010048, 100701119, 900142, 900244, 900212])]
[((900222, 22010061), 2), ((900164, 900222), 5), ((900101, 900246), 12), ((900101, 900236), 46)]
[((900101, 900246), 12), ((900101, 900236), 46), ((900222, 900246), 20), ((631357, 900236), 26)]
[([900225], 225), ([100700868], 232), ([100700853], 312), ([22010061], 200), ([100700841], 369), ([900246], 286), ([900222], 265), ([900244], 328), ([631357], 211), 

In [None]:
# final frequent road with Apriori

resultRdd.collect()

[([900225], 225),
 ([100700868], 232),
 ([100700853], 312),
 ([22010061], 200),
 ([100700841], 369),
 ([900246], 286),
 ([900222], 265),
 ([900244], 328),
 ([631357], 211),
 ([900142], 247),
 ([100700866], 211),
 ([100700845], 220),
 ([100700824], 210),
 ([900268], 255),
 ([22010083], 249),
 ([900164], 215),
 ([900212], 458),
 ([900101], 280),
 ([900236], 214),
 ([900155], 296),
 ((900101, 900246), 12),
 ((900101, 900236), 46),
 ((900222, 900246), 20),
 ((631357, 900236), 26),
 ((900142, 100700853), 72),
 ((900212, 900244), 215),
 ((900155, 900222), 43),
 ((900155, 900246), 14),
 ((900225, 100700853), 12),
 ((900222, 100700868), 82),
 ((100700866, 100700868), 34),
 ((900101, 900225), 12),
 ((900155, 100700853), 10),
 ((900164, 900246), 21),
 ((631357, 900246), 42),
 ((900101, 900212), 63),
 ((900212, 900268), 22),
 ((900222, 900244), 25),
 ((900212, 900222), 23),
 ((900142, 900268), 19),
 ((900225, 900244), 23),
 ((900225, 100700866), 17),
 ((631357, 900222), 10),
 ((900212, 900246), 1

### **8.2 SON**

In [None]:
# divide data into 3 part

rddall = df.rdd.map(lambda x: ((x['ORIGINE_CAR_KEY'], x['PASS_DAY_TIME'].date()), x['DEVICE_CODE']))
rdd1 = rddall.sample(False, 0.3)
rddall = rddall.subtract(rdd1)
rdd2 = rddall.sample(False, 0.5)
rdd3 = rddall.subtract(rdd2)

rdd1 = rdd1.map(lambda x: (x[0],[x[1]])).reduceByKey(lambda a,b: a+b)
rdd2 = rdd2.map(lambda x: (x[0],[x[1]])).reduceByKey(lambda a,b: a+b)
rdd3 = rdd3.map(lambda x: (x[0],[x[1]])).reduceByKey(lambda a,b: a+b)

suprdd1 = rdd1.flatMap(lambda X: X[1]).map(lambda item: (item , 1)).reduceByKey(lambda a,b: a+b).filter(lambda item: item[1] >= 50)
suprdd2 = rdd2.flatMap(lambda X: X[1]).map(lambda item: (item , 1)).reduceByKey(lambda a,b: a+b).filter(lambda item: item[1] >= 50)
suprdd3 = rdd3.flatMap(lambda X: X[1]).map(lambda item: (item , 1)).reduceByKey(lambda a,b: a+b).filter(lambda item: item[1] >= 50)

uniqueItems1 = suprdd1.map(lambda item: item[0]) 
uniqueItems2 = suprdd2.map(lambda item: item[0]) 
uniqueItems3 = suprdd3.map(lambda item: item[0]) 

allRdd1 = suprdd1.map(lambda item: ([item[0]] , item[1])) 
allRdd2 = suprdd2.map(lambda item: ([item[0]] , item[1])) 
allRdd3 = suprdd3.map(lambda item: ([item[0]] , item[1])) 

listitems1 = rdd1.map(lambda item: item[1])
listitems2 = rdd2.map(lambda item: item[1])
listitems3 = rdd3.map(lambda item: item[1])

suprdd1_2 = suprdd1.map(lambda item: item[0]) 
suprdd2_2 = suprdd2.map(lambda item: item[0]) 
suprdd3_2 = suprdd3.map(lambda item: item[0]) 

print(suprdd1_2.take(3))
print(suprdd2_2.take(3))
print(suprdd3_2.take(3))

[900222, 100700820, 22010061]
[22010118, 100700868, 900222]
[100700820, 22010088, 100700868]


In [None]:
#implement Apriori to find frequent itemset in each part

def apriori(supportRDD2, unique, resultRdd, listall):
  count = 2
  while(supportRDD2.isEmpty() == False):
    tmp = supportRDD2.cartesian(unique)
    tmp = tmp.map(lambda item: remove(item))
    tmp = tmp.filter(lambda item: len(item) == count)
    tmp = tmp.distinct()
    print(tmp.take(4))

    tmp_2 = tmp.cartesian(listall)
    tmp_2 = tmp_2.filter(lambda item: all(x in item[1] for x in item[0]))
    print(tmp_2.take(4))

    tmp_2 = tmp_2.map(lambda item: (item[0], 1)).reduceByKey(lambda a,b: a+b)
    print(tmp_2.take(4))

    tmp_2 = tmp_2.filter(lambda item: item[1] >= 1)
    print(tmp_2.take(4))
    resultRdd = resultRdd.union(tmp_2)
    print(resultRdd.collect())
    tmp_2 = tmp_2.map(lambda item: item[0])
    supportRDD2 = tmp_2
    count = count+1 
    print('finish ' + str(count))
    if(count == 3):
      break

  return resultRdd

In [None]:
# find frequent items in each chunk

candidate_1 = apriori(suprdd1_2, uniqueItems1, allRdd1, listitems1)
candidate_2 = apriori(suprdd2_2, uniqueItems2, allRdd2, listitems2)
candidate_3 = apriori(suprdd3_2, uniqueItems3, allRdd3, listitems3)

[(900222, 22010061), (900222, 900234), (900234, 100700841), (900207, 100700862)]
[((900207, 900244), [900207, 900244]), ((900225, 100700853), [900225, 100700853]), ((22010047, 22010061), [22010047, 22010060, 22010043, 22010047, 22010061, 22010039, 22010057, 22010061, 22010061, 22010061]), ((900225, 900268), [900268, 900225])]
[((900225, 100700853), 1), ((900234, 900265), 1), ((900207, 900244), 1), ((22010047, 22010061), 1)]
[((900225, 100700853), 1), ((900234, 900265), 1), ((900207, 900244), 1), ((22010047, 22010061), 1)]
[([900222], 79), ([100700820], 51), ([22010061], 52), ([900225], 67), ([900246], 75), ([100700841], 108), ([100700853], 94), ([100700868], 58), ([900234], 62), ([100700862], 59), ([900207], 62), ([631357], 63), ([900142], 74), ([900265], 55), ([22010047], 53), ([22010119], 53), ([100700845], 60), ([900202], 84), ([100700824], 78), ([900268], 81), ([100700866], 54), ([900244], 100), ([900139], 59), ([22010083], 99), ([22010048], 50), ([900269], 67), ([900107], 62), ([9

In [None]:
# combine the results of chunks and check the frequency of candidate items in all of dataset

def remove2(record):

    if(len(record[0]) == 1):
        return (record[0][0], 1)
    else:
        return (record[0], 1)
  
def reverseremove(record):

    if(isinstance(record[0], tuple)):
        return record[0]
    else:
        return [record[0]]


candidate = candidate_1.union(candidate_2)
candidate = candidate.union(candidate_3)
candidate = candidate.map(remove2).reduceByKey(lambda a,b: a+b).map(lambda x:x[0])
candidate = candidate.map(reverseremove)
listall = listitems1.union(listitems2)
listall = listall.union(listitems3)
tmp_2 = candidate.cartesian(listall)
tmp_2 = tmp_2.filter(lambda item: all(x in item[1] for x in item[0]))
tmp_2 = tmp_2.map(lambda x: (str(x[0]), 1)).reduceByKey(lambda a,b: a+b)
son_results = tmp_2.filter(lambda item: item[1] >= 2)


In [None]:
# final frequent road with SON

son_results.collect()