**Import Libraries**

In [2]:
from pyspark.sql.functions import isnull, when, count, col,countDistinct,regexp_replace,lower,monotonically_increasing_id, hour, date_format, dayofweek, length, size, split, avg
from pyspark.sql.window import Window

from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.ml.feature import Tokenizer, RegexTokenizer
from pyspark.ml.feature import Word2Vec
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, OneHotEncoderEstimator, StringIndexer, VectorAssembler
from pyspark.sql.types import IntegerType,StringType,DoubleType,LongType,DecimalType,FloatType,ArrayType, TimestampType
from pyspark.ml.feature import StopWordsRemover
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator

from math import cos, sin, pi

import sparknlp
from sparknlp.pretrained import PretrainedPipeline
!pip install googletrans
from googletrans import Translator

**Read in Files - note media_embed and secure_media_embed do not load for some reason but they are both null (looking at the CSV file)**

In [4]:
# read in JSON data - train
df = spark.read.json("/FileStore/tables/RS_v2_2006_03")

# read in JSON data - test
df_test = spark.read.json("/FileStore/tables/RS_v2_2006_04")

# read in JSON data - oot
df_oot = spark.read.json("/FileStore/tables/RS_v2_2006_05")

**Add Index (monotically increasing ID)**

In [6]:
# do for train dataset
df = df.withColumn("idx", monotonically_increasing_id())
# then since the id is increasing but not consecutive, it means you can sort by it, so you can use the `row_number`
df.createOrReplaceTempView('df_temp')
df = spark.sql('select *, row_number() over (order by "idx") as index from df_temp')

# do for test dataset
df_test = df_test.withColumn("idx", monotonically_increasing_id())
# then since the id is increasing but not consecutive, it means you can sort by it, so you can use the `row_number`
df_test.createOrReplaceTempView('df_temp')
df_test = spark.sql('select *, row_number() over (order by "idx") as index from df_temp')

# do for oot dataset
df_oot = df_oot.withColumn("idx", monotonically_increasing_id())
# then since the id is increasing but not consecutive, it means you can sort by it, so you can use the `row_number`
df_oot.createOrReplaceTempView('df_temp')
df_oot = spark.sql('select *, row_number() over (order by "idx") as index from df_temp')

**Prep data filling null values with NaN (Simon) copy of dataframe is made**

In [8]:
def prep_data (df):#select columns that will be used
  select_list=['index', 'idx',
  'author',
  'author_flair_text_color',
  'author_flair_type',
  'brand_safe',
  'created_utc',
  'domain',
  'is_crosspostable',
  'no_follow',
  'num_comments',
  'over_18',
  'parent_whitelist_status',
  'permalink',
  'score',
  'subreddit',
  'subreddit_type',
  'title',
  'url',
  'whitelist_status']
  df2=df.select(*select_list).withColumnRenamed('score','label') # change target variable name to label
  df2=df2.fillna('NaN') #fill null value with NaN
  return df2

train=prep_data(df)
test=prep_data(df_test)
oot = prep_data(df_oot)

**Clean text columns that need to be vectorized (Simon)**

In [10]:
def clean_text(c): #clean text columns that need to be vectorized (tfidf)
  c = lower(c)
  #c = regexp_replace(c, "\\s+","")#remove all white space 
  c = regexp_replace(c, "[^a-zA-Z0-9]"," ")#remove all non-alphanumeric characters
  c = regexp_replace(c, 'www|com|http'," ")# 
  c = regexp_replace(c, "\d+", "")#remove digital numbers
  return c

# Clean Text
train = train.withColumn('domain',clean_text(col("domain")))\
         .withColumn('permalink',clean_text(col("permalink")))\
         .withColumn('subreddit',clean_text(col("subreddit")))\
         .withColumn('title',clean_text(col("title")))\
         .withColumn('url',clean_text(col("url")))
test = test.withColumn('domain',clean_text(col("domain")))\
         .withColumn('permalink',clean_text(col("permalink")))\
         .withColumn('subreddit',clean_text(col("subreddit")))\
         .withColumn('title',clean_text(col("title")))\
         .withColumn('url',clean_text(col("url")))
oot = oot.withColumn('domain',clean_text(col("domain")))\
         .withColumn('permalink',clean_text(col("permalink")))\
         .withColumn('subreddit',clean_text(col("subreddit")))\
         .withColumn('title',clean_text(col("title")))\
         .withColumn('url',clean_text(col("url")))

**Pipeline for TF-IDF (Simon)**

