# Prediction using PySpark

In [1]:
import findspark
findspark.init()

from pyspark import SparkConf, SparkContext

try:
    sc.stop()
except NameError:
    print("sc not defined")

config = SparkConf().setMaster("local[*]").setAppName("ClassifyUrl")    
sc = SparkContext(conf=config)

sc not defined


In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
                    .master("local") \
                    .appName("Classify Urls") \
                    .getOrCreate()

In [27]:
train_csv = 'bigdata/train.csv'
html_csv = 'bigdata/train/html_data.csv'

In [4]:
%%time
train = spark.read.csv(train_csv, 
                       header=True,
                      inferSchema=True)

# Analyze Data types in dataset
train.printSchema()

root
 |-- Webpage_id: integer (nullable = true)
 |-- Domain: string (nullable = true)
 |-- Url: string (nullable = true)
 |-- Tag: string (nullable = true)

Wall time: 6.62 s


In [5]:
print('Total Records in Training Dataset :', train.count())
train.head(5)

Total Records in Training Dataset : 53447


[Row(Webpage_id=1, Domain='www.fiercepharma.com', Url='http://www.fiercepharma.com/marketing/tecfidera-gilenya-and-aubagio-s-3-way-battle-for-ms-share-about-to-get-more-interesting', Tag='news'),
 Row(Webpage_id=2, Domain='www.fiercepharma.com', Url='http://www.fiercepharma.com/pharma/novo-equipped-to-weather-storm-u-s-diabetes-market-ceo-says', Tag='news'),
 Row(Webpage_id=3, Domain='www.fiercepharma.com', Url='http://www.fiercepharma.com/pharma/another-exec-departs-troubled-endo-and-time-it-s-for-another-drugmaker', Tag='news'),
 Row(Webpage_id=4, Domain='www.fiercepharma.com', Url='http://www.fiercepharma.com/pharma/teva-buy-biosim-specialist-celltrion-it-wouldn-t-say-no', Tag='news'),
 Row(Webpage_id=5, Domain='www.fiercepharma.com', Url='http://www.fiercepharma.com/marketing/actress-marissa-tomei-partners-allergan-restasis-to-drive-dry-eye-awareness', Tag='news')]

In [6]:
# head() gives Ugly Output :(
# Prefer show() over head()
train.show(5)

+----------+--------------------+--------------------+----+
|Webpage_id|              Domain|                 Url| Tag|
+----------+--------------------+--------------------+----+
|         1|www.fiercepharma.com|http://www.fierce...|news|
|         2|www.fiercepharma.com|http://www.fierce...|news|
|         3|www.fiercepharma.com|http://www.fierce...|news|
|         4|www.fiercepharma.com|http://www.fierce...|news|
|         5|www.fiercepharma.com|http://www.fierce...|news|
+----------+--------------------+--------------------+----+
only showing top 5 rows



In [7]:
# How many columns do we have in train and what are their names?
print('No. of cols in train dataset : ', len(train.columns))
train.columns

No. of cols in train dataset :  4


['Webpage_id', 'Domain', 'Url', 'Tag']

In [8]:
# How to get the summary statistics (mean, standard deviance, min ,max , count) of numerical columns in a DataFrame?
train.describe().show()

'''
Observation:
As we can see that, describe operation is working for String type column but the output for mean, stddev are null and min & max values are calculated based on ASCII value of categories.
'''

+-------+------------------+--------------------+--------------------+--------------+
|summary|        Webpage_id|              Domain|                 Url|           Tag|
+-------+------------------+--------------------+--------------------+--------------+
|  count|             53447|               53447|               53447|         53447|
|   mean| 39920.78603102139|                null|                null|          null|
| stddev|22945.942450142324|                null|                null|          null|
|    min|                 1|  1.eyefortravel.com|http://1.eyefortr...|clinicalTrials|
|    max|             79345|zoonosis.conferen...|https://zoologica...|        thesis|
+-------+------------------+--------------------+--------------------+--------------+



'\nObservation:\nAs we can see that, describe operation is working for String type column but the output for mean, stddev are null and min & max values are calculated based on ASCII value of categories.\n'

In [9]:
%%time
# How to select column(s) from the DataFrame?
train.select('Webpage_id','Domain','Tag').show(5)

+----------+--------------------+----+
|Webpage_id|              Domain| Tag|
+----------+--------------------+----+
|         1|www.fiercepharma.com|news|
|         2|www.fiercepharma.com|news|
|         3|www.fiercepharma.com|news|
|         4|www.fiercepharma.com|news|
|         5|www.fiercepharma.com|news|
+----------+--------------------+----+
only showing top 5 rows

Wall time: 804 ms


In [10]:
%%time
# How to find the number of distinct Domain and Tags (Target-Classes) in train files?
train.select('Domain').distinct().count(), train.select('Tag').distinct().count()

