In [None]:
# default_exp fb_scraper 

# fb_scraper
> Module for scraping Facebook posts

In [None]:
#hide
from nbdev.showdoc import *

In [None]:
#export
import pandas as pd
import numpy as np
import os
import json
import progressbar
from pprint import pprint
from random import randint
from time import sleep
from facebook_scraper import *

In [None]:
#export

def filter_post(post:dict, keywords:list=None) -> pd.DataFrame:
    '''
    Function to filter a post `dict` for keywords
    
    Args:
        `post`: the post object to be filtered
        `keywords` : optional list of keywords to use instead of default
    '''
    post_keys = ['post_id', 'text', 'post_text', 'time', 'timestamp', 'likes', 'comments','shares',
                 'post_url', 'user_id', 'username','user_url', 'reactions',
                 'reaction_count']
    
    if keywords:
        post_keys = keywords
    try:
        post_dict = {key: post[key] for key in post_keys}
    except Exception as e:
        print("At least one key not found in post dict")
        raise e
        
    return pd.DataFrame(post_dict, index=[0])

In [None]:
show_doc(filter_post)

<h4 id="filter_post" class="doc_header"><code>filter_post</code><a href="__main__.py#L3" class="source_link" style="float:right">[source]</a></h4>

> <code>filter_post</code>(**`post`**:`dict`, **`keywords`**:`list`=*`None`*)

Function to filter a post `dict` for keywords

Args:
    `post`: the post object to be filtered
    `keywords` : optional list of keywords to use instead of default

In [None]:
#hide

#tests for filter posts
dummy_dict = {'post_id':12, 'text':'i am an angry customer','user':'angry_pete'}
keywords = ['text', 'user']

df_post = filter_post(dummy_dict, keywords=keywords)
assert len(keywords)==len(df_post.columns)
assert isinstance(df_post, pd.DataFrame)

In [None]:
#export

def extract_comments(comments:list, meta:dict) -> pd.DataFrame:
    """
    Function to extract comments from Facebook posts
    
    Args:
        `comments`: list of dictionaries. Each comment is a dictionary
        `meta`: dictionary that contains meta information from the post
    """
    comment_keys = ['comment_id', 'comment_url', 'comment_text', 'comment_time',
                    'comment_reactions', 'comment_reaction_count', 'replies']
    comments_dict = [{key: comment[key] for key in comment_keys} for comment in comments]
    df = pd.DataFrame(comments_dict)
    for key, val in meta.items():
        df[key] = val
    return df

In [None]:
show_doc(extract_comments)

<h4 id="extract_comments" class="doc_header"><code>extract_comments</code><a href="__main__.py#L3" class="source_link" style="float:right">[source]</a></h4>

> <code>extract_comments</code>(**`comments`**:`list`, **`meta`**:`dict`)

Function to extract comments from Facebook posts

Args:
    `comments`: list of dictionaries. Each comment is a dictionary
    `meta`: dictionary that contains meta information from the post

In [None]:
#hide
comment_keys = ['comment_id', 'comment_url', 'comment_text', 'comment_time',
                    'comment_reactions', 'comment_reaction_count', 'replies']
dummy_comments = [{key:str(val) for key,val in zip(comment_keys,range(len(comment_keys)))} for n in range(2)]
meta = {'date':'2022-03-03'}
df_comments = extract_comments(dummy_comments, meta)

assert (len(comment_keys)+len(list(meta.keys())))==len(df_comments.columns)
assert isinstance(df_comments, pd.DataFrame)

df_comments

#add tests for extract_comments

Unnamed: 0,comment_id,comment_url,comment_text,comment_time,comment_reactions,comment_reaction_count,replies,date
0,0,1,2,3,4,5,6,2022-03-03
1,0,1,2,3,4,5,6,2022-03-03


In [None]:
#export
class FbScraper:
    """
    A scraper object that runs the scraping process and stores posts and comments
    """
    def __init__(self, site:str, stop_date:str, num_pages:int=100, timeout:set=(60,120)):
        self.cookie = None
        self.posts = []
        self.comments = []
        
    def set_cookie(self, cookie:dict):
        "Set a cookie to use for scraping FB. Usually grab from Dataiku globals"
        self.cookie = cookie
        
    def scrape(self):
        """
        start scraping using the current configuration
        """
        
        return 
        
        post_ids = list()
        saved_posts = []
        for i, row in progressbar.progressbar(list(df_sites.iterrows())):
        
            start_url = None
            count = 0

            while True:
                    for post in get_posts(row['id'], pages=GET_PAGES, start_url=start_url,
                                          options={"comments": True, "sleep": 0,
                                                   "posts_per_page": 1, "days-limit": 90},
                                         cookies=cookie):



                        saved_posts.append(post)
                        print(post['text'])

                        if post['post_id'] in post_ids:
                            if interrupt>0:
                                interrupt = 0
                                break
                            else:
                                interrupt +=1
                                continue

                        try:

                            meta = {'post_id':post['post_id'], 'username': post['username']}
                            comments = extract_comments(post['comments_full'], meta)
                            post_filtered = filter_post(post)

                            fb_comments = dataiku.Dataset("fb_comments")
                            fb_comments.write_with_schema(comments)


                            fb_posts = dataiku.Dataset("fb_posts")
                            fb_posts.write_with_schema(post_filtered)

                            post_ids.append(post['post_id'])

                            print("-"*30)
                            print(f'Scraped POST: {post["text"][:25]}')
                            print(f'LAST DATE: {post["time"]}')
                            print(f'TOTAL: {len(post_ids)} posts fetched')
                            print("-"*30)

                            sleep(randint(60,120))


                            if post['time'].year==2020:

                                break


                        except Exception as e:
                            print(e)
                            sleep(randint(60,120))
                            continue
                    print(f'all posts from {row["id"]} scraped')
                    sleep(randint(60,120))
                    break
                    
        def get_posts():
            return pd.concat(self.posts)
        
        def get_comments():
            return pd.concat(self.comments)


In [None]:
#Examples
scraper = FbScraper(site='hvv', stop_date='01-01-2022')
scraper.set_cookie({'cookie':'asfkh21u8reuafhgkaushf'})

scraper.scrape()

In [None]:
show_doc(FbScraper.__init__)

<h4 id="FbScraper.__init__" class="doc_header"><code>FbScraper.__init__</code><a href="__main__.py#L6" class="source_link" style="float:right">[source]</a></h4>

> <code>FbScraper.__init__</code>(**`site`**:`str`, **`stop_date`**:`str`, **`num_pages`**:`int`=*`100`*, **`timeout`**:`set`=*`(60, 120)`*)

Initialize self.  See help(type(self)) for accurate signature.

In [None]:
show_doc(FbScraper.set_cookie)

<h4 id="FbScraper.set_cookie" class="doc_header"><code>FbScraper.set_cookie</code><a href="__main__.py#L11" class="source_link" style="float:right">[source]</a></h4>

> <code>FbScraper.set_cookie</code>(**`cookie`**:`dict`)

Set a cookie to use for scraping FB. Usually grab from Dataiku globals

In [None]:
show_doc(FbScraper.scrape)

<h4 id="FbScraper.scrape" class="doc_header"><code>FbScraper.scrape</code><a href="__main__.py#L15" class="source_link" style="float:right">[source]</a></h4>

> <code>FbScraper.scrape</code>()

start scraping using the current configuration