# Data aquisition
Download data from https://coinmarketcap.com/ and store it into a CSV.

In [1]:
# import needed modules
# standard modules
import os
import sys
import asyncio
import datetime
import re
import json
import codecs
import io
import concurrent.futures
import csv
from pprint import pprint

# pypy modules
import requests
import lxml.html

## Constants

In [2]:
# main url of coinmarketcap
COINMARKETCAP_URL = "https://coinmarketcap.com"
# url to download the currencies (coins/tokens)
CURRENCY_URL = COINMARKETCAP_URL + "/{}/views/all"
# url to get historical data per coin
SLUG_URL = COINMARKETCAP_URL + "/currencies/{}/historical-data/?start={}&end={}"

# directory of this projects root, jupyter must be started accordingly
ROOT_DIR = os.path.abspath(os.path.join(os.getcwd(), ".."))
# directory for the cache
CACHE_DIR = os.path.join(ROOT_DIR, "cache")
# resulting csv file holding **all** data
DATA_CSV = os.path.join(ROOT_DIR, "coinmarketcap.csv")

## Functions from third-party modules

### Parse the coin/token list returned as HTML code
Source: https://github.com/prouast/coinmarketcap-scraper

In [3]:
def parseCoinTokenList(html, type):
    """Parse the information returned by requestList for view 'all'."""
    data = []
    docRoot = lxml.html.fromstring(html)
    rows = docRoot.cssselect(
        "table#{0}-all > tbody > tr".format(type))

    for row in rows:
        datum = {}
        fields = row.cssselect("td")

        # Name and slug
        nameField = fields[1].cssselect("a")[0]
        datum['name'] = nameField.text_content().strip()
        datum['slug'] = nameField.attrib['href'].replace(
            '/currencies/', '').replace('/', '').strip()

        # Symbol
        datum['symbol'] = fields[2].text_content().strip()

        # Explorer link
        supplyFieldPossible = fields[5].cssselect("a")
        if len(supplyFieldPossible) > 0:
            datum['explorer_link'] = supplyFieldPossible[0].attrib['href']
        else:
            datum['explorer_link'] = ''
        data.append(datum)

    return data

### Parse the historical data
Source: https://github.com/jhogan4288/coinmarketcap-history

In [4]:
def parseHistoricalData(html):
    """
    Extract the price history from the HTML.
    
    The CoinMarketCap historical data page has just one HTML table. 
    This table contains the data we want.
    It's got one header row with the column names.
    
    We need to derive the "average" price for the provided data.
    """
    
    head = re.search(r'<thead>(.*)</thead>', html, re.DOTALL).group(1)
    header = re.findall(r'<th .*>([\w ]+)</th>', head)
    
    body = re.search(r'<tbody>(.*)</tbody>', html, re.DOTALL).group(1)
    raw_rows = re.findall(r'<tr[^>]*>' +
                          r'\s*<td[^>]*>([^<]+)</td>'*7 +
                          r'\s*</tr>', body)
    
    # strip commas
    rows = []
    for row in raw_rows:
        row = [ re.sub(",", "", field) for field in row ]
        row = [ re.sub("-", "0", field) for field in row ]
        # convert date
        row[0]= datetime.datetime.strptime(row[0], "%b %d %Y").strftime("%Y%m%d")
        rows.append(row)
    
    return header, rows

## Helper functions

In [5]:
# convert between datetime object and string representation "YYYYMMDD"
string2datetime = lambda s: datetime.datetime.strptime(s, "%Y%m%d")
datetime2string = lambda dt: dt.strftime("%Y%m%d")

In [6]:
# create directory if it does not exist
def mkdir(path):
    if not os.path.exists(path):
        os.makedirs(path)

Next, a cache is introduced. Data downloaded from *coinmarketcap.com* are stored in this cache.
With the cache it is not needed to download every time all historical data.

