# Audition tasks for Data Engineer
<br>

### Import used libraries

In [1]:
import csv
import itertools
from collections import defaultdict
from datetime import datetime
import dateutil.parser

### Task 1
Load received files: test.csv, test_level.csv, class.csv
### Task 2
Check if files are correct according to data in columns etc. If not please delete these rows.
Will be in the plus if you do this as a simple script - not manually.

In [2]:
class Validator(object):
    @staticmethod
    def int_validator(x, name, show_incorrect, not_null=True):
        try:
            if not_null or x[name] != '':
                x[name] = int(x[name]) 
        except ValueError:
            if show_incorrect:
                print('Test field "{}" has incorrect format {} (expected int)'.format(name,x[name]))
            return False
        return True
    
    @staticmethod
    def float_validator(x, name, show_incorrect, not_null=True):
        try:
            if not_null or x[name] != '':
                x[name] = float(x[name]) 
        except ValueError:
            if show_incorrect:
                print('Test field "{}" has incorrect format {} (expected float)'.format(name,x[name]))
            return False
        return True
    
    @staticmethod
    def date_validator(x, name, date_format, show_incorrect, not_null=True):
        try:
            if not_null or x[name] != '':
                x[name] = datetime.strptime(x[name], date_format) 
        except ValueError:
            if show_incorrect:
                print('Test field "{}" has incorrect format {} (expected date format: {})'.format(name,x[name], date_format))
            return False
        return True
    
    @staticmethod
    def isodate_validator(x, name, show_incorrect, not_null=True):
        try:
            if not_null or x[name] != '':
                x[name] = dateutil.parser.parse(x[name])
        except ValueError:
            if show_incorrect:
                print('Test field "{}" has incorrect format {} (expected isodate format)'.format(name,x[name], date_format))
            return False
        return True
    
    @staticmethod
    def validate_test(row, show_incorrect=False):
        return ( Validator.int_validator(row, 'id', show_incorrect) and
            Validator.int_validator(row, 'student_id', show_incorrect) and
            Validator.int_validator(row, 'class_id', show_incorrect) and
            Validator.date_validator(row, 'created_at', '%d.%m.%y %H:%M', show_incorrect) and
            Validator.date_validator(row, 'updated_at', '%d.%m.%y %H:%M', show_incorrect, not_null=False) and
            Validator.isodate_validator(row, 'last_event_time', show_incorrect, not_null=False) and
            Validator.float_validator(row, 'overall_score', show_incorrect, not_null=False) and
            Validator.date_validator(row, 'authorized_at', '%d.%m.%y %H:%M', show_incorrect,not_null=False) and
            Validator.float_validator(row, 'speaking_score', show_incorrect, not_null=False) and
            Validator.float_validator(row, 'writing_score', show_incorrect, not_null=False) and
            Validator.float_validator(row, 'reading_score', show_incorrect, not_null=False) and
            Validator.float_validator(row, 'listening_score', show_incorrect, not_null=False) and
            Validator.int_validator(row, 'test_level_id', show_incorrect))
    
    @staticmethod
    def validate_class(row, show_incorrect=False):
        return ( Validator.int_validator(row, 'id', show_incorrect) and
            Validator.int_validator(row, 'institution_id', show_incorrect, not_null=False) and
            Validator.date_validator(row, 'created_at', '%d.%m.%y %H:%M', show_incorrect) and
            Validator.date_validator(row, 'updated_at', '%d.%m.%y %H:%M', show_incorrect, not_null=False))
    
    @staticmethod
    def validate_test_level(row, show_incorrect=False):
        return ( Validator.int_validator(row, 'id', show_incorrect) and
            Validator.date_validator(row, 'created_at', '%d.%m.%y %H:%M', show_incorrect) and
            Validator.date_validator(row, 'updated_at', '%d.%m.%y %H:%M', show_incorrect, not_null=False))
    




In [3]:
def load_data(file_path, f_validate, key_name='id', show_incorrect=False):
    with open(file_path) as csvfile:
        reader = csv.DictReader(csvfile, delimiter=';')
        return {row[key_name]: row for row in reader if f_validate(row, show_incorrect)}
    
tests = load_data('input_data/test.csv', Validator.validate_test)
classes = load_data('input_data/class.csv', Validator.validate_class)
test_levels = load_data('input_data/test_level.csv', Validator.validate_test_level)

### Task 3
Create first final dataset which will contain information about frequency of tests utilization by
classes.