In [12]:
def pipeline (df):
  cat_features= [t[0] for t in df.dtypes if t[1] == 'string'] #select all string feature
  cat_features= [c for c in cat_features if c not in text_features] #remove text feature so only have catigorical feature left
  boolean_features=[t[0] for t in df.dtypes if t[1] == 'boolean'] #select all boolean features 
  for col in boolean_features:  #convert boolean to int : for FvsT ->0vs1
      df=df.withColumn(col,df[col].cast(IntegerType()))
      numeric_features=[t[0] for t in df.dtypes if t[1].startswith(('int', 'bigint'))] #select all numerical feature 
  stages=[]
  for categoricalCol in cat_features:#apply onehot encoder to all cat features- binary matrix 
      stringIndexer = StringIndexer(inputCol = categoricalCol, outputCol = categoricalCol + 'Index')
      encoder = OneHotEncoder(inputCol=categoricalCol + 'Index', outputCol=categoricalCol + "classVec")
      stages += [stringIndexer,encoder]
  
  tfidf_features=[]#apply tfidf to all text columns
  for textCol in text_features:   
      #print("1",textCol)
      tokenizer = Tokenizer(inputCol=textCol, outputCol=textCol+"token") 
      remover = StopWordsRemover(inputCol=tokenizer.getOutputCol(), outputCol=textCol+"words")
      #print("2",textCol)
      # Count the words in a document
      hashingTF = HashingTF(inputCol=remover.getOutputCol(), outputCol=textCol+"rawFeatures")
      #print(hashingTF.getOutputCol())
      # Build the idf model and transform the original token frequencies into their tf-idf counterparts
      idf = IDF(inputCol=hashingTF.getOutputCol(), outputCol=textCol+"tfidf")
      #print("4",textCol)
      tfidf_features.append(textCol+"tfidf")
      stages += [tokenizer,remover,hashingTF,idf]
  #print(tfidf_features)
      
  #assemblerInputs=[c + 'classVec' for c in cat_features ] + numeric_features +tfidf_features
  assemblerInputs=tfidf_features
  #assemblerInputs.remove('label')
  #print(assemblerInputs)
  assembler=VectorAssembler(inputCols=assemblerInputs, outputCol="features")
  #print(assembler)
  stages+=[assembler]
  pipeline2 = Pipeline(stages = stages)
  #print (type(stages[13]))
  pip_model = pipeline2.fit(df)
  df_input = pip_model.transform(df)
  return df_input

text_features=['domain','permalink','subreddit','title','url'] 
train_input=pipeline(train)
test_input=pipeline(test)
oot_input=pipeline(oot)

**Select Simon's rows only (separate DF) - convert is_crosspostable from boolean to integer, tf-idf for domain**

In [14]:
#train
df_simon_train = train_input.withColumn("is_crosspostable_encoded",train_input.is_crosspostable)
df_simon_train = df_simon_train.select("idx","index","is_crosspostable_encoded","domaintfidf")
df_simon_train.show(4,False)

#test
df_simon_test = test_input.withColumn("is_crosspostable_encoded",test_input.is_crosspostable)
df_simon_test = df_simon_test.select("idx","index","is_crosspostable_encoded","domaintfidf")
df_simon_test.show(4,False)

#oot
df_simon_oot = oot_input.withColumn("is_crosspostable_encoded",oot_input.is_crosspostable)
df_simon_oot = df_simon_oot.select("idx","index","is_crosspostable_encoded","domaintfidf")
df_simon_oot.show(4,False)

In [15]:
# add these two columns to the main dataframe
#train
df = (df.alias('df').join(df_simon_train.alias('df_simon_train'),
                         on = df['index'] == df_simon_train['index'],
                         how = 'left')
     .select('df.*','is_crosspostable_encoded','domaintfidf')
     )

#test
df_test = (df_test.alias('df_test').join(df_simon_test.alias('df_simon_test'),
                         on = df_test['index'] == df_simon_test['index'],
                         how = 'left')
     .select('df_test.*','is_crosspostable_encoded','domaintfidf')
     )

#oot
df_oot = (df_oot.alias('df_oot').join(df_simon_oot.alias('df_simon_oot'),
                         on = df_oot['index'] == df_simon_oot['index'],
                         how = 'left')
     .select('df_oot.*','is_crosspostable_encoded','domaintfidf')
     )

In [16]:
display (df_simon_test)

**Transformations - add timestamp field**

In [18]:
# Convert created_utc field, which is epoch time to date time YYYY-MM-DD HH:SS format

# do for train dataset
df = df.withColumn("timestamp", df["created_utc"].cast(TimestampType()))
display(df)

# do for test dataset
df_test = df_test.withColumn("timestamp", df_test["created_utc"].cast(TimestampType()))

# do for oot dataset
df_oot = df_oot.withColumn("timestamp", df_oot["created_utc"].cast(TimestampType()))

**Register tables for SQL Queries**

In [20]:
#register df as temptable for sql queries
df.registerTempTable('reddit_posts')
#register df as temptable for sql queries
df_test.registerTempTable('reddit_posts_test')
#register df as temptable for sql queries
df_oot.registerTempTable('reddit_posts_oot')

**Add Features**

**total_subreddit_posts** - If there are a lot of posts in that subreddit during the month, that is indicative that the subreddit has an active community, which may mean higher potential of the subreddit to attain high scores. The converse may be true as well.

In [23]:
# do for train dataset
df = sqlContext.sql("SELECT *, count(index) OVER (PARTITION BY subreddit) total_subreddit_posts FROM reddit_posts ")
df.registerTempTable("reddit_posts")

# do for test dataset
df_test = sqlContext.sql("""
SELECT *, count(index) OVER (PARTITION BY subreddit) total_subreddit_posts
FROM reddit_posts_test
""")
df_test.registerTempTable("reddit_posts_test")

# do for oot dataset
df_oot = sqlContext.sql("""
SELECT *, count(index) OVER (PARTITION BY subreddit) total_subreddit_posts
FROM reddit_posts_oot
""")
df_oot.registerTempTable("reddit_posts_oot")

**subreddit_hotness** - Perhaps if there are a lot of posts in the subreddit in the past week, that could be indicative that the subreddit is currently hot and may have higher views and inturn perhaps higher scores.