In [7]:
# load cached data
def loadCache(path):
    path = os.path.abspath(path)
    try:
        with codecs.open(path, "r", encoding="UTF8") as fp:
            return fp.read()
    except OSError:
        pass
    return ""

In [8]:
# save cached data
def saveCache(path, content):
    path = os.path.abspath(path)
    mkdir(os.path.dirname(path))
    with codecs.open(path, "w", encoding="UTF8") as fp:
        fp.write(content)

Provide a `main` method for asyncio. This function downloads the *urls* parallel and stores the *responses* for further processing.

In [9]:
async def main(urls, responses):
    with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:
    
        loop = asyncio.get_event_loop()
        futures = [
            loop.run_in_executor(
                None, 
                requests.get, 
                url,
            )
            for url in urls
        ]
        for response in await asyncio.gather(*futures):
            responses.append(response)

## Download function for coins/tokens

In [10]:
# use the cache, encode currency data with json
def decodeJson(rawData):
    try:
        return json.loads(rawData)
    except json.decoder.JSONDecodeError:
        pass
    return []
def encodeJson(pythonDict):
    return json.dumps(pythonDict, indent=4)

# download coins and tokens from the cache
def getCoinsAndTokens(forceUpdate=False):
    # forceUpdate: do not use the cache
    # cache path for coins
    cacheCoins = os.path.join(CACHE_DIR, "coins.json")
    # cache path for tokens
    cacheTokens = os.path.join(CACHE_DIR, "tokens.json")
    coins, tokens = [], []
    if not forceUpdate:
        # load coins and tokens from the cache
        coins = decodeJson(loadCache(cacheCoins))
        tokens = decodeJson(loadCache(cacheTokens))

    # early return, coins/tokens loaded from the cache
    if coins and tokens:
        print("Cached: Coins: {}, Tokens: {}".format(len(coins), len(tokens)))
        return coins, tokens

    # load coins/tokens from the web
    # initalize asyncio
    loop = asyncio.get_event_loop()

    # get urls to be downloaded
    urls = [CURRENCY_URL.format(type) for type in ["coins", "tokens"]]
    responses = []
    # download urls in parallel
    loop.run_until_complete(main(urls, responses))

    # parse the responses
    coins = parseCoinTokenList(responses[0].content, "currencies")
    tokens = parseCoinTokenList(responses[1].content, "assets")

    # update cache
    saveCache(cacheCoins, encodeJson(coins))
    saveCache(cacheTokens, encodeJson(tokens))
    print("Coins: {}, Tokens: {}".format(len(coins), len(tokens)))
    return coins, tokens

## Download function for historical data

In [11]:
# construct/generate the currency url based on the slug
# start/end may be provided, otherwise, the whole history is downloaded
def genCurrencySlugUrl(slug, start=None, end=None):
    start = start or string2datetime("20100101")
    end = end or datetime.datetime.utcnow() + datetime.timedelta(days=1)
    return SLUG_URL.format(slug, datetime2string(start), datetime2string(end))

# get the cache path for a given slug
def getSlugCache(slug):
    return os.path.join(CACHE_DIR, "{}.csv".format(slug))

# encode historical data with csv
def encodeCsv(data):
    fp = io.StringIO()
    writer = csv.writer(fp)
    writer.writerows(data)
    return fp.getvalue()

def decodeCsv(raw):
    reader = csv.reader(raw.splitlines())
    return list(reader)

# only keep the date part of the datetime object
striptime = lambda dt: datetime.datetime.combine(dt.date(), datetime.time())

# parse response for a slug and save the data to the cache
def parseResponseSaveCache(slug, response):
    # parse historical data
    _, rawData = parseHistoricalData(response.content.decode("UTF8"))
    # get the cache file
    path = getSlugCache(slug)
    # load the cache
    rows = decodeCsv(loadCache(path))
    # append new date
    rows.extend(rawData)
    # sort by date
    rows = sorted(rows, key=lambda r: int(r[0]))
    # update the cache
    saveCache(path, encodeCsv(rows))

