# MDA 2021
## Pyspark Sample Code
-----------------------------------------------------------------

## Setup
--------------------------------------------------

Let's setup Spark on your Colab environment.  Run the cell below!

In [1]:
!pip install pyspark
# !pip install -U -q PyDrive
# !apt install openjdk-8-jdk-headless -qq
# import os
# os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"



Now we authenticate a Google Drive client to processing data

**Make sure to follow the interactive instructions.**

In [2]:
# from google.colab import drive
# This will prompt for authorization.
# drive.mount('/content/drive')

Mounted at /content/drive


## Check and extract data
--------------------------------------------------

In [3]:
# !ls '/content/drive/MyDrive/SUT/Big Data/hw3'
file_path = './'

In [3]:
!unzip './Sample_Data' -d './'

Archive:  ./Sample_Data.zip
replace ./Sample_Traffic.csv? [y]es, [n]o, [A]ll, [N]one, [r]ename: ^C


the cells above, extract data which is in '/content/drive/My Drive/Test' to /content/drive/My Drive/Test/Traffic.csv  

## Initializing Spark and read data
--------------------------------------------------

In [126]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType
from pyspark.sql.functions import col, current_timestamp, to_date, hour, dayofweek
spark = SparkSession \
    .builder \
    .appName("Spark_Processor") \
    .master("local[*]") \
    .config("spark.executor.cores", '6') \
    .config("spark.executor.memory", '4900m') \
    .getOrCreate()

sc = spark.sparkContext

schema = StructType([
    StructField("DEVICE_CODE", IntegerType(), True),
    StructField("SYSTEM_ID", IntegerType(), True),
    StructField("ORIGINE_CAR_KEY", StringType(), True),
    StructField("FINAL_CAR_KEY", StringType(), True),
    StructField("CHECK_STATUS_KEY", IntegerType(), True),
    StructField("COMPANY_ID", StringType(), True),
    StructField("PASS_DAY_TIME", TimestampType(), True)
])


<h2 dir="rtl">
 خواندن دیتا
</h2> 


In [127]:
df=spark.read.csv(f'{file_path}Sample_Traffic.csv',header=True,schema=schema)
df.show(1)

+-----------+---------+---------------+-------------+----------------+----------+-------------------+
|DEVICE_CODE|SYSTEM_ID|ORIGINE_CAR_KEY|FINAL_CAR_KEY|CHECK_STATUS_KEY|COMPANY_ID|      PASS_DAY_TIME|
+-----------+---------+---------------+-------------+----------------+----------+-------------------+
|     200501|       81|       10477885|     10477885|               5|       161|2021-06-01 03:54:39|
+-----------+---------+---------------+-------------+----------------+----------+-------------------+
only showing top 1 row



In [45]:
df.head()

Row(DEVICE_CODE=200501, SYSTEM_ID=81, ORIGINE_CAR_KEY='10477885', FINAL_CAR_KEY='10477885', CHECK_STATUS_KEY=5, COMPANY_ID='161', PASS_DAY_TIME=datetime.datetime(2021, 6, 1, 3, 54, 39))

<h1 dir="rtl">
 الف 
</h1> 

<h2 dir="rtl">
 خواندن 
 <span dir="ltr">
   rdd 
 </span> 
  دیتافریم و کش کردن آن 
</h2> 


In [128]:
data = df.rdd
data.cache()

MapPartitionsRDD[8] at javaToPython at NativeMethodAccessorImpl.java:0

<h2 dir="rtl">
 بدست آوردن آیتم‌ها و سبد‌ها 
</h2> 


<h3 dir="rtl">
 آیتم‌ها 
</h3> 
<h4 dir="rtl">
 برای هر آیتم ما نیازمند پلاک شناسایی‌شده و روز به عنوان کلید و کد دوربین به عنوان مقدار آن نیازمندیم. باقی در این تمرین استفاده نمی‌شود. 
</h4> 
<h3 dir="rtl">
 سبد‌ها
</h3> 
<h4 dir="rtl">
ابتدا روی آیتم‌ها تابع 
<span dir="ltr">
  distinct
</span> 
را صدا می‌زنیم. با اینکار آیتم‌های یکتا را می‌گیریم و سپس آن‌هایی که برای یک ماشین در یک روز خاص هستند را یک گروه می‌کنیم و کد‌ دستگاه‌ها را ذخیره می‌کنیم.
</h4> 
<h3 dir="rtl">
  آیتم‌ها و سبد‌ها را کش می‌کنیم. دلیل استفاده از تابع 
  <span dir="ltr">
  distinct
</span> 
، برای از بین بردن تکرار برای یک کلید مشابه است.
به طور مثال ممکن است یک ماشین در یک روز خاص چندبار از یک‌محل رد شود.
</h3>


