## Overview

This notebook will show you how to create and query a table or DataFrame that you uploaded to DBFS. [DBFS](https://docs.databricks.com/user-guide/dbfs-databricks-file-system.html) is a Databricks File System that allows you to store data for querying inside of Databricks. This notebook assumes that you have a file already inside of DBFS that you would like to read from.

This notebook is written in **Python** so the default cell type is Python. However, you can use different languages by using the `%LANGUAGE` syntax. Python, Scala, SQL, and R are all supported.

In [2]:
from pyspark.sql.functions import isnan, when, count, col, monotonically_increasing_id, udf
import pyspark.sql.functions as f
from pyspark.ml.feature import StringIndexer, VectorAssembler, OneHotEncoder
from pyspark.ml.stat import Correlation
import sparknlp
from sparknlp.pretrained import PretrainedPipeline
!pip install googletrans
from googletrans import Translator
from pyspark.sql.types import StringType
from pyspark.ml import Pipeline
from pyspark.sql.window import Window
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

In [3]:
# File location and type
file_location = "/FileStore/tables/RS_v2_2006_03.json"
file_type = "json"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","


# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

df_id = df.withColumn("id_1", monotonically_increasing_id())
df_id.createOrReplaceTempView('df_temp')
df_id = spark.sql('select *, row_number() over (order by "idx") as index from df_temp').drop('id_1')

display(df_id)

Slicing the dataframe

In [5]:
df1 = df_id.select(df_id.columns[:])
# display(df1)

Dataframe description

In [7]:
df1.count()

In [8]:
display(df1.describe())

In [9]:
# counting the null values in the dataframe
display(df1.select([count(when(col(c).isNull(), c)).alias(c) for c in df1.columns]))

In [10]:
display(df1.groupBy("subreddit_id").count())

In [11]:
display(df1.groupBy("subreddit_name_prefixed").count())

Seems like the above 2 columns are correlated, thus checking their correlation

In [13]:
indexer = StringIndexer(inputCol="subreddit_id", outputCol="subreddit_idIndex")
df1 = indexer.fit(df1).transform(df1)
display(df1)

In [14]:
indexer = StringIndexer(inputCol="subreddit_name_prefixed", outputCol="subreddit_name_prefixedIndex")
df1 = indexer.fit(df1).transform(df1)
display(df1)

In [15]:
dfcorr1 = df1.select("subreddit_name_prefixedIndex", "subreddit_idIndex")
display(dfcorr1)

In [16]:
# convert to vector column first
assembler = VectorAssembler(inputCols=dfcorr1.columns, outputCol="corr_features")
df_vector = assembler.transform(dfcorr1).select("corr_features")

# get correlation matrix
matrix = Correlation.corr(df_vector, "corr_features")
matrix.collect()[0]["pearson({})".format("corr_features")].values

In [17]:
display(df1.groupBy("subreddit_type").count())

In [18]:
display(df1.groupBy("thumbnail").count())

Removing suggested_sort, thumbnail_height, thumbnail_width as the columns have more than 10000 null values. Thumbnail column is removed because the data is highly imbalanced which will not help in model building. Since Subreddit_name_prefixed and subreddit_id is 100% correlated it's best to select only one of these columns.

In [20]:
df1n = df1.select([c for c in df1.columns if c not in {'suggested_sort','thumbnail_height','thumbnail_width','thumbnail','subreddit_name_prefixed', 'subreddit_name_prefixedIndex', 'subreddit_id', 'subreddit_idIndex'}])
display(df1n)

##Case1 - Null replaced as a third variable

In [22]:
df1n = df1n.fillna({"whitelist_status":0})
display(df1n.groupBy("whitelist_status").count())

In [23]:
indexer = StringIndexer(inputCol="whitelist_status", outputCol="whitelist_statusIndex")
df1n = indexer.fit(df1n).transform(df1n)
display(df1n)

In [24]:
dfcorr = df1n.select("score", "whitelist_statusIndex")
display(dfcorr)

In [25]:
# convert to vector column first
assembler = VectorAssembler(inputCols=dfcorr.columns, outputCol="corr_features")
df_vector = assembler.transform(dfcorr).select("corr_features")

# get correlation matrix
matrix = Correlation.corr(df_vector, "corr_features")
matrix.collect()[0]["pearson({})".format("corr_features")].values

## Case2 - Null replaced with most frequent variable

In [27]:
df1 = df1.fillna({"whitelist_status":"all_ads"})
display(df1.groupBy("whitelist_status").count())

In [28]:
indexer = StringIndexer(inputCol="whitelist_status", outputCol="whitelist_statusIndex")
df1 = indexer.fit(df1).transform(df1)
dfcorr = df1.select("score", "whitelist_statusIndex")

# convert to vector column first
assembler = VectorAssembler(inputCols=dfcorr.columns, outputCol="corr_features")
df_vector = assembler.transform(dfcorr).select("corr_features")

# get correlation matrix
matrix = Correlation.corr(df_vector, "corr_features")
matrix.collect()[0]["pearson({})".format("corr_features")].values

Since imputing the null with median reduces the correlation as well as makes the data more imbalanced thus, we will not impute the null values with median instead we will take it as a third variable.

In [30]:
indexer = StringIndexer(inputCol="subreddit_type", outputCol="subreddit_typeIndex")
df1n = indexer.fit(df1n).transform(df1n)
display(df1n)

In [31]:
# make a temp dataframe using input column labelled as text
df_text_title = df.select("title")
# df_text_title = df_text_title.withColumnRenamed("title", "text")
display(df_text_title)

In [32]:
# https://stackoverflow.com/questions/54811428/why-am-i-getting-a-modulenotfounderror-when-using-googletrans
# https://stackoverflow.com/questions/59714502/python-translate-a-column-with-multiple-languages-to-english
def trans(x):
  translator=Translator()
  return translator.translate(x, dest="en").text

lation = udf(trans)
spark.udf.register("lation", lation)
df_text_title = df_text_title.withColumn("translated text", lation("title"))
df_text_title1 = df_text_title.select("translated text")
# df_text_title2 = df_text_title2.withColumnRenamed("translated text", "text")
display(df_text_title1)

In [33]:
def clean_str(x):
  punc = ''';:,.|'''
  for ch in x:
    if ch in punc:
      x = x.replace(ch, '')
  return x
clean = udf(clean_str)
spark.udf.register("clean", clean)
df_text_title1 = df_text_title1.withColumn("text", clean("translated text"))
df_text_title2 = df_text_title1.select("text")
# display(df_text_title2)

In [34]:
# #initialize NER pipeline. It takes col with name text
pipeline = PretrainedPipeline("analyze_sentiment", lang="en")
df_transformed = pipeline.transform(df_text_title2)

In [35]:
#store the results in a temporary dataframe
df_new = df_transformed.select("sentiment.result", "text")
display(df_new)

In [36]:
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
#join temperorary dataframe to original dataframe
df_new = df_new.withColumn("id_1", monotonically_increasing_id())
df_new.createOrReplaceTempView('df_temp1')
df_new = spark.sql('select *, row_number() over (order by "idx") as index1 from df_temp1').drop('id_1')
df_full = df1n.join(df_new, df1n.index == df_new.index1).orderBy(df1n.index).drop("index1", "title")
# display(df_full)

In [37]:
#cache statement, do all feature engineering transformations, then perform the display action in the end
df_full.cache()

In [38]:
 #https://stackoverflow.com/questions/48927271/count-number-of-words-in-a-spark-dataframe
df_full = df_full.withColumn("length_title", f.length("text"))
df_full = df_full.withColumn('titlewordCount', f.size(f.split(f.col('text'), ' ')))            
df_full = df_full.withColumn('url length', f.length(f.split(f.col('url'), '//').getItem(1)))
# display(df_full)

In [39]:
#has exclamation mark in the end?
df_full = df_full.withColumn( \
          'has_exclamation_mark', f.when(f.col("text").endswith('!') == True , 1) \
          .otherwise(0))

#has question mark in the end?
df_full = df_full.withColumn( \
          'has_question_mark', f.when(f.col("text").endswith('?') == True , 1) \
          .otherwise(0))

In [40]:
df_full.registerTempTable('dfexplor')
dfex = sqlContext.sql("SELECT has_question_mark, avg(score) as avg_score from dfexplor group by has_question_mark ")
dfex = dfex.toPandas()
dfex.plot.bar(x = "has_question_mark", rot = 0)

In [41]:
dfex1 = sqlContext.sql("SELECT has_exclamation_mark, avg(score) as avg_score from dfexplor group by has_exclamation_mark ")
dfex1 = dfex1.toPandas()
dfex1.plot.bar(x = "has_exclamation_mark", rot = 0)

In [42]:
# def get_last_part(x):
#   return str(x).split("//")[-1].split('/')[0].split('.')[-1]
# # String type output

# get_last_part_udf = udf(get_last_part, StringType())
# spark.udf.register("get_last_part_udf", get_last_part_udf)
# df_full = df_full.withColumn("top domain url", get_last_part_udf('url'))
# display(df_full)

In [43]:
def last(x):
  return str(x).split('.')[-1]
# String type output

last_udf = udf(last, StringType())
spark.udf.register("last_udf", last_udf)
df_full = df_full.withColumn("last", last_udf('domain'))
# display(df_full)

In [44]:
def protocol(x):
  return str(x).split(":")[0]

# String type output
protocol_udf = udf(protocol, StringType())
spark.udf.register("protocol_udf", protocol_udf)
df_full = df_full.withColumn("protocol", protocol_udf('url'))
df_full = df_full.drop("subreddit_type", "url", "whitelist_status", "text")
display(df_full)

In [45]:
# display(df_full.groupBy("top domain url").count())

In [46]:
dfs = df_full.select("*").toPandas()
plt.figure(figsize = (15, 10))
sx = sns.distplot(dfs["score"], kde=False)
sx.set_title("Histogram of Popularity", fontsize = 15)
sx.set_xlabel("Popularity", fontsize = 15)
sx.set_ylabel("Frequency", fontsize = 15)

In [47]:
plt.figure(figsize = (15, 10))
px = sns.distplot(dfs["length_title"], kde=False)
px.set_title("Histogram of number of characters in Title", fontsize = 15)
px.set_xlabel("Title length", fontsize = 15)
px.set_ylabel("Frequency", fontsize = 15)

In [48]:
display(df_full.groupBy("last").count().orderBy('count', ascending=False))

In [49]:
display(df_full.groupBy("last").count())

In [50]:
# display(df_full.groupBy("protocol").count())

In [51]:
# display(df_full.select([count(when(col(c).isNull(), c)).alias(c) for c in df_full.columns]))

In [52]:
# display(df_full.groupBy("lastIndex").count().orderBy('lastIndex'))

In [53]:
# File location and type
file_location = "/FileStore/tables/top500Domains.csv"
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df500 = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

display(df500)

In [54]:
df500 = df500.withColumn("last_domain", last_udf('Root Domain'))
windowSpec = Window.partitionBy(df500['last_domain'])
df500 = df500.withColumn("average da", f.avg(df500['Domain Authority']).over(windowSpec)).orderBy('Rank')
display(df500)

In [55]:
dflast = df500.select('last_domain', 'average da').distinct()
display(dflast)

In [56]:
df_full = df_full.join(dflast, df_full.last == dflast.last_domain)
df_full = df_full.fillna({"average_da": 1})
# display(df_full)

In [57]:
df_full = df_full.withColumn("result1", df_full["result"].getItem(1)) \
          .withColumn("result", df_full["result"].getItem(0)).drop('last_domain')

df_full = df_full.fillna({"result":"none"})
indexer = StringIndexer(inputCol="result", outputCol="resultIndex")
df_full = indexer.fit(df_full).transform(df_full)

indexer1 = StringIndexer(inputCol="last", outputCol="lastIndex")
df_full = indexer1.fit(df_full).transform(df_full)

indexer2 = StringIndexer(inputCol="protocol", outputCol="protocolIndex")
df_full = indexer2.fit(df_full).transform(df_full)

df_full = df_full.fillna({"result1":"none"})
indexer3 = StringIndexer(inputCol="result1", outputCol="result1Index")
df_full = indexer3.fit(df_full).transform(df_full)

df_full = df_full.drop("result", "last", "protocol", "result1")

# display(df_full)

In [58]:
train_pdf = df_full.select("length_title", "titlewordCount", "url length", "has_exclamation_mark", "has_question_mark", "average da", "whitelist_statusIndex",\
                           "subreddit_typeIndex", "resultIndex", "lastIndex", "protocolIndex", "result1Index").toPandas()

corr = train_pdf.corr()

plt.figure(figsize = (10, 10))
ax = sns.heatmap(corr, vmin=-1, vmax=1, center=0, cmap=sns.diverging_palette(20, 220, n=200), square=True, annot = True, fmt = ".2f")
ax.set_xticklabels(ax.get_xticklabels(), rotation=45, horizontalalignment='right')

In [59]:
df_train = df_full.select(df_full.columns[:])

ohe = OneHotEncoder(dropLast=False, inputCol="whitelist_statusIndex", outputCol="whitelist_status_ohe")
# model = ohe.fit(df_full)
df_train = ohe.transform(df_train)

ohe1 = OneHotEncoder(dropLast=False, inputCol="subreddit_typeIndex", outputCol="subreddit_type_ohe")
# model1 = ohe1.fit(df_full)
df_train = ohe1.transform(df_train)

ohe2 = OneHotEncoder(dropLast=False, inputCol="resultIndex", outputCol="result_ohe")
# model2 = ohe2.fit(df_full)
df_train = ohe2.transform(df_train)

df_train = df_train.fillna({"lastIndex":100})
ohe3 = OneHotEncoder(dropLast=False, inputCol="lastIndex", outputCol="last_ohe")
# model3 = ohe3.fit(df_full)
df_train = ohe3.transform(df_train)

ohe4 = OneHotEncoder(dropLast=False, inputCol="protocolIndex", outputCol="protocol_ohe")
# model4 = ohe4.fit(df_full)
df_train = ohe4.transform(df_train)

ohe5 = OneHotEncoder(dropLast=False, inputCol="result1Index", outputCol="result1_ohe")
# model5 = ohe5.fit(df_full)
df_train = ohe5.transform(df_train)

df_train = df_train.drop("whitelist_statusIndex", "subreddit_typeIndex", "resultIndex", "protocolIndex", "result1Index", "lastIndex")

display(df_train)

##Test Data

In [61]:
# File location and type
file_location = "/FileStore/tables/RS_v2_2006_04.json"
file_type = "json"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","


# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

df_id = df.withColumn("id_1", monotonically_increasing_id())
df_id.createOrReplaceTempView('df_temp')
df_id = spark.sql('select *, row_number() over (order by "idx") as index from df_temp').drop('id_1')

display(df_id)

In [62]:
df1 = df_id.select([c for c in df_id.columns if c not in {'suggested_sort','thumbnail_height','thumbnail_width','thumbnail','subreddit_name_prefixed', 'subreddit_id'}])
display(df1)

In [63]:
df1 = df1.fillna({"whitelist_status":0})
indexer = StringIndexer(inputCol="whitelist_status", outputCol="whitelist_statusIndex")
df1 = indexer.fit(df1).transform(df1)
df1 = df1.drop("whitelist_status")
display(df1)

In [64]:
indexer = StringIndexer(inputCol="subreddit_type", outputCol="subreddit_typeIndex")
df1 = indexer.fit(df1).transform(df1)
df1 = df1.drop("subreddit_type")
display(df1)

In [65]:
# make a temp dataframe using input column labelled as text
df_text_title = df1.select("title")
df_text_title = df_text_title.withColumn("translated text", lation("title"))
df_text_title1 = df_text_title.select("translated text")
df_text_title1 = df_text_title1.withColumn("text", clean("translated text"))
df_text_title2 = df_text_title1.select("text")
# display(df_text_title2)

In [66]:
# #initialize NER pipeline. It takes col with name text
pipeline = PretrainedPipeline("analyze_sentiment", lang="en")

# Transform 'data' and store output in a new 'annotations_df' dataframe
df_transformed = pipeline.transform(df_text_title2)

df_new = df_transformed.select("sentiment.result", "text")
display(df_new)

In [67]:
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
#join temperorary dataframe to original dataframe
df_new = df_new.withColumn("id_1", monotonically_increasing_id())
df_new.createOrReplaceTempView('df_temp1')
df_new = spark.sql('select *, row_number() over (order by "idx") as index1 from df_temp1').drop('id_1')
df_fu = df1.join(df_new, df1.index == df_new.index1).orderBy(df1.index).drop("index1", "title")
# display(df_fu)

In [68]:
# cache 
df_fu.cache()

In [69]:
df_fu = df_fu.withColumn("length_title", f.length("text"))
df_fu = df_fu.withColumn('titlewordCount', f.size(f.split(f.col('text'), ' '))) #https://stackoverflow.com/questions/48927271/count-number-of-words-in-a-spark-dataframe
df_fu = df_fu.withColumn('url length', f.length(f.split(f.col('url'), '//').getItem(1)))

df_fu = df_fu.withColumn('has_exclamation_mark', f.when(f.col("text").endswith('!') == True , 1).otherwise(0))
df_fu = df_fu.withColumn('has_question_mark', f.when(f.col("text").endswith('?') == True , 1).otherwise(0))

df_fu = df_fu.withColumn("last", last_udf('domain'))
df_fu = df_fu.withColumn("protocol", protocol_udf('url')).drop("url", "text")

df_fu = df_fu.join(dflast, df_fu.last == dflast.last_domain).drop('last_domain')
df_fu = df_fu.fillna({"average da": 1})

# display(df_fu)

In [70]:
df_fu = df_fu.withColumn("result1", df_fu["result"].getItem(1)).withColumn("result", df_fu["result"].getItem(0))

df_fu = df_fu.fillna({"result":"none"})
indexer = StringIndexer(inputCol="result", outputCol="resultIndex")
df_fu = indexer.fit(df_fu).transform(df_fu)

df_fu = df_fu.fillna({"result1":"none"})

pipeline = Pipeline(stages=[indexer1, indexer2, indexer3])
df_fu = pipeline.fit(df_fu).transform(df_fu)

df_fu = df_fu.drop("result", "last", "protocol", "result1")

# display(df_fu)

In [71]:
df_test = df_fu.select(df_fu.columns[:])

df_test = df_test.fillna({"lastIndex":100})

pipeline1 = Pipeline(stages=[ohe, ohe1, ohe2, ohe3 ,ohe4, ohe5])
df_test = pipeline1.fit(df_test).transform(df_test)

df_test = df_test.drop("whitelist_statusIndex", "subreddit_typeIndex", "resultIndex", "protocolIndex", "result1Index", "lastIndex")

display(df_test)