In [25]:
# do for train dataset
df = sqlContext.sql("""
SELECT *, 
count(score) OVER (
        PARTITION BY subreddit 
        ORDER BY timestamp 
        RANGE BETWEEN INTERVAL 7 DAYS PRECEDING AND CURRENT ROW
         ) AS subreddit_hotness

FROM reddit_posts
""")

df.registerTempTable("reddit_posts")

df_test = sqlContext.sql("""
SELECT *, 
count(score) OVER (
        PARTITION BY subreddit 
        ORDER BY timestamp 
        RANGE BETWEEN INTERVAL 7 DAYS PRECEDING AND CURRENT ROW
         ) AS subreddit_hotness

FROM reddit_posts_test
""")

df_test.registerTempTable("reddit_posts_test")

df_oot = sqlContext.sql("""
SELECT *, 
count(score) OVER (
        PARTITION BY subreddit 
        ORDER BY timestamp 
        RANGE BETWEEN INTERVAL 7 DAYS PRECEDING AND CURRENT ROW
         ) AS subreddit_hotness

FROM reddit_posts_oot
""")

df_oot.registerTempTable("reddit_posts_oot")

**subreddit_author_count** - Perhaps if there are a lot of contributing authors in the subreddit, that could be indicative that the subreddit is popular and may have higher views and inturn perhaps higher scores.

In [27]:
#create a count column of authors
gr = df \
        .groupBy("subreddit", "author").count().groupBy("subreddit").count() \
        .withColumnRenamed("subreddit", "subreddit_gr") \
        .withColumnRenamed("count", "subreddit_author_count")

#join using subreddit as id
df = df.join(gr, df['subreddit']==gr['subreddit_gr']).drop("subreddit_gr")
df.registerTempTable("reddit_posts")

#create a count column of authors
gr = df_test \
        .groupBy("subreddit", "author").count().groupBy("subreddit").count() \
        .withColumnRenamed("subreddit", "subreddit_gr") \
        .withColumnRenamed("count", "subreddit_author_count")

#join using subreddit as id
df_test = df_test.join(gr, df_test['subreddit']==gr['subreddit_gr']).drop("subreddit_gr")
df_test.registerTempTable("reddit_posts_test")

#create a count column of authors
gr = df_oot \
        .groupBy("subreddit", "author").count().groupBy("subreddit").count() \
        .withColumnRenamed("subreddit", "subreddit_gr") \
        .withColumnRenamed("count", "subreddit_author_count")

#join using subreddit as id
df_oot = df_oot.join(gr, df_oot['subreddit']==gr['subreddit_gr']).drop("subreddit_gr")
df_oot.registerTempTable("reddit_posts_oot")

**total_author_score_ntile** - if an author has scored a lot of upvotes in the past, it is possible that the author is influential in the community, and may likely garner more upvotes in the future. In this, we first calculate the  cumulative score obtained for each author. Then, we use NTILE function to divide the scores into 10 groups. We will later onehotencode each cateogory. We note that author with the highest score is when author= [deleted]. The underlying assumption in our model for these posts is that when the author is [deleted], this is meaningful in terms of the overall score of the post.  **data leakage-didn't use**

In [29]:
df_1 = sqlContext.sql("""

SELECT index, author, score, total_author_score, NTILE(10) over (order by total_author_score DESC) total_author_score_ntile
FROM
(
    SELECT index, author, score, sum(score) OVER (PARTITION BY author) total_author_score
    FROM reddit_posts
)
ORDER BY total_author_score DESC
""")
df_1.registerTempTable("reddit_posts_author")

In [30]:
df = spark.sql("""
SELECT * FROM 
  (
  SELECT index as author_index, total_author_score_ntile 
  FROM reddit_posts_author
  ) t
JOIN reddit_posts ON t.author_index = reddit_posts.index
ORDER BY index ASC
""")
df.registerTempTable("reddit_posts")

In [31]:
# do for test
df_1_test = sqlContext.sql("""
SELECT index, author, score, total_author_score, NTILE(10) over (order by total_author_score DESC) total_author_score_ntile
FROM
(
    SELECT index, author, score, sum(score) OVER (PARTITION BY author) total_author_score
    FROM reddit_posts_test
)
ORDER BY total_author_score DESC
""")
df_1_test.registerTempTable("reddit_posts_author_test")

In [32]:
# do for test
df_test = spark.sql("""
SELECT * FROM 
  (
  SELECT index as author_index, total_author_score_ntile 
  FROM reddit_posts_author_test
  ) t
JOIN reddit_posts_test ON t.author_index = reddit_posts_test.index
ORDER BY index ASC
""")

df_test.registerTempTable("reddit_posts_test")

In [33]:
# do for oot

df_1_oot = sqlContext.sql("""
SELECT index, author, score, total_author_score, NTILE(10) over (order by total_author_score DESC) total_author_score_ntile
FROM
(
    SELECT index, author, score, sum(score) OVER (PARTITION BY author) total_author_score
    FROM reddit_posts_oot
)
ORDER BY total_author_score DESC
""")
df_1_oot.registerTempTable("reddit_posts_author_oot")

df_oot = spark.sql("""
SELECT * FROM 
  (
  SELECT index as author_index, total_author_score_ntile 
  FROM reddit_posts_author_oot
  ) t
JOIN reddit_posts_oot ON t.author_index = reddit_posts_oot.index
ORDER BY index ASC
""")

