In [None]:
import gc

from tqdm import tqdm
import numpy as np
import pandas as pd
import datatable as dt
import dask.dataframe as dd

from copy import copy

from pathlib import Path

from time import time, sleep
from contextlib import contextmanager

import matplotlib.pyplot as plt

# print(plt.style.available)
plt.style.use("dark_background")
# plt.style.use("seaborn-dark")
import plotly.express as plx
import seaborn as sns

import os, sys

from dask.diagnostics import ProgressBar
from dask.diagnostics import Profiler, ResourceProfiler, CacheProfiler
from dask.distributed import Client, LocalCluster
from dask import compute
from dask import delayed

import gc

# cluster = LocalCluster()
# client = Client(cluster)


print(f"curdir {Path.cwd()}")

In [None]:
"""
train.csv

01. row_id: (int64) ID code for the row.
02. timestamp: (int64) the time in milliseconds between this user interaction and the first event completion from that user.
03. user_id: (int32) ID code for the user.
04. content_id: (int16) ID code for the user interaction
05. content_type_id: (int8) 0 if the event was a question being posed to the user, 1 if the event was the user watching a lecture.
06. task_container_id: (int16) Id code for the batch of questions or lectures.
    For example, a user might see three questions in a row before seeing the explanations for any of them.
    Those three would all share a task_container_id.
07. user_answer: (int8) the user's answer to the question, if any. Read -1 as null, for lectures.
08. answered_correctly: (int8) if the user responded correctly. Read -1 as null, for lectures.
09. prior_question_elapsed_time: (float32) The average time in milliseconds it took a user to answer each question
    in the previous question bundle, ignoring any lectures in between.
    Is null for a user's first question bundle or lecture.
    Note that the time is the average time a user took to solve each question in the previous bundle.
10. prior_question_had_explanation: (bool) Whether or not the user saw an explanation and the correct response(s)
    after answering the previous question bundle, ignoring any lectures in between.
    The value is shared across a single question bundle, and is null for a user's first question bundle or lecture.
    Typically the first several questions a user sees were part of an onboarding diagnostic test where they did not get any feedback.

    questions.csv: metadata for the questions posed to users.
1. question_id: foreign key for the train/test content_id column, when the content type is question (0).
2. bundle_id: code for which questions are served together.
3. correct_answer: the answer to the question.
    Can be compared with the train user_answer column to check if the user was right.
4. part: the relevant section of the TOEIC test.
5. tags: one or more detailed tag codes for the question.
    The meaning of the tags will not be provided,
    but these codes are sufficient for clustering the questions together.

     lectures.csv: metadata for the lectures watched by users as they progress in their education.
1. lecture_id: foreign key for the train/test content_id column, when the content type is lecture (1).
2. part: top level category code for the lecture.
3. tag: one tag codes for the lecture.
    The meaning of the tags will not be provided,
    but these codes are sufficient for clustering the lectures together.
4. type_of: brief description of the core purpose of the lecture

   example_test_rows.csv Three sample groups of the test set data as it will be delivered by the time-series API. The format is largely the same as train.csv. There are two different columns that mirror what information the AI tutor actually has available at any given time, but with the user interactions grouped together for the sake of API performance rather than strictly showing information for a single user at a time. Some users will appear in the hidden test set that have NOT been presented in the train set, emulating the challenge of quickly adapting to modeling new arrivals to a website.

   prior_group_responses (string) provides all of the user_answer entries for previous group
    in a string representation of a list in the first row of the group.
    All other rows in each group are null.
    If you are using Python, you will likely want to call eval on the non-null rows.
    Some rows may be null, or empty lists.

   prior_group_answers_correct (string) provides all the answered_correctly field for previous group,
    with the same format and caveats as prior_group_responses.
    Some rows may be null, or empty lists.

"""


In [None]:
cluster = LocalCluster(n_workers=4)
client = Client(cluster)  # start distributed scheduler locally.  Launch dashboard
print(cluster)
client

In [None]:
@contextmanager
def timer(name):
    t0 = time()
    yield

    print(f'[{name}] done in {time() - t0:.2f} s')

In [None]:
%%time

