In [2]:
import csv
import nest_asyncio
import twint
import requests
from models import *
from pymongo import MongoClient
from pycoingecko import CoinGeckoAPI
from datetime import datetime, timezone

### Conectar con MongoDB

In [3]:
mongo = MongoClient('localhost:27017')

db = mongo.proyectoBigData

### Inicializar CoinGecko API Wrapper

In [3]:
cg = CoinGeckoAPI()

### Recuperar id, nombre y abreviación de las criptomonedas disponibles

In [4]:
criptocurrency_list = cg.get_coins_list()

db.criptomonedas.insert_many(criptocurrency_list)

print(f'{len(criptocurrency_list)} Criptomoneadas registradas en DB')

7958 Criptomoneadas registradas en DB


### Recuperar historial de precios, capitalización de mercado y volumen de las criptomonedas seleccionadas

In [None]:
# Definimos los tokens a buscar
token_list = ["bitcoin", "dogecoin", "ethereum"]

# Fecha desde la que se comienza a buscar
from_date = datetime(2020, 1, 1)

days_ago = (datetime.now() - from_date).days

# intervalo entre cada recuperación de datos
time_interval = 'daily'

# moneda fíat con la que comparar el valor de las criptomonedas a buscar
moneda_comparacion = 'usd'

for token in token_list:
    
    # Recuperar historial de la criptomoneda
    historial = cg.get_coin_market_chart_by_id(id=token, vs_currency=moneda_comparacion,
                                               days=days_ago, interval=time_interval)
    
    # Histórico precios
    price_list = [vars(Precio(token, x[1], datetime.fromtimestamp(x[0] / 1000, timezone.utc)))
                  for x in  historial['prices']]
    
    db.priceHistory.insert_many(price_list)
    
    # Histórico de capitalización de mercado
    market_cap_list = [vars(MarketCap(token, x[1], datetime.fromtimestamp(x[0] / 1000, timezone.utc)))
                       for x in  historial['market_caps']]
    
    db.marketCapHistory.insert_many(market_cap_list)
    
    # Histórico de volumen de negociación
    volume_list = [vars(Volume(token, x[1], datetime.fromtimestamp(x[0] / 1000, timezone.utc)))
                   for x in  historial['total_volumes']]
    
    db.volumeHistory.insert_many(volume_list)

    print(f'{len(price_list)} registros históricos de {token} insertados en DB')


### Recuperar datos para grafico de velas de criptomonedas

In [9]:
# Definimos los tokens a buscar
candle_token_list = ["USDT_BTC", "USDT_DOGE", "USDT_ETH"]

from_date_timestamp = int(datetime(2020, 1, 1).timestamp())

today_timestamp = int(datetime.today().timestamp())

for token in candle_token_list:
    # Recuperamos los datos de grafico de vela

    params = {'command': 'returnChartData',
             'currencyPair': token,
             'start': from_date_timestamp,
             'end': today_timestamp,
             'period': '86400'
             }

    r = requests.get(url = 'https://poloniex.com/public', params= params)

    candle_data = []

    for x in r.json():
        x.update({'date': datetime.fromtimestamp(x['date'], timezone.utc)})
        candle_data.append(x)

    collection_name = token + '_candle'

    collection = db[collection_name]
    # Guardamos en db
    collection.insert_many(candle_data)


### Recuperar tweets relacionados a criptomonedas

In [None]:
nest_asyncio.apply()

# Lista de palabras a buscar
keywords = [
        "cripto",
        "criptomoneda",
        "criptomonedas",
        "crypto",
        "cryptocurrency",
        "cryptocurrencies",
        "bitcoin",
        "BTC",
        "$BTC",
        "ethereum",
        "ETH",
        "$ETH",
        "dogecoin",
        "DOGE",
        "$DOGE",
    ]

# Inicializamos el scraper de Twitter
config = twint.Config()

# Excluimos retweets de la búsqueda
config.Retweets = False

# Mínimo de likes que deba tener un tweet para ser recuperado
config.Min_likes = 100

# Fecha de inicio de recuperación de tweets
config.Since = '2020-01-01'

config.Hide_output = True
config.Store_object = True

lista_tweets = []

for word in keywords:

    # Seleccionamos la palabra clave a buscar
    config.Search = word
    
    # Realizamos la búsqueda
    twint.run.Search(config)
    
    # Recuperamos los resultados
    tweets = twint.output.tweets_list
    
    # Procesamos los resultados
    tweets_clean = [vars(Tweet(x.id, x.tweet, x.hashtags, x.cashtags,
                               datetime.strptime(x.datestamp + x.timestamp + x.timezone, '%Y-%m-%d%H:%M:%S%z'),
                               x.username, x.name, x.link, word, x.likes_count, x.retweets_count, x.replies_count)) for x in tweets]

    lista_tweets.extend(tweets_clean)

    # Añadimos a la lista de resultados
    print(f'{len(tweets_clean)} tweets con la keyword {word} registrados en DB')

# Guardamos en mongo
db.tweetsCripto.insert_many(lista_tweets)

print(f'{len(lista_tweets)} tweets registrados en DB')

### Query en MongoDB para obtener tweets sin repeticiones