In [129]:
# (plate, date), route
car_route_per_day_rdd = data.map(lambda x: (
    (x.FINAL_CAR_KEY, x.PASS_DAY_TIME.date()), x.DEVICE_CODE))
car_route_per_day_rdd.cache()
# (plate, date), [route]
car_routes_list_per_day_rdd = car_route_per_day_rdd.distinct().groupByKey().map(lambda x: tuple(x[1]))
car_routes_list_per_day_rdd.cache()


PythonRDD[18] at RDD at PythonRDD.scala:53

<h1 dir="rtl">
 ب 
</h1> 
<h2 dir="rtl">
 پیاده‌سازی الگوریتم 
 <span dir="ltr">
   a-priori
 </span> 
  
</h2> 


<h3 dir="rtl">
 دلیل اینکه ترشولد را خیلی بالا گرفتم این است که در انتهای ددلاین متوجه باگی در کدم شدم. برای اینکه شما خروجی از یک کد صحیح را ببینید مجبور به ران روی ترشولد پایین شدم. متاسفم. اما اگر روی 0.1 قرار دهید، مجموعه ۴ عضوی نیز خواهید داشت. 
</h3> 


In [130]:
frequent_threshold = 2

<h3 dir="rtl">
 یک تابع پیاده‌سازی می‌کنیم که  سبد‌ها و درصد ترشولد را ورودی بگیرد و خروجی لیستی از آیتم‌های پرتکرار بدهد. 
 در حین اجرا لاگ‌هایی برای فهم بهتر مسئله چاپ می‌شود که زمان‌ها نشان‌دهنده زمانی که از آخرین باری که چاپ شده تا الان را نشان می‌دهد و بعضی از لاگ‌ها تعداد کاندید‌ها و بعضی‌ها تعداد پرتکرار‌های یک طول خاص را نشان می‌دهند.
</h3> 


<h3 dir="rtl">
 <ul dir="rtl">
  <li dir="rtl">
   ابتدا تعداد سبد‌ها را می‌شماریم و با اعمال ساده ریاضی ترشولد را بدست می‌آوریم. 
  </li> 
   <li dir="rtl">
    ابتدا نیاز داریم آیتم‌های پرتکرار با طول یک را بدست آوریم. از سبد‌ها تمام آیتم‌های متمایز را می‌خوانیم و سپس تعداد هرکدام را می‌شماریم. اگر از مقدار ترشولدمان بیشتر بود آن‌ها را انتخاب می‌کنیم. دلیل اینکه به صورت 
    <span dir="ltr">
      tuple
    </span> 
     نگه‌داری می‌کنیم این است که اعمال اجتماع روی آن‌ها می‌توان استفاده کرد. همچنین نیاز به حالت‌بندی برای پرتکرار‌های طول ۲ با بقیه نیست.
     <span dir="ltr">
       rdd
     </span> 
     آن به عنوان پرتکرار ایتریشن خود ذخیره می‌کنیم و لیست آن را به عنوان نتیجه نهایی.
   </li> 
   <li dir="rtl">
    حال ایتریشن را ۲ می‌کنیم. ایتریشن نشان می‌دهد که در حلقه دنبال پرتکرار‌ها با چه طولی هستیم. آنقدر این چرخه را ادامه می‌دهیم تا هیچ پرتکراری درآن نباشد.
    <ol dir="rtl">
    <br/>
     <li dir="rtl">
      مجموعه‌های پرتکرار ایتریشن قبل را در نظر بگیرید. آن‌ها را باهم کراس می‌کنیم. به عبارتی هرکدام را با دیگری جوین می‌کنیم. سپس با تابع پایتون 
      <span dir="ltr">
        set
      </span> 
       مجموع آن را بدست می‌آوریم تا تکراری‌ها حذف شوند و دوباره آن را
       <span dir="ltr">
         tuple
       </span> 
       می‌کنیم. سپس آن‌هایی که طولشان برابر طول ایتریشن‌مان هستند را انتخاب می‌کنیم. چون در این حلقه قرار است مجموعه‌های پرتکرار که طولشان برابر ایتریشن است را انتخاب کنیم. دلیل اینکه کار درستی انجام می‌دهیم این است که اگر یک مجموعه پرتکرار باشد باید زیرمجموعه‌های یک طول کمتر آن‌ هم پرتکرار باشند. حال که ما تنها دو زیرمجموعه آن را بررسی کردیم نیز جزو کاندید‌های ما حساب می‌شود. البته که می‌توانیم برای کم‌تر کردن کاندید‌ها چک کنیم که همه زیرمجموعه‌های با یک طول کمتر پرتکرار باشند اما ما به همین‌قدر بسنده کرده‌ایم.
     </li>   
     <br/>
     <li dir="rtl">
      سپس روی از هر سبد اگر کاندیدی در آن ظاهر شده‌است،
      <span dir="ltr">
       (tuple_candidate, 1) 
      </span> 
       خروجی می‌دهیم. سپس این‌ها را باهم جمع می‌کنیم و از ترشولد استفاده می‌کنیم تا پرتکرار‌ها را بدست بیاوریم.
     </li> 
     <br/>
     <li dir="rtl">
       پرتکرار‌های بدست‌آمده را به نتایجمان اضافه می‌کنیم. ایتریشن را یک واحد اضافه می‌کنیم.
     </li> 
    </ol> 
     
   </li> 
   
 </ul> 
  