dtypes = {
    "row_id": "int64",
    "timestamp": "int64",
    "user_id": "int32",
    "content_id": "int16",
    "content_type_id": "boolean",
    "task_container_id": "int16",
    "user_answer": "int8",
    "answered_correctly": "int8",
    "prior_question_elapsed_time": "float32",
    "prior_question_had_explanation": "boolean"
}

dask_dtypes = {
    "row_id": np.int64,
    "timestamp": np.int64,
    "user_id": np.int32,
    "content_id": np.int16,
    "content_type_id": bool,
    "task_container_id": np.int16,
    "user_answer": np.int8,
    "answered_correctly": np.int8,
    "prior_question_elapsed_time": np.float32,
    # "prior_question_had_explanation": bool
}

# with timer("reading SSD feather"):
#     data_pd = pd.read_csv("./input/train.csv", dtype=dtypes, nrows=12.5e6, low_memory=False)
    # data_dd = dd.from_pandas(data_pd, chunksize=int(1e4))
    # del data_pd
    # _ = gc.collect()
    # data_pd = pd.read_feather("/mnt/data30G/2020riid/riiid_train.feather")
    # train_dd = dd.from_pandas(train, chunksize=int(1e4))
    # train_dd = dd.read_csv("/mnt/data30G/2020riid/train.csv",
    #                                dtype=dask_dtypes,
    #                                assume_missing=True,
    #                                low_memory=False,
    #                       )


    # train = train.loc[:1e5]


In [None]:
data_pd = data_pd[data_pd.answered_correctly != -1]
print(data_pd.head())

In [None]:
%%time
print("groupby started")
users = data_pd.groupby("user_id")

In [None]:
users_len = len(users)
nvalid = int(users_len/5)
print("len(users)", users_len, nvalid)

In [None]:
%%time

print("users.tail(nvalid) started")
valid_pd = users.tail(nvalid)
valid_pd.reset_index().to_feather("/mnt/data30G/2020riid/valid_small.feather")
valid_mean = valid_pd.answered_correctly.dropna().values.mean()

del valid_pd
_ = gc.collect()

In [None]:
%%time

print("users.head(users_len-nvalid) started")
train_pd = users.head(users_len-nvalid)
train_pd.reset_index().to_feather("/mnt/data30G/2020riid/train_small.feather")
train_mean = train_pd.answered_correctly.dropna().values.mean()
print("train_pd", train_pd.shape)

del train_pd
_ = gc.collect()

# print("valid_pd", valid_pd.shape)

In [None]:
print(" trainmean validmean")
print(train_mean, valid_mean)



In [None]:
# with timer("types SSD"):
#     for col, type_ in dask_dtypes.items():
#         print(col, train_dd[col].dtype())
#         # train[col] = train[col].astype(type_)
#         train_dd[col] = delayed(train_dd[col].astype(type_))

In [None]:
%%time
# train_dd = train_dd.loc[:10000]
print("1 train_dd.head()")

# train_dd.head()
print(type(train_dd))
train_dd.shape[0].compute()

In [None]:
%%time

if "row_id" in train_dd.columns:
    # train.set_index("row_id", inplace=True)
    train_dd = train_dd.set_index("row_id")
    # res = train_dd.mean()
    # res = delayed(train_dd.mean)()
# res_delayed = res.compute()
print("res = train_dd.mean()")
# print(res_delayed)

In [None]:
%%time

res_delayed = delayed(train_dd.mean)()


In [None]:
res_delayed.visualize()

In [None]:
%%time

with ProgressBar():
    out = res_delayed.compute()

    print(f" out res_delayed = \n {out}")

In [None]:
res_delayed.visualize()

In [None]:
# train_dd["attempt"] = -1

print(f" train_dd.head() 2 = ")
train_dd.head()


In [None]:
train_dd.count().compute()

In [None]:
len(train_dd) - train_dd.count().compute()

In [None]:
print(train_dd.shape[0].compute(), train_dd.shape[1])

In [None]:
pd.Series(train_dd.user_id.compute()).nunique()


In [None]:
%%time
print("max", train_dd.user_id.compute().value_counts().max())
print("mean", train_dd.user_id.compute().value_counts().mean())
print("median", train_dd.user_id.compute().value_counts().quantile(.5))

