In [22]:
import json
import psycopg2
import pandas as pd
import numpy as np

import requests
import urllib
import zipfile

import os
from functools import reduce
import io
from sqlalchemy import create_engine

In [23]:
def convert4null(var):
    '''
    служебная функция для промежуточного преобразования Nan в null
    '''
    if pd.isna(var):
        return '#@$'
    
    return var


In [24]:
def fill_item_type(myzip):
    '''
    upsert справочника:
    DB.item_type  <- course_item_types.csv
    '''
    with myzip.open('course_item_types.csv') as from_archive:
        df_item_type = pd.read_csv(from_archive)
    
    print(len(df_item_type))
    
    df_item_type = df_item_type.rename(columns={'course_item_type_id':'id', 'course_item_type_desc':'descr', 
                             'course_item_type_category':'categ', 'course_item_type_graded':'graded'})
    df_item_type.drop(['atom_content_type_id'], axis='columns', inplace=True)
    
    
    # пишем во временную таблицу
    df_item_type.to_sql('tmp', engine, if_exists='replace')
    
    conn=engine.raw_connection()
    cur = conn.cursor()
    
    # upsert из временной таблицы в item_type
    sql = 'INSERT INTO coursera_structure.item_type \
    (select id, descr, categ, cast(graded as boolean) from tmp) \
    ON CONFLICT (id) DO NOTHING'
    cur.execute(sql)
    
    conn.commit()

In [25]:
def fill_assm_type(myzip):
    '''
    upsert справочника:
    assessment_type <- assessment_types.csv
    '''
    with myzip.open('assessment_types.csv') as from_archive:
        df_assmm_type = pd.read_csv(from_archive)
    
    print(len(df_assmm_type))
    
    df_assmm_type = df_assmm_type.rename(columns={'assessment_type_id':'id', 'assessment_type_desc':'descr'})

    # пишем во временную таблицу
    df_assmm_type.to_sql('tmp', engine, if_exists='replace')
    
    conn=engine.raw_connection()
    cur = conn.cursor()
    
    # upsert из временной таблицы в assessment_type
    sql = 'INSERT INTO coursera_structure.assessment_type \
    (select id, descr from tmp) \
    ON CONFLICT (id) DO NOTHING'
    cur.execute(sql)
    
    conn.commit()

In [26]:
def fill_que_type(myzip):
    '''
    upsert справочника:
    question_type <- assessment_question_types.csv
    '''
    with myzip.open('assessment_question_types.csv') as from_archive:
        df_assmm_type = pd.read_csv(from_archive)
    
    print(len(df_assmm_type))
    
    df_assmm_type = df_assmm_type.rename(columns={'assessment_question_type_id':'id', 'assessment_question_type_desc':'descr'})

    # пишем во временную таблицу
    df_assmm_type.to_sql('tmp', engine, if_exists='replace')
    
    conn=engine.raw_connection()
    cur = conn.cursor()
    
    # upsert из временной таблицы в assessment_type
    sql = 'INSERT INTO coursera_structure.question_type \
    (select id, descr from tmp) \
    ON CONFLICT (id) DO NOTHING'
    cur.execute(sql)
    
    conn.commit()

In [27]:
engine = create_engine('postgresql+psycopg2://postgres:pg215@VM-AS494:5432/test')
def upsert_reference_tables(zipname):
    print(zipname)

    myzip = zipfile.ZipFile(f'Coursera/{zipname}')
    fill_item_type(myzip)
    fill_assm_type(myzip)
    fill_que_type(myzip)

In [28]:
upsert_reference_tables('bayesian_methods_in_machine_learning_1578903839471.zip')

bayesian_methods_in_machine_learning_1578903839471.zip
27
7
14


In [29]:
def add_load(zipname):
    """
    Проверяет наличие загрузки в базе и при отсутствии - добавляет запись в coursera_structure.load
    """
    
    cursor.execute(f"select id from coursera_structure.load where name='{zipname[:-4]}'")
    
    if cursor.fetchone():
        print(f'Срез курса {zipname} уже в базе')
        return False
    else:
        cursor.execute(f"insert into coursera_structure.load (name, load_ts) values('{zipname[:-4]}', now()) returning id")
        return cursor.fetchone()[0]
    
        

In [30]:
def add_course(myzip, load_id):
    '''
    DB.course  | courses.csv
    ----------------------------
    id  -  (series nextval)
    ext_id  <-   course_id
    slug    <-   course_slug
    name    <-   course_name
    descr   <-   course_desc
    load_id - current load.id
    '''
    with myzip.open('courses.csv') as from_archive:
        df = pd.read_csv(from_archive, escapechar='\\')
    
    
    for index, row in df.iterrows():
        ext_id = row['course_id']
        slug = row['course_slug']
        name = row['course_name'].replace("'","''")
        desc = convert4null(row['course_desc']).replace("'","''")
        
        course_tmpdf = pd.read_sql_query(f"select * from coursera_structure.course where slug='{slug}' and ext_id is null", conn)
        
        # поскольку слаги всех курсов были залиты в базу заранее:
        if course_tmpdf.shape[0] > 0:
            # заполнить остальные поля в текущей строке, если слаг еще не "привлекался"
            id_ = course_tmpdf['id'].values[0]
            sql = f"update coursera_structure.course \
             set (ext_id, name, descr, load_id) = ('{ext_id}', '{name}', '{desc}', {load_id}) \
             where id={id_}"
        else:
            # если нет слага с пустым курсом, добавить новый курс
            sql = f"insert into coursera_structure.course (ext_id, slug, name, descr, load_id) \
                           values ('{ext_id}', '{slug}', '{name}', '{desc}', {load_id})"    
            
        # print(sql)
        cursor.execute(sql)
     
    return 0
        

