In [1]:

from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import HashingTF, IDF, Tokenizer


conf = SparkConf()
conf.setMaster("local[*]").setAppName("CENG790-Project")
conf.set("spark.driver.memory", "15g")

spark = SparkSession.builder.config(conf=conf).getOrCreate()
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

22/01/15 23:44:03 WARN Utils: Your hostname, bhdemirbilek resolves to a loopback address: 127.0.1.1; using 10.1.46.97 instead (on interface eno1)
22/01/15 23:44:03 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
22/01/15 23:44:03 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [2]:

# # Saving the oscar dataset(28GB) to json format. (only execute once)
# from datasets import load_from_disk
# dataset = load_from_disk("lang_detected")["train"]
# # Set num_proc according to your cpu count, num_proc=20 means 20 thread will be executed paralelly.
# dataset.to_json("dataset_json", num_proc=20)

In [3]:
# Reads from the dataset_json.json file, 
df_json = spark.read.json("dataset.json")

                                                                                

In [5]:
small_df = df_json.limit(10000)
small_df.show(10)
small_df.write.mode("overwrite").json("dataset_small.json")



                                                                                

+---+----+--------------------+
| id|lang|                text|
+---+----+--------------------+
|  0|  tr|Son yıllarda görü...|
|  1|  tr|Şehrin karmaşası ...|
|  2|  tr|2010 Yılında Mard...|
|  3|  tr|29Ekim 2009 2010 ...|
|  4|  tr|Yüksek İslam Şura...|
|  5|  tr|Oncelıkle bu etkı...|
|  6|  tr|Mavi-Mi Sanat Mer...|
|  7|  tr|Türkiye Futbol Fe...|
|  8|  tr|anlami-nedir.com'...|
|  9|  tr|Kepez Belediye Ba...|
+---+----+--------------------+
only showing top 10 rows



                                                                                

In [6]:
small_df2 = spark.read.json("dataset_small.json").select("lang", "text")
#small_df2.show(10)

                                                                                

In [7]:

tokenizer = Tokenizer(inputCol="text", outputCol="words")
wordsData = tokenizer.transform(small_df2)

hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures")
featurizedData = hashingTF.transform(wordsData)
# alternatively, CountVectorizer can also be used to get term frequency vectors

idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
rescaledDataWithLang = idfModel.transform(featurizedData)


                                                                                

In [8]:
rescaledDataWithLang

DataFrame[lang: string, text: string, words: array<string>, rawFeatures: vector, features: vector]

In [9]:
from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(inputCol="lang", outputCol="label")
rescaledData = indexer.fit(rescaledDataWithLang).transform(rescaledDataWithLang)


In [10]:
print(rescaledData.count())
rescaledData


10000


DataFrame[lang: string, text: string, words: array<string>, rawFeatures: vector, features: vector, label: double]

In [11]:
rescaledData.select("features", "label").write.mode("overwrite").json("dataset_small_rescaled.json")

22/01/15 23:46:08 WARN DAGScheduler: Broadcasting large task binary with size 4.2 MiB
                                                                                

In [12]:
from pyspark.ml.linalg import VectorUDT
from pyspark.sql.types import StructType, StructField, DoubleType

schema = StructType([StructField('features', VectorUDT(),False), StructField('label', DoubleType(),False)])

rescaledData = spark.read.schema(schema=schema).json("dataset_small_rescaled.json")
rescaledData.show(10)

