In [1]:
import pyspark
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName('Basics').getOrCreate()

In [3]:
train = sqlContext.sql("SELECT * FROM train_csv")

In [4]:
test = sqlContext.sql("SELECT * FROM test_csv1")

In [5]:
from pyspark.sql.functions import isnan, when, count, col
train.select([count(when(col(c).isNull(), c)).alias(c) for c in train.columns]).show()

In [6]:
train = train.fillna(' ', subset=['title', 'text'])
train.select([count(when(col(c).isNull(), c)).alias(c) for c in train.columns]).show()

In [7]:
from pyspark.sql.functions import when
train = train.withColumn("author", \
              when(train["author"] == 'nan', ' ').otherwise(train["author"]))
train.show()

In [8]:
 
from pyspark.sql.functions import isnan, when, count, col
test.select([count(when(col(c).isNull(), c)).alias(c) for c in test.columns]).show()

In [9]:
test = test.fillna(' ', subset=['title', 'text'])
test.select([count(when(col(c).isNull(), c)).alias(c) for c in test.columns]).show()

In [10]:
test = test.withColumn("author", \
              when(test["author"] == 'nan', ' ').otherwise(test["author"]))
test.show(5)

In [11]:
df = train

In [12]:
from pyspark.sql.functions import concat_ws, concat
df = train
df = df.withColumn('combi_text', concat_ws(' ', df.title, df.author, df.text))
df = df.drop('title','author','text')
df.show(1,truncate=False)

In [13]:
#Test data combining title, author and text
df_test = test
df_test = df_test.withColumn('combi_text', concat_ws(' ', df_test.title, df_test.author, df_test.text))
df_test = df_test.drop('title','author','text')
df_test.show(1,truncate=False)

In [14]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import Tokenizer, RegexTokenizer,Word2Vec, HashingTF
from pyspark.ml.feature import StandardScaler
 
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures", withStd=True, withMean=False)
def nlpTransform(data):
  tokenizer = Tokenizer(inputCol="combi_text", outputCol="words")
  wordsData = tokenizer.transform(data)
  hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures")
  featurizedData = hashingTF.transform(wordsData)
  scaler = StandardScaler(inputCol="rawFeatures", outputCol="features", withStd=True, withMean=False)
  featureData = scaler.fit(featurizedData)
  featureD = featureData.transform(featurizedData)
  return featureD

In [15]:
data = nlpTransform(df)
data = data.drop('combi_text', 'words', 'rawFeatures')

In [16]:
data.show(3)

In [17]:
testData = nlpTransform(df_test)
testData = testData.drop('combi_text', 'words','rawFeatures')
testData.show(1)

In [18]:
train_data, test_data = data.randomSplit([0.75, 0.25], seed=12345)

In [19]:
from pyspark.ml.classification import LogisticRegression
model=LogisticRegression(labelCol='label',maxIter=5, regParam=0.001)           
model=model.fit(train_data)                                                          
summary=model.summary
summary.predictions.describe().show() 

In [20]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
predictions=model.evaluate(test_data)

In [21]:
evaluator=BinaryClassificationEvaluator(rawPredictionCol='prediction',labelCol='label')
evaluator.evaluate(predictions.predictions)

In [22]:
#Achieved 96.3% accuracy on test data, lets predict on the unseen test data for kaggle submission.

In [23]:
results=model.transform(test_data)
results.select('features','prediction').show()

In [24]:
results.count()

In [25]:
results=model.transform(testData)
results.select('features','prediction').show()

In [26]:
results.count()

In [27]:
values = list(results.select('prediction').toPandas()['prediction'])

In [28]:
print(values)

In [29]:
import pandas as pd
res = pd.DataFrame(values)
res.head()

Unnamed: 0,0
0,0.0
1,1.0
2,1.0
3,0.0
4,1.0