In [31]:
def add_branch(myzip, load_id):
    '''
    DB.branch   | course_branches.csv
    ----------------------------
    id          - (series nextval)
    course_id   - (FK from course.id) <- course_id
    created_ts  <- authoring_course_branch_created_ts
    ext_id      <- course_branch_id
    load_id     - (current load.id)
    '''
    
    with myzip.open('course_branches.csv') as from_archive:
        df = pd.read_csv(from_archive, parse_dates=['authoring_course_branch_created_ts'], escapechar='\\')
    
    course_ext_id = '' 
    for index, row in df.iterrows():
        # course.id Postgres'а для внешнего ключа:
        if course_ext_id != row['course_id']: # в данном случае лишнее, но на будущее
            course_ext_id = row['course_id']
            course_sql = f"select id from coursera_structure.course where load_id={load_id} and ext_id='{course_ext_id}'"
            course_id = pd.read_sql_query(course_sql, conn)['id'].values[0] 
        
        created_ts = convert4null(row['authoring_course_branch_created_ts'])
        ext_id = row['course_branch_id']
        
        sql = f"insert into coursera_structure.branch (course_id, ext_id, created_ts, load_id) \
                        values ({course_id}, '{ext_id}', '{created_ts}', {load_id})".replace("'#@$'", "null")
        
        cursor.execute(sql)
        
    return 0


In [32]:
def add_module(myzip, load_id):
    '''
    DB.module     | course_branch_modules.csv 
    ----------------------------
    id            - (series nextval)
    branch_id     - (FK from branch.id)  <- course_branch_id
    branch_ext_id <- course_branch_id
    ext_id        <- course_module_id
    ord           <- course_branch_module_order
    name          <- course_branch_module_name
    descr         <- course_branch_module_desc
    load_id       - (current load.id)
    '''
    
    with myzip.open('course_branch_modules.csv') as from_archive:
        df = pd.read_csv(from_archive, escapechar='\\')
    df = df.sort_values(by=['course_branch_id'])
     
    branch_ext_id = ''
    for index, row in df.iterrows():
        # branch.id Postgres'а для внешнего ключа:
        if branch_ext_id != row['course_branch_id']:
            branch_ext_id = row['course_branch_id']
            branch_sql = f"select id from coursera_structure.branch where load_id={load_id} and ext_id='{branch_ext_id}'"
            branch_id = pd.read_sql_query(branch_sql, conn)['id'].values[0]
        
        ext_id = row['course_module_id']
        ord_   = row['course_branch_module_order']
        name   = row['course_branch_module_name'].replace("'","''")
        descr  = convert4null(row['course_branch_module_desc']).replace("'","''")
        
        sql = f"insert into coursera_structure.module (branch_id, branch_ext_id, ext_id, ord, name, descr, load_id) \
                        values ({branch_id}, '{branch_ext_id}', '{ext_id}', {ord_}, '{name}', '{descr}', {load_id})"
        
        cursor.execute(sql)
        
    return 0

In [33]:
def add_lesson(myzip, load_id):
    '''
    DB.lesson     | course_branch_lessons.csv
    ----------------------------
    id            - (series nextval)
    module_id  - (FK from module.id) <- course_branch_id, course_module_id
    branch_ext_id <- course_branch_id
    ext_id        <- course_lesson_id
    ord           <- course_branch_lesson_order
    name          <- course_branch_lesson_name
    load_id       - (current load.id)
    '''
    
    with myzip.open('course_branch_lessons.csv') as from_archive:
        df = pd.read_csv(from_archive, escapechar='\\')
    df = df.sort_values(by=['course_branch_id', 'course_module_id'])
     
    branch_ext_id = ''
    module_ext_id = ''
    for index, row in df.iterrows():
        # module.id Postgres'а для внешнего ключа:
        if branch_ext_id != row['course_branch_id'] or module_ext_id != row['course_module_id']:
            branch_ext_id = row['course_branch_id']
            module_ext_id = row['course_module_id']
            module_sql = f"select id from coursera_structure.module where load_id={load_id} and\
             branch_ext_id='{branch_ext_id}' and ext_id='{module_ext_id}'"
            module_id = pd.read_sql_query(module_sql, conn)['id'].values[0]

        ext_id = row['course_lesson_id']
        ord_   = row['course_branch_lesson_order']
        name   = row['course_branch_lesson_name'].replace("'","''")

        sql = f"insert into coursera_structure.lesson (module_id, branch_ext_id, ext_id, ord, name, load_id) \
                        values ({module_id}, '{branch_ext_id}', '{ext_id}', {ord_}, '{name}', {load_id})"
        
        cursor.execute(sql)
        
    return 0

In [34]:
def add_item(myzip, load_id):
    '''
    DB.item       | course_branch_items.csv
    ----------------------------
    id            - (series nextval)
    lesson_id  - (FK from lesson.id) <- course_branch_id, course_lesson_id
    branch_ext_id <- course_branch_id
    ext_id        <- course_item_id
    
    type_id       <- course_item_type_id
    ord           <- course_branch_item_order
    name          <- course_branch_item_name
    
    optional      <- course_branch_item_optional
    graded        <- is_graded
    
    load_id       - (current load.id)
    '''

    with myzip.open('course_branch_items.csv') as from_archive:
        df = pd.read_csv(from_archive, escapechar='\\')
    df = df.sort_values(by=['course_branch_id', 'course_lesson_id'])
     
    branch_ext_id = ''
    lesson_ext_id = ''
    for index, row in df.iterrows():
        # lesson.id Postgres'а для внешнего ключа:
        if branch_ext_id != row['course_branch_id'] or lesson_ext_id != row['course_lesson_id']:
            branch_ext_id = row['course_branch_id']
            lesson_ext_id = row['course_lesson_id']
            lesson_sql = f"select id from coursera_structure.lesson where load_id={load_id} and\
             branch_ext_id='{branch_ext_id}' and ext_id='{lesson_ext_id}'"
            lesson_id = pd.read_sql_query(lesson_sql, conn)['id'].values[0]

        ext_id   = row['course_item_id']
        type_id  = row['course_item_type_id']
        ord_     = row['course_branch_item_order']
        name     = row['course_branch_item_name'].replace("'","''")
        optional = row['course_branch_item_optional']
        graded   = 'f'#row['is_graded'] !!! - не во всех версиях загрузок (срез курса) есть это поле

        sql = \
        f"insert into coursera_structure.item (lesson_id, branch_ext_id, ext_id, type_id, ord, name, optional, graded, load_id) \
        values ({lesson_id}, '{branch_ext_id}', '{ext_id}', {type_id}, {ord_}, '{name}', '{optional}', '{graded}', {load_id})"
        
        cursor.execute(sql)
        
    return 0