df_oot.registerTempTable("reddit_posts_oot")

**avg_author_score_ntile** - this is similar to total_author_score_ntile, except we take the average score of the author instead of the total score  **data leakage-didn't use**

In [35]:
df_1 = sqlContext.sql("""
SELECT index, author, score, avg_author_score, NTILE(10) over (order by avg_author_score DESC) avg_author_score_ntile
FROM
(
    SELECT index, author, score, AVG(score) OVER (PARTITION BY author) avg_author_score
    FROM reddit_posts
)
ORDER BY avg_author_score DESC
""")
df_1.registerTempTable("reddit_posts_author")

In [36]:
#edited to distinguish author_index from previous one
df = spark.sql("""
SELECT * FROM 
  (
  SELECT index as author_index2, avg_author_score_ntile 
  FROM reddit_posts_author
  ) t
JOIN reddit_posts ON t.author_index2 = reddit_posts.index
ORDER BY index ASC
""")
df.registerTempTable("reddit_posts")

In [37]:
# do for test
df_1_test = sqlContext.sql("""
SELECT index, author, score, avg_author_score, NTILE(10) over (order by avg_author_score DESC) avg_author_score_ntile
FROM
(
    SELECT index, author, score, AVG(score) OVER (PARTITION BY author) avg_author_score
    FROM reddit_posts_test
)
ORDER BY avg_author_score DESC
""")
df_1_test.registerTempTable("reddit_posts_author_test")

In [38]:
df_test = spark.sql("""
SELECT * FROM 
  (
  SELECT index as author_index2, avg_author_score_ntile 
  FROM reddit_posts_author_test
  ) t
JOIN reddit_posts_test ON t.author_index2 = reddit_posts_test.index
ORDER BY index ASC
""")

df_test.registerTempTable("reddit_posts_test")

In [39]:
# do for oot
df_1_oot = sqlContext.sql("""
SELECT index, author, score, avg_author_score, NTILE(10) over (order by avg_author_score DESC) avg_author_score_ntile
FROM
(
    SELECT index, author, score, AVG(score) OVER (PARTITION BY author) avg_author_score
    FROM reddit_posts_oot
)
ORDER BY avg_author_score DESC
""")
df_1_oot.registerTempTable("reddit_posts_author_oot")

df_oot = spark.sql("""
SELECT * FROM 
  (
  SELECT index as author_index2, avg_author_score_ntile 
  FROM reddit_posts_author_oot
  ) t
JOIN reddit_posts_oot ON t.author_index2 = reddit_posts_oot.index
ORDER BY index ASC
""")

df_oot.registerTempTable("reddit_posts_oot")

**Encode Columns**

**binned_score** - this is similar to total_author_score_ntile, except we take the average score of the author instead of the total score

In [42]:
df = spark.sql("""
           SELECT *, (CASE 
           WHEN score < 20 THEN 1 
           WHEN score >= 20 and score  < 100 THEN 2
           ELSE 3
           END) binned_score
           FROM reddit_posts
""")
df.registerTempTable("reddit_posts")

In [43]:
df_test = spark.sql("""
           SELECT *, (CASE 
           WHEN score < 20 THEN 1 
           WHEN score >= 20 and score  < 100 THEN 2
           ELSE 3
           END) binned_score
           FROM reddit_posts_test
""")

df_test.registerTempTable("reddit_posts_test")

In [44]:
df_oot = spark.sql("""
           SELECT *, (CASE 
           WHEN score < 20 THEN 1 
           WHEN score >= 20 and score  < 100 THEN 2
           ELSE 3
           END) binned_score
           FROM reddit_posts_oot
""")

df_oot.registerTempTable("reddit_posts_oot")

**Encode author_cakeday, author_flair_text_color, brand_safe**

In [46]:
#encode the author_cakeday column
df = spark.sql("""
           SELECT *, (CASE 
           WHEN author_cakeday = "true" THEN 1
           ELSE 0
           END) author_cakeday_encoded
           FROM reddit_posts
""")
df = df.drop("author_cakeday")
df.registerTempTable("reddit_posts")

#encode the author_cakeday column in test
df_test = spark.sql("""
           SELECT *, (CASE 
           WHEN author_cakeday = "true" THEN 1
           ELSE 0
           END) author_cakeday_encoded
           FROM reddit_posts_test
""")
df_test = df_test.drop("author_cakeday")
df_test.registerTempTable("reddit_posts_test")

#encode the author_cakeday column in oot
df_oot = spark.sql("""
           SELECT *, (CASE 
           WHEN author_cakeday = "true" THEN 1
           ELSE 0
           END) author_cakeday_encoded
           FROM reddit_posts_oot
""")
df_oot = df_oot.drop("author_cakeday")
df_oot.registerTempTable("reddit_posts_oot")

In [47]:
# author_flair_text_color - train
df = spark.sql("""
           SELECT *, (CASE 
           WHEN author_flair_text_color = "dark" THEN 1
           ELSE 0
           END) author_flair_text_color_encoded
           FROM reddit_posts
""")
df = df.drop("author_flair_text_color")
df.registerTempTable("reddit_posts")

# test
df_test = spark.sql("""
           SELECT *, (CASE 
           WHEN author_flair_text_color = "dark" THEN 1
           ELSE 0
           END) author_flair_text_color_encoded
           FROM reddit_posts_test
""")
df_test = df_test.drop("author_flair_text_color")
df_test.registerTempTable("reddit_posts_test")