In [None]:
%%time
max, mean, median = compute(
    train_dd.user_id.value_counts().max(),
    train_dd.user_id.value_counts().mean(),
    train_dd.user_id.value_counts().quantile(.5)
)
print(max, mean, median)

In [None]:
def get_user_quantiles(df, plot=False):

    quantiles = []
    for q in np.arange(start=.4, stop=.8, step=.1):
        quantiles.append((q, df.user_id.value_counts().quantile(q=q).compute()))
        if plot:
            print(f"quantile {quantiles[-1]}")
            plt.plot(*quantiles[-1], marker="o",  c="w")
    return quantiles

In [None]:
from sklearn.preprocessing import StandardScaler

In [None]:
train_dd.columns

In [None]:
# total = train_dd.user_id.nunique()
groups = train_dd.groupby(by="user_id")
# total = len(groups.size())
# print("total", total)

In [None]:
%%time
def print_count(group):
    print("print 1 from print_count")

groups_apply = groups.sum()

In [None]:
%%time
groups_apply.compute()

In [None]:
iter = groups


In [None]:
%%time

content_id_sum = groups.content_id.sum()

print("content_id_sum.compute()")
content_id_sum.compute()

In [None]:
%%time
agg = groups.aggregate(['sum', 'mean', 'max', 'min', list]).compute()

In [None]:
agg

In [None]:
agg.loc[115, "timestamp"]["sum"]

In [None]:
with timer("rolling values"):
    for id, df in tqdm(groups, total=total):
        print("ok")
        # print(f"  user_id = {id}  shape = {df.shape[0]} is_monotonic {df.timestamp.is_monotonic}")
        pass
        # train_dd.loc[df.index, "attempt"] = range(1, df.shape[0]+1)

        # quantiles = get_user_quantiles(df=df)
        #
        # for quantile, val in quantiles:
        #     train_dd.loc[df.index, f"q{quantile}_None"] = df.timestamp.rolling(
        #         int(val),
        #         min_periods=1,
        #         win_type=None,
        #     ).quantile(quantile)
        #
        #     train_dd.loc[df.index, f"q{quantile}_gauss"] = df.timestamp.rolling(
        #         int(val),
        #         min_periods=1,
        #         win_type="gaussian",
        #     ).mean(std=3)
        #
        #     train_dd.loc[df.index, f"q{quantile}_exp"] = df.timestamp.rolling(
        #         int(val),
        #         min_periods=1,
        #         win_type="exponential",
        #     ).sum(tau=3)

# train_dd = train_dd.compute()



In [None]:
train_shuffled = train.sample()
print("train.shape", train.shape)
train.head(20)

In [None]:
submit_example = pd.read_csv("./input/example_sample_submission.csv", )
test = pd.read_csv("./input/example_test.csv")
lectures = pd.read_csv("./input/lectures.csv")
questions = pd.read_csv("./input/questions.csv")
print("ok")

In [None]:
print("submit_example.shape", submit_example.shape[0])
submit_example.head()

In [None]:
print("test.shape", test.shape)
test.head()

In [None]:
print("lectures.shape", lectures.shape)
lectures.head()



In [None]:
set(train.columns).symmetric_difference(set(test.columns))

In [None]:
set(test.columns) - set(train.columns)

In [None]:
set(train.columns) - set(test.columns)

In [None]:
train.columns

In [None]:
test.columns


In [None]:
%time

# train_dt = dt.fread("./input/train.csv")
# print("train_dt.head()")
# train_dt.head(5)

##Questions

In [None]:
print("questions.shape", questions.shape)
questions.head()

In [None]:
bundle_ids = questions.value_counts(subset=["bundle_id"])
print(bundle_ids)
plt.hist(x=bundle_ids, bins=10);


In [None]:
questions.value_counts(subset=["correct_answer"])


In [None]:
questions.value_counts(subset=["part"])



In [None]:
questions.tags = questions.tags.astype(str).apply([str.strip, str.split])
questions.tags[0][0]

In [None]:
# ll = train_dt.shape[0]
#
# uniques = dt.unique(train_dt["user_id"]).shape[0]
# print(ll)
# print(uniques)
# print(ll/uniques)

In [None]:
value_counts = train.user_id.value_counts()

In [None]:
value_counts

In [None]:
plt.hist(value_counts);