In [35]:
def add_assessment(myzip, load_id):
    '''
    DB.assessment     | assessments.csv
    ----------------------------
    id            - (series nextval)
    ext_id        <- assessment_id
    
    type_id       <- assessment_type_id
    update_ts     <- assessment_update_ts
    passing_fract <- assessment_passing_fraction

    load_id       - (current load.id)
    '''

    with myzip.open('assessments.csv') as from_archive:
        df = pd.read_csv(from_archive, parse_dates=['assessment_update_ts'])
         
    for index, row in df.iterrows():
        ext_id    = row['assessment_id']
        type_id   = row['assessment_type_id']
        update_ts = convert4null(row['assessment_update_ts'])
        passing_fract = convert4null(row['assessment_passing_fraction']) 
        
        sql = \
        f"insert into coursera_structure.assessment (ext_id, type_id, update_ts, passing_fract, load_id) \
        values ('{ext_id}', {type_id}, '{update_ts}', {passing_fract}, {load_id})".replace("'#@$'", "null").\
        replace("#@$", "null")
        
        cursor.execute(sql)
        
    return 0

In [36]:
def add_question(myzip, load_id):
    '''
    DB.question   | assessment_questions.csv
    ----------------------------
    id            - (series nextval)
    ext_id        <- assessment_question_id
    
    type_id       <- assessment_question_type_id
    prompt        <- assessment_question_prompt
    update_ts     <- assessment_question_update_ts

    load_id       - (current load.id)
    '''

    with myzip.open('assessment_questions.csv') as from_archive:
        df = pd.read_csv(from_archive, escapechar='\\', parse_dates=['assessment_question_update_ts'])
         
    for index, row in df.iterrows():
        ext_id    = row['assessment_question_id']
        type_id   = row['assessment_question_type_id']
        prompt    = row['assessment_question_prompt'].replace("'","''")
        update_ts = convert4null(row['assessment_question_update_ts'])
        
        sql = \
        f"insert into coursera_structure.question (ext_id, type_id, prompt, update_ts, load_id) \
        values ('{ext_id}', {type_id}, '{prompt}', '{update_ts}', {load_id})".replace("'#@$'", "null")
        
        cursor.execute(sql)
        
    return 0

In [37]:
def add_item_assm(myzip, load_id):
    '''
    DB.item_assessment                       | course_branch_item_assessments.csv
    -----------------------------------------------------------------------------
    item_id        - (FK from item.id)       <- course_branch_id, course_item_id
    assessment_id  - (FK from assessment.id) <- assessment_id
    '''

    with myzip.open('course_branch_item_assessments.csv') as from_archive:
        df = pd.read_csv(from_archive)

    for index, row in df.iterrows():
        # item.id Postgres'а для внешнего ключа:
        branch_ext_id = row['course_branch_id']
        item_ext_id = row['course_item_id']
        item_sql = f"select id from coursera_structure.item where load_id={load_id} and\
         branch_ext_id='{branch_ext_id}' and ext_id='{item_ext_id}'"
        item_id = pd.read_sql_query(item_sql, conn)['id'].values[0]

        # assessment.id Postgres'а для внешнего ключа:
        assessment_ext_id = row['assessment_id']
        assessment_sql = f"select id from coursera_structure.assessment where load_id={load_id} and ext_id='{assessment_ext_id}'"
        assessment_id = pd.read_sql_query(assessment_sql, conn)['id'].values[0]

        sql = \
        f"insert into coursera_structure.item_assessment (item_id, assessment_id) \
        values ({item_id}, {assessment_id})"
        
        cursor.execute(sql)
        
    return 0

In [38]:
def add_assm_question(myzip, load_id):
    '''
    DB.assessment_question  | assessment_assessments_questions.csv
    -----------------------------------------------------------------------------
    assessment_id  - (FK from assessment.id) <- assessment_id
    question_id    - (FK from question.id)   <-  assessment_question_id
    internal_id         <-  assessment_question_internal_id
    cuepoint            <-  assessment_question_cuepoint
    ord                 <-  assessment_question_order
    weight              <-  assessment_question_weight
    '''

    with myzip.open('assessment_assessments_questions.csv') as from_archive:
        df = pd.read_csv(from_archive)
    
    for index, row in df.iterrows():
        # assessment.id Postgres'а для внешнего ключа:
        assm_ext_id = row['assessment_id']
        assessment_sql = f"select id from coursera_structure.assessment where load_id={load_id} and ext_id='{assm_ext_id}'"
        assessment_id = pd.read_sql_query(assessment_sql, conn)['id'].values[0]

        # question.id Postgres'а для внешнего ключа:
        question_ext_id = row['assessment_question_id']
        question_sql = f"select id from coursera_structure.question where load_id={load_id} and ext_id='{question_ext_id}'"
        question_id = pd.read_sql_query(question_sql, conn)['id'].values[0]
        
        internal_id   = row['assessment_question_internal_id']
        cuepoint  = convert4null(row['assessment_question_cuepoint'])
        ord_     = convert4null(row['assessment_question_order'])
        weight     = convert4null(row['assessment_question_weight'])

        sql = \
        f"insert into coursera_structure.assessment_question (assessment_id, question_id, internal_id, cuepoint, ord, weight) \
        values ({assessment_id}, {question_id}, '{internal_id}', {cuepoint}, {ord_}, {weight})".replace("#@$", "null")
        
        cursor.execute(sql)
        
    return 0

In [39]:
def add_option(myzip, load_id):
    '''
    DB.q_option       | assessment_options.csv
    ----------------------------
    id            - (series nextval)
    question_id  - (FK from question.id) <- assessment_question_id
    ext_id        <- assessment_option_id
    
    display       <- assessment_option_display
    feedback      <- assessment_option_feedback
    correct       <- assessment_option_correct
    index      <- assessment_option_index
    
    load_id       - (current load.id)
    '''

    with myzip.open('assessment_options.csv') as from_archive:
        df = pd.read_csv(from_archive, escapechar='\\')
    df = df.sort_values(by=['assessment_question_id'])
     
    question_ext_id = ''
    for index, row in df.iterrows():
        # lesson.id Postgres'а для внешнего ключа:
        if question_ext_id != row['assessment_question_id']:
            question_ext_id = row['assessment_question_id']
            question_sql = f"select id from coursera_structure.question where load_id={load_id} \
            and ext_id='{question_ext_id}'"
            question_id = pd.read_sql_query(question_sql, conn)['id'].values[0]

        ext_id   = row['assessment_option_id']
        display  = row['assessment_option_display'].replace("'","''")
        feedback = row['assessment_option_feedback'].replace("'","''")
        correct  = row['assessment_option_correct']
        index    = convert4null(row['assessment_option_index'])

        sql = \
        f"insert into coursera_structure.q_option (question_id, ext_id, display, feedback, correct, index, load_id) \
        values ({question_id}, '{ext_id}', '{display}', '{feedback}', '{correct}', {index}, {load_id})".replace("#@$", "null")
        
        cursor.execute(sql)
        
    return 0