Wall time: 54.6 s


(3974, 9)

In [11]:
# Check for Null values in 
print('Count of Missing values in Train Dataset :\n')
print('Domain :', train.filter(train.Domain.isNull()).count())
print('Url :', train.filter(train.Url.isNull()).count())
print('Tag :', train.filter(train.Tag.isNull()).count())

Count of Missing values in Train Dataset :

Domain : 0
Url : 0
Tag : 0


In [12]:
# How to drop the all rows with null value?
train.dropna().count() # Count of rows in newly returned non-null dataframes

53447

In [13]:
# How to fill the null values in Domain column of DataFrame with, some constant value, say, 'www.missing.in'?
missing_domain = 'www.missing.in'
train.fillna(missing_domain, 'Domain').count() # Count of rows in newly returned non-null dataframes

53447

In [14]:
# How to create a sample DataFrame from the base DataFrame?
train.sample(False, # withReplacement=False
             0.0001, # fraction = x percecntage that we want to pick
             42 # seed to reproduce the result
            ).show()

+----------+--------------------+--------------------+-----------+
|Webpage_id|              Domain|                 Url|        Tag|
+----------+--------------------+--------------------+-----------+
|     18472|cancerci.biomedce...|https://cancerci....|publication|
|     26364|www.naturalnews.c...|http://www.natura...|       news|
|     28191|         twitter.com|https://twitter.c...|     others|
+----------+--------------------+--------------------+-----------+



In [15]:
# How to find the number of rows we have per Domain?
from pyspark.sql.functions import col
train.groupBy('Domain') \
    .count() \
    .filter("`count` > 225") \
    .sort(col('count').desc()) \
    .show(10) # Show Count of Top 10

# Or like below:
from pyspark.sql.functions import desc
train.groupBy('Domain') \
    .count() \
    .filter("`count` > 225") \
    .sort(desc('count')) \
    .show(10) # Show Count of Top 10

# Or like below:
from pyspark.sql.functions import desc
train.groupBy('Domain') \
    .count() \
    .filter("`count` > 225") \
    .orderBy('count', ascending=False) \
    .show(10) # Show Count of Top 10

+--------------------+-----+
|              Domain|count|
+--------------------+-----+
|thesis.library.ca...|  301|
|academiccommons.c...|  300|
|  www.dart-europe.eu|  300|
|       curate.nd.edu|  300|
|      dspace.mit.edu|  300|
|ecommons.cornell.edu|  300|
|     www.nice.org.uk|  230|
|www.ncbi.nlm.nih.gov|  226|
+--------------------+-----+

+--------------------+-----+
|              Domain|count|
+--------------------+-----+
|thesis.library.ca...|  301|
|academiccommons.c...|  300|
|  www.dart-europe.eu|  300|
|       curate.nd.edu|  300|
|      dspace.mit.edu|  300|
|ecommons.cornell.edu|  300|
|     www.nice.org.uk|  230|
|www.ncbi.nlm.nih.gov|  226|
+--------------------+-----+

+--------------------+-----+
|              Domain|count|
+--------------------+-----+
|thesis.library.ca...|  301|
|academiccommons.c...|  300|
|  www.dart-europe.eu|  300|
|       curate.nd.edu|  300|
|      dspace.mit.edu|  300|
|ecommons.cornell.edu|  300|
|     www.nice.org.uk|  230|
|www.ncbi.nl

In [23]:
%%time 
# Wall time: 2.13s - 2.39s

# OBJECTIVE : Get just the domain from URLs
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
import tldextract

def extract_domain(url):
    return tldextract.extract(url).domain

extract_domain_udf = udf(extract_domain, StringType())
# extract_domain_udf = udf(lambda url : tldextract.extract(url).domain, StringType())

train = train.withColumn('Domain', extract_domain_udf(train.Domain))
train.show(5)

+----------+------------+--------------------+----+
|Webpage_id|      Domain|                 Url| Tag|
+----------+------------+--------------------+----+
|         1|fiercepharma|http://www.fierce...|news|
|         2|fiercepharma|http://www.fierce...|news|
|         3|fiercepharma|http://www.fierce...|news|
|         4|fiercepharma|http://www.fierce...|news|
|         5|fiercepharma|http://www.fierce...|news|
+----------+------------+--------------------+----+
only showing top 5 rows

Wall time: 2.13 s


In [28]:
from bs4 import BeautifulSoup # For Scraping HTML page
from bs4.element import Comment
import re

In [29]:
# Objective : Extract text from title tag of HTML source of web-page
def extract_title(page):
    if (page == None): 
        return None
    soup = BeautifulSoup(page, 'html.parser')
    title_tag = soup.find('title')
    if (title_tag == None):
        title = None
    else:
        title = title_tag.text.strip()
    return title