# oot
df_oot = spark.sql("""
           SELECT *, (CASE 
           WHEN author_flair_text_color = "dark" THEN 1
           ELSE 0
           END) author_flair_text_color_encoded
           FROM reddit_posts_oot
""")
df_oot = df_oot.drop("author_flair_text_color")
df_oot.registerTempTable("reddit_posts_oot")

In [48]:
# brand_safe
df = spark.sql("""
           SELECT *, (CASE 
           WHEN brand_safe = "true" THEN 1
           ELSE 0
           END) brand_safe_encoded
           FROM reddit_posts
""")
df = df.drop("brand_safe")
df.registerTempTable("reddit_posts")

In [49]:
df_test = spark.sql("""
           SELECT *, (CASE 
           WHEN brand_safe = "true" THEN 1
           ELSE 0
           END) brand_safe_encoded
           FROM reddit_posts_test
""")
df_test = df_test.drop("brand_safe")
df_test.registerTempTable("reddit_posts_test")

In [50]:
df_oot = spark.sql("""
           SELECT *, (CASE 
           WHEN brand_safe = "true" THEN 1
           ELSE 0
           END) brand_safe_encoded
           FROM reddit_posts_oot
""")
df_oot = df_oot.drop("brand_safe")
df_oot.registerTempTable("reddit_posts_oot")

**Encoding (Abhishek)** - no_follow, over_18

In [52]:
t_cols=['no_follow','over_18']
for c in t_cols:
  df=df.withColumn(c + "_enc",when(col(c)==True,1).when(col(c)==False,0))
  df_test=df_test.withColumn(c + "_enc",when(col(c)==True,1).when(col(c)==False,0))
  df_oot=df_oot.withColumn(c + "_enc",when(col(c)==True,1).when(col(c)==False,0))
display(df_test)

**Subreddit Binning (Chris)** - those with counts < 30 put under "other"

In [54]:
# use Sahil's column that has total posts by subreddit

def subreddit_binning (subreddit, value):
  if value < 30:
    return 'other'
  else:
    return subreddit

subreddit_udf = udf(subreddit_binning,StringType())
df = df.withColumn("subreddit_rev",subreddit_udf('subreddit','total_subreddit_posts'))
#df.sort('total_subreddit_posts',ascending = True).show(10)

df_test = df_test.withColumn("subreddit_rev",subreddit_udf('subreddit','total_subreddit_posts'))

df_oot = df_oot.withColumn("subreddit_rev",subreddit_udf('subreddit','total_subreddit_posts'))

**Encode Creation Day of Week and Hour (Chris) as cyclical ordinal value**

In [56]:
#udf for sine and cosine function
def time_conversion_sin (hour):
  hour_sin = sin(2*pi*hour / 23)
  return hour_sin
  
time_conv_sin_udf = udf(time_conversion_sin,FloatType())  

def time_conversion_cos (hour):
  hour_cos = cos(2*pi*hour / 23)
  return hour_cos
  
time_conv_cos_udf = udf (time_conversion_cos, FloatType())

In [57]:
# add creation hour and Day of Week to dataframe
df = df.withColumn("create_hour", hour(col("timestamp"))).withColumn("DOW",date_format(col("timestamp"),"E")).withColumn("DOW_value",dayofweek(col("timestamp"))).withColumn("created_hour_sin",time_conv_sin_udf(col("create_hour"))).withColumn("created_hour_cos",time_conv_cos_udf(col("create_hour")))

# do same thing for test
df_test = df_test.withColumn("create_hour", hour(col("timestamp"))).withColumn("DOW",date_format(col("timestamp"),"E")).withColumn("DOW_value",dayofweek(col("timestamp"))).withColumn("created_hour_sin",time_conv_sin_udf(col("create_hour"))).withColumn("created_hour_cos",time_conv_cos_udf(col("create_hour")))

# do same thing for oot
df_oot = df_oot.withColumn("create_hour", hour(col("timestamp"))).withColumn("DOW",date_format(col("timestamp"),"E")).withColumn("DOW_value",dayofweek(col("timestamp"))).withColumn("created_hour_sin",time_conv_sin_udf(col("create_hour"))).withColumn("created_hour_cos",time_conv_cos_udf(col("create_hour")))

**Impute null values for whitelist_status & create index values (Siddhant)**

In [59]:
# train - consolidate with other StringIndexers
df = df.fillna({"whitelist_status":0})
#indexer = StringIndexer(inputCol="whitelist_status", outputCol="whitelist_statusIndex")
#df = indexer.fit(df).transform(df)
#df = df.drop("whitelist_status")

#test
df_test = df_test.fillna({"whitelist_status":0})

#oot
df_oot = df_oot.fillna({"whitelist_status":0})

**Text Translation and Cleansing for Title**

In [61]:
def trans(x):
  translator=Translator()
  return translator.translate(x, dest="en").text

lation = udf(trans)
spark.udf.register("lation", lation)

def clean_str(x):
  punc = ''';:,.|'''
  for ch in x:
    if ch in punc:
      x = x.replace(ch, '')
  return x
clean = udf(clean_str)
spark.udf.register("clean", clean)

