In [1]:
# You sould have spark installed in your server or your computer
# for the example here the spark installed in google could servers.

import findspark # you need to install this one by pip install findspark 
findspark.init('/home/sak/spark-2.4.3-bin-hadoop2.7') # set your path 
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('lr_example').getOrCreate()
from pyspark import SparkContext 
sc= spark.sparkContext
import pandas as pd
import numpy as np # linear algebra
import seaborn as sns
import matplotlib.pyplot as plt
import base64
import string
import re
from collections import Counter
from pyspark.sql.functions import col, lower, regexp_replace, split

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)


In [2]:
data = spark.read.csv("research_paper.csv", inferSchema=True, header=True) # importing the data from your computer/server
data.head(5)

[Row(Title='Innovation in Database Management: Computer Science vs. Engineering.', Conference='VLDB'),
 Row(Title='High performance prime field multiplication for GPU.', Conference='ISCAS'),
 Row(Title='enchanted scissors: a scissor interface for support in cutting and interactive fabrication.', Conference='SIGGRAPH'),
 Row(Title='Detection of channel degradation attack by Intermediary Node in Linear Networks.', Conference='INFOCOM'),
 Row(Title='Pinning a Complex Network through the Betweenness Centrality Strategy.', Conference='ISCAS')]

In [3]:
data = spark.read.csv("research_paper.csv", inferSchema=True, header=True)
dataPd = pd.read_csv("research_paper.csv")
data.show()


+--------------------+----------+
|               Title|Conference|
+--------------------+----------+
|Innovation in Dat...|      VLDB|
|High performance ...|     ISCAS|
|enchanted scissor...|  SIGGRAPH|
|Detection of chan...|   INFOCOM|
|Pinning a Complex...|     ISCAS|
|Analysis and Desi...|     ISCAS|
|Dynamic bluescreens.|  SIGGRAPH|
|A Quantitative As...|   INFOCOM|
|Automatic sanitiz...|       WWW|
|A &#916;&#931; IR...|     ISCAS|
|Architecture of a...|     ISCAS|
|Rule-based Servic...|       WWW|
|Business Policy M...|      VLDB|
|A high speed and ...|     ISCAS|
|PREDIcT: Towards ...|      VLDB|
|SocialSensor: sen...|       WWW|
|Parametric keyfra...|  SIGGRAPH|
|An Explanation fo...|   INFOCOM|
|Hot Block Cluster...|      VLDB|
|Analysis of propa...|     ISCAS|
+--------------------+----------+
only showing top 20 rows



In [4]:
data.printSchema()

root
 |-- Title: string (nullable = true)
 |-- Conference: string (nullable = true)



In [5]:
from pyspark.sql.functions import col 
data.groupBy("Conference") \
    .count() \
    .orderBy(col("count").desc()) \
    .show()

+----------+-----+
|Conference|count|
+----------+-----+
|     ISCAS|  864|
|   INFOCOM|  515|
|      VLDB|  423|
|       WWW|  379|
|  SIGGRAPH|  326|
+----------+-----+



In [6]:
from pyspark.sql.functions import isnan, when, count, col

data.select([count(when(isnan(c), c)).alias(c) for c in data.columns]).show()

+-----+----------+
|Title|Conference|
+-----+----------+
|    0|         0|
+-----+----------+



In [7]:
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler

def clean_text(c):
  c = lower(c)
  c = regexp_replace(c, "^rt ", "")
  c = regexp_replace(c, "(https?\://)\S+", "")
  c = regexp_replace(c, "[^a-zA-Z0-9\\s]", "")
  

  return c

data1 = data.select((clean_text(col("title")).alias("title")), ((clean_text(col("Conference")).alias("Conference"))))

data1.show(5)

       

+--------------------+----------+
|               title|Conference|
+--------------------+----------+
|innovation in dat...|      vldb|
|high performance ...|     iscas|
|enchanted scissor...|  siggraph|
|detection of chan...|   infocom|
|pinning a complex...|     iscas|
+--------------------+----------+
only showing top 5 rows



In [8]:
# regular expression tokenizer
regexTokenizer = RegexTokenizer(inputCol="Title", outputCol="words", pattern="\\W")
# stop words
remover = StopWordsRemover()
stopwords = remover.getStopWords()  
stopwordsRemover = StopWordsRemover(inputCol="words", outputCol="filtered").setStopWords(stopwords)
# bag of words count
countVectors = CountVectorizer(inputCol="filtered", outputCol="features", vocabSize=10000, minDF=5)

label_stringIdx = StringIndexer(inputCol = "Conference", outputCol = "label")
pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, countVectors, label_stringIdx])
# Fit the pipeline to training documents.
pipelineFit = pipeline.fit(data)
dataset = pipelineFit.transform(data)
dataset.show(5)