In [30]:
#  OBJECTIVE: Functions to parse HTML content and extract text that matters.
def extract_body(page):
    if (page == None): 
        return None
    soup = BeautifulSoup(page, 'html.parser', from_encoding="utf-8")
    body_tag = soup.find('body')
    if (body_tag == None):
        body = page 
    else:
        body = body_tag # What should be returned here? How to stringify this for further  procecssing?
    return body

def is_visible_content(element):
    if element.parent.name in ['style', 'script', 'head', 'title', 'meta', '[document]']:
        return False
    if isinstance(element, Comment):
        return False
    return True

def remove_extra_spaces(str):
    return u" ".join(str.split())

def extract_text(page):
    if (page == None): 
        return None
    soup = BeautifulSoup(page, 'html.parser') #, from_encoding="utf-8"
    texts = soup.findAll(text=True) # Extracts text from all HTML Markups, incl nested ones
    visible_texts = filter(is_visible_content, texts)
    # The u-prefix u" ".join() indicates Unicode and has been in python since v2.0
    # Ref. Read: https://www.joelonsoftware.com/2003/10/08/the-absolute-minimum-every-software-developer-absolutely-positively-must-know-about-unicode-and-character-sets-no-excuses/
    text = u" ".join(remove_extra_spaces(t.strip()) for t in visible_texts)
    text = text.replace(',','')
    text = text.replace('|','')
    text = re.sub(r'\s\s+',' ',text).strip()
    return text.encode('utf-8',errors='ignore').decode('utf-8').strip()

In [41]:
%%time

# OBJECTIVE : Read html_data.csv
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from pyspark.sql.functions import lit # lit for literals

extract_text_udf = udf(extract_text, StringType())

from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import DoubleType, IntegerType, StringType
schema = StructType([
    StructField("Webpage_id", IntegerType()),
    StructField("Html", StringType())
])

html_df = spark.read.csv(html_csv, 
                         header=True, 
                         ignoreLeadingWhiteSpace=True, 
                         ignoreTrailingWhiteSpace=True, 
                         encoding='utf-8', 
                         inferSchema=True, 
                         maxColumns=2)
'''
html_df = spark.read.format('csv') \
                    .option('header',True) \
                    .option('ignoreLeadingWhiteSpace',True) \
                    .option('ignoreTrailingWhiteSpace',True) \
                    .option('inferSchema',True) \
                    .option('maxColumns',2) \
                    .load(html_csv)
'''
# Analyze Data types in dataset
html_df.printSchema()

Py4JJavaError: An error occurred while calling o566.load.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 50.0 failed 1 times, most recent failure: Lost task 0.0 in stage 50.0 (TID 1080, localhost, executor driver): com.univocity.parsers.common.TextParsingException: java.lang.ArrayIndexOutOfBoundsException - null
Parser Configuration: CsvParserSettings:
	Auto configuration enabled=true
	Autodetect column delimiter=false
	Autodetect quotes=false
	Column reordering enabled=true
	Empty value=null
	Escape unquoted values=false
	Header extraction enabled=null
	Headers=null
	Ignore leading whitespaces=true
	Ignore trailing whitespaces=true
	Input buffer size=128
	Input reading on separate thread=false
	Keep escape sequences=false
	Keep quotes=false
	Length of content displayed on error=-1
	Line separator detection enabled=false
	Maximum number of characters per column=-1
	Maximum number of columns=2
	Normalize escaped line separators=true
	Null value=
	Number of records to read=all
	Processor=none
	Restricting data in exceptions=false
	RowProcessor error handler=null
	Selected fields=none
	Skip bits as whitespace=true
	Skip empty lines=true
	Unescaped quote handling=STOP_AT_DELIMITERFormat configuration:
	CsvFormat:
		Comment character=\0
		Field delimiter=,
		Line separator (normalized)=\n
		Line separator sequence=\r\n
		Quote character="
		Quote escape character=\
		Quote escape escape character=null
