In [1]:
from glob import glob
import json
import re
import yaml
import os
import shutil
import time
os.chdir('../')

In [2]:
from cloudant.client import Cloudant

In [3]:
with open("config.yaml", 'r') as ymlfile:
    cfg = yaml.load(ymlfile)

In [4]:
queues_cfg = cfg['QUEUES']

In [5]:
tweets_queue_path = queues_cfg['new_tweets']
processed_tweets_path = queues_cfg['processed_tweets']
sa_queue_path = queues_cfg['sentiment_tasks']
geo_queue_path = queues_cfg['geo_tasks']

In [6]:
#for folder in [tweets_folder, sentiment_folder, geo_folder, 'tweet_no_coordinates']:
for folder in [tweets_queue_path, processed_tweets_path, ]:
    if not os.path.exists(folder):
        os.makedirs(folder)

In [7]:
unprocessed_tweets = glob('{}/*.json'.format(tweets_queue_path))
len(unprocessed_tweets)

1286

# Process Downloaded Tweets Queue

In [8]:
def create_geoanalyser_task(tweet_id, coordinates):
    filename = '{}/{}.task.txt'.format(geo_queue_path, tweet_id)
    with open(filename, 'w') as fp:
        fp.write(json.dumps(coordinates))
        fp.close()

def send_to_geoanalyser(tweet_id, tweet_json):
    try:
        if tweet_json['coordinates'] is not None:
            coordinates = tweet_json['coordinates']
        elif tweet_json['place'] is not None:
            coordinates = tweet_json['place']['bounding_box']
        else:
            print('Tweet {} doesn\'t contain coordinates'.format(tweet_id))
            print(path)
        create_geoanalyser_task(tweet_id, coordinates=coordinates)
    except Exception as e:
        print('Tweet {} wasn\'t sent to geoanalyser due to error. {}'.format(tweet_id, e))
        #raise e

In [9]:
def create_sentiment_task(tweet_id, text):
    filename = '{}/{}.task.txt'.format(sa_queue_path, tweet_id)
    with open(filename, 'w') as fp:
        fp.write(text)
        fp.close()
        
def send_to_sentiment_analysis(tweet_id, tweet_json):
    try:
        if tweet_json['text'] is not None:
            text = tweet_json['text']
        else:
            print('Tweet {} doesn\'t contain text'.format(tweet_id))
            print(path)
        create_sentiment_task(tweet_id, text)
    except Exception as e:
        print('Tweet {} wasn\'t sent to sentiment analyser due to error. {}'.format(tweet_id, e))

In [10]:
class TweetsDB():
    def __init__(self, cfg):
        self._cfg = cfg
        self._client = Cloudant(self._cfg['user'], self._cfg['password'], url=self._cfg['host'])
        self._client.connect()
        self._db = self._get_db()
        
    def _get_db(self):
        databases = self._client.all_dbs()
        db_name = self._cfg['tweets_db']
        if not db_name in databases:
            client.create_database(db_name)
            #databases = client.all_dbs()
        return self._client[db_name]
        
    def save_tweet(self, document):
        if not '_id' in document:
            document['_id'] = document['id_str']
        if 'id' in document:
            document.pop('id')
        self._db.create_document(document) 
        
    def update_document(self, document_id, attributes_dict):
        document = self._db[document_id]
        for key in attributes_dict.keys():
            document[key] = attributes_dict[key]
        document.save()
        

In [11]:
couch_db = TweetsDB(cfg['COUCHDB'])

In [12]:
couch_db.update_document('988986510979497985', {'key1': 'dfsdf', 'key2': 'dsfjkds'})

In [None]:
def save_to_couch_db(tweet, path):
    filename = path.split('/')[-1]
    new_path = '{}/{}'.format(processed_tweets_path, filename)
    couch_db.save_tweet(tweet)
    shutil.move(path, new_path)
    

In [None]:
#!cp $processed_tweets_path/* $tweets_queue_path

In [None]:


988986510979497985
        
    

In [None]:
#for path in bitcoin_pathes:
i = 1
while True:
    unprocessed_tweets = glob('{}/*.json'.format(tweets_queue_path))
    for path in unprocessed_tweets:
        with open(path, 'r') as fp:
            tweet_json = json.load(fp)
            tweet_id = tweet_json['id_str']
            for analyser_func in (send_to_geoanalyser, send_to_sentiment_analysis):
                analyser_func(tweet_id, tweet_json)
            save_to_couch_db(tweet_json, path)
    
    print('Iteration: {}\tFiles processed: {}'.format(i, len(unprocessed_tweets)))
    i+=1
    time.sleep(20)

## test

In [None]:
with open('../shared_folder/tweets/988986510979497985.json', 'r') as fp:
    data = json.load(fp)   
#data

# Analyse tweets contents

In [None]:
hashtags = ['bitcoin', 'blockchain', 'btc', 'cryptocurrency', 'market\svalue',
            'crypto','ethereum', 'fintech', 'coin', 'doge', 'ethereum', 'ripple', 'litecoin', 
            'cardano', 'monero', 'TRON', 'zcash', 'jaxx', 'copay', 'bitpay', 'exodus', 
            'mycelium', 'Bread\sWallet', 'trezor', 'ledger\snano', 'Silk\sRoad', 'darknet\smarket', 
            'dogecoin', 'ASIC\sMiner', 'Central\sLedger', 'Hashrate', 'ICO',         
           ]

In [None]:
'|'.join(hashtags)

In [None]:
bitcoin_pathes = []
for path in (unprocessed_tweets):
    with open(path, 'r') as fp:
        data = json.load(fp)
        if re.search('|'.join(hashtags), json.dumps(data)):
            bitcoin_pathes.append(path)
        #print(data['entities']['hashtags'])


In [None]:
len(bitcoin_pathes)