Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
venv/
.env
.pytest_cache/
__pycache__/
.idea
.coverage
*build/
Expand Down
16 changes: 15 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,23 @@ python:

install:
- pip install -r requirements/dev.req
- pip install -r requirements/requirements.req

services:
- postgresql

addons:
postgresql: "10"
apt:
packages:
- postgresql-10
- postgresql-client-10

before_script:
- psql -c "CREATE USER test WITH PASSWORD 'test' SUPERUSER;" -U postgres

script:
- coverage run -m unittest
- coverage run --source . -m unittest

after_success:
- coveralls
49 changes: 49 additions & 0 deletions database/from_psql.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import psycopg2

# Local import
from .utils import Utils


class FromPSQL(Utils):

def __init__(self, info_dict, table, column):
Utils.__init__(self, table, column)
self.info_dict = info_dict
self.connection = self.connect()
self.cursor = self.connection.cursor()
self.pk = self.get_pk_name()

def connect(self):
connection = psycopg2.connect(
host=self.info_dict.get('HOST', 'localhost'),
database=self.info_dict.get('DATABASE'),
user=self.info_dict.get('USER'),
password=self.info_dict.get('PASSWORD'),
port=self.info_dict.get('PORT', 5432),
)
return connection

def disconnect(self):
if self.connection is not None:
self.connection.close()

def get_pk_name(self):
self.cursor.execute(self.postgres.select_primary_key_name_query())
row = self.cursor.fetchone()
return row[0]

def select_duplicate(self, rows_list=False):
if rows_list:
self.cursor.execute(self.postgres.select_duplicate_query())
return self.cursor.fetchall()

self.cursor.execute(self.postgres.select_duplicate_pk_query(self.pk))
return [row[0] for row in self.cursor.fetchall()]

def select_unique(self, rows_list=False):
if rows_list:
self.cursor.execute(self.postgres.select_unique_query())
return self.cursor.fetchall()

self.cursor.execute(self.postgres.select_unique_pk_query(self.pk))
return [row[0] for row in self.cursor.fetchall()]
62 changes: 58 additions & 4 deletions database/utils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,61 @@

class Utils:

def __init__(self, ):
pass
def __init__(self, table, column):
self.postgres = Postgres(table, column)


# noinspection SqlNoDataSourceInspection,SqlDialectInspection
class Postgres:

def __init__(self, table, column):
self.table = table
self.column = column

def select_primary_key_name_query(self):
return f"SELECT a.attname " \
f"FROM pg_index AS i " \
f"JOIN pg_attribute AS a " \
f"ON a.attrelid = i.indrelid " \
f"AND a.attnum = ANY(i.indkey) " \
f"WHERE i.indrelid = '{self.table}'::regclass " \
f"AND i.indisprimary;"

def select_duplicate_query(self):
return f"SELECT t.* " \
f"FROM {self.table} AS t " \
f"INNER JOIN (" \
f"SELECT {self.column} " \
f"FROM {self.table} " \
f"GROUP BY {self.column} " \
f"HAVING ( COUNT(*) > 1 )" \
f") dt ON t.{self.column}=dt.{self.column}"

def select_duplicate_pk_query(self, pk):
return f"SELECT t.{pk} " \
f"FROM {self.table} AS t " \
f"INNER JOIN (" \
f"SELECT {self.column} " \
f"FROM {self.table} " \
f"GROUP BY {self.column} " \
f"HAVING ( COUNT(*) > 1 )" \
f") dt ON t.{self.column}=dt.{self.column}"

def select_unique_query(self):
return f"SELECT t.* " \
f"FROM {self.table} AS t " \
f"INNER JOIN (" \
f"SELECT {self.column} " \
f"FROM {self.table} " \
f"GROUP BY {self.column} " \
f"HAVING ( COUNT(*) = 1 )" \
f") dt ON t.{self.column}=dt.{self.column}"

pass
def select_unique_pk_query(self, pk):
return f"SELECT t.{pk} " \
f"FROM {self.table} AS t " \
f"INNER JOIN (" \
f"SELECT {self.column} " \
f"FROM {self.table} " \
f"GROUP BY {self.column} " \
f"HAVING ( COUNT(*) = 1 )" \
f") dt ON t.{self.column}=dt.{self.column}"
5 changes: 4 additions & 1 deletion requirements/dev.req
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,7 @@ docopt==0.6.2
idna==2.8
requests==2.22.0
urllib3==1.25.6
# coveralls dependencies end
# coveralls dependencies end
# python dotenv start
python-dotenv==0.10.3
# python dotenv end
Empty file added test/database/__init__.py
Empty file.
9 changes: 9 additions & 0 deletions test/database/fixtures/address.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
id,complete_address,country_id,resident_id
0,1 street test,0,0
1,2 street test,1,1
2,3 street test,0,2
3,4 street test,0,3
4,5 street test,1,4
5,5 street test,0,3
6,6 street test,0,4
7,3 street test,0,0
3 changes: 3 additions & 0 deletions test/database/fixtures/country.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
id,country,code
0,France,FR
1,United Kingdom,GB
6 changes: 6 additions & 0 deletions test/database/fixtures/resident.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
id,first_name
0,Clement
1,Antonin
2,Guillaume
3,Victor
4,Jordan
87 changes: 87 additions & 0 deletions test/database/fixtures/test_db.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
import psycopg2

from pathlib import Path


# noinspection SqlDialectInspection,SqlNoDataSourceInspection
class CreateDBs:

