In [1]:
import os
from csv import DictReader
from functools import partial

In [2]:
import numpy as np
import pandas as pd

In [3]:
from clickhouse_driver import Client

In [3]:
# from crowdkit.datasets import load_dataset

---

In [4]:
DATA_FOLDER = 'TlkUsersAndTasks'

---

In [8]:
%%bash
curl https://tlk.s3.yandex.net/dataset/TlkUsersAndTasks.zip -o source.zip
unzip source.zip

Archive:  source.zip
   creating: TlkUsersAndTasks/
  inflating: TlkUsersAndTasks/projects.tsv  
  inflating: TlkUsersAndTasks/visits.tsv  
  inflating: TlkUsersAndTasks/assignments.tsv  
  inflating: TlkUsersAndTasks/README.md  
  inflating: TlkUsersAndTasks/users.tsv  


  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0  0 1022M    0 1064k    0     0  1702k      0  0:10:15 --:--:--  0:10:15 1700k  0 1022M    0 3787k    0     0  2324k      0  0:07:30  0:00:01  0:07:29 2323k  0 1022M    0 8499k    0     0  3228k      0  0:05:24  0:00:02  0:05:22 3226k  1 1022M    1 13.5M    0     0  3770k      0  0:04:37  0:00:03  0:04:34 3770k  1 1022M    1 18.2M    0     0  4033k      0  0:04:19  0:00:04  0:04:15 4032k  2 1022M    2 23.9M    0     0  4349k      0  0:04:00  0:00:05  0:03:55 4680k  2 1022M    2 29.6M    0     0  4587k      0  0:03:48  0:00:06  0:03:42 5323k  3 1022M    3 35.9M    0     0  4828k      0  0:03:36  0:00:07  0:03:29 5670k  4 1022M    4 41.6M    0     0  4943k      0  0:03:31  0:00:08  0:03:23 5813k  4 1022M    4 47.4M    0     0  5051k      0  0:03

---

In [None]:
# !clickhouser server --http_port 8123 --daemon

In [5]:
client = Client(host='localhost')

---

In [6]:
float_nanvl = lambda x: float(x) if x else 0.0
int_optional = lambda x: int(x) if x else None

In [7]:
def tsv_reader(filename, transform):
    with open(os.path.join(DATA_FOLDER, filename)) as input_stream:
        for row in DictReader(input_stream, delimiter='\t'):
            yield transform(row)

In [8]:
def row_converter(row, column_cast_map):
    return {
        column: (column_cast_map[column](value) if column in column_cast_map else value)
        for column, value in row.items()
    }

---

### Users table

In [None]:
client.execute('''
    CREATE TABLE crowd.users (
        user_id String,
        created_at UInt64
    )
    ENGINE = MergeTree()
    ORDER BY (user_id)
''')

In [54]:
users_transform = partial(row_converter, column_cast_map={'created_at': int})

In [56]:
stream_reader = tsv_reader('users.tsv', users_transform)

client.execute(
    'INSERT INTO crowd.users VALUES',
    stream_reader
)
stream_reader.close()

In [64]:
client.execute('DESCRIBE crowd.users')

[('user_id', 'String', '', '', '', '', ''),
 ('created_at', 'UInt64', '', '', '', '', '')]

In [65]:
client.execute('SELECT * FROM crowd.users LIMIT 3')

[('0000900e9c8b4242bd747c5063a33028', 1528479404),
 ('0001030c04654cf2850f675b56cb05a9', 1543384566),
 ('00023c4a8bea48e8a153df78cad93880', 1543213647)]

### Visists table

In [59]:
client.execute('''
    CREATE TABLE crowd.visits (
        user_id String,
        assignment_id String,
        timestamp UInt64
    )
    ENGINE = MergeTree()
    ORDER BY (user_id)
''')

[]

In [60]:
visits_transform = partial(row_converter, column_cast_map={'timestamp': int})

In [63]:
stream_reader = tsv_reader('visits.tsv', visits_transform)

client.execute(
    'INSERT INTO crowd.visits VALUES',
    stream_reader
)
stream_reader.close()

In [66]:
client.execute('DESCRIBE crowd.visits')

[('user_id', 'String', '', '', '', '', ''),
 ('assignment_id', 'String', '', '', '', '', ''),
 ('timestamp', 'UInt64', '', '', '', '', '')]

In [67]:
client.execute('SELECT * FROM crowd.visits LIMIT 3')

[('0000900e9c8b4242bd747c5063a33028',
  '901a59de-7289-415d-b6db-d169645f6748',
  1540186755),
 ('0001030c04654cf2850f675b56cb05a9',
  'b380393f-760b-4bc8-b990-6846884734d1',
  1543387259),
 ('0001030c04654cf2850f675b56cb05a9',
  '495979f6-b762-4ffb-8552-2866beed7eef',
  1543384846)]

