Skip to content

Commit

Permalink
Issue #8: Add tests for PackageCache.
Browse files Browse the repository at this point in the history
  • Loading branch information
gh640 committed Jun 29, 2018
1 parent 728cb07 commit 3312737
Show file tree
Hide file tree
Showing 2 changed files with 148 additions and 18 deletions.
40 changes: 22 additions & 18 deletions PypiPackageInfo.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,14 +243,31 @@ def get_package_data(self, name):
cur = self.conn.cursor()
cur.execute('SELECT * FROM packages WHERE name=?', (name, ))
row = cur.fetchone()
cur.execute(
'UPDATE packages SET updated_at=? WHERE name=?', (get_now(), name)
)
if row:
cur.execute(
'UPDATE packages SET updated_at=? WHERE name=?',
(get_now(), name),
)
self.conn.commit()
cur.close()
data = row['data']
return json.loads(data)

return False

def add_package_data(self, name, data):
row = (name, json.dumps(data), get_now())
cur = self.conn.cursor()
cur.execute('INSERT INTO packages VALUES (?, ?, ?)', row)
self.conn.commit()
cur.close()
self.clear_old_cache()

def clear_old_cache(self):
cache_max_count = self._get_cache_max_count()
if cache_max_count > 0:
cur.execute('SELECT count(*) FROM packages')
cur = self.conn.cursor()
cur.execute('SELECT COUNT(*) FROM packages')
count = cur.fetchone()[0]
if count > cache_max_count:
cur.execute(
Expand All @@ -263,20 +280,7 @@ def get_package_data(self, name):
(cache_max_count, )
)
self.conn.commit()
cur.close()

if row:
data = row['data']
return json.loads(data)

return False

def add_package_data(self, name, data):
row = (name, json.dumps(data), get_now())
cur = self.conn.cursor()
cur.execute('INSERT INTO packages VALUES (?, ?, ?)', row)
self.conn.commit()
cur.close()
cur.close()

def clear_all_cache(self):
if self.conn:
Expand Down
126 changes: 126 additions & 0 deletions tests/test_PypiPackageInfo.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,11 @@
See https://github.com/SublimeText/UnitTesting .
'''

import json
import os
import sqlite3
import sys
import tempfile
import unittest
from unittest import mock

Expand Down Expand Up @@ -50,3 +54,125 @@ def test_get_data_response_ok(self, PackageCache, get):
self.assertIsInstance(data, dict)
self.assertIn('name', data)
self.assertNotIn('summary', data)


@mock.patch.object(PypiPackageInfo, 'CacheManager')
class TestPackageCache(unittest.TestCase):
def setUp(self):
self.temp = tempfile.NamedTemporaryFile()
self.db_path = self.temp.name

def test_init(self, CacheManager):
self.use_temp_cache_manager(CacheManager, self.db_path)

PypiPackageInfo.PackageCache()

row = self.get_db_table_count('packages')
self.assertEquals(row[0], 1)

def test_get_package_data_miss(self, CacheManager):
self.use_temp_cache_manager(CacheManager, self.db_path)

pc = PypiPackageInfo.PackageCache()
data = pc.get_package_data('sample')

self.assertFalse(data)

def test_get_package_data_hit(self, CacheManager):
self.use_temp_cache_manager(CacheManager, self.db_path)

pc = PypiPackageInfo.PackageCache()
with mock.patch.object(pc, 'conn') as conn:
cur = conn.cursor.return_value
cur.fetchone.return_value = {'data': json.dumps({'key': 'value'})}
data = pc.get_package_data('sample')

self.assertIsInstance(data, dict)
self.assertEquals(data['key'], 'value')

def test_add_package_data_data_added(self, CacheManager):
self.use_temp_cache_manager(CacheManager, self.db_path)

pc = PypiPackageInfo.PackageCache()
data = pc.get_package_data('AcmePackage')

self.assertFalse(data)
self.assertEquals(self.get_package_count(), 0)

pc.add_package_data('AcmePackage', {
'name': 'AcmePackage',
'summary': 'This is an awesome package.',
})

self.assertEquals(self.get_package_count(), 1)

data = pc.get_package_data('AcmePackage')

self.assertIsInstance(data, dict)
self.assertEquals(data['name'], 'AcmePackage')
self.assertEquals(data['summary'], 'This is an awesome package.')

def test_add_package_data_count_limited(self, CacheManager):
self.use_temp_cache_manager(CacheManager, self.db_path)

pc = PypiPackageInfo.PackageCache()

with mock.patch.object(pc, '_get_cache_max_count') as _get_cache_max_count:
_get_cache_max_count.return_value = 30
for i in range(100):
pc.add_package_data('AcmePackage {}'.format(i), {})

self.assertEquals(self.get_package_count(), 30)

def test_clear_old_cache(self, CacheManager):
self.use_temp_cache_manager(CacheManager, self.db_path)

pc = PypiPackageInfo.PackageCache()

self.assertEquals(self.get_package_count(), 0)

for i in range(100):
pc.add_package_data('AcmePackage {}'.format(i), {})

self.assertEquals(self.get_package_count(), 100)

with mock.patch.object(pc, '_get_cache_max_count') as _get_cache_max_count:
_get_cache_max_count.return_value = 10
pc.clear_old_cache()

self.assertEquals(self.get_package_count(), 10)

def test_clear_all_cache(self, CacheManager):
temp = tempfile.NamedTemporaryFile(delete=False)
db_path = temp.name

self.use_temp_cache_manager(CacheManager, db_path)

pc = PypiPackageInfo.PackageCache()
pc.clear_all_cache()

self.assertFalse(os.path.exists(db_path))

def use_temp_cache_manager(self, CacheManager, path):
cache_manager = CacheManager.return_value
cache_manager.get_path.return_value = path

def get_db_table_count(self, table_name):
conn = sqlite3.connect(self.db_path)
cur = conn.cursor()
cur.execute(
'SELECT COUNT(name) FROM sqlite_master '
"WHERE type='table' AND name=?",
(table_name, ),
)
row = cur.fetchone()
conn.close()
return row

def get_package_count(self):
conn = sqlite3.connect(self.db_path)
cur = conn.cursor()
cur.execute('SELECT COUNT(*) FROM packages')
row = cur.fetchone()
conn.close()
return row[0]

0 comments on commit 3312737

Please sign in to comment.