In [None]:
import os
# Overwrite Pyspark driver
os.environ['PYSPARK_PYTHON'] = "./environment/bin/python"

In [2]:
import pandas as pd
import numpy as np
from pyspark.sql import SparkSession
from langdetect import detect
import pyspark as ps
import datetime
from pyspark.sql import functions as f
from pyspark.sql import types as t
from sparknlp.pretrained import PretrainedPipeline
import re
# from transformers import pipeline

In [3]:
# Helper functions
TEXT_CLEANING_RE = "@\S+|https?:\S+|http?:\S|[^A-Za-z0-9]+"  
def cleanText(text):
    return re.sub(TEXT_CLEANING_RE, ' ', str(text).lower()).strip()
# Convert function to UDF
cleanTextDF = f.udf(lambda z: cleanText(z))

# Helper functions
def isEnglish(text):
    try:
        if detect(text) != 'en':
            return "No"
        return "Yes"
    except:
        return "No"
    return "No"
# Convert function to UDF
isEnglishUDF = f.udf(lambda z: isEnglish(z))

# def summarizeFunction(text):
#     summ = summarizer(text, min_length = round(0.1 * len(text.split(' '))), max_length = round(0.2 * len(text.split(' '))), do_sample=False)
#     return summ

# summarizeFunctionUDF = f.udf(lambda z: summarizeFunction(z),StringType())
    

In [4]:
# spark = SparkSession.builder \
#     .appName("Redit Summarization App")\
#     spark.archives", 
#     .master("yarn")\
#     .config("spark.executor.memoryOverhead","2048")\
#     .config("spark.driver.maxResultSize", "0") \
#     .config("spark.kryoserializer.buffer.max", "2000M")\
#     .getOrCreate()
spark = SparkSession.builder.master("yarn").appName("Reddit Summarization App")\
.config("spark.yarn.dist.archives","pyspark_venv.tar.gz#environment")\
.getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
2022-04-29 10:33:48,796 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2022-04-29 10:33:51,703 WARN yarn.Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.


In [6]:
# Get spark configurations
spark

In [41]:
# Read data
df = spark.read.format("csv").option("header","true").load("hdfs://namenode:9000/dis_materials/data_reddit.csv")

In [42]:
# drop duplicates
df2 = df.dropDuplicates()

In [43]:
df.dtypes

[('created_utc', 'string'),
 ('ups', 'string'),
 ('subreddit_id', 'string'),
 ('link_id', 'string'),
 ('name', 'string'),
 ('score_hidden', 'string'),
 ('author_flair_css_class', 'string'),
 ('author_flair_text', 'string'),
 ('subreddit', 'string'),
 ('id', 'string'),
 ('removal_reason', 'string'),
 ('gilded', 'string'),
 ('downs', 'string'),
 ('archived', 'string'),
 ('author', 'string'),
 ('score', 'string'),
 ('retrieved_on', 'string'),
 ('body', 'string'),
 ('distinguished', 'string'),
 ('edited', 'string'),
 ('controversiality', 'string'),
 ('parent_id', 'string')]

In [44]:
df2= df2.withColumn('created_utc', f.from_unixtime('created_utc').cast(t.DateType()))

In [45]:
df2 = df2.withColumn("ups", df2["ups"].cast(t.IntegerType()))

In [46]:
# drop null values
df2 = df2.na.drop(subset=["subreddit","subreddit_id","body","created_utc","ups"])

In [47]:
"""
Remove comments belonging to moderators
"""
df2 = df2.filter((df2.distinguished != "moderator")|(df2.body!="[deleted]"))

In [48]:
"""
Use CleantTextDf to clean body column
"""
df2 = df2.withColumn("clean_body",cleanTextDF(f.col("body")))

In [49]:
df2 = df2.drop("name","author_flair_css_class","author_flair_text","score_hidden","id","distinguished","body","removal_reason","downs","archived","gilded","retrieved_on","edited","controversiality","author","score")

In [50]:
df2 = df2.where(f.length(f.col("parent_id")) <= 12)

In [51]:
df2.show(truncate=False)

[Stage 15:>                                                         (0 + 1) / 1]

