In [1]:
# Core Python Libs
import os
import sys
import pathlib

# Spark init
import findspark
findspark.init()

In [2]:
# SPARK_CONTEXT
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
import pandas as pd

In [3]:
# Get Environment Path Variables
HOME_PATH = os.environ["HOME"]
CURRENT_PATH = os.path.abspath(os.getcwd())

In [4]:
# Get Data Path
file_name = "omer_seyfettin_forsa_hikaye.txt"
DATA_PATH = CURRENT_PATH + '/' + file_name

In [5]:
# Check Path of Data (if file is exits, it will return true)
if os.path.exists(DATA_PATH):
    print(f"File Found in path: {DATA_PATH}")
else:
    print(f"File Not Found! check your file path: {DATA_PATH}")

File Found in path: /home/batu/spark_mini_projects/mini_projects/1_word_count_project/omer_seyfettin_forsa_hikaye.txt


In [6]:
# Build an spark session
spark = SparkSession.builder \
.master("local[4]") \
.appName("Wordcount_project") \
.getOrCreate()

23/04/06 23:31:45 WARN Utils: Your hostname, batu-hep resolves to a loopback address: 127.0.1.1; using 192.168.2.106 instead (on interface wlp4s0)
23/04/06 23:31:45 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/04/06 23:31:46 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [7]:
# Create spark context object for create RDDs (for connection to computing cluster)
sc = spark.sparkContext

In [8]:
# Create RDD based on data
story_rdd = sc.textFile(DATA_PATH)

In [9]:
# Check number of lines in data
story_rdd.count()

                                                                                

103

In [10]:
# Head 5 in data
story_rdd.take(5)

['Ömer Seyfettin - Forsa',
 '',
 'Akdeniz’in, kahramanlık yuvası sonsuz ufuklarına bakan küçük tepe, minimini bir çiçek ormanı gibiydi. İnce uzun dallı badem ağaçlarının alaca gölgeleri sahile inen keçiyoluna düşüyor, ilkbaharın tatlı rüzgârıyla sarhoş olan martılar, çılgın bağrışlarıyla havayı çınlatıyordu. Badem bahçesinin yanı geniş bir bağdı. Beyaz taşlardan yapılmış kısa bir duvarın ötesindeki harabe vadiye kadar iniyordu. Bağın ortasındaki yıkık kulübenin kapısız girişinden bir ihtiyar çıktı. Saçı sakalı bembeyazdı. Kamburunu düzeltmek istiyormuş gibi gerindi. Elleri, ayakları titriyordu. Gök kadar boş, gök kadar sakin duran denize baktı, baktı.',
 '',
 '– Hayırdır inşallah! dedi.']

In [11]:
# Split all words with spaces per line
words = story_rdd.flatMap(lambda line: line.split(" "))

In [12]:
# Display first 20 words per line for checking
words.take(20)

['Ömer',
 'Seyfettin',
 '-',
 'Forsa',
 '',
 'Akdeniz’in,',
 'kahramanlık',
 'yuvası',
 'sonsuz',
 'ufuklarına',
 'bakan',
 'küçük',
 'tepe,',
 'minimini',
 'bir',
 'çiçek',
 'ormanı',
 'gibiydi.',
 'İnce',
 'uzun']

In [13]:
# Create tuples template as (word, 1)
words_numbers = words.map(lambda word: (word, 1))

In [14]:
# check template
words_numbers.take(5)

[('Ömer', 1), ('Seyfettin', 1), ('-', 1), ('Forsa', 1), ('', 1)]

In [15]:
# Reduce per key every tuple and found number of repeated words
words_numbers_RDK = words_numbers.reduceByKey(lambda x,y : x + y)

In [16]:
# check
words_numbers_RDK.take(5)

                                                                                

[('Ömer', 1), ('Seyfettin', 1), ('Forsa', 1), ('', 59), ('Akdeniz’in,', 1)]

In [17]:
# Reverse template as (1, word)
words_numbers_RDK_reversed = words_numbers_RDK.map(lambda x: (x[1], x[0]))

In [18]:
words_numbers_RDK_reversed.take(5)

[(1, 'Ömer'), (1, 'Seyfettin'), (1, 'Forsa'), (59, ''), (1, 'Akdeniz’in,')]

In [19]:
# Results as most duplicated words in text data (for highest 20)
words_numbers_RDK_reversed.sortByKey(False).take(20)

[(59, ''),
 (33, 'bir'),
 (31, '–'),
 (8, 'yıl'),
 (6, 'diye'),
 (5, 'Türk'),
 (5, 'dedi.'),
 (5, 'onun'),
 (5, 'doğru'),
 (5, 'Kırk'),
 (4, 'Yirmi'),
 (4, 'tutsak'),
 (4, 'Ben'),
 (4, 'gibi'),
 (4, 'Ama'),
 (4, 'büyük'),
 (3, 'yanı'),
 (3, 'şey'),
 (3, 'onu'),
 (3, 'geminin')]

In [20]:
# TODO özel karakterleri sil, veri görselleştirmesi yap!

In [21]:
print(type(words_numbers_RDK_reversed))

<class 'pyspark.rdd.PipelinedRDD'>


In [22]:
my_df = words_numbers_RDK_reversed.toDF()

In [23]:
my_df.head()

Row(_1=1, _2='Ömer')