+--------------------+----------+--------------------+--------------------+--------------------+-----+
|               Title|Conference|               words|            filtered|            features|label|
+--------------------+----------+--------------------+--------------------+--------------------+-----+
|Innovation in Dat...|      VLDB|[innovation, in, ...|[innovation, data...|(791,[32,40,184,4...|  2.0|
|High performance ...|     ISCAS|[high, performanc...|[high, performanc...|(791,[16,42,301,3...|  0.0|
|enchanted scissor...|  SIGGRAPH|[enchanted, sciss...|[enchanted, sciss...|(791,[87,330,405]...|  4.0|
|Detection of chan...|   INFOCOM|[detection, of, c...|[detection, chann...|(791,[1,38,46,80,...|  1.0|
|Pinning a Complex...|     ISCAS|[pinning, a, comp...|[pinning, complex...|(791,[13,103,782]...|  0.0|
+--------------------+----------+--------------------+--------------------+--------------------+-----+
only showing top 5 rows



In [9]:
# set seed for reproducibility
(trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed = 100)
print("Training Dataset Count: " + str(trainingData.count()))
print("Test Dataset Count: " + str(testData.count()))

Training Dataset Count: 1756
Test Dataset Count: 751


In [10]:
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)
lrModel = lr.fit(trainingData)
predictions = lrModel.transform(testData)
predictions.filter(predictions['prediction'] == 0) \
    .select("Title","Conference","probability","label","prediction") \
    .orderBy("probability", ascending=False) \
    .show(n = 10, truncate = 30)

+------------------------------+----------+------------------------------+-----+----------+
|                         Title|Conference|                   probability|label|prediction|
+------------------------------+----------+------------------------------+-----+----------+
|Front-end amplifier of low-...|     ISCAS|[0.9873925466446979,0.00332...|  0.0|       0.0|
|Low-voltage SOI CMOS DTMOS/...|     ISCAS|[0.9742686991022053,0.00850...|  0.0|       0.0|
|A low power multi-mode CMOS...|     ISCAS|[0.9674492923760447,0.01337...|  0.0|       0.0|
|P<sup>2</sup>E-DWT: A paral...|     ISCAS|[0.9659249230169219,0.00634...|  0.0|       0.0|
|A continuous-time band-pass...|     ISCAS|[0.962968104908483,0.008284...|  0.0|       0.0|
|WL-VC SRAM: a low leakage m...|     ISCAS|[0.9606127886621211,0.00716...|  0.0|       0.0|
|Design of process variation...|     ISCAS|[0.9604440270461447,0.01587...|  0.0|       0.0|
|Split-ADC Digital Backgroun...|     ISCAS|[0.9579038172854173,0.00757...|  0.0|

In [11]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)

0.7650884770005698

In [12]:
from pyspark.ml.classification import DecisionTreeClassifier
dt = DecisionTreeClassifier(featuresCol = 'features', labelCol = 'label', maxDepth = 30)
dtModel = dt.fit(trainingData)
predictions = dtModel.transform(testData)
predictions.select("Title","Conference","probability","label","prediction") \
    .orderBy("probability", ascending=False) \
    .show(n = 10, truncate = 30)

from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)



+------------------------------+----------+---------------------+-----+----------+
|                         Title|Conference|          probability|label|prediction|
+------------------------------+----------+---------------------+-----+----------+
|A Compact On-Chip Capacitiv...|     ISCAS|[1.0,0.0,0.0,0.0,0.0]|  0.0|       0.0|
|A framework for benchmarkin...|       WWW|[1.0,0.0,0.0,0.0,0.0]|  3.0|       0.0|
|A Fast and reliable switchi...|     ISCAS|[1.0,0.0,0.0,0.0,0.0]|  0.0|       0.0|
|A 10 Gb/s optical receiver ...|     ISCAS|[1.0,0.0,0.0,0.0,0.0]|  0.0|       0.0|
|A Capacitor-free CMOS Low-d...|     ISCAS|[1.0,0.0,0.0,0.0,0.0]|  0.0|       0.0|
|A continuous-time band-pass...|     ISCAS|[1.0,0.0,0.0,0.0,0.0]|  0.0|       0.0|
|A 10-bit 2GHz Current-Steer...|     ISCAS|[1.0,0.0,0.0,0.0,0.0]|  0.0|       0.0|
|A 20-MS/s sigma delta modul...|     ISCAS|[1.0,0.0,0.0,0.0,0.0]|  0.0|       0.0|
|A Low Jitter CMOS PLL Clock...|     ISCAS|[1.0,0.0,0.0,0.0,0.0]|  0.0|       0.0|
|A 7

0.5279728595964894

In [13]:
from pyspark.ml.classification import NaiveBayes
nb = NaiveBayes(smoothing=1)
model = nb.fit(trainingData)
predictions = model.transform(testData)
predictions.filter(predictions['prediction'] == 0) \
    .select("Title","Conference","probability","label","prediction") \
    .orderBy("probability", ascending=False) \
    .show(n = 10, truncate = 30)
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)

+------------------------------+----------+------------------------------+-----+----------+
|                         Title|Conference|                   probability|label|prediction|
+------------------------------+----------+------------------------------+-----+----------+
|Low-voltage SOI CMOS DTMOS/...|     ISCAS|[0.9999999719795253,2.55481...|  0.0|       0.0|
|A low power multi-mode CMOS...|     ISCAS|[0.9999998440207164,1.17117...|  0.0|       0.0|
|A novel structure for the d...|     ISCAS|[0.999998052448344,1.710033...|  0.0|       0.0|
|Front-end amplifier of low-...|     ISCAS|[0.9999954888419709,1.92638...|  0.0|       0.0|
|Adaptive Low/High Voltage S...|     ISCAS|[0.9999937641443432,6.27888...|  0.0|       0.0|
|Improved hybrid coding sche...|     ISCAS|[0.9999918143518555,2.69522...|  0.0|       0.0|
|Dynamic sawtooth compensati...|     ISCAS|[0.9999900448691628,4.86796...|  0.0|       0.0|
|A continuous-time band-pass...|     ISCAS|[0.9999756264204607,1.36908...|  0.0|

0.7635266573430473