In [40]:
def load_course_to_db(folder, zipname):
    '''
    Загрузка одного курса из архива (на данный момент - только структуры курса)
    '''
    
    load_id = add_load(zipname)
    if not load_id:
        return -1

    myzip = zipfile.ZipFile(f'{folder}/{zipname}')
    
    add_course(myzip, load_id)
    
    add_branch(myzip, load_id)
    
    add_module(myzip, load_id)
    
    add_lesson(myzip, load_id)
    
    add_item(myzip, load_id)
    
    add_assessment(myzip, load_id)
    
    add_item_assm(myzip, load_id)
    
    add_question(myzip, load_id)
    
    add_assm_question(myzip, load_id)

    add_option(myzip, load_id)
    
    conn.commit()


### Загрузка архивов из папки

In [41]:
# соберем все архивы в список (архив = выгруженный из курсеры срез курса в формате *.zip)
folder = 'Coursera' #'\\\\VM-AS494\\Temp'   #'C:\\Users\\vgrishin\\vadim\\mk_an\\zip'
zip_list = list(filter(lambda x: x[-4:] == '.zip', os.listdir(folder)))
zip_list  

['algorithmic_toolbox_1579681812260.zip',
 'bayesian_methods_in_machine_learning_1565260801251.zip',
 'bayesian_methods_in_machine_learning_1574256885796.zip',
 'bayesian_methods_in_machine_learning_1578903839471.zip',
 'neuroeconomics_1579681772902.zip']

In [42]:
%%time
# загрузка курсов по списку

conn = psycopg2.connect(dbname='test', user='postgres', 
                        password='pg215', host='VM-AS494')
cursor = conn.cursor()



for zipname in zip_list:
    
    load_course_to_db(folder, zipname)


Срез курса algorithmic_toolbox_1579681812260.zip уже в базе
Срез курса bayesian_methods_in_machine_learning_1565260801251.zip уже в базе
Срез курса bayesian_methods_in_machine_learning_1574256885796.zip уже в базе
Срез курса bayesian_methods_in_machine_learning_1578903839471.zip уже в базе
Wall time: 13 s


In [15]:
zip_list = zip_list[:1]
zip_list

['algorithmic_toolbox_1565260799484.zip']

In [16]:
folder

'C:\\Users\\vgrishin\\vadim\\mk_an\\zip'

In [22]:
myzip = zipfile.ZipFile(f'{folder}/{zip_list[0]}')
with myzip.open('assessment_questions.csv') as from_archive:
    df = pd.read_csv(from_archive, escapechar='\\')
df.shape

(1199, 6)

In [None]:
# Работа с типом столбца датафрейма
if type(df.iloc[0]['assessment_passing_fraction']) == np.float64:
    print(str(type(df.iloc[0]['assessment_passing_fraction'])).replace("class 'numpy.", ''))

In [66]:
import io

output = io.StringIO()
df_item_type.to_csv(output, sep='\t', header=False, index=False)
output.seek(0)
contents = output.getvalue()

In [119]:
conn.commit()

In [16]:
"Honor's track assignment".strip()

"Honor's track assignment"

UndefinedTable: ОШИБКА:  отношение "bayesian_methods_in_machine_learning_1565260801251.zip" не существует
LINE 1: SELECT * FROM bayesian_methods_in_machine_learning_156526080...
                      ^


In [29]:
conn = psycopg2.connect(dbname='test', user='postgres', 
                        password='pg215', host='VM-AS494')
cursor = conn.cursor()

for name in ['openedu_structure.course', 'openedu_structure.chapter', 'openedu_structure.item']:
    cursor.execute(f"SELECT * FROM {name}")
    print(cursor.fetchall()[0])
    conn.commit()
    

(1, 'Основания алгебры и геометрии', 'hse+AGBASE+fall_2019', None)
(7, 1, '11111', 'nnnnnnn')
(561, 465, 'db61b97b8db74f069b1e12bd135824ff', 'video', 'Промо-ролик', None)


In [5]:
# Образец для быстрой загрузки через copy_from

conn = psycopg2.connect(dbname='test', user='postgres', 
                        password='pg215', host='VM-AS494')
cur = conn.cursor()

#engine = create_engine('postgresql+psycopg2://postgres:pg215@VM-AS494:5432/test')
#conn=engine.raw_connection()
#cur = conn.cursor()
output = io.StringIO()
df_item_type.to_csv(output, sep='\t', header=False, index=False)
output.seek(0)
contents = output.getvalue()
cur.copy_from(output, 'coursera_structure.item_type', null="") #null values become ''   
conn.commit()

INSERT INTO tablename (id, username, password, level, email) 
                VALUES (1, 'John', 'qwerty', 5, 'john@mail.com') 
ON CONFLICT (id) DO NOTHING



UniqueViolation: ОШИБКА:  повторяющееся значение ключа нарушает ограничение уникальности "item_type_pkey"
DETAIL:  Ключ "(id)=(1)" уже существует.
CONTEXT:  COPY item_type, строка 1


In [21]:
for d in df:
    print(d.shape)

(27351, 10)
(30416, 10)
(31913, 10)


In [20]:
df[2].shape

(31913, 10)

In [2]:
myzip = zipfile.ZipFile('bayesian_methods_in_machine_learning_1578903839471.zip')

In [3]:
with myzip.open('assessment_types.csv') as from_archive:
    df1 = pd.read_csv(from_archive)

In [6]:
myzip.close()

In [5]:
df1.sort_values(by='assessment_type_id')

Unnamed: 0,assessment_type_id,assessment_type_desc
0,1,in video
5,2,course quick
3,3,open single page
6,4,quick questions
4,5,survey
2,6,formative
1,7,summative


In [25]:

#for fname in myzip.namelist():
#    print(fname)