In [4]:
def get_test_utilization(tests, classes, test_levels):
    """Return test utilization dataset

    Keyword arguments:
    tests -- tests dataset
    classes -- classes dataset
    test_levels -- test_levels dataset
    show_all_classes -- if true the output dataset will contain all classes, even those that have
        no relation with authorized testor or any test in general.
    """
    res = {}
    class_counter = {}
    for test_id,test in tests.items():
        test_class = classes[test['class_id']]
        test_level = test_levels[test['test_level_id']]

        if test['authorized_at'] != '':
            #enumerate each solved test
            if test_class['id'] not in class_counter:
                class_counter[test_class['id']] = 0
            class_counter[test_class['id']] += 1

            #add test to output dataset
            res[test_id] = {
                'class_id': test_class['id'], 
                'class_name': test_class['name'],
                'teaching_hours': test_class['teaching_hours'],
                'test_id': test_id,
                'test_level': test_level['name'],
                'test_created_at': test['created_at'],
                'test_authorized_at': test['authorized_at'],
                'class_test_number': class_counter[test_class['id']]
            } 
    return res

tests_ut = get_test_utilization(tests, classes, test_levels)

### Task 4
Create a second final dataset which will contain information about average overall scores for
tests in classes.

In [5]:
def dict_filter(org_dic, *list_el):
    """Return dictonary like org_dic with removed elements from list_el"""
    dic = dict(org_dic)
    for x in list_el:
        dic.pop(x)
    return dic
        

def get_test_average(tests_ut, tests, classes, show_all_classes = False):
    """Return test average overall scores dataset

    Keyword arguments:
    tests_ut -- test utilization dataset
    tests -- tests dataset
    classes -- classes dataset
    show_all_classes -- if true the output dataset will contain all classes, even those that have
        no relation with authorized test or any test in general.
    """
    res = {}
    class_counter = {}

    for test_id, test_ut in tests_ut.items():
        test_status = tests[test_id]['test_status']
        test_score = tests[test_id]['overall_score']
        new_class = dict_filter(test_ut, 'test_level', 'test_id', 'class_test_number')

        #add new class to output dataset
        if test_ut['class_id'] not in res:
            res[test_ut['class_id']] = new_class

        #if test has a score and its status is 'SCORING_SCORED' use it for computing the average score
        if test_status == 'SCORING_SCORED' and test_score!='':
            if 'avg_class_test_overall_score' in res[test_ut['class_id']]:
                res[test_ut['class_id']]['avg_class_test_overall_score'] += float(test_score)
                class_counter[test_ut['class_id']] += 1
            else:
                res[test_ut['class_id']]['avg_class_test_overall_score'] = float(test_score)
                class_counter[test_ut['class_id']] = 1

    #compute average score from scores sum and tests number
    for class_id, class_res in res.items():
            if 'avg_class_test_overall_score' in class_res:
                class_res['avg_class_test_overall_score'] /= class_counter[class_id]
                
    #add every class from classes to ouput dataset (with empty fields)
    if show_all_classes:
        for class_id, cl in classes.items():
            if class_id not in res:
                res[class_id] = {
                    'class_id': class_id,
                    'class_name': cl['name'],
                    'teaching_hours': cl['teaching_hours']
                }
    return res

tests_av = get_test_average(tests_ut, tests, classes)

### Task 5
Please save and store these two datasets produced in the 2 earlier tasks in the catalogue from which you retrieved data for calculation.

In [6]:
def save_dataset(data, filename, fieldnames):
    with open(filename, 'w') as res_file:
        writer = csv.DictWriter(res_file, fieldnames=fieldnames, delimiter=';')
        writer.writeheader()
        for x in sorted(data, key = lambda x: int(x)):
            writer.writerow(data[x])

fieldnames_ut = [ 'class_id', 'class_name', 'teaching_hours', 'test_id',
                'test_created_at', 'test_authorized_at', 'test_level', 'class_test_number']

fieldnames_av = ['class_id', 'class_name', 'teaching_hours', 'test_created_at',
                        'test_authorized_at', 'avg_class_test_overall_score']

save_dataset(tests_ut, 'results/test_utilization.csv', fieldnames_ut)  
save_dataset(tests_av, 'results/test_average_scores.csv', fieldnames_av)  

### Task 6
Please prepare a script which allow you to load the earlier prepared datasets to
tables in a database.

#### Queries:

In [7]:
# Creates table for classes
CREATE_CLASS_TABLE = (
    'CREATE TABLE CLASS ('
        'id INTEGER PRIMARY KEY, '
        'institution_id INTEGER NOT NULL, '
        'owner_id VARCHAR(30), '
        'name VARCHAR(50),'
        'created_at DATE NOT NULL, '
        'updated_at DATE, '
        'teaching_hours VARCHAR(10), '
        'latest_test_time VARCHAR(30),'
        'has_student_with_scored_test VARCHAR(10));')

# Creates table for tests' levels
CREATE_TEST_LEVEL_TABLE = (
    'CREATE TABLE TEST_LEVEL ('
        'id INTEGER PRIMARY KEY, '
        'name VARCHAR(10), ' 
        'displayName VARCHAR(10), '
        'created_at DATE NOT NULL, '
        'updated_at DATE); ')

