In [1]:
spark

In [2]:
import pandas as pd
import numpy as np
import seaborn as sns
import pyspark.sql
import pyspark.ml

In [3]:
df=spark.read.csv('/FileStore/tables/News_Data.csv',header=True,inferSchema=True)

In [4]:
df = df.filter("CATEGORY != ''")

In [5]:
df.head()

In [6]:
display(df.limit(5))

In [7]:
df.createOrReplaceTempView("tblnews")

In [8]:
%sql select * from tblnews

In [9]:
%sql select TITLE, PUBLISHER, CATEGORY from tblnews 

In [10]:
#group by is used to show the number of the articles in a category

In [11]:
%sql select count(TITLE), CATEGORY from tblnews group by CATEGORY order by count(TITLE) desc 

In [12]:
#group by is used to show the count of the publications

In [13]:
%sql select count(TITLE), PUBLISHER from tblnews group by PUBLISHER order by count(TITLE) desc

In [14]:
#Self join is used to show the title of the article which were written by the same PUBLISHER but not same

In [15]:
%sql SELECT A.TITLE, B.TITLE, A.PUBLISHER
FROM tblnews A, tblnews B
WHERE A.TITLE <> B.TITLE
AND A.PUBLISHER = B.PUBLISHER
order by A.PUBLISHER desc


In [16]:
#Having clause to show only count which is greater than 1000

In [17]:
%sql select count(TITLE), PUBLISHER from tblnews group by PUBLISHER having count(TITLE)>1000 

In [18]:
#the name of the PUBLISHER who has written maximum no of articles

In [19]:
%sql select  count(TITLE), PUBLISHER from tblnews group by PUBLISHER order by count(TITLE) desc LIMIT 1

In [20]:
#the name of the category which has maximum no of articles

In [21]:
%sql select count(TITLE), CATEGORY from tblnews group by CATEGORY order by count(TITLE) desc Limit 1

In [22]:
#select the Publisher and Category name whose name starts with alphabet a

In [23]:
%sql select PUBLISHER, CATEGORY from tblnews WHERE PUBLISHER LIKE 'a%'

In [24]:
#Select Publication between the two given TIMESTAMP

In [25]:
%sql select PUBLISHER,CATEGORY from tblnews where TIMESTAMP between '1.39447E+12' AND '1.40268E+12'

In [26]:
# Machine Learning 

In [27]:
from pyspark.ml.feature import StringIndexer
# Convert target into numerical categories
labelIndexer = StringIndexer(inputCol="CATEGORY", outputCol="label")

In [28]:
print(labelIndexer)

In [29]:
df.printSchema()

In [30]:
#Dividing dataset into training and testing 
(trainingData, testData) = df.randomSplit([0.7, 0.3], seed = 100)

trainingData.cache()
testData.cache()

print (trainingData.count())
print (testData.count())

In [31]:
trainingData

In [32]:
from pyspark.ml.classification import NaiveBayes
from pyspark.ml import Pipeline

# Train a NaiveBayes model
nb = NaiveBayes(smoothing=1.0, modelType="multinomial")

# Chain labelIndexer, vecAssembler and NBmodel in a 
pipeline = Pipeline(stages=[labelIndexer, nb])

In [33]:
from pyspark.ml import *
from pyspark.ml.classification import *
from pyspark.ml.feature import *
from pyspark.ml.param import *
from pyspark.ml.tuning import *
from pyspark.ml.evaluation import *

In [34]:
indexer = StringIndexer(inputCol="CATEGORY", outputCol="label")
indexed = indexer.fit(trainingData).transform(trainingData)

In [35]:
labelIndexer = StringIndexer(inputCol="CATEGORY", outputCol="label")

In [36]:
# Constructing a pipeline is done by creating each pipeline stage and configuing its parameters.
tokenizer = RegexTokenizer(inputCol="TITLE", outputCol="words", pattern="s+")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features", numFeatures=5000)
lr = LogisticRegression(maxIter=20, regParam=0.01)

# To create an ML pipeline you concatenate a sequence of stages.
pipeline = Pipeline(stages=[labelIndexer,tokenizer, hashingTF, lr])

In [37]:
# Run stages in pipeline and train model
model = pipeline.fit(trainingData)

In [38]:
# Make predictions on testData so we can measure the accuracy of our model on new data
predictions = model.transform(testData)

# Display what results we can view
predictions.printSchema()

In [39]:
display(predictions.select("label", "prediction", "probability"))

In [40]:
predictions

In [41]:
# After fitting, making predictions is as simple as calling "transform" on the model.
prediction = model.transform(trainingData)
# Show the predicted labels along with true labels and raw texts.
display(prediction.select("prediction", "label", "TITLE").limit(10))

In [42]:
evaluator = BinaryClassificationEvaluator(metricName="areaUnderROC")
evaluator.evaluate(prediction)

In [43]:
from pyspark.mllib.evaluation import MulticlassMetrics
# Create (prediction, label) pairs
predictionAndLabel = predictions.select("prediction", "label").rdd

# Generate confusion matrix
metrics = MulticlassMetrics(predictionAndLabel)

In [44]:
#GraphX and GraphFrames analysis

In [45]:
%scala
import org.apache.spark._
import org.apache.spark.rdd.RDD
import org.apache.spark.graphx._

In [46]:
df5=spark.read.csv('/FileStore/tables/vertex_edge.csv',header=True,inferSchema=True)

In [47]:
from graphframes import *

from pyspark.sql.functions import monotonically_increasing_id 

df = df.select("*").withColumn("id", monotonically_increasing_id())

g = GraphFrame(df, df5)
print (g)

In [48]:
results = g.pageRank(resetProbability=0.15, tol=0.01)
display(results.vertices)

In [49]:
display(results.edges)

In [50]:
g.pageRank(resetProbability=0.15, maxIter=10)

In [51]:
# Run PageRank personalized for vertex "a"
g.pageRank(resetProbability=0.15, maxIter=10, sourceId="1")

In [52]:
results = g.shortestPaths(landmarks=["1", "10"])
display(results)

In [53]:
result = g.labelPropagation(maxIter=5)
display(result)

In [54]:
result = g.stronglyConnectedComponents(maxIter=10)
display(result.select("id", "component"))

In [55]:
numFollows = g.edges.filter("relationship = 'same_category'").count()
print("The number of follow edges is", numFollows)

In [56]:
numFollows = g.edges.filter("relationship = 'same_publisher'").count()
print("The number of follow edges is", numFollows)