In [1]:
import os

import logging

import pandas as pd

import datetime

from influxdb_client import InfluxDBClient, WritePrecision, WriteOptions

In [2]:
################################################################# LOGGING ##################################################################
############################################################################################################################################

LOGGER_FORMAT = '%(asctime)s %(message)s'
logging.basicConfig(format=LOGGER_FORMAT, datefmt='[%H:%M:%S]')
log = logging.getLogger()
log.setLevel(logging.INFO)

############################################################################################################################################
############################################################################################################################################

In [3]:
# INFLUXDB CLIENT
INFLUXDB_URL = os.getenv('INFLUXDB_URL')
INFLUXDB_TOKEN = os.getenv('INFLUXDB_TOKEN')
INFLUXDB_ORG = os.getenv('INFLUXDB_ORG')
INFLUXDB_TIMEOUT = 300000

# INFLUXDB READ/WRITE API
INFLUXDB_BUCKET = "coinmarketcapBucket"
INFLUXDB_BATCH_SIZE = 800
INFLUXDB_MEASUREMENT = "dailyCandleSticks"
INFLUXDB_WRITE_PRECISION = WritePrecision.NS

In [4]:
def datetime_to_unix_ns(timestamp):

    unix_timestamp = datetime.datetime.strptime(timestamp, "%Y-%m-%d %H:%M:%S").timestamp()*1e9

    return unix_timestamp

def dataframe_to_influx(dataframe, url, token, org, timeout, batch_size, bucket, measurement_name, dataframe_tag_columns, write_precision):

    '''
    Takes a dataframe and writes its content onto InfluxDB using batches.

    dataframe: pandas dataframe with a time index in ns to be written to InfluxDB
    url: endopoint for writing to InfluxDB
    token: token for connecting to InfluxDB
    org: organization for connecting to InfluxDB
    timeout: int timeout for connection to InfluxDB in s
    batch_size: int numer of line protocol to be sent at a time
    bucket: str bucket name
    measurment_name: str measurment name
    dataframe_tag_columns: list list with column names of the dataframe to be taken as tags
    '''

    with InfluxDBClient(url=url, token=token, org=org, timeout=timeout, enable_gzip=True) as client:
                    
                with client.write_api(write_options=WriteOptions(batch_size=batch_size)) as write_api:
                        
                    write_api.write(bucket=bucket, record=dataframe, data_frame_measurement_name=measurement_name, data_frame_tag_columns=dataframe_tag_columns,
                        write_precision=write_precision)

                    log.info(f"Data for cryptocurrency {dataframe['symbol']} has been correctly written to InfluxDB")

In [5]:
symbol_daily_data_df = pd.read_csv("coinmarketcap_ETH_daily_market_data.csv")
symbol_daily_data_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2160 entries, 0 to 2159
Data columns (total 10 columns):
 #   Column     Non-Null Count  Dtype  
---  ------     --------------  -----  
 0   SNo        2160 non-null   int64  
 1   Name       2160 non-null   object 
 2   Symbol     2160 non-null   object 
 3   Date       2160 non-null   object 
 4   High       2160 non-null   float64
 5   Low        2160 non-null   float64
 6   Open       2160 non-null   float64
 7   Close      2160 non-null   float64
 8   Volume     2160 non-null   float64
 9   Marketcap  2160 non-null   float64
dtypes: float64(6), int64(1), object(3)
memory usage: 168.9+ KB


In [6]:
symbol_daily_data_df.head()

Unnamed: 0,SNo,Name,Symbol,Date,High,Low,Open,Close,Volume,Marketcap
0,1,Ethereum,ETH,2015-08-08 23:59:59,2.79881,0.714725,2.79376,0.753325,674188.0,45486890.0
1,2,Ethereum,ETH,2015-08-09 23:59:59,0.87981,0.629191,0.706136,0.701897,532170.0,42399570.0
2,3,Ethereum,ETH,2015-08-10 23:59:59,0.729854,0.636546,0.713989,0.708448,405283.0,42818360.0
3,4,Ethereum,ETH,2015-08-11 23:59:59,1.13141,0.663235,0.708087,1.06786,1463100.0,64569290.0
4,5,Ethereum,ETH,2015-08-12 23:59:59,1.28994,0.883608,1.05875,1.21744,2150620.0,73645010.0


In [7]:
symbol_daily_data_df.drop(columns=['SNo', 'Name'], inplace=True)
symbol_daily_data_df.rename(columns={"Symbol": "symbol", "Date":"time", "High":"high", "Low":"low", "Open":"open", "Close":"close", "Volume":"volume", "Marketcap":"marketcap"}, inplace=True)
symbol_daily_data_df["time"] = symbol_daily_data_df["time"].apply(lambda timestamp: datetime_to_unix_ns(timestamp))
symbol_daily_data_df = symbol_daily_data_df.astype({'time': 'datetime64[ns]','open': float, 'high': float, 'low': float, 'close': float, 'volume': float,'marketcap': float})
symbol_daily_data_df.set_index('time', inplace=True)
symbol_daily_data_df.head()

Unnamed: 0_level_0,symbol,high,low,open,close,volume,marketcap
time,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
2015-08-09 02:59:59,ETH,2.79881,0.714725,2.79376,0.753325,674188.0,45486890.0
2015-08-10 02:59:59,ETH,0.87981,0.629191,0.706136,0.701897,532170.0,42399570.0
2015-08-11 02:59:59,ETH,0.729854,0.636546,0.713989,0.708448,405283.0,42818360.0
2015-08-12 02:59:59,ETH,1.13141,0.663235,0.708087,1.06786,1463100.0,64569290.0
2015-08-13 02:59:59,ETH,1.28994,0.883608,1.05875,1.21744,2150620.0,73645010.0


In [8]:
dataframe_to_influx(dataframe=symbol_daily_data_df, url=INFLUXDB_URL, token=INFLUXDB_TOKEN, org=INFLUXDB_ORG, timeout=INFLUXDB_TIMEOUT, batch_size=INFLUXDB_BATCH_SIZE, bucket=INFLUXDB_BUCKET, measurement_name=INFLUXDB_MEASUREMENT, dataframe_tag_columns=['symbol'], write_precision=INFLUXDB_WRITE_PRECISION)

[13:11:54] Data for cryptocurrency time
2015-08-09 02:59:59    ETH
2015-08-10 02:59:59    ETH
2015-08-11 02:59:59    ETH
2015-08-12 02:59:59    ETH
2015-08-13 02:59:59    ETH
                      ... 
2021-07-03 02:59:59    ETH
2021-07-04 02:59:59    ETH
2021-07-05 02:59:59    ETH
2021-07-06 02:59:59    ETH
2021-07-07 02:59:59    ETH
Name: symbol, Length: 2160, dtype: object has been correctly written to InfluxDB