</h3> 


In [131]:
from pyspark.rdd import RDD
from datetime import datetime


def a_priori_algorithm(baskets: RDD, frequent_threshold: int = 5):
    def get_candidate(basket):
        basket_set = set(basket)
        for iterset in candidate_frequent_iterset:
            if set(iterset) <= basket_set:
                yield (iterset, 1)

    started_time = datetime.now()

    total_baskets = baskets.count()
    threshold = frequent_threshold * total_baskets / 100

    frequent_itemset_length_one = baskets.flatMap(lambda x: list(set(x))).map(lambda x: (tuple([x]), 1)).reduceByKey(
        lambda x, y: x+y).filter(lambda x: x[1] > threshold).map(lambda x: x[0]).filter(lambda x: len(x) == 1)
    frequent_itemset_length_one.cache()

    result_frequent_itemset = frequent_itemset_length_one.collect()
    frequent_iterset_rdd = frequent_itemset_length_one

    iter_length = 2
    print(f"{len(result_frequent_itemset)} frequent length one")
    while not frequent_iterset_rdd.isEmpty():

        elapsed_time = datetime.now() - started_time
        started_time = datetime.now()
        print(f"{iter_length} iteration:", elapsed_time)
        # candidate_frequent_iterset_rdd = frequent_iterset_rdd.cartesian(frequent_itemset_length_one).map(
        #     lambda x: tuple(set(x[0] + x[1]))).filter(lambda x: len(x) == iter_length).distinct()

        candidate_frequent_iterset_rdd = frequent_iterset_rdd.cartesian(frequent_iterset_rdd).map(
            lambda x: tuple(sorted(list(set(x[0] + x[1]))))).filter(lambda x: len(x) == iter_length).distinct()

        candidate_frequent_iterset = candidate_frequent_iterset_rdd.collect()

        elapsed_time = datetime.now() - started_time
        started_time = datetime.now()
        print(f"{len(candidate_frequent_iterset)} candidate", elapsed_time)

        frequent_iterset_rdd = baskets.flatMap(get_candidate).reduceByKey(
            lambda x, y: x+y).filter(lambda x: x[1] > threshold).map(lambda x: x[0])

        frequent_iterset = frequent_iterset_rdd.collect()

        elapsed_time = datetime.now() - started_time
        started_time = datetime.now()
        print(f"{len(frequent_iterset)} new frequent", elapsed_time)

        result_frequent_itemset.extend(frequent_iterset)
        iter_length += 1

    frequent_itemset_length_one.unpersist()

    return result_frequent_itemset


In [132]:
result_a_priori = a_priori_algorithm(car_routes_list_per_day_rdd, frequent_threshold)

27 frequent length one
2 iteration: 0:01:10.481343
351 candidate 0:00:00.846577
1 new frequent 0:00:26.912161
3 iteration: 0:00:00.039071
0 candidate 0:00:00.864092
0 new frequent 0:00:00.445037


In [133]:
result_a_priori

[(900107,),
 (900139,),
 (900155,),
 (100700824,),
 (900225,),
 (100700841,),
 (900222,),
 (900246,),
 (900142,),
 (900191,),
 (900207,),
 (22010119,),
 (100700804,),
 (900164,),
 (900268,),
 (900212,),
 (900244,),
 (900236,),
 (100700868,),
 (100700845,),
 (900269,),
 (900101,),
 (100700853,),
 (631357,),
 (100700866,),
 (900234,),
 (900202,),
 (900212, 900244)]

<h1 dir="rtl">
 پ 
</h1> 
<h2 dir="rtl">
 پیاده‌سازی 
 <span dir="ltr">
  SON 
 </span> 
  
</h2> 


<h3 dir="rtl">
برای اینکه سبد‌ها را به ۳ گروه تقسیم‌ کنیم، به صورت رندم برای هر سبد یک گروه انتخاب می‌کنیم. 
</h3> 


In [134]:
import random, math

