In [7]:
import requests, boto3, pandas as pd, os, sys, pprint
from pathlib import Path


# from entities.RedditAccount import RedditAccount
# from entities.DailyUpload import DailyUpload

#TODO: Change path accordingly in handler


# Making the current directory in which this file is in discoverable to python.
# Commenting it here because it will not work in jupyter notebook. It will work in lambda though.
#sys.path.append(os.path.join(os.path.dirname(__file__)))

# Below should be used only in jupyter notebook
sys.path.append('../')


REDDIT_AUTH_URL = 'https://www.reddit.com/api/v1/access_token'
REDDIT_ACCOUNTS_TABLE_NAME = 'RedditAccountsTable-dev'
DAILY_UPLOADS_TABLE = "DailyUploadsTable-dev"
REDDIT_API_URL_TOP = "https://oauth.reddit.com/r/placeholder_value/top"
REDDIT_API_URL_SORT = "https://oauth.reddit.com/r/placeholder_value/sort"

ddb = boto3.client("dynamodb", region_name="ap-south-1")

# GatherUrls Class

In [8]:
from datetime import datetime
import pprint

pp = pprint.PrettyPrinter(indent=2, compact=True, width=80)


class GatherUrls:
    post_keys_to_keep = [
        "title",
        "url",
        "upvote_ratio",
        "ups",
        "author",
        "name",
        "total_awards_received",
    ]

    def __init__(self, subreddit, logger) -> None:
        self.subreddit = subreddit
        self.date = str(datetime.today().date())  ## Of the format yyyy-mm-dd
        self.total_duration = 0
        self.urls = []
        self.latest_post = None
        self.eligible_posts = []
        self.logger = logger

    # Renamed from date_subreddit_key()
    def key(self) -> dict:
        """Returns a dictionary with date as PK, subreddit as SK.

        Returns:
            Dict: Containing serialized subreddit and date
        """

        return {
            "PK": GatherUrls.__serialize_date(self.date),
            "SK": GatherUrls.__serialize_subreddit(self.subreddit),
        }

    def serialize_to_item(self):
        """Serializes member variable data of this object for the access pattern:
        date-Partition Key
        subreddit- Sort Key

        Returns:
            Dict: Ready to be used by boto3 to insert item into DynamoDB.
        """
        item = self.key()
        item["posts"] = GatherUrls.__serialize_posts(self.eligible_posts)
        self.logger.info("Serialized item is:\n")
        self.logger.info(pp.pformat(item))
        return item

    @staticmethod
    def __removed_post_is_worthy(post):
        if post["removed_by"] or post["removal_reason"]:
            if post["num_comments"] > 5 and post["score"] > 10:
                return True
            else:
                return False

        return True

    @staticmethod
    def __is_eligible(post):
        if post["is_video"] and not post["over_18"] and not post["stickied"]:
            if post["total_awards_received"] > 0:
                return True

            if post["ups"] > 0 and post["num_comments"] > 0:
                return True

        return False

    def parse_posts(self, posts):
        """Parse posts and insert into a dataframe.
        The last parsed post will updated in a member variable.

        Args:
            posts (list): List of posts from reddit API
        """
        posts = posts["data"]["children"]
        self.logger.info(f"For {self.subreddit} on date: {self.date}")
        duration = 0
        for post in posts:
            post = post["data"]
            self.latest_post = post
            if GatherUrls.__is_eligible(post) and GatherUrls.__removed_post_is_worthy(
                post
            ):

                temp = {key: post[key] for key in GatherUrls.post_keys_to_keep}
                self.eligible_posts.append(temp)
                duration = int(post["media"]["reddit_video"]["duration"])
                self.total_duration += duration
                self.logger.info(
                    f"Post:\nTitle: {post['title']}\nDuration: {duration}s\nwas added to eligible posts\n"
                )

        self.logger.info(
            f"Total duration for {self.subreddit} subreddit on {self.date} is {self.total_duration}\n"
        )

    @staticmethod
    def __serialize_posts(posts):
        serialized_posts = {"L": [GatherUrls.__serialize_post(post) for post in posts]}
        return serialized_posts

    @staticmethod
    def deserialize_from_item(serialized_item):
        deserialized_item = {}
        serialized_item = serialized_item["Item"]

        for key, value in serialized_item.items():
            for _key, _value in value.items():
                deserialized_item[key] = helpers.ddb.deserialize_piece_of_item(
                    _key, _value
                )

        return deserialized_item

    @staticmethod
    def __serialize_post(post):
        serialized_post = {"M": {}}

        for key in GatherUrls.post_keys_to_keep:
            serialized_post["M"][key] = {
                helpers.ddb.get_datatype(post[key]): str(post[key])
            }

        return serialized_post

    @staticmethod
    def __serialize_subreddit(subreddit):
        return {"S": subreddit}

    @staticmethod
    def __serialize_date(date):
        return {"S": date}

    @staticmethod
    def deserialize_PK_SK_count(item):
        deserialized_item = {}
        for key, value in item.items():
            for _key, _value in value.items():
                deserialized_item[key] = _value
        return deserialized_item