+-----------+---+------------+---------+---------------+----------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|created_utc|ups|subreddit_id|link_id  |subreddit      |parent_id |clean_body                                                                                                                                                                                                                                                                  |
+-----------+---+------------+---------+---------------+----------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

                                                                                

In [52]:
df2.columns

['created_utc',
 'ups',
 'subreddit_id',
 'link_id',
 'subreddit',
 'parent_id',
 'clean_body']

In [55]:
df2.write.option("header","true").csv("hdfs://namenode:9000/cleaned_data.csv")

                                                                                

In [65]:
!python3 /home/ubuntu/mapred.py -r spark hdfs://namenode:9000/cleaned_data.csv >english

No configs found; falling back on auto-configuration
No configs specified for spark runner
Looking for hadoop binary in /usr/local/hadoop/bin...
Found hadoop binary: /usr/local/hadoop/bin/hadoop
Looking for spark-submit binary in /usr/local/spark/bin...
Found spark-submit binary: /usr/local/spark/bin/spark-submit
Running step 1 of 1
Creating temp directory /tmp/mapred.ubuntu.20220429.124659.723650
  Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
  Running Spark version 3.2.0
  No custom resources configured for spark.driver.
  Submitted application: harness.py
  Default ResourceProfile created, executor resources: Map(cores -> name: cores, amount: 1, script: , vendor: , memory -> name: memory, amount: 1024, script: , vendor: , offHeap -> name: offHeap, amount: 0, script: , vendor: ), task resources: Map(cpus -> name: cpus, amount: 1.0)
  Limiting resource is cpu
  Added ResourceProfile id: 0
  Changing view acls to: ubuntu
  Changi

  Adding task set 0.0 with 200 tasks resource profile 0
  Starting task 0.0 in stage 0.0 (TID 0) (namenode, executor driver, partition 0, ANY, 4552 bytes) taskResourceAssignments Map()
  Starting task 1.0 in stage 0.0 (TID 1) (namenode, executor driver, partition 1, ANY, 4552 bytes) taskResourceAssignments Map()
  Starting task 2.0 in stage 0.0 (TID 2) (namenode, executor driver, partition 2, ANY, 4552 bytes) taskResourceAssignments Map()
  Starting task 3.0 in stage 0.0 (TID 3) (namenode, executor driver, partition 3, ANY, 4552 bytes) taskResourceAssignments Map()
  Running task 1.0 in stage 0.0 (TID 1)
  Running task 0.0 in stage 0.0 (TID 0)
  Running task 3.0 in stage 0.0 (TID 3)
  Running task 2.0 in stage 0.0 (TID 2)
  Input split: hdfs://namenode:9000/cleaned_data.csv/part-00001-fdd9117d-88f2-46ab-bb61-b6e2dde97885-c000.csv:0+28576357
  Input split: hdfs://namenode:9000/cleaned_data.csv/part-00002-fdd9117d-88f2-46ab-bb61-b6e2dde97885-c000.csv:0+28518842
  Input split: hdfs://name

  SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
  Times: total = 1178221, boot = -138, init = 177, finish = 1178182
  Finished task 11.0 in stage 0.0 (TID 11). 1902 bytes result sent to driver
  Starting task 15.0 in stage 0.0 (TID 15) (namenode, executor driver, partition 15, ANY, 4552 bytes) taskResourceAssignments Map()
  Running task 15.0 in stage 0.0 (TID 15)
  Finished task 11.0 in stage 0.0 (TID 11) in 1178489 ms on namenode (executor driver) (12/200)
  Input split: hdfs://namenode:9000/cleaned_data.csv/part-00015-fdd9117d-88f2-46ab-bb61-b6e2dde97885-c000.csv:0+28631978


2022-04-29 14:00:59,026 ERROR client.TransportResponseHandler: Still have 1 requests outstanding when connection from /10.10.22.91:59354 is closed
2022-04-29 14:00:59,027 WARN cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: Attempted to get executor loss reason for executor id 4 at RPC address 10.10.22.91:59364, but got no response. Marking as agent lost.
java.io.IOException: Connection from /10.10.22.91:59354 closed
	at org.apache.spark.network.client.TransportResponseHandler.channelInactive(TransportResponseHandler.java:147)
	at org.apache.spark.network.server.TransportChannelHandler.channelInactive(TransportChannelHandler.java:117)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
	at io.netty.cha

