## Import Library

In [1]:
import pymongo
from config import *
import tweepy
import datetime
import csv

## Credentials

In [2]:
auth = tweepy.OAuthHandler('', '')                 # Twitter API
auth.set_access_token('', '')  # Twitter API
api = tweepy.API(auth,wait_on_rate_limit=True)
connection_url=""              # Mongo DB

## Set Dates for Crawling Data

In [3]:
today = datetime.date.today()
yesterday= today - datetime.timedelta(days=1)

## Connect to MongoDB

In [4]:
client=pymongo.MongoClient(connection_url)
client.list_database_names()
test = client['test']                             ## pick mongoDB database
twitter_collection = test['twitter_tweet']        ## pick twitter_tweet collection

## Function to create dictionary

In [5]:
def append_data_to_tweet(tweet_obj,**kwargs):
    """Append data to tweet object
    Create new dictionary consisted of tweet object
    and new key and value
    example:append_data_to_tweet(tweet_obj,**dict)
    Args:
    	tweet_obj(dict) : Twitter's tweet object
    	kwargs(dict) : data to be added
	Return:
		new_object(dict) : data consisted of tweet object and
							added key value
    """
    new_object={}
    for key in kwargs.keys():
    	new_object[key]=kwargs[key]
    new_object['tweet_object']=tweet_obj
    return new_object

## Twitter Tweet Crawling (API)

In [None]:
tweets_list = tweepy.Cursor(api.search, q="#COVID since:" + str(yesterday)+ " until:" + str(today),tweet_mode='extended', lang='id').items(100)

In [7]:
output = []
counter = 0
for tweet in tweets_list:
    text = tweet._json["full_text"]
    print(text)
    favourite_count = tweet.favorite_count
    retweet_count = tweet.retweet_count
    created_at = tweet.created_at
    counter += 1
    
    datadict = {
      'query_type': "search",
      'query_value': {"keyword":"#COVID19"},
      'query_time': datetime.datetime.today(),
      'extra':{
          'detail':"monitoring"
          }
    }

    final_data = append_data_to_tweet(tweet._json,**datadict)                        # Create Dictionary
    twitter_collection.insert_one(final_data)                                        # Import to MongoDB

    line = {'text' : text, 'favourite_count' : favourite_count, 'retweet_count' : retweet_count, 'created_at' : created_at}
    output.append(line)

Dengan semangat Hari Bhayangkara Polri berkomitmen untuk menjaga situasi kamtibmas tetap kondusif agar masyarakat semakin produktif dalam tatanan hidup baru di tengah pandemi Covid-19. #HariBhayangkaraKe-75 https://t.co/aDILnWgZ4l
RT @emrhaqim: Untuk sejarah K3 aku nak bagi soalan apakah hashtag-hashtag yang digunakan oleh rakyat bagi mengecam tindakan kerajaan Malays…
Terima kasih atas pengabdiannya yang luar biasa dalam menangani penyebaran Covid-19 semoga usaha selama ini tidak mengkhianati hasil. #HariBhayangkaraKe-75 https://t.co/XXsFD1FbFH
Usaha tidak akan mengkhianati hasil. Terimakasih Polri sudah berusaha semaksimal mungkin dalam menekan penyebaran Covid-19. #HariBhayangkaraKe-75 https://t.co/tRvOjjh2vd
RT @sigitwid: Seingat saya, @DKIJakarta bisa beli komputer mainframe ratusan miliar tiap tahun dan bisa bayar commitment fee Formula E samp…
RT @DitpamobvitPo18: Ayo dukung program vaksinasi pemerintah dengan ikut berpartisipasi yang bertujuan untuk menekan penyebaran covid-19 

In [21]:
import pandas as pd
pd.DataFrame(output).to_excel("hasil.xlsx")

## Retrieve data from MongoDB

In [8]:
results = twitter_collection.find({})
print('MongoDB count: ',results.count(),'Without MongoDB count: ',counter)

  print('MongoDB count: ',results.count(),'Without MongoDB count: ',counter)


MongoDB count:  100 Without MongoDB count:  100


## Connect to PostgreSQL

In [9]:
import psycopg2
conn = psycopg2.connect(
    host="localhost",
    port = "5432",
    database="test",
    user="postgres",
    password="12345678")
print("connected to postgresql")
cur = conn.cursor()

connected to postgresql


## Import Library

In [10]:
import psycopg2.extras as pgextras
import tweet_processor as proc
import time
from datetime import datetime,timedelta
import etl_query as query
import pathlib
import configparser
from bson.objectid import ObjectId

## Transform and Load