# Reddit Account Class

In [9]:
import requests, logging, pprint
from helpers.Exceptions import InvalidCredentialsProvidedException

pp = pprint.PrettyPrinter(indent=2, compact=True, width=80)


class RedditAccount:
    def __init__(self, subreddit, ddb, logger):
        self.subreddit = subreddit
        self.client_id = None
        self.secret_key = None
        self.username = None
        self.password = None
        self.auth = None
        self.headers = {"User-Agent": f"{subreddit}API/0.0.1"}
        self.data = {"grant_type": "password", "username": None, "password": None}
        self.access_token = None
        self.ddb = ddb
        self.logger = logger

    def key(self):
        return {"PK": {"S": self.subreddit}}

    def fetch_and_update_account_details(self, REDDIT_ACCOUNTS_TABLE_NAME):
        params = {"TableName": REDDIT_ACCOUNTS_TABLE_NAME, "Key": self.key()}
        item = ddb_helpers.get_item(ddb=self.ddb, logger=self.logger, **params)   
        deserialized_item = RedditAccount.deserialize_item(item)       
        
        self.client_id = deserialized_item["personal_use_script"]
        self.secret_key = deserialized_item["secret_key"]
        self.username = deserialized_item["username"]
        self.password = deserialized_item["password"]
        self.data["username"] = self.username
        self.data["password"] = self.password
        self.logger.info("Fetched and updated the following account details:\n")
        self.logger.info(pp.pformat(deserialized_item))

    @staticmethod
    def deserialize_item(item):
        deserialized_item = {}
        for key, value in item.items():
            for _key, _value in value.items():
                deserialized_item[key] = ddb_helpers.deserialize_piece_of_item(_key, _value)
        

        return deserialized_item

    @staticmethod
    def extract_value(dictionary):
        data_type, value = list(dictionary.keys())[0], list(dictionary.values())[0]

        if data_type == "S":
            return value

    def authenticate_with_api(self):
        self.auth = requests.auth.HTTPBasicAuth(self.client_id, self.secret_key)

    def fetch_and_update_access_token(self, REDDIT_AUTH_URL):
        try:
            # Authorise and request for access token from Reddit API
            res = requests.post(
                REDDIT_AUTH_URL, auth=self.auth, data=self.data, headers=self.headers
            )

            res = res.json()
            if "error" in res and res["error"] == 401:
                raise InvalidCredentialsProvidedException()

        except (InvalidCredentialsProvidedException, Exception):
            self.logger.error(f"Response object contains:\n")
            self.logger.error(pp.pformat(res))
            self.logger.error(
                "Invalid Credentials. The following details were provided:\n"
            )
            self.logger.error(
                f"Requests auth object:\nusername: {self.auth.username}\npassword: {self.auth.password}\n"
            )
            self.logger.error(f"Data provided in the POST request:\n")
            self.logger.error(pp.pformat(self.headers))
            self.logger.error(f"Headers present in the POST request:\n")
            self.logger.error(pp.pformat(self.headers))

        self.access_token = res["access_token"]
        self.headers["Authorization"] = f"bearer {self.access_token}"

    def fetch_posts_as_json(self, url, params={}):
        try:
            res = requests.get(url, headers=self.headers, params=params)
            return res.json()

        except Exception as err:
            self.logger.error(f"Unable to fetch posts from Reddit")
            self.logger.error("Headers used:\n")
            self.logger.error(pp.pformat(self.headers))
            self.logger.error(f"URL to fetch posts from: {url}\n")
            self.logger.error("params passed were:\n")
            self.logger.error(pp.pformat(params))