with myzip.open('assessment_actions.csv') as from_archive:
    df = pd.read_csv(from_archive, parse_dates=['assessment_action_ts', 'assessment_action_start_ts'])


#myzip.close()


In [23]:
%%time
df.sort_values(by=['hse_user_id', 'assessment_action_ts'])

Wall time: 2.85 s


Unnamed: 0,assessment_action_id,assessment_action_base_id,assessment_id,assessment_scope_id,assessment_scope_type_id,assessment_action_version,assessment_action_ts,assessment_action_start_ts,guest_user_id,hse_user_id
584798,134aed120116815552efde9c156f9c2d883fd56f,2cb9284a9672b130e924bcd5c16b6ce730ea6c21,TPm_mZpLEeem4Q4WHq6hYA@5,opencourse~anas83IDEeenOwpmZ1LH1g,6,1,2019-03-31 15:42:52.503000,2019-03-31 15:42:52.467,,0004f43a2fc120d733dd3dc6783079fca531f815
612727,d30cac5a8bb0c0b6ec7a8655ac9d1edd33a16272,2cb9284a9672b130e924bcd5c16b6ce730ea6c21,TPm_mZpLEeem4Q4WHq6hYA@5,opencourse~anas83IDEeenOwpmZ1LH1g,6,2,2019-03-31 16:00:00.359000,2019-03-31 15:42:52.467,,0004f43a2fc120d733dd3dc6783079fca531f815
1232519,cbe210939e5d352a7da96e8a10f4ff0e9d6efd0c,2cb9284a9672b130e924bcd5c16b6ce730ea6c21,TPm_mZpLEeem4Q4WHq6hYA@5,opencourse~anas83IDEeenOwpmZ1LH1g,6,3,2019-03-31 16:02:38.266000,2019-03-31 15:42:52.467,,0004f43a2fc120d733dd3dc6783079fca531f815
903484,2a19d1c309ec70432400fb80a7a0c9c5acd2fc11,2cb9284a9672b130e924bcd5c16b6ce730ea6c21,TPm_mZpLEeem4Q4WHq6hYA@5,opencourse~anas83IDEeenOwpmZ1LH1g,6,4,2019-03-31 16:13:59.190000,2019-03-31 15:42:52.467,,0004f43a2fc120d733dd3dc6783079fca531f815
528516,ebac796845e44b13ce85c5b064ed01de18af8a87,2cb9284a9672b130e924bcd5c16b6ce730ea6c21,TPm_mZpLEeem4Q4WHq6hYA@5,opencourse~anas83IDEeenOwpmZ1LH1g,6,5,2019-03-31 16:14:21.900000,2019-03-31 15:42:52.467,,0004f43a2fc120d733dd3dc6783079fca531f815
1523168,5ae2f410cf12067130fed046171f54de02ee45e0,2cb9284a9672b130e924bcd5c16b6ce730ea6c21,TPm_mZpLEeem4Q4WHq6hYA@5,opencourse~anas83IDEeenOwpmZ1LH1g,6,6,2019-03-31 16:14:33.271000,2019-03-31 15:42:52.467,,0004f43a2fc120d733dd3dc6783079fca531f815
416441,0b4e59f3feb45df65f82b3c1dc7a016902d8b1fe,061aeea26b9b80f1951e518ba8fefa0e15fd3428,YZp66ppOEeek-RKI0bJQ2g@4,opencourse~anas83IDEeenOwpmZ1LH1g,6,1,2019-03-31 16:14:50.097000,2019-03-31 16:14:50.092,,0004f43a2fc120d733dd3dc6783079fca531f815
1355256,8ec9b463b4d3fcaabc08bba57a715b6e7c4f1342,061aeea26b9b80f1951e518ba8fefa0e15fd3428,YZp66ppOEeek-RKI0bJQ2g@4,opencourse~anas83IDEeenOwpmZ1LH1g,6,2,2019-03-31 16:28:30.174000,2019-03-31 16:14:50.092,,0004f43a2fc120d733dd3dc6783079fca531f815
1392760,91bd5e7c8ec19870a0297d0d9e563b3215e46081,65cc7ab87982fecc5cc0e8fa9fa0eea02a318500,YtA0hZpREeeKDBL2UWdiaA@5,opencourse~anas83IDEeenOwpmZ1LH1g,6,1,2019-03-31 16:56:38.740000,2019-03-31 16:56:38.736,,0004f43a2fc120d733dd3dc6783079fca531f815
70251,9864dd3cc2e19d6e589e0d3e37febeb24ce04dea,65cc7ab87982fecc5cc0e8fa9fa0eea02a318500,YtA0hZpREeeKDBL2UWdiaA@5,opencourse~anas83IDEeenOwpmZ1LH1g,6,2,2019-03-31 17:18:15.432000,2019-03-31 16:56:38.736,,0004f43a2fc120d733dd3dc6783079fca531f815


In [15]:
# парсинг csv одного курса из архива в DataFrame
def parse_structure():
    df['course_id'] = df.apply(lambda x: x[1][:x[1].find('+type@chapter')].replace('block-v1:', ''), axis=1)
    df['chapter_id'] = df.apply(lambda x: x[1].replace(f'block-v1:{x["course_id"]}+type@chapter+block@', ''), axis=1) 
    df['sequential_id'] = df.apply(lambda x: x[2].replace(f'block-v1:{x["course_id"]}+type@sequential+block@', ''), axis=1)
    df['vertical_id'] = df.apply(lambda x: x[3].replace(f'block-v1:{x["course_id"]}+type@vertical+block@', ''), axis=1)
    
    df['item_type'] = df.apply(lambda x: x[4].replace(f'block-v1:{x["course_id"]}+type@', ''), axis=1)
    df['item_type'] = df.apply(lambda x: x['item_type'][:x['item_type'].find('+')], axis=1)
                                                      
    df['item_id'] = df.apply(lambda x: x[4].replace(f'block-v1:{x["course_id"]}+type@{x["item_type"]}+block@', ''), axis=1)
                                                    
    df[['chapter_name', 'sequential_name', 'vertical_name', 'item_name']] = df[[5, 6, 7, 8]]
                                                          

### Функции-обертки для обращений к БД

In [6]:
def find_course(course_extid):
    cursor.callproc("openedu_structure.find_course", [course_extid,])
    return cursor.fetchone()[0]