def __init__(self, info_dict, db_type):
self.info_dict = info_dict
self.current_dir = Path(__file__).parent.resolve()
if db_type == 'psql':
self.psql_drop_and_create_database()
self.psql_create_and_populate_tables()

def psql_drop_and_create_database(self):
connection = psycopg2.connect(
host=self.info_dict.get('HOST', 'localhost'),
database=self.info_dict.get('DEFAULT_DB'),
user=self.info_dict.get('USER'),
password=self.info_dict.get('PASSWORD'),
port=self.info_dict.get('PORT', 5432),
)
connection.autocommit = True
cursor = connection.cursor()

qs = f"DROP DATABASE IF EXISTS {self.info_dict.get('DATABASE', 'pyd')}"
cursor.execute(qs)

qs = f"CREATE DATABASE {self.info_dict.get('DATABASE', 'pyd')} " \
f"WITH OWNER {self.info_dict.get('USER')}"
cursor.execute(qs)

connection.close()

def psql_create_and_populate_tables(self):
connection = psycopg2.connect(
host=self.info_dict.get('HOST', 'localhost'),
database=self.info_dict.get('DATABASE'),
user=self.info_dict.get('USER'),
password=self.info_dict.get('PASSWORD'),
port=self.info_dict.get('PORT', 5432),
)
connection.autocommit = True
cursor = connection.cursor()

# Create and populate "country"
qs_create_country = \
f"CREATE TABLE country (" \
f" id serial PRIMARY KEY," \
f" country VARCHAR (50)," \
f" code VARCHAR (4)" \
f")"
cursor.execute(qs_create_country)

country_path = self.current_dir.joinpath('country.csv')
with open(country_path, 'r') as country:
next(country) # Skip the header row.
cursor.copy_from(country, 'country', sep=',')

# Create and populate "resident"
qs_create_resident = \
f"CREATE TABLE resident (" \
f" id serial PRIMARY KEY," \
f" first_name VARCHAR (50)" \
f")"
cursor.execute(qs_create_resident)

resident_path = self.current_dir.joinpath('resident.csv')
with open(resident_path, 'r') as resident:
next(resident) # Skip the header row.
cursor.copy_from(resident, 'resident', sep=',')

# Create and populate "address"
qs_create_address = \
f"CREATE TABLE address (" \
f" id serial PRIMARY KEY," \
f" complete_address VARCHAR (255) NOT NULL," \
f" country_id SMALLINT REFERENCES country(id)," \
f" resident_id SMALLINT REFERENCES resident(id)" \
f")"
cursor.execute(qs_create_address)

address_path = self.current_dir.joinpath('address.csv')
with open(address_path, 'r') as address:
next(address) # Skip the header row.
cursor.copy_from(address, 'address', sep=',')
86 changes: 86 additions & 0 deletions test/database/test_from_psql.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
import unittest
import os

from copy import deepcopy
from dotenv import load_dotenv
from psycopg2 import DatabaseError

# Local import
from database.from_psql import FromPSQL
from test.database.fixtures.test_db import CreateDBs

load_dotenv()


class TestFromPSQL(unittest.TestCase):

@classmethod
def setUpClass(cls) -> None:
cls.psql_info_dict = {
'HOST': os.getenv("POSTGRES_HOSTNAME"),
'DATABASE': os.getenv("POSTGRES_DATABASE"),
'USER': os.getenv("POSTGRES_USER"),
'PASSWORD': os.getenv("POSTGRES_PASSWORD"),
'PORT': os.getenv("POSTGRES_PORT"),
'DEFAULT_DB': 'postgres',
}
CreateDBs(cls.psql_info_dict, 'psql')

def setUp(self) -> None:
self.info_dict = deepcopy(self.psql_info_dict)
self.table = 'address'
self.column = 'complete_address'

def test_connect_and_disconnect_success(self):
from_psql = FromPSQL(self.info_dict, self.table, self.column)
self.assertFalse(from_psql.connection.closed)
self.assertFalse(from_psql.cursor.closed)

from_psql.disconnect()
self.assertTrue(from_psql.connection.closed)
self.assertTrue(from_psql.cursor.closed)

def test_connect_error(self):
self.info_dict['DATABASE'] = ''
with self.assertRaises(DatabaseError):
FromPSQL(self.info_dict, self.table, self.column)

def test_select_duplicate(self):
from_psql = FromPSQL(self.info_dict, self.table, self.column)

result = from_psql.select_duplicate(rows_list=True)
expected_result = [
(2, '3 street test', 0, 2),
(4, '5 street test', 1, 4),
(5, '5 street test', 0, 3),
(7, '3 street test', 0, 0)
]
self.assertEqual(len(result), 4)
self.assertEqual(result, expected_result)

result = from_psql.select_duplicate()
expected_result = [2, 4, 5, 7]
self.assertEqual(len(result), 4)
self.assertEqual(result, expected_result)

from_psql.disconnect()

def test_select_unique(self):
from_psql = FromPSQL(self.info_dict, self.table, self.column)

result = from_psql.select_unique(rows_list=True)
expected_result = [
(0, '1 street test', 0, 0),
(1, '2 street test', 1, 1),
(3, '4 street test', 0, 3),
(6, '6 street test', 0, 4)
]
self.assertEqual(len(result), 4)
self.assertEqual(result, expected_result)

result = from_psql.select_unique()
expected_result = [0, 1, 3, 6]
self.assertEqual(len(result), 4)
self.assertEqual(result, expected_result)

from_psql.disconnect()