# download **all** historical data of **all** slugs
# use a cache to make it faster on successive runs
# the function returns the number of updated histories
def getHistories(slugs):
    # build requests
    requests = []
    # keep track which request belongs to which slug
    slugRequestMap = {}
    # current utc time, historical data are update on UTC 00:00:00
    utcnow = striptime(datetime.datetime.utcnow())

    # for all slugs, prepare the url
    for slug in slugs:
        path = getSlugCache(slug)
        dtCache = None
        if os.path.exists(path):
            # get the timestamp of the cached file of the slug
            st = os.stat(path)
            dtCache = datetime.datetime.utcfromtimestamp(st.st_mtime)
            dtCache = striptime(dtCache)
        # load the cached file
        rows = decodeCsv(loadCache(path))
        
        # find the date of the next entry
        start = None
        if rows:
            # get latest date
            start = string2datetime(rows[-1][0])
            # add one day
            start += datetime.timedelta(days=1)

        if start:
            # if start lies in the future, skip
            if start >= utcnow:
                continue
            # if the cache date current, skip
            if dtCache and dtCache >= utcnow:
                continue

        # build the url for the slug
        url = genCurrencySlugUrl(slug, start)
        # append to requests
        requests.append(url)
        # add to inverse mapping
        slugRequestMap[url] = slug

    # nothing to download, return
    if not slugRequestMap:
        return 0

    # prepare asyncio
    loop = asyncio.get_event_loop()
    
    responses = []
    while requests:
        print("\rRequest to process: {}{}".format(len(requests), " "*20), flush=True, end="")
        # download all requests
        loop.run_until_complete(main(requests, responses))
       
        # check responses, try again if it failed
        requests = []
        for r in responses:
            # remove responses
            responses.remove(r)
            if r.ok:
                parseResponseSaveCache(slugRequestMap[r.url], r)
            else:
                # print("Failed: {}".format(url))
                pass
        responses = []
    print("")  # add newline feed

## Function to build the final CSV file holding all currency data
This function reads all cached coin/token data and merges it into a single *csv* file.

In [12]:
# merge all cached csv into a single csv
def buildAllCurrenciesCsv(allCurrencies):
    # count rows
    rowCnt = 0
    with codecs.open(DATA_CSV, "w", encoding="UTF8") as fp:
        writer = csv.writer(fp)
        writer.writerow([
                "date",
                "slug",
                "name",
                "open",
                "high",
                "low",
                "close",
                "volume",
                "marketcap"])
        # for each currency append to the data file
        # and insert *slug* and *name* as column
        for currency in allCurrencies:
            slug = currency["slug"]
            name = currency["name"]
            path = getSlugCache(slug)
            rows = decodeCsv(loadCache(path))
            print("\r{}/{}{}".format(slug, len(rows), " "*20), end="", flush=True)
            for row in rows:
                writer.writerow([row[0]] + [slug, name] + row[1:])
                rowCnt += 1
    print("\rCurrencies: {}, rows: {}".format(len(allCurrencies), rowCnt))

In [13]:
print("CACHE: {}".format(CACHE_DIR))
print("DATA:  {}".format(DATA_CSV))
# download coins and tokens
coins, tokens = getCoinsAndTokens(forceUpdate=True)
allCurrencies = coins + tokens

# get the slug name from the dicts
slugs = [x["slug"] for x in allCurrencies]
# download historical data
getHistories(slugs)

# always build CSV
buildAllCurrenciesCsv(allCurrencies)

CACHE: /home/dahuebi/PML/cas-pml-prj/cache
DATA:  /home/dahuebi/PML/cas-pml-prj/coinmarketcap.csv
Coins: 917, Tokens: 677
Request to process: 1594                    
Currencies: 1594, rows: 750954                         