In [7]:
def add_chapter(course_id, chapter_extid, chapter_name):
    cursor.callproc("openedu_structure.add_chapter", [course_id, chapter_extid, chapter_name,])
    return cursor.fetchone()[0]

In [8]:
def add_sequential(chapter_id, sequential_extid, sequential_name):
    cursor.callproc("openedu_structure.add_sequential", [chapter_id, sequential_extid, sequential_name,])
    return cursor.fetchone()[0]

In [9]:
def add_vertical(sequential_id, vertical_extid, vertical_name):
    cursor.callproc("openedu_structure.add_vertical", [sequential_id, vertical_extid, vertical_name,])
    return cursor.fetchone()[0]

In [10]:
def add_item(vertical_id, item_extid, item_type, item_name):
    cursor.callproc("openedu_structure.add_item", [vertical_id, item_extid, item_type, item_name,])
    return cursor.fetchone()[0]

### Загрузка курса в БД

In [11]:
def load_course_to_db():
    conn = psycopg2.connect(dbname='test', user='postgres', 
                        password='pg215', host='VM-AS494')
    cursor = conn.cursor()

    course_extid = 0
    chapter_extid = 0
    sequential_extid = 0
    vertical_extid = 0
    item_extid = 0

    for row in df.iterrows():
        if course_extid != row[1]['course_id']:
            course_extid = row[1]['course_id']
            course_id = find_course(course_extid)
            if not course_id:
                continue

        if chapter_extid != row[1]['chapter_id']:
            chapter_extid = row[1]['chapter_id']
            chapter_id = add_chapter(course_id, chapter_extid, row[1]['chapter_name'])

        if sequential_extid != row[1]['sequential_id']:
            sequential_extid = row[1]['sequential_id']
            sequential_id = add_sequential(chapter_id, sequential_extid, row[1]['sequential_name'])

        if vertical_extid != row[1]['vertical_id']:
            vertical_extid = row[1]['vertical_id']
            vertical_id = add_vertical(sequential_id, vertical_extid, row[1]['vertical_name'])

        if item_extid != row[1]['item_id']:
            item_extid = row[1]['item_id']
            #print(vertical_id, item_extid, row[1]['item_type'], row[1]['item_name']) 
            add_item(vertical_id, item_extid, row[1]['item_type'], row[1]['item_name'])    

    
    conn.commit()
    cursor.close()
    return course_id, course_extid

In [12]:
# подключение к БД
conn = psycopg2.connect(dbname='test', user='postgres', 
                        password='pg215', host='VM-AS494')
cursor = conn.cursor()

## Основной цикл загрузки

In [13]:
with open('qq.txt', 'r', encoding='utf-8') as g:
    for line in g:
        print(f'Обрабатывается {line}')
        fname = load_from_url(line)
        
        with gzip.open(fname) as f:
            df = pd.read_csv(f, sep=';', header=None)
        
        parse_structure()
        print(f'Размер датафрейма к загрузке {df.shape}')
        
        print(f'Загружен в БД контент для курса  {load_course_to_db()}')


https://207918.selcdn.ru/jCJ5Ef1pLDebKtwW/SUPPORT-1137e/structure_hse_ECONOM_spring_2019.csv.gz

(120, 19)
(133, 'hse+ECONOM+spring_2019')
https://207918.selcdn.ru/jCJ5Ef1pLDebKtwW/SUPPORT-1137e/structure_hse_EXACTS_spring_2019.csv.gz

(209, 19)
(159, 'hse+EXACTS+spring_2019')
https://207918.selcdn.ru/jCJ5Ef1pLDebKtwW/SUPPORT-1137e/structure_hse_FIL_spring_2019.csv.gz

(274, 19)
(173, 'hse+FIL+spring_2019')
https://207918.selcdn.ru/jCJ5Ef1pLDebKtwW/SUPPORT-1137e/structure_hse_FORLANG_spring_2019.csv.gz

(226, 19)
(227, 'hse+FORLANG+spring_2019')
https://207918.selcdn.ru/jCJ5Ef1pLDebKtwW/SUPPORT-1137e/structure_hse_FUNDLAW_spring_2019.csv.gz

(97, 19)
(234, 'hse+FUNDLAW+spring_2019')
https://207918.selcdn.ru/jCJ5Ef1pLDebKtwW/SUPPORT-1137e/structure_hse_LABLAW_spring_2019.csv.gz

(219, 19)
(305, 'hse+LABLAW+spring_2019')
https://207918.selcdn.ru/jCJ5Ef1pLDebKtwW/SUPPORT-1137e/structure_hse_MACROEC_spring_2019.csv.gz

(139, 19)
(335, 'hse+MACROEC+spring_2019')
https://207918.selcdn.ru/jCJ

In [14]:
conn.commit()
cursor.close()

In [7]:
features = ['course_id', 'chapter_id', 'chapter_name', 'sequential_id', 'sequential_name', 
            'vertical_id', 'vertical_name', 'item_type', 'item_id',  'item_name']
df[features].head()

Unnamed: 0,course_id,chapter_id,chapter_name,sequential_id,sequential_name,vertical_id,vertical_name,item_type,item_id,item_name
0,hse+DATPRO+spring_2019,8411723edf2f4d8cb6b130fb8023a26d,О Вышке,b74b1284929046d2b280f3c560f0e896,Промо-ролик,e444e0cf02854ed9b516481d39f9a342,Промо-ролик,video,5b50ca94e2774d6f83da03543e39bf50,Видео о ВШЭ
1,hse+DATPRO+spring_2019,911bd2c030644cc9841a2950b62489c0,Тема 1. Основы информационной безопасности. Ос...,da6736e32aae4ac6922f6c55129d312f,Лекция 1,f27a9e3ace954d9a9838ee229f116f94,Понятие информации,video,261019e7ca7d40cabf469069dbef052c,Понятие информации
2,hse+DATPRO+spring_2019,911bd2c030644cc9841a2950b62489c0,Тема 1. Основы информационной безопасности. Ос...,da6736e32aae4ac6922f6c55129d312f,Лекция 1,dd37a044619949568f9ae7323be7850d,Доступ к информации,video,d1efc98f8244445f9d011479366f9fb2,Доступ к информации
3,hse+DATPRO+spring_2019,911bd2c030644cc9841a2950b62489c0,Тема 1. Основы информационной безопасности. Ос...,da6736e32aae4ac6922f6c55129d312f,Лекция 1,f6b6aa6909cd40abbc74e2fd1845d549,Информационные системы,video,19a06eb43b0f461e88ac12340ed9cb54,Информационные системы
4,hse+DATPRO+spring_2019,911bd2c030644cc9841a2950b62489c0,Тема 1. Основы информационной безопасности. Ос...,da6736e32aae4ac6922f6c55129d312f,Лекция 1,f915b0bf93c24c859d741e11993375f8,Обработка информации,video,94ef7b162a3442a19eb4c3dd436afc45,Обработка информации