^C
Traceback (most recent call last):
  File "/home/ubuntu/mapred.py", line 42, in <module>
    subreddit.run()
  File "/home/ubuntu/.local/lib/python3.8/site-packages/mrjob/job.py", line 616, in run
    cls().execute()
  File "/home/ubuntu/.local/lib/python3.8/site-packages/mrjob/job.py", line 687, in execute
    self.run_job()
  File "/home/ubuntu/.local/lib/python3.8/site-packages/mrjob/job.py", line 636, in run_job
    runner.run()
  File "/home/ubuntu/.local/lib/python3.8/site-packages/mrjob/runner.py", line 503, in run
    self._run()
  File "/home/ubuntu/.local/lib/python3.8/site-packages/mrjob/spark/runner.py", line 189, in _run
    self._run_steps_on_spark()
  File "/home/ubuntu/.local/lib/python3.8/site-packages/mrjob/spark/runner.py", line 294, in _run_steps_on_spark
    self._run_step_on_spark(group['steps'][0], step_num, last_step_num)
  File "/home/ubuntu/.local/lib/python3.8/site-packages/mrjob/spark/runner.py", line 327, in _run_step_on_spark
    return

In [None]:
df_group_by_day =df2.groupBy("created_utc","subreddit","parent_id").agg(f.sum("ups").alias("total_ups"))

In [None]:
df_group_by_day_body =df2.groupBy("created_utc","subreddit","parent_id").agg(f.sum("ups").alias("total_ups"),f.collect_list("body").alias("body"))

In [None]:
df_group_by_day_body.filter(df_group_by_day_body.subreddit == "AskReddit").show()

In [None]:
summarizer = pipeline('summarization', model='facebook/bart-large', tokenizer='facebook/bart-large-cnn')

In [None]:
# df_top_comments = df_top_comments.persist(ps.StorageLevel.DISK_ONLY)

In [None]:
# Select only English Columns
# df2 = df2.filter(df2.is_english == "yes")

In [None]:
# df2 = df2.withColumn("is_english",isEnglishUDF(f.col("body")))

In [None]:
# Get a list of all link_ids for top comments
# top_comments_link_ids=df_top_comments.rdd.map(lambda x: x.link_id).collect()

In [None]:
# top_comments_link_ids =list(set(top_comments_link_ids))

In [None]:
# len(top_comments_link_ids)

In [None]:
# from pyspark.sql.functions import dayofmonth, mean, countDistinct, to_date, count

In [None]:
# df_english = df_english.withColumn("day_of_month", dayofmonth("created_utc"))

In [None]:
# df_group_by_day.show()

In [None]:
# from sparknlp.pretrained import PretrainedPipeline, LanguageDetectorDL, PipelineModel

In [None]:
# df_group_by_day.columns

In [None]:
df_group_by_day.where(df_group_by_day.subreddit=='AskReddit').show()

In [None]:
# df_top_comments.select('English').show

In [None]:
# comment_section = ['you just know someone has made that joke with her',
#  'was literally discussing this with my wife yesterday i am british she is not for some reason she thought i would know the answer to whether that would be a thing or not i did not so i guess it s a pretty common shower thought tbh',
#  'i m giving you mine as a thank you for telling us this']

In [None]:
# text = ".".join(comment_section)

In [None]:
# summarizer = pipeline('summarization', model='facebook/bart-large-cnn', tokenizer='facebook/bart-large-cnn')

In [None]:
# summarizer(text, min_length = round(0.1 * len(text.split(' '))), max_length = round(0.2 * len(text.split(' '))), do_sample=False)

In [None]:
# pandasDF = df_group_by_day.toPandas()

In [None]:
# pandasDf.head()

In [None]:
# pandasDF

In [None]:
# pandasDF.to_csv(r'pandas.txt', header=None, index=None, sep=' ', mode='a')