+--------------------+-----+
|            features|label|
+--------------------+-----+
|(262144,[2054,477...|  0.0|
|(262144,[5612,155...|  0.0|
|(262144,[550,1693...|  0.0|
|(262144,[448,1512...|  0.0|
|(262144,[2054,276...|  0.0|
|(262144,[1004,107...|  0.0|
|(262144,[5612,177...|  0.0|
|(262144,[6,3720,3...|  0.0|
|(262144,[3023,720...|  0.0|
|(262144,[1219,214...|  0.0|
+--------------------+-----+
only showing top 10 rows



In [13]:
rescaledData.select("label", "features").show(1)
rescaledData

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(262144,[2054,477...|
+-----+--------------------+
only showing top 1 row



DataFrame[features: vector, label: double]

In [14]:
(trainingData, testData) = rescaledData.select("label", "features").randomSplit([0.8, 0.2])

In [15]:
trainingData

DataFrame[label: double, features: vector]

In [16]:
training = spark \
    .read \
    .format("libsvm") \
    .load("sample_multiclass_classification_data.txt")
training

22/01/15 23:46:15 WARN LibSVMFileFormat: 'numFeatures' option not specified, determining the number of features by going though the input. If you know the number in advance, please specify it via 'numFeatures' option to avoid the extra scan.


DataFrame[label: double, features: vector]

In [19]:

from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(maxIter=100, regParam=0.3, elasticNetParam=0.8)

# Fit the model
lrModel = lr.fit(training)



In [20]:
# Print the coefficients and intercept for multinomial logistic regression
print("Coefficients: \n" + str(lrModel.coefficientMatrix))
print("Intercept: " + str(lrModel.interceptVector))

trainingSummary = lrModel.summary

# Obtain the objective per iteration
objectiveHistory = trainingSummary.objectiveHistory
print("objectiveHistory:")
for objective in objectiveHistory:
    print(objective)

# for multiclass, we can inspect metrics on a per-label basis
print("False positive rate by label:")
for i, rate in enumerate(trainingSummary.falsePositiveRateByLabel):
    print("label %d: %s" % (i, rate))

print("True positive rate by label:")
for i, rate in enumerate(trainingSummary.truePositiveRateByLabel):
    print("label %d: %s" % (i, rate))

print("Precision by label:")
for i, prec in enumerate(trainingSummary.precisionByLabel):
    print("label %d: %s" % (i, prec))

print("Recall by label:")
for i, rec in enumerate(trainingSummary.recallByLabel):
    print("label %d: %s" % (i, rec))

print("F-measure by label:")
for i, f in enumerate(trainingSummary.fMeasureByLabel()):
    print("label %d: %s" % (i, f))

accuracy = trainingSummary.accuracy
falsePositiveRate = trainingSummary.weightedFalsePositiveRate
truePositiveRate = trainingSummary.weightedTruePositiveRate
fMeasure = trainingSummary.weightedFMeasure()
precision = trainingSummary.weightedPrecision
recall = trainingSummary.weightedRecall
print("Accuracy: %s\nFPR: %s\nTPR: %s\nF-measure: %s\nPrecision: %s\nRecall: %s"
      % (accuracy, falsePositiveRate, truePositiveRate, fMeasure, precision, recall))

Coefficients: 
3 X 4 CSRMatrix
(0,3) 0.3053
(1,2) -0.7885
(1,3) -0.3633
Intercept: [0.050824384014652994,-0.12360035368876093,0.07277596967410795]
objectiveHistory:
1.098612288668108
1.087602085441699
1.0341156572156232
1.0289859520256008
1.0300389657358993
1.0239965158223991
1.0236097451839508
1.023108212197001
1.023022220302788
1.0230018151780265
1.0229963739557606
1.0229911245659569
1.0229874340180964
1.0229860342712205
1.0229832902098992
1.0229817403940862
1.0229813578951676
1.0229811458425946
1.0229809219195842
1.0229808777096137
1.0229808456821092
1.022980838296153
1.0229808366282138
1.022980836270468
1.0229808361102433
1.0229808360601507
1.0229808360560266
1.0229808360545576
1.022980836051717
False positive rate by label:
label 0: 0.22
label 1: 0.05
label 2: 0.0
True positive rate by label:
label 0: 1.0
label 1: 1.0
label 2: 0.46
Precision by label:
label 0: 0.6944444444444444
label 1: 0.9090909090909091
label 2: 1.0
Recall by label:
label 0: 1.0
label 1: 1.0
label 2: 0.46
F-mea