grouped_son = 3
def random_mapper(x):
    group = math.floor(random.random() * grouped_son) + 1
    return (x, group)

<h3 dir="rtl">
 در مراحل جلوتر چون نیاز‌مند چندبار اجرا روی
 <span dir="ltr">
  rdd 
 </span> 
 هستیم، آن را کش می‌کنیم. 
</h3> 


In [135]:
grouped_car_routes_list_per_day_rdd = car_routes_list_per_day_rdd.map(
    random_mapper).cache()

<h3 dir="rtl">
 تمام‌ آن‌هایی که در گروه خاص هستند را می‌گیریم. سپس  سبد‌های آن را بدست می‌آوریم. پس از آن الگوریتم
 <span dir="ltr">
  A-priori 
 </span> 
 که در بخش قبل پیاده‌سازی کردیم را با ترشولدی کمی کمتر اجرا می‌کنیم. نتیجه را در لیستی ذخیره می‌کنیم. پس از اینکه برای همه گرو‌ه‌ها انجام دادیم، لیست را به مجموعه و دوباره به لیست تبدیل می‌کنیم تا تکراری‌ها را حذف شوند.
</h3> 


In [136]:
result_son = []
son_threshold = frequent_threshold*100/125
for iteration in range(grouped_son):
    print(f"Son iteration {iteration}")
    son_basket_rdd = grouped_car_routes_list_per_day_rdd.filter(
        lambda x: x[1] == iteration + 1).map(lambda x: x[0])
    son_basket_rdd.cache()
    result_son.extend(a_priori_algorithm(son_basket_rdd, son_threshold))

    print('')

result_son = list(set(result_son))


Son iteration 0
39 frequent length one
2 iteration: 0:00:01.547617
741 candidate 0:00:00.763600
1 new frequent 0:00:18.932574
3 iteration: 0:00:00.041469
0 candidate 0:00:00.966228
0 new frequent 0:00:00.225620

Son iteration 1
40 frequent length one
2 iteration: 0:00:01.321733
780 candidate 0:00:00.772662
2 new frequent 0:00:19.973493
3 iteration: 0:00:00.018946
1 candidate 0:00:00.888922
0 new frequent 0:00:00.232869

Son iteration 2
39 frequent length one
2 iteration: 0:00:01.184068
741 candidate 0:00:00.743994
2 new frequent 0:00:19.401634
3 iteration: 0:00:00.068504
1 candidate 0:00:00.940920
0 new frequent 0:00:00.258434



<h2 dir="rtl">
 چک نهایی برای از بین بردن مجموعه‌های پرتکرار اشتباه 
</h2> 


<h3 dir="rtl">
 برای اینکار نیاز داریم روی تمام سبد‌ها بچرخیم و اگر یکی از مجموعه‌های پرتکرار را داشت،
 <span dir="ltr">
  (frequent_itemset, 1) 
 </span> 
  را به عنوان یکی از خروجی‌های 
  <span dir="ltr">
   flatMap 
  </span> 
  می‌دهیم.
</h3> 


In [137]:
def get_all_cadidate_son(basket):
        basket_set = set(basket)
        for iterset in result_son:
            if set(iterset) <= basket_set:
                yield (iterset, 1)


<h3 dir="rtl">
 ترشولد را باتوجه به تعداد داده‌ها می‌سازیم تا چک کنیم. 
</h3> 


In [138]:
all_baskets = car_routes_list_per_day_rdd.count()
son_final_check_threshold = frequent_threshold * all_baskets / 100

<h3 dir="rtl">
 در انتها جمع می‌کنیم تا ببینیم کدام‌ها واقعا مجموعه پرتکرار هستند. 
</h3> 


In [139]:
final_frequent_itemset_son = car_routes_list_per_day_rdd.flatMap(get_all_cadidate_son).reduceByKey(
    lambda x, y: x+y).filter(lambda x: x[1] > son_final_check_threshold).map(lambda x: x[0])


In [140]:
final_frequent_itemset_son.count()

28

In [141]:
final_result_son = final_frequent_itemset_son.collect()

<h3 dir="rtl">
 همانطور که انتظار داشتیم نتیجه هردو یکسان شده‌است. 
</h3> 


In [142]:
final_result_son

[(900107,),
 (900139,),
 (900155,),
 (100700824,),
 (900225,),
 (900212, 900244),
 (100700841,),
 (900222,),
 (900246,),
 (900142,),
 (900191,),
 (900207,),
 (22010119,),
 (100700804,),
 (900164,),
 (900268,),
 (900212,),
 (900244,),
 (900236,),
 (100700868,),
 (100700845,),
 (900269,),
 (900101,),
 (100700853,),
 (631357,),
 (100700866,),
 (900234,),
 (900202,)]