In [12]:
conn = psycopg2.connect(dbname='test', user='postgres', 
                        password='pg215', host='VM-AS494')
cursor = conn.cursor()

In [27]:
copy_sql = """
  copy open_edu_course (id, name, ext_id)
  from stdin with
    csv
    header
    delimiter as ';'
"""
from_csv = 'course_for_load1.csv'
with open(from_csv, 'r') as f:
  cursor.copy_expert(sql=copy_sql, file=f)
  conn.commit()
  cursor.close()

In [18]:
?cursor.copy_expert

In [11]:
conn.commit() 

In [3]:
cursor.execute("SELECT * FROM openedu_structure.course where ext_id like '%FUNDLAW%'")

In [4]:
for row in cursor:
    print(row)

(230, '(DEMO)Основы права', 'hse+FUNDLAW+DEMO', None)
(231, '(2018-2)Основы права', 'hse+FUNDLAW+fall_2018', None)
(232, 'Основы права', 'hse+FUNDLAW+fall_2019', None)
(233, 'Основы права', 'hse+FUNDLAW+fall_2019_dvfu', None)
(234, '(2019-1)Основы права', 'hse+FUNDLAW+spring_2019', None)
(235, 'Основы права', 'hse+FUNDLAW+spring_2020', None)
(236, '(2018-2)Основы права', 'hse+FUNDLAW+stud_fall_2018', None)
(237, 'Основы права', 'hse+FUNDLAW+stud_fall_2019', None)
(238, 'Основы права', 'hse+FUNDLAW+workshop_adaptive', None)


In [28]:
from psycopg2 import sql

In [29]:
columns = ('country_name_ru', 'airport_name_ru', 'city_code')
stmt = sql.SQL('SELECT {} FROM {} LIMIT 5').format(
            sql.SQL(',').join(map(sql.Identifier, columns)),
            sql.Identifier('airport')
        )

In [30]:
values = [
        (3, 'однажды в ... ;;;'),
        (4, 'хэлло, товарищи!'),
    ]

In [31]:
insert = sql.SQL('INSERT INTO test_table (id, name) VALUES {}').format(
        sql.SQL(',').join(map(sql.Literal, values))
    )
    

In [32]:
cursor.execute(insert)

UniqueViolation: ОШИБКА:  повторяющееся значение ключа нарушает ограничение уникальности "test_table_pkey"
DETAIL:  Ключ "(id)=(3)" уже существует.


In [22]:
insert

Composed([SQL('INSERT INTO test_table (id, name) VALUES '), Composed([Literal((3, 'однажды в ... ;;;')), SQL(','), Literal((4, 'хэлло, товарищи!'))])])

In [36]:
conn.commit()  # или надо использовать conn.autocommit = True

### На память: для перезагрузки конфига в  psql запустить select pg_reload_conf()

## Скачивание по массиву ссылок

https://207918.selcdn.ru/jCJ5Ef1pLDebKtwW/SUPPORT-1137e/structure_hse_ECONOM_spring_2019.csv.gz

structure_hse_ECONOM_spring_2019.csv.gz
https://207918.selcdn.ru/jCJ5Ef1pLDebKtwW/SUPPORT-1137e/structure_hse_EXACTS_spring_2019.csv.gz

structure_hse_EXACTS_spring_2019.csv.gz
https://207918.selcdn.ru/jCJ5Ef1pLDebKtwW/SUPPORT-1137e/structure_hse_FIL_spring_2019.csv.gz

structure_hse_FIL_spring_2019.csv.gz
https://207918.selcdn.ru/jCJ5Ef1pLDebKtwW/SUPPORT-1137e/structure_hse_FORLANG_spring_2019.csv.gz

structure_hse_FORLANG_spring_2019.csv.gz
https://207918.selcdn.ru/jCJ5Ef1pLDebKtwW/SUPPORT-1137e/structure_hse_FUNDLAW_spring_2019.csv.gz

structure_hse_FUNDLAW_spring_2019.csv.gz
https://207918.selcdn.ru/jCJ5Ef1pLDebKtwW/SUPPORT-1137e/structure_hse_LABLAW_spring_2019.csv.gz

structure_hse_LABLAW_spring_2019.csv.gz
https://207918.selcdn.ru/jCJ5Ef1pLDebKtwW/SUPPORT-1137e/structure_hse_MACROEC_spring_2019.csv.gz

structure_hse_MACROEC_spring_2019.csv.gz
https://207918.selcdn.ru/jCJ5Ef1pLDebKtwW

In [79]:
import gzip

Unnamed: 0,0,1,2,3,4,5,6,7,8
0,0,block-v1:hse+ECPSY+spring_2019+type@chapter+bl...,block-v1:hse+ECPSY+spring_2019+type@sequential...,block-v1:hse+ECPSY+spring_2019+type@vertical+b...,block-v1:hse+ECPSY+spring_2019+type@video+bloc...,О Вышке,Промо-ролик,Промо-ролик,Промо-ролик
1,1,block-v1:hse+ECPSY+spring_2019+type@chapter+bl...,block-v1:hse+ECPSY+spring_2019+type@sequential...,block-v1:hse+ECPSY+spring_2019+type@vertical+b...,block-v1:hse+ECPSY+spring_2019+type@video+bloc...,Неделя 1: Экономическая психология,Лекция 1,Предмет экономической психологии. Часть 1,Предмет экономической психологии
2,2,block-v1:hse+ECPSY+spring_2019+type@chapter+bl...,block-v1:hse+ECPSY+spring_2019+type@sequential...,block-v1:hse+ECPSY+spring_2019+type@vertical+b...,block-v1:hse+ECPSY+spring_2019+type@video+bloc...,Неделя 1: Экономическая психология,Лекция 1,Предмет экономической психологии. Часть 2,Предмет экономической психологии
3,3,block-v1:hse+ECPSY+spring_2019+type@chapter+bl...,block-v1:hse+ECPSY+spring_2019+type@sequential...,block-v1:hse+ECPSY+spring_2019+type@vertical+b...,block-v1:hse+ECPSY+spring_2019+type@video+bloc...,Неделя 1: Экономическая психология,Лекция 1,История экономической психологии,История экономической психологии
4,4,block-v1:hse+ECPSY+spring_2019+type@chapter+bl...,block-v1:hse+ECPSY+spring_2019+type@sequential...,block-v1:hse+ECPSY+spring_2019+type@vertical+b...,block-v1:hse+ECPSY+spring_2019+type@video+bloc...,Неделя 1: Экономическая психология,Лекция 1,Методы экономической психологии,Методы экономической психологии


