In [1]:
### Modules ### 
import pandas as pd
import numpy as np
from datetime import datetime, timedelta 
from pycoingecko import CoinGeckoAPI

### GCP Modules ### 
from google.cloud import storage 
import gcsfs
import glob
import os
os.environ["GOOGLE_APPLICATION_CREDENTIALS"]="/Users/gabrielgomes/Downloads/awaricripto-5cfe64a9c26d.json"

# Establish Client connection to the project 
project = 'awaricripto'
client  = storage.Client(project=project)
fs = gcsfs.GCSFileSystem(project=project)

### Defining functions 

In [16]:
# function to read GS files without downloading it
def fs_to_notebook_transfer(id):
    with fs.open(f'{id}_awaricripto_bucket/{id}_ts') as f:
        id_ts = pd.read_csv(f, index_col = 'Unnamed: 0')
    return id_ts 

# -----

# Updating workflow function
def dailyExtractor(id):
    ''' Retrieves latest metrics from coinGeckoAPI to a given 'id' | creating the 1-line df: 'id_metrics_today'
    '''
    # Set retrieving date
    today = datetime.now().strftime('%d-%m-%Y') # 15-03-2022 
    yesterday = ( datetime.now() - timedelta(days=1)).strftime('%d-%m-%Y') # 14-03-2022 
    
    # Set API connection cg variable
    cg = CoinGeckoAPI()
        
    # Set request dictionary
    request_dict = cg.get_coin_history_by_id(id = id, date = today)
        
    # Request community data from API ##############################################
    rqst_community_data     = request_dict['community_data']
    twitter_foll            = rqst_community_data.get('twitter_followers')
    reddit_subs             = rqst_community_data.get('reddit_subscribers')
    reddit_avg_posts_48h    = rqst_community_data.get('reddit_average_posts_48h')
    reddit_avg_comments_48h = rqst_community_data.get('reddit_average_comments_48h')

    # Request development data from API ##############################################
    rqst_developer_data       = request_dict['developer_data']
    forks                     = rqst_developer_data.get('forks')
    stars                     = rqst_developer_data.get('stars')
    subscribers               = rqst_developer_data.get('subscribers')
    total_issues              = rqst_developer_data.get('total_issues')
    closed_issues             = rqst_developer_data.get('closed_issues')
    pull_rqst_merged          = rqst_developer_data.get('pull_requests_merged')
    pull_request_contributors = rqst_developer_data.get('pull_request_contributors')
        
    # Request market data from API ##############################################
        
    # Price 
    rqst_price_data   = request_dict['market_data']['current_price']
    usd_current_price = rqst_price_data.get('usd')
    eur_current_price = rqst_price_data.get('eur')
    brl_current_price = rqst_price_data.get('brl')
        
    # Market cap
    rqst_mktcap_data = request_dict['market_data']['market_cap']
    usd_market_cap   = rqst_mktcap_data.get('usd')
    eur_market_cap   = rqst_mktcap_data.get('eur')
    brl_market_cap   = rqst_mktcap_data.get('brl')
        
    # Total volume 
    rqst_volume_data = request_dict['market_data']['total_volume']
    usd_total_volume = rqst_volume_data.get('usd')
    eur_total_volume = rqst_volume_data.get('eur')
    brl_total_volume = rqst_volume_data.get('brl')

    # Set dataframe using the lists
    extracted_df = pd.DataFrame({
        
                               # date
                               'dates':today,
            
                               # community data
                               'twitter_followers':twitter_foll,
                               'reddit_subs':reddit_subs,
                               'reddit_avg_posts_48h':reddit_avg_posts_48h,
                               'reddit_avg_comments_48h':reddit_avg_comments_48h,

                               # developer data        
                               'forks':forks,
                               'stars':stars,
                               'github_subs':subscribers,
                               'total_issues':total_issues,
                               'closed_issues':closed_issues,
                               'pull_rqst_merged':pull_rqst_merged,
                               'pull_request_contributors':pull_request_contributors,
                         
                               # current_price data
                               'usd_cp':usd_current_price,
                               'eur_cp':eur_current_price,
                               'brl_cp':brl_current_price,

                               # Market_cap data
                               'usd_mc':usd_market_cap,
                               'eur_mc':eur_market_cap,
                               'brl_mc':brl_market_cap,

                               # total_volume data
                               'usd_tv':usd_total_volume,
                               'eur_tv':eur_total_volume,
                               'brl_tv':brl_total_volume},
                                
                               # pass index
                                index = [0])
    
    # Rounding lists
    round_zero_list = ['twitter_followers','reddit_subs',
                       'forks','stars','github_subs',
                       'total_issues','closed_issues',
                       'pull_rqst_merged','pull_request_contributors',
                       'usd_mc', 'eur_mc', 'brl_mc',
                       'usd_tv', 'eur_tv', 'brl_tv']
    
    round_two_list = [col for col in extracted_df.columns if col not in round_zero_list]
    
    # Rounding
    extracted_df[round_zero_list] = round(extracted_df[round_zero_list])
    extracted_df[round_two_list] = round(extracted_df[round_two_list],2)
    
    # Return
    return extracted_df
# Ok, function set

# ------

# Define function 'concater()'
def concater(id, today):
    
    # Extract todays data
    # Read local file of outdated time series for each id
    # Concat the 2 files
    # Back fill None, in case there is any unreatrieved metrics
    id_today    = dailyExtractor(id)
    id_outdated = fs_to_notebook_transfer(id)
    id_completed  = pd.concat([id_today, id_outdated], ignore_index=True)  
    id_completed  = id_completed.fillna(method='bfill')
    
    # Download it 
    id_completed.to_csv(today + f'/{id}_completed.csv')
# Ok, we can use concater to update the time series

# ------

# Set function to upload '{id}_updated' to Google Cloud Storage 'id' bucket, replacing (updating) it.
def upload_completed(id, today):
    
    bucket_name           = f'{id}_awaricripto_bucket'
    source_file_name      = today + f'/{id}_completed.csv'
    destination_blob_name = f'{id}_ts'
    
    bucket = client.bucket(bucket_name)
    blob   = bucket.blob(destination_blob_name)

    blob.upload_from_filename(source_file_name)

    print(f'{destination_blob_name} with content from {source_file_name} uploaded to {bucket_name}.\n')
# Ok, we can use upload_updated to upload the updated time series

## Read necessary files

In [9]:
# Read main50
with fs.open('main50/main50.csv') as f:
        main50 = pd.read_csv(f, index_col = 'Unnamed: 0')

## Apply functions 
* Iterate over ids in main50
    * concater(id) | to retrieve latest updated metrics, concatenate it with outdated data ({id}_ts_pvm.csv), and save it in the pvm as '{id}_completed.csv' 
    * upload_completed(id)

In [17]:
# Create date-based folder | Facilitate organization
today = datetime.now().strftime('%d-%m-%Y')
os.makedirs(today, exist_ok = True)

In [None]:
# Iterate for ids
for id in main50.id:

    # Retrieve latest updated metrics and save it in the pvm as 'date/{id}_completed.csv' 
    concater(id, today)
    
    # Upload '{id}_completed.csv' to GS
    upload_completed(id, today)