# Event handler code

In [10]:
import boto3, os, sys, logging, pprint
from pathlib import Path

pp = pprint.PrettyPrinter(indent=2, compact=True, width=80)

# Initialize log config.
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger()

# Making the current directory in which this file is in discoverable to python
# sys.path.append(os.path.join(os.path.dirname(__file__)))

# TODO: Commenting this only for jupyter
# from entities.GatherUrls import GatherUrls
# from entities.RedditAccount import RedditAccount
from helpers import ddb as ddb_helpers

# from subreddit_groups import subreddit_groups

ddb = boto3.client("dynamodb", region_name="ap-south-1")
sqs = boto3.client("sqs")

# REDDIT_AUTH_URL = os.getenv("REDDIT_AUTH_URL")
# REDDIT_ACCOUNTS_TABLE_NAME = os.getenv("REDDIT_ACCOUNTS_TABLE_NAME")
# DAILY_UPLOADS_TABLE_NAME = os.getenv("DAILY_UPLOADS_TABLE_NAME")
# PROCESS_URLS_FOR_SUBREDDIT_GROUP_QUEUE_URL = os.getenv(
#     "PROCESS_URLS_FOR_SUBREDDIT_GROUP_QUEUE_URL"
# )


def run(event, context):

    # TODO: Hardcoding subreddit value for now. In production, should extract from queue:
    # subreddit = "funny"
    subreddit = str(event["Records"][0]["body"])
    logger.info(f"Subreddit : {subreddit}, is being processed")

    # Getting from env here because, if container is warm, it will fetch from the previously
    # executed subreddit url.
#     REDDIT_API_URL_TOP = os.getenv("REDDIT_API_URL_TOP")
    #TODO: Uncomment above and comment below one. This change is only for jupyter
    REDDIT_API_URL_TOP = "https://oauth.reddit.com/r/placeholder_value/top"
    REDDIT_API_URL_TOP = REDDIT_API_URL_TOP.replace("placeholder_value", subreddit)

    gather_urls = GatherUrls(subreddit=subreddit, logger=logger)
    reddit_account = RedditAccount(subreddit=subreddit, ddb=ddb, logger=logger)

    reddit_account.fetch_and_update_account_details(REDDIT_ACCOUNTS_TABLE_NAME)
    reddit_account.authenticate_with_api()
    reddit_account.fetch_and_update_access_token(REDDIT_AUTH_URL)

    # Keep fetching and parsing posts from reddit api till gather_urls.total_duration
    # is more than 600 seconds. Will use the 'after' param to keep going backwards.
    after = None
    while gather_urls.total_duration < 601:
        logger.info(f"Fetching {subreddit} posts after {after}")
        posts = reddit_account.fetch_posts_as_json(
            REDDIT_API_URL_TOP, params={"limit": "100", "after": after}
        )
        gather_urls.parse_posts(posts)
        after = gather_urls.latest_post["name"]

    # After uploading this subreddits' urls, update the count of todays_subreddits_count
    # doing this as a transaction.
    params = {
        "TransactItems": [
            {
                "Put": {
                    "TableName": DAILY_UPLOADS_TABLE_NAME,
                    "Item": gather_urls.serialize_to_item(),
                }
            },
            {
                "Update": {
                    "TableName": DAILY_UPLOADS_TABLE_NAME,
                    "Key": {
                        "PK": {"S": gather_urls.date},
                        "SK": {"S": "todays_subreddits_count"},
                    },
                    "ConditionExpression": "attribute_exists(PK) and attribute_exists(SK)",
                    "UpdateExpression": "SET #count = #count + :inc",
                    "ExpressionAttributeNames": {"#count": "count"},
                    "ExpressionAttributeValues": {":inc": {"N": "1"}},
                }
            },
        ]
    }

    res = ddb_helpers.transact_write_items(ddb, logger, **params)

    logger.info(
        f"Successfully updated DB for {subreddit} subreddit on {gather_urls.date}"
    )
   

    return {
        subreddit: f"successfully processed {subreddit} for date: {gather_urls.date}"
    }