# make a temp dataframe using input column labelled as text
df_text_title = df.select("title")
df_text_title = df_text_title.withColumn("translated text", lation("title"))
df_text_title1 = df_text_title.select("translated text")
df_text_title1 = df_text_title1.withColumn("text", clean("translated text"))
df_text_title2 = df_text_title1.select("text")
display(df_text_title2)

In [62]:
# testing
df_test_text_title = df_test.select("title")
df_test_text_title = df_test_text_title.withColumn("translated text", lation("title"))
df_test_text_title1 = df_test_text_title.select("translated text")
df_test_text_title1 = df_test_text_title1.withColumn("text", clean("translated text"))
df_test_text_title2 = df_test_text_title1.select("text")
display(df_test_text_title2.take(2))

In [63]:
# oot
df_oot_text_title = df_oot.select("title")
df_oot_text_title = df_oot_text_title.withColumn("translated text", lation("title"))
df_oot_text_title1 = df_oot_text_title.select("translated text")
df_oot_text_title1 = df_oot_text_title1.withColumn("text", clean("translated text"))
df_oot_text_title2 = df_oot_text_title1.select("text")
display(df_oot_text_title2.take(2))

**Sentiment Analysis**

In [65]:
pipeline = PretrainedPipeline("analyze_sentiment", lang="en")

# Transform 'data' and store output in a new 'annotations_df' dataframe
df_transformed = pipeline.transform(df_text_title2)

df_new = df_transformed.select("sentiment.result", "text")
display(df_new)

In [66]:
# test dataset
pipeline = PretrainedPipeline("analyze_sentiment", lang="en")

# Transform 'data' and store output in a new 'annotations_df' dataframe
df_transformed_test = pipeline.transform(df_test_text_title2)

df_new_test = df_transformed_test.select("sentiment.result", "text")
display(df_new_test)

In [67]:
# oot dataset
pipeline = PretrainedPipeline("analyze_sentiment", lang="en")

# Transform 'data' and store output in a new 'annotations_df' dataframe
df_transformed_oot = pipeline.transform(df_oot_text_title2)

df_new_oot = df_transformed_oot.select("sentiment.result", "text")
display(df_new_oot)

In [68]:
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
#join temperorary dataframe to original dataframe
df_new = df_new.withColumn("id_1", monotonically_increasing_id())
df_new.createOrReplaceTempView('df_temp1')
df_new = spark.sql('select *, row_number() over (order by "idx") as index1 from df_temp1').drop('id_1')
df_full = df.join(df_new, df.index == df_new.index1).orderBy(df.index).drop("index1", "title")
#display(df_full)

In [69]:
#cache statement, do all feature engineering transformations, then perform the display action in the end
df_full.cache()

In [70]:
# test dataset
#join temperorary dataframe to original dataframe
df_new_test = df_new_test.withColumn("id_1", monotonically_increasing_id())
df_new_test.createOrReplaceTempView('df_temp1')
df_new_test = spark.sql('select *, row_number() over (order by "idx") as index1 from df_temp1').drop('id_1')

df_full_test = (df_test.alias('df_test').join(df_new_test.alias('df_new_test'),
                         on = df_test['index'] == df_new_test['index1'],
                         how = 'left')
     .select('df_test.*','result','text')
     )

# cache
df_full_test.cache()

In [71]:
# oot dataset
#join temperorary dataframe to original dataframe
df_new_oot = df_new_oot.withColumn("id_1", monotonically_increasing_id())
df_new_oot.createOrReplaceTempView('df_temp1')
df_new_oot = spark.sql('select *, row_number() over (order by "idx") as index1 from df_temp1').drop('id_1')

df_full_oot = (df_oot.alias('df_oot').join(df_new_oot.alias('df_new_oot'),
                         on = df_oot['index'] == df_new_oot['index1'],
                         how = 'left')
     .select('df_oot.*','result','text')
     )

# cache
df_full_oot.cache()

**<span style="color:red">If Databricks stops working or you receive errors, use the backup dataframes, but reverse the command (df_full = df_full_bk), etc. and run from this point onwards </span>**

In [73]:
#create backups
df_full_bk = df_full
df_full_test_bk = df_full_test
df_full_oot_bk = df_full_oot

In [74]:
df_full = df_full.withColumn("length_title", length("text"))
df_full = df_full.withColumn('titlewordCount', size(split(col('text'), ' '))) #https://stackoverflow.com/questions/48927271/count-number-of-words-in-a-spark-dataframe
df_full = df_full.withColumn('url length', length(split(col('url'), '//').getItem(1)))
#display(df_full)

In [75]:
df_full_test = df_full_test.withColumn("length_title", length("text"))
df_full_test = df_full_test.withColumn('titlewordCount', size(split(col('text'), ' '))) #https://stackoverflow.com/questions/48927271/count-number-of-words-in-a-spark-dataframe
df_full_test = df_full_test.withColumn('url length', length(split(col('url'), '//').getItem(1)))

In [76]:
df_full_oot = df_full_oot.withColumn("length_title", length("text"))
df_full_oot = df_full_oot.withColumn('titlewordCount', size(split(col('text'), ' '))) #https://stackoverflow.com/questions/48927271/count-number-of-words-in-a-spark-dataframe
df_full_oot = df_full_oot.withColumn('url length', length(split(col('url'), '//').getItem(1)))

**Add flags if title has exclamation or question mark**

In [78]:
#new features

