# Docker Image Check

Created by Michael George 2020-01-31

## Initialisation

Import common libraries and determine the project directory

In [1]:
import os, sys
import json

projDir = os.path.realpath(os.path.join(sys.path[0], '..'))

## Generic Class

Generic class to ensure that all custom classes are printable

In [2]:
class Printable:
    def __repr__(self):
        return str(self.__class__) + ': ' + str(self.__dict__)

    def __str__(self):
        return str(self.__class__) + ': ' + str(self.__dict__)

## Docker Repository Class

Class to process Docker repository, filter tags, etc.

In [3]:
import urllib.request

from datetime import datetime

try:
    if urlCache:
        pass
except:
    urlCache = {}


class Digest(Printable):
    def __init__(self, sha256):
        self.sha256 = sha256
        self.tags = []

    def addTag(self, tag):
        self.tags.append(tag)

        
class Platform(Printable):
    def __init__(self, name):
        self.name = name
        self.digests = {}

    def addDigest(self, sha256):
        self.digests[sha256] = Digest(sha256)

        
class Repository(Printable):
    def __init__(self, name, repository, deployed, desired, checks, days, news, releases):
        self.name, self.repository, self.deployed, self.desired, self.checks, self.days, self.news, self.releases = \
            name, repository, deployed, desired, checks, days, news, releases
        self.platforms = {}

    def addPlatform(self, name):
        self.platforms[name] = Platform(name)

    def fetchTags(self, days = 90):

        # Determine list of tags to find in the registry
        tags = []
        for check in self.checks:
            for tag in check['tags']:
                if tag not in tags:
                    tags.append(tag)
                if check['tags'][tag] not in tags:
                    tags.append(check['tags'][tag])

        # Determine repository name in registry
        if '/' not in self.repository:
            repository = 'library/' + self.repository
        else:
            repository = self.repository

        # URL for the first page of data
        url = "https://registry.hub.docker.com/v2/repositories/{}/tags".format(repository)

        done = False;
        while url and not done and len(tags) > 0:
            objData = None

            if url in urlCache:
                objData = urlCache[url]
            else:
                # Submit the request to the registry
                req = urllib.request.Request(url, headers={'User-Agent': 'Mozilla'})
                with urllib.request.urlopen(req, timeout = 15) as response:
                    jsonTxt = response.read().decode('utf-8')
                    objData = json.loads(jsonTxt)
                    urlCache[url] = objData

            if objData:
                for result in objData['results']:

                    name = result['name']

                    # Record the image
                    for image in result['images']:
                        platform = image['os'] + '/' + image['architecture']
                        sha256 = image['digest']

                        if platform not in self.platforms.keys():
                            self.addPlatform(platform)
                        if sha256 not in self.platforms[platform].digests.keys():
                            self.platforms[platform].addDigest(sha256)
                        self.platforms[platform].digests[sha256].addTag(name)

                    # The page loop will stop once all tags have been found
                    if name in tags:
                        tags.remove(name)

                    # Stop processing after x days
                    d1 = datetime.strptime(result['last_updated'][:10], '%Y-%m-%d')
                    d2 = datetime.now()
                    if ((d2 - d1).days > days):
                        done = True
                        break

                    # TODO - grace period based on date of tags
                      
                # URL for the next page of data
                url = objData['next']


    def getTags(self, tag, os = 'linux', arch = 'amd64'):

        platform = os + '/' + arch
        
        for digest in self.platforms[platform].digests.keys():
            tags = self.platforms[platform].digests[digest].tags
            
            if tag in tags:
                return tags

class Category(Printable):
    def __init__(self, name, category, repositories):
        self.name, self.category, self.repositories = name, category, repositories

## Load Configuration

Load configuration and instantiate as objects

In [4]:
fn = os.path.join(projDir, 'config', 'images.json')
with open(fn) as f:
    jsonTxt = f.read()

def as_repository(d):
    if 'category' in d:
        return Category(d['name'], d['category'], d['repositories'])
    elif 'repository' in d:
        return Repository(d['name'], d['repository'], d['deployed'], d['desired'],
                          d['checks'], d['days'], d['news'], d['releases'])
    else:
        return d

categories = json.loads(jsonTxt, object_hook = as_repository)

## Download Tags

Download list of tags for all repositories

In [5]:
# Time module used for performance counters
import time

# Start time in fractional seconds
pc1 = time.perf_counter()

# Fetch tags for all repositories
for category in categories:
    print(category.name)
    for repository in category.repositories:
        print("Fetching tags for {}...".format(repository.name))
        repository.fetchTags()
    print()
    
# End time in fractional seconds
pc2 = time.perf_counter()

print("All tags downloaded in {:.2f} seconds".format(pc2 - pc1))

Operating Systems
Fetching tags for Alpine Linux...
Fetching tags for Debian...
Fetching tags for Ubuntu...

Programming Languages
Fetching tags for Python...
Fetching tags for PHP-FPM...

Application Frameworks
Fetching tags for NGINX...

Databases
Fetching tags for MariaDB...
Fetching tags for MySQL...

DevOps Tools
Fetching tags for Jenkins...

All tags downloaded in 32.97 seconds


## Check Tags

Check that the tags are as expected

In [6]:
# Check tags for all categories
fn = os.path.join(projDir, "docs", "images.md")

with open(fn, "w") as f:
    for category in categories:
        f.write('## {}'.format(category.name) + os.linesep * 2)

        # Check tags for all repositories
        for repository in category.repositories:

            # Check if the deployed tag is valid
            warning = '**REQUIRES ATTENTION**'
            for check in repository.checks:
                for tag in check['tags']:
                    tags = repository.getTags(tag)
                    if tags and repository.deployed in tags:
                        warning = ''

            # Summarise the repository + deployed tag
            f.write('### {}'.format(repository.name) + os.linesep)
            f.write('#### {}'.format(repository.deployed))
            if repository.deployed != repository.desired:
                f.write(' -> {}'.format(repository.desired))
            f.write(os.linesep * 2)
            if warning:
                f.write('{}'.format(warning) + os.linesep * 2)

            # Link to Docker Hub
            url = 'https://hub.docker.com/'
            if '/' not in repository.repository:
                url += '_/'
            else:
                url += 'r/'
            url += repository.repository
            f.write('- [Docker Hub]({})'.format(url) + os.linesep)

            # Links to other sites
            if hasattr(repository, 'news'):
                f.write('- [News]({})'.format(repository.news) + os.linesep)
            if hasattr(repository, 'releases'):
                f.write('- [Releases]({})'.format(repository.releases) + os.linesep)
            if hasattr(repository, 'upgrades'):
                f.write('- [Upgrades]({})'.format(repository.upgrades) + os.linesep)
            f.write(os.linesep * 2)

            # Check each of the expected tags
            for check in repository.checks:
                f.write('    {}'.format(check['release']) + os.linesep)

                for tag in check['tags']:
                    tags = repository.getTags(tag)

                    # Is the expected tag valid?
                    if tags:
                        if check['tags'][tag] in tags:
                            f.write('      {} = {}'.format(tag, check['tags'][tag]) + os.linesep)
                        else:
                            f.write('      {} != {}'.format(tag, check['tags'][tag]) + os.linesep)
                            f.write('        {}'.format(tags) + os.linesep)
                    else:
                        f.write('      {} not found'.format(tag) + os.linesep)

                f.write(os.linesep)

            f.write(os.linesep * 2)

## All Done!