In [13]:
#Control = http://localhost:4040/

from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
from pyspark.sql.types import *
from pyspark.sql.functions import length
from pyspark.ml.feature import HashingTF, IDF, Tokenizer,StopWordsRemover, CountVectorizer,IDF,StringIndexer,VectorAssembler
from pyspark.ml.linalg import Vector
from pyspark.ml.classification import NaiveBayes
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
import os


In [2]:
#Setting property

conf = SparkConf()
conf.setAppName('ml')
conf.set("spark.driver.memory",'8g')
conf.set("spark.executor.memory", '8g')
sc = SparkContext(
    conf=conf)

spark = SparkSession \
    .builder \
    .master('local[*]') \
    .appName("ml") \
    .config("spark.driver.memory",'8g') \
    .config("spark.executor.memory", '8g')\
    .getOrCreate()



In [3]:
#Data load

dunya_list = []
ekonomi_list = []
genel_list = []
guncel_list = []
magazin_list = []
spor_list = []

rootdir = "16752news"

#Files are read end labeled
for directories, subdirs, files in os.walk(rootdir):
    if (os.path.split(directories)[1] == 'dunya'):
        for filename in files:
            with open(os.path.join(directories, filename), encoding="UTF-8") as f:
                data = f.read()

                dunya_list.append((data, "dunya"))  # added to list

    if (os.path.split(directories)[1] == 'ekonomi'):
        for filename in files:
            with open(os.path.join(directories, filename), encoding="UTF-8") as f:
                data = f.read()

                ekonomi_list.append((data, "ekonomi"))

    if (os.path.split(directories)[1] == 'genel'):
        for filename in files:
            with open(os.path.join(directories, filename), encoding="UTF-8") as f:
                data = f.read()

                genel_list.append((data, "genel"))

    if (os.path.split(directories)[1] == 'guncel'):
        for filename in files:
            with open(os.path.join(directories, filename), encoding="UTF-8") as f:
                data = f.read()

                guncel_list.append((data, "guncel"))


    if (os.path.split(directories)[1] == 'magazin'):
        for filename in files:
            with open(os.path.join(directories, filename), encoding="UTF-8") as f:
                data = f.read()

                magazin_list.append((data, "magazin"))  # added to list


    if (os.path.split(directories)[1] == 'spor'):
        for filename in files:
            with open(os.path.join(directories, filename), encoding="UTF-8") as f:
                data = f.read()

                spor_list.append((data, "spor"))


print(len(dunya_list))
print(len(ekonomi_list))
print(len(genel_list))
print(len(guncel_list))
print(len(magazin_list))
print(len(spor_list))

combined_list = []

combined_list = dunya_list + ekonomi_list + genel_list + guncel_list  + magazin_list  + spor_list
print("sum news :  ", len(combined_list))


combined_list = sc.parallelize(combined_list, numSlices=100)    #parallelize operation

#random.shuffle(combined_list) #mix the data



2792
2792
2792
2792
2792
2792
sum news :   16752


In [4]:
schemaString = "text class"

fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
schema = StructType(fields)

data = spark.createDataFrame(combined_list, schema)

print(data.show())
print(data.printSchema())

+--------------------+-----+
|                text|class|
+--------------------+-----+
|Cameron'dan 'krit...|dunya|
|Mısır'da referand...|dunya|
|İran'ın Patriot t...|dunya|
|Savcı, o çavuş iç...|dunya|
|Fransız haber aja...|dunya|
|Talabani, Ali Xey...|dunya|
|İşte İran'ın son ...|dunya|
|El Pais: PKK'lıla...|dunya|
|Ünlü spekülatörde...|dunya|
|Çocuklarını kafes...|dunya|
|Ölü çocuklar ajan...|dunya|
|Ahmedinejad'a aya...|dunya|
|Azerbaycan sınırı...|dunya|
|Kerkükte patlama:...|dunya|
|İsrailde derin ç...|dunya|
|İsrail askerleri ...|dunya|
|Cezayir'de rehine...|dunya|
|Katalonlar, İspan...|dunya|
|Fransa’dan Mali'y...|dunya|
|'Türkiye'ye yardı...|dunya|
+--------------------+-----+
only showing top 20 rows