# Creates table for tests
CREATE_TEST_TABLE = (
    'CREATE TABLE TEST ( '
        'id INTEGER PRIMARY KEY, '
        'student_id INTEGER NOT NULL, '
        'class_id INTEGER NOT NULL, '
        'created_at DATE NOT NULL, '
        'updated_at DATE, '
        'last_event_time DATE, '
        'overall_score FLOAT, '
        'test_status VARCHAR(30), '
        'institution_id VARCHAR(10), '
        'authorized_at DATE, '
        'confidence_level, '
        'speaking_score FLOAT, '
        'writing_score FLOAT, '
        'reading_score FLOAT, '
        'listening_score FLOAT, '
        'test_level_id INTEGER NOT NULL, '
        'licence_id INTEGER, '
        'FOREIGN KEY(class_id) REFERENCES CLASS(id), '
        'FOREIGN KEY(test_level_id) REFERENCES TEST_LEVEL(id)); ')

# Creates test_utilization table in database
TASK_3 = (
    'CREATE TABLE TEST_UT AS '
    'SELECT '
    't.class_id AS class_id, '
    'c.name AS class_name, '
    'c.teaching_hours, '
    't.id AS test_id, '
    't.created_at AS test_created_at, '
    't.authorized_at AS test_authorized_at, '
    'tl.name AS test_level, '
    '(SELECT COUNT(*) FROM TEST t2 WHERE t2.authorized_at != "" AND t2.id <= t.id AND t2.class_id = t.class_id) AS class_test_number '
    'FROM TEST t '
    'JOIN CLASS c ON t.class_id = c.id '
    'JOIN TEST_LEVEL tl ON t.test_level_id = tl.id '
    'WHERE t.authorized_at != ""'
    'ORDER BY t.id;')

# Creates test_average_scores table in database
TASK_4 = (
    'CREATE TABLE TEST_AVG AS '
    'SELECT '
    't.class_id, '
    'c.name AS class_name, '
    'c.teaching_hours, '
    't.created_at AS test_created_at, '
    't.authorized_at AS test_authorized_at, '
    'AVG(t.overall_score) as avg_class_test_overall_score '
    'FROM TEST t '
    'JOIN CLASS c ON t.class_id = c.id '
    'WHERE t.authorized_at != "" AND t.test_status = "SCORING_SCORED" '
    'GROUP BY t.class_id;')
    

#### Tasks solved with sqlite3 in Python:

In [None]:
import sqlite3 as sql


class EducationDataset:
    def __init__(self, db_name, test_path, class_path, test_level_path):
        self.db = sql.connect(db_name)
        self.cursor = self.db.cursor()
        # create tables
        self.cursor.execute(CREATE_CLASS_TABLE)
        self.cursor.execute(CREATE_TEST_LEVEL_TABLE)
        self.cursor.execute(CREATE_TEST_TABLE)
        # insert values into tables
        self.load_file(class_path, 'CLASS', 9, self.convert_dates)
        self.load_file(test_level_path, 'TEST_LEVEL', 5, self.convert_dates)
        self.load_file(test_path, 'TEST', 17, self.convert_dates, self.convert_test)
        self.db.commit()
    
    def convert_dates(self, row):
        if row['updated_at'] != '':
            row['updated_at'] = datetime.strftime(datetime.strptime(row['updated_at'], '%d.%m.%y %H:%M'), '%Y-%m-%d %H:%M:%S')
        if row['created_at'] != '':
            row['created_at'] = datetime.strftime(datetime.strptime(row['created_at'], '%d.%m.%y %H:%M'), '%Y-%m-%d %H:%M:%S')

    def convert_test(self, row):
        if row['authorized_at'] != '':
            row['authorized_at'] = datetime.strftime(datetime.strptime(row['authorized_at'], '%d.%m.%y %H:%M'), '%Y-%m-%d %H:%M:%S')

    def load_file(self, path, table_name, columns_no, *f_convert):
        """Load file to table table_name in database

        Keyword arguments:
        path -- path f file which contains data to load 
        table_name -- name of table
        columns_no -- number of columns in the file
        *f_convert -- functions that convert the row so that it can be inserted into database
        """

        with open(path, 'r') as csvfile:
            reader = csv.DictReader(csvfile, delimiter=';')
            row_pat = ''.join(['?, ' for x in range(columns_no)])[:-2]
            for row in reader:
                for f in f_convert:
                    f(row)
                self.cursor.execute('INSERT OR IGNORE INTO ' + 
                                    table_name + ' VALUES (' + 
                                    row_pat + ')', list(row.values()))

    def do_task(self, task, columns_no):
        output_format = ''.join(['{} ' for x in range(columns_no)])
        res = self.cursor.execute(task).fetchall()
        for r in res:
            print(output_format.format(*r).replace('\n', ''))
            
ed = EducationDataset("ed", 'input_data/test.csv', 'input_data/class.csv', 'input_data/test_level.csv')
ed.do_task(TASK_3, 8)
ed.do_task(TASK_4, 6)