Internal state when error was thrown: line=3, column=3, record=3, charIndex=755, headers=[1, <!DOCTYPE html>]
	at com.univocity.parsers.common.AbstractParser.handleException(AbstractParser.java:368)
	at com.univocity.parsers.common.AbstractParser.parseLine(AbstractParser.java:609)
	at org.apache.spark.sql.execution.datasources.csv.TextInputCSVDataSource$$anonfun$5$$anonfun$apply$2.apply(CSVDataSource.scala:169)
	at org.apache.spark.sql.execution.datasources.csv.TextInputCSVDataSource$$anonfun$5$$anonfun$apply$2.apply(CSVDataSource.scala:169)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
	at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
	at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1336)
	at scala.collection.TraversableOnce$class.aggregate(TraversableOnce.scala:214)
	at scala.collection.AbstractIterator.aggregate(Iterator.scala:1336)
	at org.apache.spark.rdd.RDD$$anonfun$aggregate$1$$anonfun$21.apply(RDD.scala:1116)
	at org.apache.spark.rdd.RDD$$anonfun$aggregate$1$$anonfun$21.apply(RDD.scala:1116)
	at org.apache.spark.SparkContext$$anonfun$33.apply(SparkContext.scala:2130)
	at org.apache.spark.SparkContext$$anonfun$33.apply(SparkContext.scala:2130)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ArrayIndexOutOfBoundsException

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1602)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1590)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1589)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1589)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1823)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1772)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1761)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2034)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2131)
	at org.apache.spark.rdd.RDD$$anonfun$aggregate$1.apply(RDD.scala:1118)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.aggregate(RDD.scala:1111)
	at org.apache.spark.sql.execution.datasources.csv.CSVInferSchema$.infer(CSVInferSchema.scala:44)
	at org.apache.spark.sql.execution.datasources.csv.TextInputCSVDataSource$.inferFromDataset(CSVDataSource.scala:171)
	at org.apache.spark.sql.execution.datasources.csv.TextInputCSVDataSource$.infer(CSVDataSource.scala:149)
	at org.apache.spark.sql.execution.datasources.csv.CSVDataSource.inferSchema(CSVDataSource.scala:63)
	at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat.inferSchema(CSVFileFormat.scala:57)
	at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$8.apply(DataSource.scala:202)
	at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$8.apply(DataSource.scala:202)
	at scala.Option.orElse(Option.scala:289)
	at org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:201)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:392)
	at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:239)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:227)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:174)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:745)
Caused by: com.univocity.parsers.common.TextParsingException: java.lang.ArrayIndexOutOfBoundsException - null
Parser Configuration: CsvParserSettings:
	Auto configuration enabled=true
	Autodetect column delimiter=false
	Autodetect quotes=false
	Column reordering enabled=true
	Empty value=null
	Escape unquoted values=false
	Header extraction enabled=null
	Headers=null
	Ignore leading whitespaces=true
	Ignore trailing whitespaces=true
	Input buffer size=128
	Input reading on separate thread=false
	Keep escape sequences=false
	Keep quotes=false
	Length of content displayed on error=-1
	Line separator detection enabled=false
	Maximum number of characters per column=-1
	Maximum number of columns=2
	Normalize escaped line separators=true
	Null value=
	Number of records to read=all
	Processor=none
	Restricting data in exceptions=false
	RowProcessor error handler=null
	Selected fields=none
	Skip bits as whitespace=true
	Skip empty lines=true
	Unescaped quote handling=STOP_AT_DELIMITERFormat configuration:
	CsvFormat:
		Comment character=\0
		Field delimiter=,
		Line separator (normalized)=\n
		Line separator sequence=\r\n
		Quote character="
		Quote escape character=\
		Quote escape escape character=null
Internal state when error was thrown: line=3, column=3, record=3, charIndex=755, headers=[1, <!DOCTYPE html>]
	at com.univocity.parsers.common.AbstractParser.handleException(AbstractParser.java:368)
	at com.univocity.parsers.common.AbstractParser.parseLine(AbstractParser.java:609)
	at org.apache.spark.sql.execution.datasources.csv.TextInputCSVDataSource$$anonfun$5$$anonfun$apply$2.apply(CSVDataSource.scala:169)
	at org.apache.spark.sql.execution.datasources.csv.TextInputCSVDataSource$$anonfun$5$$anonfun$apply$2.apply(CSVDataSource.scala:169)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
	at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
	at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1336)
	at scala.collection.TraversableOnce$class.aggregate(TraversableOnce.scala:214)
	at scala.collection.AbstractIterator.aggregate(Iterator.scala:1336)
	at org.apache.spark.rdd.RDD$$anonfun$aggregate$1$$anonfun$21.apply(RDD.scala:1116)
	at org.apache.spark.rdd.RDD$$anonfun$aggregate$1$$anonfun$21.apply(RDD.scala:1116)
	at org.apache.spark.SparkContext$$anonfun$33.apply(SparkContext.scala:2130)
	at org.apache.spark.SparkContext$$anonfun$33.apply(SparkContext.scala:2130)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	... 1 more
Caused by: java.lang.ArrayIndexOutOfBoundsException


In [None]:
%%time

# OBJECTIVE : From html_data.csv loaded in df, extract title and text from html-page, and add the them to train.csv as new columns

# Adding a constant column 
html_df = html_df.withColumn('Title',lit(''))
html_df.show(5)

# Transforming an existing column
html_df = html_df.withColumn('Html',extract_text_udf(html_df.Html))
html_df = html_df.withColumnRenamed('Html','Html2Text')
html_df.write.csv('bigdata/train/spark1.csv', header=True)