None
root
 |-- text: string (nullable = true)
 |-- class: string (nullable = true)

None


In [5]:
data = data.withColumn('length',length(data['text']))
print(data.show())

+--------------------+-----+------+
|                text|class|length|
+--------------------+-----+------+
|Cameron'dan 'krit...|dunya|  1505|
|Mısır'da referand...|dunya|   927|
|İran'ın Patriot t...|dunya|   899|
|Savcı, o çavuş iç...|dunya|   752|
|Fransız haber aja...|dunya|   769|
|Talabani, Ali Xey...|dunya|  2093|
|İşte İran'ın son ...|dunya|  1310|
|El Pais: PKK'lıla...|dunya|  1727|
|Ünlü spekülatörde...|dunya|   885|
|Çocuklarını kafes...|dunya|   972|
|Ölü çocuklar ajan...|dunya|  1579|
|Ahmedinejad'a aya...|dunya|   535|
|Azerbaycan sınırı...|dunya|   379|
|Kerkükte patlama:...|dunya|   639|
|İsrailde derin ç...|dunya|  2971|
|İsrail askerleri ...|dunya|  1013|
|Cezayir'de rehine...|dunya|  1818|
|Katalonlar, İspan...|dunya|   662|
|Fransa’dan Mali'y...|dunya|   891|
|'Türkiye'ye yardı...|dunya|  1349|
+--------------------+-----+------+
only showing top 20 rows

None


In [6]:
print(data.printSchema())

#Pretty Clear Difference
print(data.groupby('class').mean().show())


root
 |-- text: string (nullable = true)
 |-- class: string (nullable = true)
 |-- length: integer (nullable = true)

None
+-------+------------------+
|  class|       avg(length)|
+-------+------------------+
|   spor|1567.5157593123208|
|  genel| 2047.307664756447|
|ekonomi| 2322.356017191977|
|magazin| 1148.374641833811|
| guncel|1887.4222779369627|
|  dunya|1380.0744985673352|
+-------+------------------+

None


In [7]:
tokenizer = Tokenizer(inputCol="text", outputCol="words")
wordsData = tokenizer.transform(data)
wordsData.show()

