In [1]:
# Initializing spark
! pip install pyspark
import pyspark

from pyspark.sql import SparkSession 

spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .getOrCreate()

sc = spark.sparkContext

# Mounting dataset from Google Drive
from google.colab import drive
drive.mount('/content/gdrive')

Collecting pyspark
  Downloading pyspark-3.2.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 33 kB/s 
[?25hCollecting py4j==0.10.9.3
  Downloading py4j-0.10.9.3-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 50.8 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.1-py2.py3-none-any.whl size=281853642 sha256=c62a6145633f6fbb0449e606869b3a8d67054c96c94daadc572785d63131f84a
  Stored in directory: /root/.cache/pip/wheels/9f/f5/07/7cd8017084dce4e93e84e92efd1e1d5334db05f2e83bcef74f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.3 pyspark-3.2.1
Mounted at /content/gdrive


### Importing dataset without any options 

In [2]:
# Read csv file with spark.read_csv()
digi = spark.read.csv('/content/gdrive/MyDrive/digikala_comments.csv', header = True)
digi.show(10, truncate = False)

+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------+-----------------------------+-------------------------------------------+-------------+--------+-------------------+-----------+------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------+-------------+
|product_id                                                                                                                                                                                                                        

### **Problem:** First column must be just integer but its mixed with strings (comments) !

### **Solution:** Using the multiline option in spark.read.csv can parse as one record, which may span multiple lines, per file. Since we have very long comment lines we must use it.

In [3]:
digi = spark.read.csv('/content/gdrive/MyDrive/digikala_comments.csv', header = True, multiLine = True)
digi.show(10, truncate = False)

+----------+--------------------------------------------------------------+--------+-------+-----+--------+-------------------+---------------+-----------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

### **Here we write a UDF to clean dataset comments**

In [4]:
import re
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf

def clean_comment(comment):
  stop_words = ['.',',','_x000D_\n','x000D','،','(',')','...','"',r'\u200c',r'\u200',r'\u200b','-','\\','_','r','[',']','!','\n','٪','؟','xD']
  for sw in stop_words:
    try:
      comment = comment.replace(sw,' ')
      comment = re.sub(r"\s+.\u0648.\s+", ' ', comment)
      # Numbers removing
      comment = re.sub(r'\d+', '', comment)
      # To remove leading and ending spaces
      comment = comment.strip()
      # Replacing multiple spaces with one space
      comment = ' '.join(comment.split())
    except:
      continue
  return comment

clean_comments = udf(lambda z: clean_comment(z), StringType())
digi_cleaned = digi.withColumn('comment', clean_comments('comment'))
digi_cleaned.select('comment').show(10, truncate = False)

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

### Visualizing the cleaned dataset comments

In [5]:
import pyspark.sql.functions as F

digi_count_word = digi_cleaned.withColumn('wordCount', F.size(F.split(F.col('comment'), ' ')))
digi_count_word.select(['wordCount','comment']).show(10)

+---------+--------------------+
|wordCount|             comment|
+---------+--------------------+
|        7|واقعا عالیه من که...|
|      182|سلام قبل اینکه نظ...|
|       60|گیره های فلزی خیل...|
|      245|همه چیز در رابطه ...|
|       63|اگر ظرفیتش براتون...|
|      237|سلام دوستان منم م...|
|       38|من چند سالی هست ک...|
|       13|بوی تند ولی خوشبو...|
|       43|متاسفانه عمر مفید...|
|        2|        خوب بودممنون|
+---------+--------------------+
only showing top 10 rows



### Calculating the likability of a given comment which is considered the difference between number of likes and dislikes for that comment.

In [6]:
import pyspark.sql.functions as F

comment_likes_dislikes = digi_cleaned.select('comment','likes','dislikes')

likeablity = comment_likes_dislikes.dropna().withColumn('likability',comment_likes_dislikes['likes'] - comment_likes_dislikes['dislikes'])

popular_comments = likeablity.sort(F.col('likability').desc())

popular_comments.select(['likability','comment']).show(10)

+----------+--------------------+
|likability|             comment|
+----------+--------------------+
|     184.0|خلاصه بگم ماشین ب...|
|     124.0|با سلام باید بگم ...|
|     115.0|    هیچ تاثیری نداشت|
|     114.0|من این مداد رنگی ...|
|     113.0|این دوربین قطعا ا...|
|     109.0|خیلی وسیله ی بی ک...|
|     108.0|اینا به این درد م...|
|     102.0|سلام دوستان شارژر...|
|      91.0|ببینید اینکه یکی ...|
|      80.0|    بهقیمتش نمی عرضه|
+----------+--------------------+
only showing top 10 rows



### The persian stop words has been extracted from [here](https://github.com/ziaa/Persian-stopwords-collection/blob/master/Stopwords/Kharazi/Pesian_Stop_Words_List.txt) !

In [7]:
my_file = open('/content/gdrive/MyDrive/persian_stopwords.txt', 'r')
persian_stop_words = my_file.read().split('\n')
my_file.close()

digi_cleaned_rdd = digi_cleaned.dropna().rdd
whole_words = digi_cleaned_rdd.flatMap(lambda x: x[9].split())

wordCounts = whole_words.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y)
wordCountsSorted = wordCounts.map(lambda x: (x[1], x[0])).sortByKey(False)
results = wordCountsSorted.collect()