In [84]:
parse_structure()

In [33]:
df

Unnamed: 0,assessment_question_id,assessment_question_base_id,assessment_question_version,assessment_question_type_id,assessment_question_prompt,assessment_question_update_ts
0,NFkcnnrUEeetkBKcF7Gx4A@2,NFkcnnrUEeetkBKcF7Gx4A,2,5,"{""typeName"":""cml"",""definition"":{""dtdId"":""asses...",2017-08-06 20:50:40.598
1,w_yYQI8xEeeCnhKCj4fcOA@3,w_yYQI8xEeeCnhKCj4fcOA,3,1,"{""typeName"":""cml"",""definition"":{""dtdId"":""asses...",2018-08-15 04:29:12.58
2,-aErO0rnEeiInQq_9aGGQg@1,-aErO0rnEeiInQq_9aGGQg,1,1,"{""typeName"":""cml"",""definition"":{""dtdId"":""asses...",2018-04-28 13:28:02.083
3,7Aket543EeevgwpFUW6DoA@1,7Aket543EeevgwpFUW6DoA,1,5,"{""typeName"":""cml"",""definition"":{""dtdId"":""asses...",2017-09-20 19:14:27.853
4,EkL0tHnsEeeh7wqtJBDcag@2,EkL0tHnsEeeh7wqtJBDcag,2,1,"{""typeName"":""cml"",""definition"":{""dtdId"":""asses...",2018-08-15 02:56:08.717
5,825BdkrnEeip3wrcpPrs7A@1,825BdkrnEeip3wrcpPrs7A,1,1,"{""typeName"":""cml"",""definition"":{""dtdId"":""asses...",2018-04-28 13:27:51.684
6,EkL0tHnsEeeh7wqtJBDcag@1,EkL0tHnsEeeh7wqtJBDcag,1,1,"{""typeName"":""cml"",""definition"":{""dtdId"":""asses...",2017-08-05 14:40:48.375
7,NFgxsnrUEeexdQ4iFFMrvA@2,NFgxsnrUEeexdQ4iFFMrvA,2,1,"{""typeName"":""cml"",""definition"":{""dtdId"":""asses...",2017-08-06 20:50:40.603
8,bhgcUnoiEeezGQp6YRZP7g@4,bhgcUnoiEeezGQp6YRZP7g,4,8,"{""typeName"":""cml"",""definition"":{""dtdId"":""asses...",2017-10-24 22:57:08.773
9,fBEL1KFlEeesZAoGx9lBJA@2,fBEL1KFlEeesZAoGx9lBJA,2,5,"{""typeName"":""cml"",""definition"":{""dtdId"":""asses...",2017-09-24 20:32:06.036


In [34]:
df.shift(3)

Unnamed: 0,assessment_question_id,assessment_question_base_id,assessment_question_version,assessment_question_type_id,assessment_question_prompt,assessment_question_update_ts
0,,,,,,
1,,,,,,
2,,,,,,
3,NFkcnnrUEeetkBKcF7Gx4A@2,NFkcnnrUEeetkBKcF7Gx4A,2.0,5.0,"{""typeName"":""cml"",""definition"":{""dtdId"":""asses...",2017-08-06 20:50:40.598
4,w_yYQI8xEeeCnhKCj4fcOA@3,w_yYQI8xEeeCnhKCj4fcOA,3.0,1.0,"{""typeName"":""cml"",""definition"":{""dtdId"":""asses...",2018-08-15 04:29:12.58
5,-aErO0rnEeiInQq_9aGGQg@1,-aErO0rnEeiInQq_9aGGQg,1.0,1.0,"{""typeName"":""cml"",""definition"":{""dtdId"":""asses...",2018-04-28 13:28:02.083
6,7Aket543EeevgwpFUW6DoA@1,7Aket543EeevgwpFUW6DoA,1.0,5.0,"{""typeName"":""cml"",""definition"":{""dtdId"":""asses...",2017-09-20 19:14:27.853
7,EkL0tHnsEeeh7wqtJBDcag@2,EkL0tHnsEeeh7wqtJBDcag,2.0,1.0,"{""typeName"":""cml"",""definition"":{""dtdId"":""asses...",2018-08-15 02:56:08.717
8,825BdkrnEeip3wrcpPrs7A@1,825BdkrnEeip3wrcpPrs7A,1.0,1.0,"{""typeName"":""cml"",""definition"":{""dtdId"":""asses...",2018-04-28 13:27:51.684
9,EkL0tHnsEeeh7wqtJBDcag@1,EkL0tHnsEeeh7wqtJBDcag,1.0,1.0,"{""typeName"":""cml"",""definition"":{""dtdId"":""asses...",2017-08-05 14:40:48.375


In [58]:
import dask.dataframe as dd

In [24]:
import this

The Zen of Python, by Tim Peters

Beautiful is better than ugly.
Explicit is better than implicit.
Simple is better than complex.
Complex is better than complicated.
Flat is better than nested.
Sparse is better than dense.
Readability counts.
Special cases aren't special enough to break the rules.
Although practicality beats purity.
Errors should never pass silently.
Unless explicitly silenced.
In the face of ambiguity, refuse the temptation to guess.
There should be one-- and preferably only one --obvious way to do it.
Although that way may not be obvious at first unless you're Dutch.
Now is better than never.
Although never is often better than *right* now.
If the implementation is hard to explain, it's a bad idea.
If the implementation is easy to explain, it may be a good idea.
Namespaces are one honking great idea -- let's do more of those!