#has exclamation mark in the end?
df_full = df_full.withColumn( \
          'has_exclamation_mark', when(col("text").endswith('!') == True , 1) \
          .otherwise(0))

#has question mark in the end?
df_full = df_full.withColumn( \
          'has_question_mark', when(col("text").endswith('?') == True , 1) \
          .otherwise(0))

In [79]:
# test data
df_full_test = df_full_test.withColumn('has_exclamation_mark', when(col("text").endswith('!') == True , 1).otherwise(0))
df_full_test = df_full_test.withColumn('has_question_mark', when(col("text").endswith('?') == True , 1).otherwise(0))

In [80]:
# oot data
df_full_oot = df_full_oot.withColumn('has_exclamation_mark', when(col("text").endswith('!') == True , 1).otherwise(0))
df_full_oot = df_full_oot.withColumn('has_question_mark', when(col("text").endswith('?') == True , 1).otherwise(0))

In [81]:
def last(x):
  return str(x).split('.')[-1]
# String type output

last_udf = udf(last, StringType())
spark.udf.register("last_udf", last_udf)
df_full = df_full.withColumn("last", last_udf('domain'))
df_full_test = df_full_test.withColumn("last", last_udf('domain'))
df_full_oot = df_full_oot.withColumn("last", last_udf('domain'))
#display(df_full)

In [82]:
def protocol(x):
  return str(x).split(":")[0]

# String type output
protocol_udf = udf(protocol, StringType())
spark.udf.register("protocol_udf", protocol_udf)
df_full = df_full.withColumn("protocol", protocol_udf('url'))
df_full_test = df_full_test.withColumn("protocol", protocol_udf('url'))
df_full_oot = df_full_oot.withColumn("protocol", protocol_udf('url'))
#df_full = df_full.drop("subreddit_type", "url", "whitelist_status", "text")
#display(df_full)

**Add Top 500 websites**

In [84]:
# File location and type
file_location = "/FileStore/tables/top500Domains.csv"
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df500 = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

# display(df500)

In [85]:
df500 = df500.withColumn("last_domain", last_udf('Root Domain'))
windowSpec = Window.partitionBy(df500['last_domain'])
df500 = df500.withColumn("average da", avg(df500['Domain Authority']).over(windowSpec)).orderBy('Rank')
# display(df500)

In [86]:
dflast = df500.select('last_domain', 'average da').distinct()
# display(dflast)

In [87]:
df_full = df_full.join(dflast, df_full.last == dflast.last_domain).drop('last_domain')
df_full = df_full.fillna({"average da": 1})
df_full_test = df_full_test.join(dflast, df_full_test.last == dflast.last_domain).drop('last_domain')
df_full_test = df_full_test.fillna({"average da": 1})
df_full_oot = df_full_oot.join(dflast, df_full_oot.last == dflast.last_domain).drop('last_domain')
df_full_oot = df_full_oot.fillna({"average da": 1})
#display(df_full)

In [88]:
# create backup again
df_full_bk = df_full
df_full_test_bk = df_full_test
df_full_oot_bk = df_full_oot

In [89]:
df_full = df_full.withColumn("result1", df_full["result"].getItem(1)).withColumn("result", df_full["result"].getItem(0))

df_full = df_full.fillna({"result":"none"})
indexer = StringIndexer(inputCol="result", outputCol="resultIndex")
df_full = indexer.fit(df_full).transform(df_full)

indexer1 = StringIndexer(inputCol="last", outputCol="lastIndex")
df_full = indexer1.fit(df_full).transform(df_full)

indexer2 = StringIndexer(inputCol="protocol", outputCol="protocolIndex")
df_full = indexer2.fit(df_full).transform(df_full)

df_full = df_full.fillna({"result1":"none"})
indexer3 = StringIndexer(inputCol="result1", outputCol="result1Index")
df_full = indexer3.fit(df_full).transform(df_full)

df_full = df_full.drop("result", "last", "protocol", "result1")

# display(df_full)

In [90]:
df_full_test = df_full_test.withColumn("result1", df_full_test["result"].getItem(1)).withColumn("result", df_full_test["result"].getItem(0))

df_full_test = df_full_test.fillna({"result":"none"})
indexer = StringIndexer(inputCol="result", outputCol="resultIndex")
df_full_test = indexer.fit(df_full_test).transform(df_full_test)

df_full_test = df_full_test.fillna({"result1":"none"})

pipeline = Pipeline(stages=[indexer1, indexer2, indexer3])
df_full_test = pipeline.fit(df_full_test).transform(df_full_test)

df_full_test = df_full_test.drop("result", "last", "protocol", "result1")
# display(df_full_test)

In [91]:
df_full_oot = df_full_oot.withColumn("result1", df_full_oot["result"].getItem(1)).withColumn("result", df_full_oot["result"].getItem(0))

df_full_oot = df_full_oot.fillna({"result":"none"})
indexer = StringIndexer(inputCol="result", outputCol="resultIndex")
df_full_oot = indexer.fit(df_full_oot).transform(df_full_oot)

df_full_oot = df_full_oot.fillna({"result1":"none"})

pipeline = Pipeline(stages=[indexer1, indexer2, indexer3])
df_full_oot = pipeline.fit(df_full_oot).transform(df_full_oot)

df_full_oot = df_full_oot.drop("result", "last", "protocol", "result1")
# display(df_full_test)