In [11]:
def transform_and_load_tweet(collection,connection,cursor,period,logger=None):
    _general = {
        "name" : "general tweet",
        "query_param" : "general",
        "period" : period,
        "transformation_n_query" :  [
            (proc.TweetDataExtractor.get_user,False,query.tweet_user_query),
            (proc.TweetDataExtractor.get_main_tweet,False,query.tweet_query),
            (proc.TweetDataExtractor.get_hashtags,False,query.hashtag_query),
            (proc.TweetDataExtractor.get_symbol,False,query.symbol_query),
            (proc.TweetDataExtractor.get_place,False,query.place_query),
            (proc.TweetDataExtractor.get_media,True,query.media_query),
            (proc.TweetDataExtractor.get_mention_user,True,query.mention_query)
        ]
    }

    _rt = {
        "name" : "retweeted tweet",
        "query_param" : "rt",
        "period" : period,
        "transformation_n_query" :  [
            (proc.TweetDataExtractor.get_user_from_main_tweet,False,query.tweet_user_query),
            (proc.TweetDataExtractor.get_main_tweet,False,query.tweet_query),
            (proc.TweetDataExtractor.get_user,False,query.tweet_user_query),
            (proc.TweetDataExtractor.get_rt_data,False,query.rt_query),
            (proc.TweetDataExtractor.get_hashtags,False,query.hashtag_query),
            (proc.TweetDataExtractor.get_symbol,False,query.symbol_query),
            (proc.TweetDataExtractor.get_place,False,query.place_query),
            (proc.TweetDataExtractor.get_media,True,query.media_query),
            (proc.TweetDataExtractor.get_mention_user,True,query.mention_query)
        ]
    }

    _qt = {
        "name" : "quoted tweet",
        "query_param" : "qt",
        "period" : period,
        "transformation_n_query" :  [
            (proc.TweetDataExtractor.get_user_from_main_tweet,False,query.tweet_user_query),
            (proc.TweetDataExtractor.get_main_tweet,False,query.tweet_query),
            (proc.TweetDataExtractor.get_user,False,query.tweet_user_query),
            (proc.TweetDataExtractor.get_quote_data,False,query.qt_query),
            (proc.TweetDataExtractor.get_hashtags,False,query.hashtag_query),
            (proc.TweetDataExtractor.get_symbol,False,query.symbol_query),
            (proc.TweetDataExtractor.get_place,False,query.place_query),
            (proc.TweetDataExtractor.get_media,True,query.media_query),
            (proc.TweetDataExtractor.get_mention_user,True,query.mention_query)
        ]
    }

    _data_group_list=[_general,_rt,_qt]
    if logger:
        for item in _data_group_list:
            process_data(item,collection,connection,cursor,logger=logger)
    else:
        for item in _data_group_list:
            process_data(item,collection,connection,cursor)
    connection.close()

In [12]:
def process_data(data_group_dict,collection,conn,cur,logger=None):
    for item in data_group_dict["transformation_n_query"]:
        transform_func,yield_from,query = item
        print(transform_func)
        print(yield_from)
        print(query)
        
        getter_obj = TweetDataGetter(collection,data_group_dict["period"])
        mongo_cur = getter_obj.get_data(data_group_dict["query_param"])
        if yield_from:
            transformed_data = generate_transormed_data(mongo_cur,
                                transform_func,yield_from=True)
        else:
            transformed_data = generate_transormed_data(mongo_cur,
                                transform_func)

        pgextras.execute_batch(cur,query,transformed_data)
        conn.commit()

In [13]:
def generate_transormed_data(doc_generator,function,yield_from=False):
    if yield_from:
        for doc in doc_generator:
            raw = proc.TweetDataExtractor(doc["tweet_object"])
            transformed = function(raw)
            yield from transformed
    else:
        for doc in doc_generator:
            raw = proc.TweetDataExtractor(doc["tweet_object"])
            transformed = function(raw)
            if transformed != None:
                yield transformed

In [14]:
class TweetDataGetter():
    """Mongodb query wrapper class
    Provide method to retrieve mongodb data
    to be transformed
    """
    def __init__(self,collection,period,limit=None):
        """
        period : {all-time,today}
        """
        self.collection = collection
        self._tweet_query = {"$and":[
                                        {"tweet_object.retweeted_status":{"$exists":False}},
                                        {"tweet_object.quoted_status":{"$exists":False}}
                                    ]
                            }
        self._rt_query = {"tweet_object.retweeted_status":{"$exists":True}}
        self._qt_query = {"tweet_object.quoted_status":{"$exists":True}}
        self.limit = limit
            
    def get_data(self,type):
        """
        return mongocursor
        """
        if type == 'general':
            query = self._tweet_query
        elif type == 'rt':
            query = self._rt_query
        elif type == 'qt':
            query = self._qt_query
        else:
            return

        print(query)

        if self.limit is not None:
            gen_object = self.collection.find(query).limit(self.limit)
            print(f'{type} data: {self.limit}')
        else:
            gen_object = self.collection.find(query)
            counts = self.collection.count_documents(query)
            print(f'{type} data: {counts}')
        return gen_object

In [15]:
tq = {"$and":[
    {"tweet_object.retweeted_status":{"$exists":False}},
    {"tweet_object.quoted_status":{"$exists":False}}
]}
rt = {"tweet_object.retweeted_status":{"$exists":True}}
qt = {"tweet_object.quoted_status":{"$exists":True}}

In [16]:
tq_r = twitter_collection.find(tq)
rt_r = twitter_collection.find(rt)
qt_r = twitter_collection.find(qt)
print('tq: ',tq_r.count(),'rt_r: ',rt_r.count(),'qt_r: ',qt_r.count())

  print('tq: ',tq_r.count(),'rt_r: ',rt_r.count(),'qt_r: ',qt_r.count())


tq:  46 rt_r:  54 qt_r:  0


In [17]:
transform_and_load_tweet(twitter_collection,conn,cur,'yesterday')

<function TweetDataExtractor.get_user at 0x000002D8F48935E0>
False
insert into "twitter"."tweet_user" ("id", "id_str", "name", "screen_name", "location", "description", "url", "protected", "followers_count", "friends_count", "listed_count", "created_at", "favourites_count", "utc_offset", "geo_enabled", "verified", "statuses_count", "lang", "contributors_enabled", "is_translator", "profile_background_color", "profile_background_image_url", "profile_background_tile", "profile_image_url", "profile_image_url_https", "profile_link_color", "profile_sidebar_border_color", "profile_sidebar_fill_color", "profile_text_color", "profile_use_background_image", "default_profile", "default_profile_image", "following", "follow_request_sent", "notifications", "translator_type") values 
(%(id)s, %(id_str)s, %(name)s, %(screen_name)s, %(location)s, %(description)s, %(url)s, %(protected)s, %(followers_count)s, %(friends_count)s, %(listed_count)s, %(created_at)s, %(favourites_count)s,%(utc_offset)s, %(geo_