```
db.tweetsCripto.aggregate([{
    $group: {
        _id: {
            id: "$id"
        },
        cashtag: {
            $first: "$cashtag"
        },
        fecha: {
            $first: "$fecha"
        },
        hashtag: {
            $first: "$hashtag"
        },
        likes: {
            $first: "$likes"
        },
        link: {
            $first: "$link"
        },
        name: {
            $first: "$name"
        },
        replies: {
            $first: "$replies"
        },
        retweets: {
            $first: "$retweets"
        },
        tweet: {
            $first: "$tweet"
        },
        user: {
            $first: "$user"
        },
        search_keywords: {
            $addToSet: "$search"
        },
        count: {
            $sum: 1
        }
    }
}])
```

In [7]:
keywords = [ ["bitcoin", ["bitcoin","BTC"]],
             ["ethereum", ["ethereum","ETH"]],
             ["dogecoin", ["dogecoin","DOGE"]]
            ]

results = []

with open('tweets_result.csv', encoding="utf8") as File:
    reader = csv.DictReader(File)
    for row in reader:
        try:
            row.update({'fecha': datetime.strptime(row['fecha'], '%Y-%m-%dT%H:%M:%S.%f%z')})
            results.append(row)
        except (TypeError, ValueError) as err:
            print(row)

cryptocurrency_variation = []

tweet_cryptocurrency_variation = []

# Fila anterior iterada
cryptocurrencies_above = 0

for x in db.priceHistory.find():
    if cryptocurrencies_above != 0:
        # variacion del precio promedio de criptomoneda en 24hs
        variation = ((x['precio'] - cryptocurrencies_above)/cryptocurrencies_above)*100

        x_variation = {
                'id_criptomoneda':x['id_criptomoneda'],
                'precio': x['precio'],
                'fecha': x['fecha'],
                'percentage_variation': variation
            }

        if variation >= 5:
            x_variation.update(type_variation='mayor al 5 %', type_code=1)

        if -5 < variation < 5:
            x_variation.update(type_variation='entre -5% y 5%', type_code=0)

        if variation <=  -5:
            x_variation.update(type_variation='menor al -5 %', type_code=-1)

        cryptocurrency_variation.append(x_variation)

        # Si la variacion es mayor a -+5%
        if x_variation['type_code'] != 0:

            # Buscamos en todos los tweets
            for row in results:

                diferencia_fecha = x['fecha'] - row['fecha'].replace(tzinfo=None)

                # Si ocurrió dentro de los dos días anteriores a la variación de precio
                if 0 < diferencia_fecha.days < 2:

                    tweet_variaton = {
                        'id_tweet': row['_id'],
                        'fecha_tweet': row['fecha'],
                        'link_tweet': row['link'],
                        'userName_tweet': row['name'],
                        'user_tweet': row['user'],
                        'content_tweet': row['tweet'],
                        'hashtag_tweet': row['hashtag'],
                        'cashtag_tweet': row['cashtag'],
                        'likes': row['likes'],
                        'retweets': row['retweets'],
                        'replies': row['replies'],
                        'search_keywords' : row['search_keywords'],
                    }

                    # Buscamos si el tweet coincide con alguna criptomoneda

                    for pair in keywords:
                        token = pair[0]
                        keyword_list = pair[1]

                        if x['id_criptomoneda'] == token:
                            if any((keyword.upper() in row['tweet'].upper() or
                                    (keyword.lower() in row['hashtag']) or
                                    (keyword.lower() in row['cashtag'])) for keyword in keyword_list):

                                tweet_variaton.update({
                                    'id_criptomoneda':x['id_criptomoneda'],
                                    'percentage_variation': variation,
                                    'type_code_variaton':x_variation['type_code'],
                                    })
                                break

                    if 'id_criptomoneda' in tweet_variaton:
                        tweet_cryptocurrency_variation.append(tweet_variaton)

    cryptocurrencies_above = x['precio']

# Guardamos registros de variaciones en el precio de la criptomoneda
db.criptocurrencyVariation.insert_many(cryptocurrency_variation)

# Guardamos tweets que coincidan con las variaciones
db.tweet_cryptocurrency_variation.insert_many(tweet_cryptocurrency_variation)

print(f'{len(tweet_cryptocurrency_variation)} tweets coincidentes con cambios significativos en precios registrados')

{'_id': ' To the Members and Staff of the United States Senate  SEC to bulldoze one of the core innovations of the technology   #XRP has been under a bewilderingly persistent enforcement threat by the SEC  throughout Chairman Jay Clayton’s tenure  https://t.co/DqAG0dtaIk', 'cashtag': 'bankxrp', 'count': None, 'fecha': None, 'hashtag': None, 'likes': None, 'link': None, 'name': None, 'replies': None, 'retweets': None, 'search_keywords': None, 'tweet': None, 'user': None}
{'_id': 'Follow +RT my post. Have a nice day all my love !!!! &lt;3   #crypto #cryptotrade #giveaway #giveaways #giveawayalert', 'cashtag': 'jenniferegrant1', 'count': None, 'fecha': None, 'hashtag': None, 'likes': None, 'link': None, 'name': None, 'replies': None, 'retweets': None, 'search_keywords': None, 'tweet': None, 'user': None}


IOPub data rate exceeded.
The notebook server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variable
`--NotebookApp.iopub_data_rate_limit`.

Current values:
NotebookApp.iopub_data_rate_limit=1000000.0 (bytes/sec)
NotebookApp.rate_limit_window=3.0 (secs)



77549 tweets coincidentes con cambios significativos en precios registrados