count_of_words = 0
i = 0
while count_of_words <100:
    count = str(results[i][0])
    word = results[i][1]
    if (word) and (word not in persian_stop_words):
        print(word + "\t\t" + count)
        count_of_words += 1
    i += 1

کیفیت		783
گوشی		735
خریدم		658
عالیه		598
عالی		594
خوبه		523
شارژ		482
قیمت		463
خرید		457
کار		452
خوبی		446
دیجی		428
پیشنهاد		421
راضی		387
نسبت		374
کالا		368
نظر		364
محصول		313
ماه		280
قیمتش		264
دستگاه		261
دوستان		249
ساعت		245
مدل		245
بازی		226
مناسب		219
رنگ		219
راضیم		213
سال		210
زیاد		210
حدود		202
ارزش		194
جنس		189
دست		185
مشکل		184
دوربین		183
صدا		173
صفحه		172
بخرید		172
صدای		170
خوبیه		168
توجه		163
بهترین		161
دستم		161
آب		158
نصب		156
باهاش		153
سرعت		153
مشکلی		147
شگفت		143
خریدش		133
باتری		132
انگیز		130
حالت		127
ماشین		123
بسته		122
سری		121
نرم		121
هدفون		120
پخش		119
طراحی		119
پایین		118
خراب		116
برند		115
کابل		114
داخل		114
شک		113
ساخت		112
موقع		111
بدنه		110
زیبا		109
وصل		106
سریع		105
روشن		104
شارژر		102
باطری		102
تهیه		101
باعث		101
گارانتی		100
اندازه		99
هفته		98
فکر		96
سیم		96
بازار		96
قابلیت		95
کیف		95
سر		94
پاور		93
عکس		93
قدرت		92
قرار		92
دسته		91
صداش		91
زیادی		89
برنامه		89
ساله		89
دکمه		89
بعضی		88
دیدم		88
درست		88


### Implemented Apriori algorithm for mining association rules in comments.

In [14]:
from functools import partial
from itertools import combinations

# first we collect all of cleaned comments in one file which is named "all_digi_comments.txt" (each comment in one line)
digi_comments = digi_cleaned.select('comment').rdd

frequent_items = digi_comments.flatMap(lambda line: line)

f = open('all_digi_comments.txt', "w")
for line in frequent_items.collect():
  f.write(str(line) + '\n')
f.close()

# reading all comments file
file = sc.textFile('all_digi_comments.txt')

# here we use the exact same method in question 3 for extracting association rules using apriori method 
frequent_items = file.flatMap(lambda line: line.split()).map(lambda x:(x,1)).reduceByKey(lambda e1,e2:e1+e2).filter(lambda x: x[1]>=100)

frequent_itemset = frequent_items.collectAsMap()

frequent_pairs = file.map(lambda line: line.split()).flatMap(partial(combinations, r=2)).map(lambda pair: sorted(pair)) \
    .map(lambda pair: (tuple(pair), 1)).filter(
    lambda x: x[0][0] in frequent_itemset and x[0][1] in frequent_itemset).reduceByKey(
    lambda p1, p2: p1 + p2).filter(lambda x: x[1] >= 100)

freq_pairs_count = frequent_pairs.collectAsMap()

frequent_pairs = frequent_pairs.flatMap(lambda x:[((x[0][0],x[0][1]),x[1]),((x[0][1],x[0][0]),x[1])])

frequent_pair_conf = frequent_pairs.map(lambda x:(x[0],float(x[1]/frequent_itemset[x[0][0]]))).sortBy(lambda x:-x[1])

In [17]:
selected_words = ['کیفیت', 'گوشی', 'رنگ']

u = frequent_pair_conf.filter(lambda x: ((((x[0][0] in selected_words) and (x[0][1] not in persian_stop_words))) or (((x[0][1] in selected_words) and (x[0][0] not in persian_stop_words)))))
u.take(15)

[(('سامسونگ', 'گوشی'), 2.1452991452991452),
 (('اس', 'گوشی'), 1.5798319327731092),
 (('برنامه', 'گوشی'), 1.578125),
 (('دوربین', 'گوشی'), 1.5590277777777777),
 (('هدفون', 'کیفیت'), 1.4792899408284024),
 (('صفحه', 'گوشی'), 1.3898916967509025),
 (('سری', 'گوشی'), 1.3471502590673574),
 (('گوشی', 'گوشی'), 1.2058319039451115),
 (('گوشی', 'گوشی'), 1.2058319039451115),
 (('تصویر', 'کیفیت'), 1.1923076923076923),
 (('جی', 'گوشی'), 1.190909090909091),
 (('شارژر', 'گوشی'), 1.1823529411764706),
 (('باطری', 'گوشی'), 1.16),
 (('اسپیکر', 'کیفیت'), 1.1559633027522935),
 (('پشت', 'گوشی'), 1.146551724137931)]