In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.master('local[*]').appName('Hamshahri_1').getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/03/15 13:47:51 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
from pyspark.sql.functions import input_file_name, substring_index
text_df_with_filename = spark.read.options(inferSchema=True).text("./Hamshahri/Hamshahri").withColumn("filename", substring_index(input_file_name(), "/", -1))

In [3]:
text_df_with_filename.count()

                                                                                

600

In [4]:
text_df_with_filename.printSchema()
text_df_with_filename.show()

root
 |-- value: string (nullable = true)
 |-- filename: string (nullable = false)

+--------------------+-------------------+
|               value|           filename|
+--------------------+-------------------+
|{"title":"نام نوي...|HAM2-750405-016.txt|
|{"title":"فوتبال:...|HAM2-750406-017.txt|
|{"title":"فرهنگ س...|HAM2-750403-045.txt|
|{"title":"دريانور...|HAM2-750407-019.txt|
|{"title":"ركود فك...|HAM2-750404-045.txt|
|{"title":"حزب كهن...|HAM2-750407-036.txt|
|{"title":"به بهان...|HAM2-750409-013.txt|
|{"title":"با دانش...|HAM2-750409-004.txt|
|{"title":"فرهنگ س...|HAM2-750402-043.txt|
|{"title":"ركود فك...|HAM2-750405-051.txt|
|{"title":"گسترش ت...|HAM2-750409-046.txt|
|{"title":"روسيه و...|HAM2-750406-041.txt|
|{"title":"نقش باز...|HAM2-750403-003.txt|
|{"title":"گفتگو ب...|HAM2-750403-001.txt|
|{"title":"موفقيت ...|HAM2-750409-007.txt|
|{"title":"تهاجم ف...|HAM2-750406-073.txt|
|{"title":"بازي را...|HAM2-750402-004.txt|
|{"title":"كداميك ...|HAM2-750404-015.txt|
|{"title":"در

In [5]:
from pyspark.sql.functions import *

def tokenize_text(text):
    cleaned_text = regexp_replace(text, r'[^\w\s\u0600-\u06FF]', ' ')
    cleaned_text = regexp_replace(cleaned_text, r'\؟', ' ')
    return split(cleaned_text, r'\s+')

In [6]:
specific_item = text_df_with_filename.select(col("value")).filter(col("filename") == 'HAM2-750409-004.txt').first()
text_of_specific_item = specific_item['value']
data = [(text_of_specific_item,)]
df = spark.createDataFrame(data, ["value"])
tokenized_df = df.withColumn("tokenized_text", tokenize_text(col("value")))
tokenized_df.show(truncate=False)


                                                                                

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [7]:
spark.udf.register("tokenize_text", tokenize_text, ArrayType(StringType()))

tokenized_df = text_df_with_filename.withColumn("word", explode(tokenize_text("value")))
print(tokenized_df.show())


+--------------------+-------------------+-------+
|               value|           filename|   word|
+--------------------+-------------------+-------+
|{"title":"نام نوي...|HAM2-750405-016.txt|       |
|{"title":"نام نوي...|HAM2-750405-016.txt|  title|
|{"title":"نام نوي...|HAM2-750405-016.txt|    نام|
|{"title":"نام نوي...|HAM2-750405-016.txt|  نويسي|
|{"title":"نام نوي...|HAM2-750405-016.txt|     در|
|{"title":"نام نوي...|HAM2-750405-016.txt|   كدام|
|{"title":"نام نوي...|HAM2-750405-016.txt|  مدرسه|
|{"title":"نام نوي...|HAM2-750405-016.txt|   زياد|
|{"title":"نام نوي...|HAM2-750405-016.txt|    سخت|
|{"title":"نام نوي...|HAM2-750405-016.txt| نگيريم|
|{"title":"نام نوي...|HAM2-750405-016.txt|   text|
|{"title":"نام نوي...|HAM2-750405-016.txt|    صبح|
|{"title":"نام نوي...|HAM2-750405-016.txt|    روز|
|{"title":"نام نوي...|HAM2-750405-016.txt|پنجشنبه|
|{"title":"نام نوي...|HAM2-750405-016.txt|     30|
|{"title":"نام نوي...|HAM2-750405-016.txt|  خرداد|
|{"title":"نام نوي...|HAM2-7504

In [8]:
from pyspark.sql.types import ArrayType, StructType, StructField, IntegerType, StringType

result_df = tokenized_df.groupBy("word", "filename").agg(
    count("word").cast('int').alias("count")
)

result_df.show(truncate=False)



+---------+-------------------+-----+
|word     |filename           |count|
+---------+-------------------+-----+
|تكاپو    |HAM2-750405-016.txt|1    |
|عين      |HAM2-750405-016.txt|1    |
|تقي      |HAM2-750405-016.txt|3    |
|ارزشهاي  |HAM2-750406-017.txt|1    |
|مگر      |HAM2-750406-017.txt|1    |
|ميان     |HAM2-750403-045.txt|4    |
|تنها     |HAM2-750403-045.txt|2    |
|فقهي     |HAM2-750403-045.txt|2    |
|السلطانيه|HAM2-750403-045.txt|1    |
|هوشيار   |HAM2-750403-045.txt|1    |
|رابه     |HAM2-750407-019.txt|2    |
|اي       |HAM2-750407-019.txt|11   |
|جزيره    |HAM2-750407-019.txt|11   |
|ارشداداره|HAM2-750407-019.txt|1    |
|بازار    |HAM2-750407-019.txt|1    |
|مركزيت   |HAM2-750404-045.txt|2    |
|شامل     |HAM2-750404-045.txt|1    |
|سردار    |HAM2-750407-036.txt|1    |
|واقع،    |HAM2-750407-036.txt|1    |
|اينك     |HAM2-750409-013.txt|1    |
+---------+-------------------+-----+
only showing top 20 rows



                                                                                

In [9]:
result_df.printSchema()

root
 |-- word: string (nullable = false)
 |-- filename: string (nullable = false)
 |-- count: integer (nullable = false)



In [10]:
grouped_df = result_df.groupBy('word').agg(collect_list(struct('filename', 'count').alias('Name Repeat Count')))

grouped_df.show(truncate=False)

[Stage 14:>                                                         (0 + 2) / 2]

+------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

                                                                                

In [11]:
tokenized_df = tokenized_df.withColumn("index", monotonically_increasing_id())
tokenized_df.show()

+--------------------+-------------------+-------+-----+
|               value|           filename|   word|index|
+--------------------+-------------------+-------+-----+
|{"title":"نام نوي...|HAM2-750405-016.txt|       |    0|
|{"title":"نام نوي...|HAM2-750405-016.txt|  title|    1|
|{"title":"نام نوي...|HAM2-750405-016.txt|    نام|    2|
|{"title":"نام نوي...|HAM2-750405-016.txt|  نويسي|    3|
|{"title":"نام نوي...|HAM2-750405-016.txt|     در|    4|
|{"title":"نام نوي...|HAM2-750405-016.txt|   كدام|    5|
|{"title":"نام نوي...|HAM2-750405-016.txt|  مدرسه|    6|
|{"title":"نام نوي...|HAM2-750405-016.txt|   زياد|    7|
|{"title":"نام نوي...|HAM2-750405-016.txt|    سخت|    8|
|{"title":"نام نوي...|HAM2-750405-016.txt| نگيريم|    9|
|{"title":"نام نوي...|HAM2-750405-016.txt|   text|   10|
|{"title":"نام نوي...|HAM2-750405-016.txt|    صبح|   11|
|{"title":"نام نوي...|HAM2-750405-016.txt|    روز|   12|
|{"title":"نام نوي...|HAM2-750405-016.txt|پنجشنبه|   13|
|{"title":"نام نوي...|HAM2-7504