+--------------------+-----+------+--------------------+
|                text|class|length|               words|
+--------------------+-----+------+--------------------+
|Cameron'dan 'krit...|dunya|  1505|[cameron'dan, 'kr...|
|Mısır'da referand...|dunya|   927|[mısır'da, refera...|
|İran'ın Patriot t...|dunya|   899|[i̇ran'ın, patrio...|
|Savcı, o çavuş iç...|dunya|   752|[savcı,, o, çavuş...|
|Fransız haber aja...|dunya|   769|[fransız, haber, ...|
|Talabani, Ali Xey...|dunya|  2093|[talabani,, ali, ...|
|İşte İran'ın son ...|dunya|  1310|[i̇şte, i̇ran'ın,...|
|El Pais: PKK'lıla...|dunya|  1727|[el, pais:, pkk'l...|
|Ünlü spekülatörde...|dunya|   885|[ünlü, spekülatör...|
|Çocuklarını kafes...|dunya|   972|[çocuklarını, kaf...|
|Ölü çocuklar ajan...|dunya|  1579|[ölü, çocuklar, a...|
|Ahmedinejad'a aya...|dunya|   535|[ahmedinejad'a, a...|
|Azerbaycan sınırı...|dunya|   379|[azerbaycan, sını...|
|Kerkükte patlama:...|dunya|   639|[kerkükte, patlam...|
|İsrailde derin ç...|dunya|  2

In [8]:
tokenizer = Tokenizer(inputCol="text", outputCol="token_text")
count_vec = CountVectorizer(inputCol='token_text',outputCol='c_vec')
idf = IDF(inputCol="c_vec", outputCol="tf_idf")
news_to_num = StringIndexer(inputCol='class',outputCol='label')


In [9]:
clean_up = VectorAssembler(inputCols=['tf_idf','length'],outputCol='features')


In [10]:
#Use defaults
nb = NaiveBayes()

data_prep_pipe = Pipeline(stages=[news_to_num,tokenizer,count_vec,idf,clean_up])

print(data.show())

+--------------------+-----+------+
|                text|class|length|
+--------------------+-----+------+
|Cameron'dan 'krit...|dunya|  1505|
|Mısır'da referand...|dunya|   927|
|İran'ın Patriot t...|dunya|   899|
|Savcı, o çavuş iç...|dunya|   752|
|Fransız haber aja...|dunya|   769|
|Talabani, Ali Xey...|dunya|  2093|
|İşte İran'ın son ...|dunya|  1310|
|El Pais: PKK'lıla...|dunya|  1727|
|Ünlü spekülatörde...|dunya|   885|
|Çocuklarını kafes...|dunya|   972|
|Ölü çocuklar ajan...|dunya|  1579|
|Ahmedinejad'a aya...|dunya|   535|
|Azerbaycan sınırı...|dunya|   379|
|Kerkükte patlama:...|dunya|   639|
|İsrailde derin ç...|dunya|  2971|
|İsrail askerleri ...|dunya|  1013|
|Cezayir'de rehine...|dunya|  1818|
|Katalonlar, İspan...|dunya|   662|
|Fransa’dan Mali'y...|dunya|   891|
|'Türkiye'ye yardı...|dunya|  1349|
+--------------------+-----+------+
only showing top 20 rows

None


In [11]:
cleaner = data_prep_pipe.fit(data)
clean_data = cleaner.transform(data)
clean_data.printSchema()
clean_data = clean_data.select(['label','features'])

#clean_data.show()
print(clean_data.printSchema())


root
 |-- text: string (nullable = true)
 |-- class: string (nullable = true)
 |-- length: integer (nullable = true)
 |-- label: double (nullable = false)
 |-- token_text: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- c_vec: vector (nullable = true)
 |-- tf_idf: vector (nullable = true)
 |-- features: vector (nullable = true)

root
 |-- label: double (nullable = false)
 |-- features: vector (nullable = true)

None


In [14]:
(training,testing) = clean_data.randomSplit([0.8,0.2])
spam_predictor = nb.fit(training)
data.printSchema()
test_results = spam_predictor.transform(testing)
print(test_results.show())

acc_eval = MulticlassClassificationEvaluator()
acc = acc_eval.evaluate(test_results)
print("Accuracy of model at predicting news was: {}".format(acc))


root
 |-- text: string (nullable = true)
 |-- class: string (nullable = true)
 |-- length: integer (nullable = true)

+-----+--------------------+--------------------+--------------------+----------+
|label|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+----------+
|  2.0|(262145,[0,1,2,3,...|[-35203.802824962...|[0.0,0.0,1.855056...|       5.0|
|  2.0|(262145,[0,1,2,3,...|[-30328.480389016...|[0.0,0.0,1.0,0.0,...|       2.0|
|  2.0|(262145,[0,1,2,3,...|[-21018.004549035...|[0.0,1.0,3.239359...|       1.0|
|  2.0|(262145,[0,1,2,3,...|[-7666.4990075850...|[3.58509348667199...|       4.0|
|  2.0|(262145,[0,1,2,3,...|[-12011.915689776...|[0.0,2.4305033082...|       2.0|
|  2.0|(262145,[0,1,2,3,...|[-16866.527661694...|[0.0,0.0,1.0,0.0,...|       2.0|
|  2.0|(262145,[0,1,2,3,...|[-11659.893671887...|[0.0,3.2671857830...|       2.0|
|  2.0|(262145,[0,1,2,3,...|[-13811.781068378...|[0.0,0.0,1.0,