### Project table

In [124]:
project_cast_map = {
    'project_creation_timestamp': int,
    'project_has_audio': int,
    'project_has_button': int,
    'project_has_buttonClicked_input': int,
    'project_has_checkbox_input': int,
    'project_has_externalHtml': int,
    'project_has_fileAudio_input': int,
    'project_has_fileImg_input': int,
    'project_has_fileVideo_input': int,
    'project_has_file_input': int,
    'project_has_iframe': int,
    'project_has_image': int,
    'project_has_imageAnnotation_input': int,
    'project_has_radio_input': int,
    'project_has_sbs': int,
    'project_has_select_input': int,
    'project_has_sourcesRecorder_input': int,
    'project_has_string_input': int,
    'project_has_suggest_input': int,
    'project_has_textarea_input': int,
    'project_has_video': int,
    'project_instruction_FK': float_nanvl,
    'project_instruction_len': float_nanvl,
    'project_instruction_wordCount': float_nanvl,
    'project_required_fields': int,
    'project_spec_length': int,
}
projects_transform = partial(row_converter, column_cast_map=project_cast_map)

In [123]:
client.execute('''
    CREATE TABLE crowd.projects (
        project_creation_timestamp UInt64,
        project_has_audio UInt32,
        project_has_button UInt32,
        project_has_buttonClicked_input UInt32,
        project_has_checkbox_input UInt32,
        project_has_externalHtml UInt32,
        project_has_fileAudio_input UInt32,
        project_has_fileImg_input UInt32,
        project_has_fileVideo_input UInt32,
        project_has_file_input UInt32,
        project_has_iframe UInt32,
        project_has_image UInt32,
        project_has_imageAnnotation_input UInt32,
        project_has_radio_input UInt32,
        project_has_sbs UInt32,
        project_has_select_input UInt32,
        project_has_sourcesRecorder_input UInt32,
        project_has_string_input UInt32,
        project_has_suggest_input UInt32,
        project_has_textarea_input UInt32,
        project_has_video UInt32,
        project_id String,
        project_instruction_FK Float64,
        project_instruction_language String,
        project_instruction_len Float64,
        project_instruction_wordCount Float64,
        project_required_fields UInt64,
        project_spec_length UInt64
    )
    ENGINE = MergeTree()
    ORDER BY (project_id)
''')

[]

In [125]:
stream_reader = tsv_reader('projects.tsv', projects_transform)

client.execute(
    'INSERT INTO crowd.projects VALUES',
    stream_reader
)
stream_reader.close()

In [126]:
client.execute('DESCRIBE crowd.projects')