In [92]:
df_train = df_full.select(df_full.columns[:])
df_test2 = df_full_test.select(df_full_test.columns[:])
df_oot2 = df_full_oot.select(df_full_oot.columns[:])

**One Hot Encoding (Sahil, Chris, Siddhant)**

In [94]:
#one hot encode the total_author_ntile columns
#ohe = OneHotEncoderEstimator() #changed from OneHotEncoder
#ohe.setInputCols(["total_author_score_ntile"])
#ohe.setOutputCols(["total_author_score_ntile_ohe"])
#df = ohe.fit(df).transform(df)

#one hot encode the avg_author_ntile columns
#ohe = OneHotEncoderEstimator() 
#ohe.setInputCols(["avg_author_score_ntile"])
#ohe.setOutputCols(["avg_author_score_ntile_ohe"])
#df = ohe.fit(df).transform(df)

SI_subreddit = StringIndexer(inputCol="subreddit_rev",outputCol="subreddit_rev_Index")
SI_DOW = StringIndexer(inputCol="DOW",outputCol="DOW_Index")
SI_whitelist_status = StringIndexer(inputCol="whitelist_status", outputCol="whitelist_statusIndex")
SI_subreddit_type = StringIndexer(inputCol="subreddit_type", outputCol="subreddit_typeIndex")

df_train = SI_subreddit.fit(df_train).transform(df_train)
df_train = SI_DOW.fit(df_train).transform(df_train)
df_train = SI_whitelist_status.fit(df_train).transform(df_train)
df_train = SI_subreddit_type.fit(df_train).transform(df_train)

df_train = df_train.fillna({"lastIndex":100})
ohe = OneHotEncoderEstimator()
ohe = OneHotEncoderEstimator(inputCols=["total_author_score_ntile", "avg_author_score_ntile", "subreddit_rev_Index", "DOW_Index", "whitelist_statusIndex", "subreddit_typeIndex", "resultIndex", "lastIndex", "protocolIndex", "result1Index"],outputCols=["total_author_score_ntile_ohe","avg_author_score_ntile_ohe","subreddit_rev_ohe", "DOW_ohe", "whitelist_status_ohe", "subreddit_type_ohe", "result_ohe", "last_ohe", "protocol_ohe", "result1_ohe"])
df_train = ohe.fit(df_train).transform(df_train)

#model = df

#print(model)
#drop the author column
#df_sahil_train = model.drop("author", "total_author_score_ntile", "avg_author_score_ntile")
#display(df_sahil_train.limit(4))

In [95]:
# testing data
pipeline = Pipeline(stages=[SI_subreddit, SI_DOW, SI_whitelist_status, SI_subreddit_type])
df_test2 = pipeline.fit(df_test2).transform(df_test2)

df_test2 = df_test2.fillna({"lastIndex":100})
df_test2 = ohe.fit(df_test2).transform(df_test2)

# oot data
df_oot2 = pipeline.fit(df_oot2).transform(df_oot2)

df_oot2 = df_oot2.fillna({"lastIndex":100})
df_oot2 = ohe.fit(df_oot2).transform(df_oot2)

**Finalize Dataset**

In [97]:
# drop columns - first pass
cols_to_drop = ["archived","author_flair_background_color","author_flair_css_class","author_flair_richtext","author_flair_text","can_gild","contest_mode","distinguished","edited","gilded","hidden","hide_score","id","is_reddit_media_domain","is_self","is_video","link_flair_css_class","link_flair_richtext","link_flair_text","link_flair_text_color","link_flair_type","locked","media","num_crossposts","parent_whitelist_status","permalink","post_hint","preview","retrieved_on","rte_mode","secure_media","selftext","send_replies","spoiler","stickied","subreddit_id","subreddit_name_prefixed","suggested_sort","thumbnail","thumbnail_height","thumbnail_width"]
df_train_final = df_train.drop(*cols_to_drop)
df_test_final = df_test2.drop(*cols_to_drop)
df_oot_final = df_oot2.drop(*cols_to_drop)

In [98]:
cols_to_drop2 = ["author","author_index2","author_index","avg_author_score_ntile","total_author_score_ntile","title","author_flair_type","is_crosspostable","no_follow","over_18","subreddit_type","url","whitelist_status","text"]
df_train_final = df_train_final.drop(*cols_to_drop2)
df_test_final = df_test_final.drop(*cols_to_drop2)
df_oot_final = df_oot_final.drop(*cols_to_drop2)

In [99]:
display(df_train_final)

In [100]:
dbutils.fs.rm("/FileStore/my-stuff/df_train_final.parquet",True)
dbutils.fs.rm("/FileStore/my-stuff/df_test_final.parquet",True)
dbutils.fs.rm("/FileStore/my-stuff/df_oot_final.parquet",True)

In [101]:
df_train_final = df_train_final.withColumnRenamed("url length","url_length").withColumnRenamed("average da","average_da")
df_test_final = df_test_final.withColumnRenamed("url length","url_length").withColumnRenamed("average da","average_da")
df_oot_final = df_oot_final.withColumnRenamed("url length","url_length").withColumnRenamed("average da","average_da")

df_train_final.write.parquet("/FileStore/my-stuff/df_train_final.parquet")
df_test_final.write.parquet("/FileStore/my-stuff/df_test_final.parquet")
df_oot_final.write.parquet("/FileStore/my-stuff/df_oot_final.parquet")