In [11]:
event = {"Records": [ {"body": "funny"} ]}

In [12]:
run(event, {})

INFO:root:Subreddit : funny, is being processed
INFO:root:Received the following item:
INFO:root:{ 'PK': {'S': 'funny'},
  'email_address': {'S': 'mugblsxlqpoqbclaqi@bptfp.net'},
  'password': {'S': 'Abcd@12349'},
  'personal_use_script': {'S': 'epsNZpypuQ1IngadWDnlGg'},
  'secret_key': {'S': 'yG4Ej57nkBMIdFzpWkzhNXChnsiluw'},
  'username': {'S': 'mugblsxlqpoqbclaqi'}}
INFO:root:Fetched and updated the following account details:

INFO:root:{ 'PK': 'funny',
  'email_address': 'mugblsxlqpoqbclaqi@bptfp.net',
  'password': 'Abcd@12349',
  'personal_use_script': 'epsNZpypuQ1IngadWDnlGg',
  'secret_key': 'yG4Ej57nkBMIdFzpWkzhNXChnsiluw',
  'username': 'mugblsxlqpoqbclaqi'}
INFO:root:Fetching funny posts after None
INFO:root:For funny on date: 2021-08-15
INFO:root:Post:
Title: No fucks given at the Home Depot today. 🤣 I'm envious of this man's confidence.
Duration: 4s
was added to eligible posts

INFO:root:Post:
Title: Apple: We're not scanning your images, we're just scanning your images.
D

NameError: name 'DAILY_UPLOADS_TABLE_NAME' is not defined

# Pushshift api tryout

In [None]:
api = PushshiftAPI()

In [None]:
from datetime import datetime, timedelta

In [None]:
today =  datetime.today()
yesterday = datetime.today() - timedelta(days=1)
day_before_yesterday = yesterday - timedelta(days=1)

In [None]:
today = datetime(today.year, today.month, today.day, 0,0,0).timestamp()
yesterday = datetime(yesterday.year, yesterday.month, yesterday.day,0,0,0).timestamp()
day_before_yesterday = datetime(day_before_yesterday.year, day_before_yesterday.month, day_before_yesterday.day,0,0,0).timestamp()

In [None]:
 a = list(api.search_submissions(after=day_before_yesterday, before=today, subreddit='funny', filter=['url', 'title'], limit = 10))

In [None]:
int(day_before_yesterday)

In [None]:
def get_pushshift_data(data_type, **kwargs):
    """
    Gets data from the pushshift api.
 
    data_type can be 'comment' or 'submission'
    The rest of the args are interpreted as payload.
 
    Read more: https://github.com/pushshift/api
    """
 
    base_url = f"https://api.pushshift.io/reddit/search/submission/?subreddit=funny&num_comments=>0&after={int(day_before_yesterday)}&before={int(yesterday)}&is_video=true&sort_type=score&sort=score:asc&size=100&aggs=subreddit"
#     payload = {}
#     print(payload)
    request = requests.get(base_url)
    return request.json()

In [None]:
data_type="submission"     # give me comments, use "submission" to publish something
query="funny"          # Add your query
duration="1d"          # Select the timeframe. Epoch value or Integer + "s,m,h,d" (i.e. "second", "minute", "hour", "day")
size=1000               # maximum 1000 comments
sort_type="score"       # Sort by score (Accepted: "score", "num_comments", "created_utc")
sort="desc"             # sort descending
aggs="subreddit"        #"author", "link_id", "created_utc", "subreddit"

In [None]:
base_url = f"https://api.pushshift.io/reddit/search/submission/?subreddit=funny&num_comments=>0&over_18=false&after={int(day_before_yesterday)}&before={int(yesterday)}&is_video=true&sort_type=score&sort=score:desc&size=100&aggs=subreddit"

request = requests.get(base_url)
b = request.json()

In [None]:
b['data'][59]

In [None]:
for post in a['data']:    
    pp.pprint(post)
    break

   
    
        


In [None]:
post