[('project_creation_timestamp', 'UInt64', '', '', '', '', ''),
 ('project_has_audio', 'UInt32', '', '', '', '', ''),
 ('project_has_button', 'UInt32', '', '', '', '', ''),
 ('project_has_buttonClicked_input', 'UInt32', '', '', '', '', ''),
 ('project_has_checkbox_input', 'UInt32', '', '', '', '', ''),
 ('project_has_externalHtml', 'UInt32', '', '', '', '', ''),
 ('project_has_fileAudio_input', 'UInt32', '', '', '', '', ''),
 ('project_has_fileImg_input', 'UInt32', '', '', '', '', ''),
 ('project_has_fileVideo_input', 'UInt32', '', '', '', '', ''),
 ('project_has_file_input', 'UInt32', '', '', '', '', ''),
 ('project_has_iframe', 'UInt32', '', '', '', '', ''),
 ('project_has_image', 'UInt32', '', '', '', '', ''),
 ('project_has_imageAnnotation_input', 'UInt32', '', '', '', '', ''),
 ('project_has_radio_input', 'UInt32', '', '', '', '', ''),
 ('project_has_sbs', 'UInt32', '', '', '', '', ''),
 ('project_has_select_input', 'UInt32', '', '', '', '', ''),
 ('project_has_sourcesRecorder_inpu

In [127]:
client.execute('SELECT * FROM crowd.projects LIMIT 3')

[(1428398401,
  0,
  1,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  3,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  '1',
  20.06825358851674,
  'ru',
  3208.0,
  319.0,
  1,
  1196),
 (1446184834,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  3,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  '10',
  18.75511558124029,
  'ru',
  3397.0,
  329.0,
  2,
  1877),
 (1494924673,
  0,
  0,
  0,
  1,
  0,
  0,
  0,
  0,
  0,
  0,
  1,
  0,
  2,
  0,
  0,
  0,
  0,
  0,
  0,
  0,
  '100',
  40.13924757281552,
  'ru',
  737.0,
  85.0,
  2,
  698)]

### Assignments table

In [24]:
client.execute('''
    CREATE TABLE crowd.assignments (
        assignment_id String,
        assignment_project_id String,
        assignment_start_time UInt64,
        assignment_submit_time UInt64,
        microtasks_count UInt64,
        assignment_gs_count UInt64,
        assignment_gs_correct_count UInt64,
        assignment_price Float64,
        assignment_type String,
        assignment_status String,
        os_family String,
        device_category String
    )
    ENGINE = File(TabSeparated)
''')

[]

In [25]:
assignments_cast_map = {
    'assignment_start_time': int, 
    'assignment_submit_time': int, 
    'microtasks_count': int, 
    'assignment_gs_count': int, 
    'assignment_gs_correct_count': int, 
    'assignment_price': float, 
}
assignment_transform = partial(row_converter, column_cast_map=assignments_cast_map)

In [26]:
stream_reader = tsv_reader('assignments.tsv', assignment_transform)

client.execute(
    'INSERT INTO crowd.assignments VALUES',
    stream_reader, types_check=True
)
stream_reader.close()

In [27]:
client.execute('DESCRIBE crowd.assignments')

[('assignment_id', 'String', '', '', '', '', ''),
 ('assignment_project_id', 'String', '', '', '', '', ''),
 ('assignment_start_time', 'UInt64', '', '', '', '', ''),
 ('assignment_submit_time', 'UInt64', '', '', '', '', ''),
 ('microtasks_count', 'UInt64', '', '', '', '', ''),
 ('assignment_gs_count', 'UInt64', '', '', '', '', ''),
 ('assignment_gs_correct_count', 'UInt64', '', '', '', '', ''),
 ('assignment_price', 'Float64', '', '', '', '', ''),
 ('assignment_type', 'String', '', '', '', '', ''),
 ('assignment_status', 'String', '', '', '', '', ''),
 ('os_family', 'String', '', '', '', '', ''),
 ('device_category', 'String', '', '', '', '', '')]

In [29]:
client.execute('SELECT * FROM crowd.assignments LIMIT 3')

[('fe52c68a-48ec-4837-a24c-4c2bd191431f',
  '14',
  1538145644,
  1538145992,
  15,
  15,
  13,
  0.0,
  'train',
  'APPROVED',
  'WINDOWS',
  'PERSONAL_COMPUTER'),
 ('d19953f6-1f53-4b54-8fa4-d7374ca0607b',
  '14',
  1538150924,
  1538151375,
  15,
  15,
  13,
  0.0,
  'train',
  'APPROVED',
  'ANDROID',
  'SMARTPHONE'),
 ('3aea0026-faae-4845-8708-9bf323ff8a77',
  '14',
  1538148115,
  1538148369,
  15,
  15,
  9,
  0.0,
  'train',
  'APPROVED',
  'WINDOWS',
  'PERSONAL_COMPUTER')]

---

In [76]:
client.execute('''
    CREATE TABLE crowd.worker_statistics_daily (
        fielddate String,
        project_id String,
        os_family String,
        user_id String,
        n_gs_correct UInt64,
        n_gs_total UInt64,
        earnings Float64,
        n_assignments Int64
    )
    ENGINE = File(TabSeparated)
''')

[]

In [77]:
client.execute('''
INSERT INTO crowd.worker_statistics_daily
SELECT
    fielddate,
    project_id,
    os_family,
    user_id,
    
    SUM(n_gs_correct) as n_gs_correct,
    SUM(n_gs_total) as n_gs_total,
    SUM(price) as earnings,
    COUNT(*) as n_assignments
FROM (
    SELECT
        src.fielddate as fielddate,
        src.user_id as user_id,
        src.n_gs_correct as n_gs_correct,
        src.n_gs_total as n_gs_total,
        src.price as price,
        
        projects.project_id as project_id,
        src.os_family as os_family
    FROM (
        SELECT
            DATE(src.assignment_start_time) as fielddate,
            src.assignment_id as assignment_id,
            src.os_family as os_family,
            src.assignment_project_id as project_id,
            workers.user_id as user_id,

            src.assignment_gs_correct_count as n_gs_correct,
            src.assignment_gs_count as n_gs_total,
            src.assignment_price as price
        FROM (
            SELECT * FROM crowd.assignments
            WHERE 
                assignment_type = 'regular'
                AND (assignment_gs_count > 0)
        ) as src
        JOIN
            crowd.visits as workers
        ON src.assignment_id = workers.assignment_id
    ) as src
    JOIN
        crowd.projects as projects
    ON projects.project_id = src.project_id
)
GROUP BY (fielddate, project_id, os_family, user_id)
'''
)

[]

---

In [81]:
client.execute('''
    CREATE TABLE crowd.train_worker_statistics_daily (
        fielddate String,
        project_id String,
        os_family String,
        user_id String,
        n_gs_correct UInt64,
        n_gs_total UInt64,
        n_assignments Int64
    )
    ENGINE = File(TabSeparated)
''')

[]

In [83]:
client.execute('''
INSERT INTO crowd.train_worker_statistics_daily

WITH
train_assignments AS (
    SELECT
        DATE(src.assignment_start_time) as fielddate,
        src.assignment_id as assignment_id,
        src.os_family as os_family,
        src.assignment_project_id as project_id,
        workers.user_id as user_id,

        src.assignment_gs_correct_count as n_gs_correct,
        src.assignment_gs_count as n_gs_total
    FROM (
        SELECT * FROM crowd.assignments
        WHERE 
            assignment_type = 'train'
            AND (assignment_gs_count > 0)
    ) as src
    JOIN
        crowd.visits as workers
    ON src.assignment_id = workers.assignment_id
),
merged_assignments AS (
    SELECT
        src.fielddate as fielddate,
        src.user_id as user_id,
        src.n_gs_correct as n_gs_correct,
        src.n_gs_total as n_gs_total,
        
        projects.project_id as project_id,
        src.os_family as os_family
    FROM 
        train_assignments as src
    JOIN
        crowd.projects as projects
    ON projects.project_id = src.project_id
)
    
SELECT
    fielddate,
    project_id,
    os_family,
    user_id,
    
    SUM(n_gs_correct) as n_gs_correct,
    SUM(n_gs_total) as n_gs_total,
    COUNT(*) as n_assignments
FROM merged_assignments
GROUP BY (fielddate, project_id, os_family, user_id)
'''
)

[]

---

In [98]:
client.execute('''
    CREATE TABLE crowd.projects_fk (
        project_id String,
        lang String,
        fk Float64,
        fk_rank Float64
    )
    ENGINE = File(TabSeparated)
''')

[]

In [99]:
client.execute('''
INSERT INTO crowd.projects_fk

WITH project_info AS (
    SELECT
        project_id,
        CASE
            WHEN project_instruction_language == '' THEN '_rest'
            ELSE project_instruction_language
        END as lang,
        project_instruction_FK as fk
    FROM crowd.projects
)

SELECT
    project_id,
    lang,
    fk,
    RANK() OVER (PARTITION BY lang ORDER BY fk DESC) as fk_rank
FROM project_info
WHERE lang != '_rest'
''')

[]

---

In [113]:
client.execute('''
CREATE TABLE crowd.user_top1_projects (
    user_id String,
    project_id String,
    m_quantile Float64,
    n_submits UInt64
)
ENGINE = File(TabSeparated)
''')

[]

In [114]:
client.execute('''
INSERT INTO crowd.user_top1_projects
WITH
assignments AS (
    SELECT
        src.assignment_id as assignment_id,
        src.assignment_project_id as project_id,
        workers.user_id as user_id
    FROM (
        SELECT * FROM crowd.assignments
        WHERE 
            assignment_type = 'regular'
    ) as src
    JOIN
        crowd.visits as workers
    ON src.assignment_id = workers.assignment_id
),
user_project_assignments AS (
    SELECT
        user_id,
        project_id,
        COUNT(*) as n_submits
    FROM (
        SELECT
            src.user_id as user_id,
            projects.project_id as project_id
        FROM 
            assignments as src
        JOIN
            crowd.projects as projects
        ON projects.project_id = src.project_id
    )
    GROUP BY (user_id, project_id)
),
user_project_submit_rank AS (
    SELECT
        user_id,
        project_id,
        RANK() OVER(PARTITION BY project_id ORDER BY n_submits DESC) as rank,
        n_submits
    FROM user_project_assignments
),
project_max_rank AS (
    SELECT
        project_id,
        MAX(rank) as rank
    FROM user_project_submit_rank
    GROUP BY project_id
)

SELECT
    user_id,
    project_id,
    m_quantile,
    n_submits
FROM (
    SELECT
        src.user_id as user_id,
        src.project_id as project_id,
        src.rank * 1.0 / max_rank.rank as m_quantile,
        src.n_submits as n_submits
    FROM
        user_project_submit_rank as src
    JOIN
        project_max_rank as max_rank
    ON src.project_id = max_rank.project_id
)
WHERE
    m_quantile < 0.01
    AND n_submits > 20
ORDER BY user_id, m_quantile ASC
''')

[]