In [None]:
import sys

import urllib
import zipfile

# On ajoute le dossier racine dans les chemins de fichiers de python
sys.path.append("../")

In [None]:
%load_ext autoreload
%autoreload 2

from P9_02_scripts.datasets import *
from notebook import *

# Introduction

In [None]:
# Lien vers le dataset
DATASET_URL = "https://s3-eu-west-1.amazonaws.com/static.oc-static.com/prod/courses/files/AI+Engineer/Project+9+-+R%C3%A9alisez+une+application+mobile+de+recommandation+de+contenu/news-portal-user-interactions-by-globocom.zip"

# Fichiers du dataset
csv_files = ["articles_metadata.csv", "clicks_sample.csv"]
zip_file = "clicks"
pickle_file = "articles_embeddings.pickle"

needed_files = csv_files + [zip_file] + [pickle_file]

# Fichiers actuellement présents
current_files = os.listdir(CSV_PATH)
current_files += os.listdir(PICKLE_PATH)

# On vérifie si tous les fichiers/dossiers sont bien présents
if all([i in current_files for i in needed_files]):
    print("Tous les fichiers sont bien présents.")
# Sinon on télécharge et on extrait les données
else:
    print("Téléchargement des données en cours...")

    # On télécharge le .zip dans un fichier temporaire et on extrait les données
    tmp, _ = urllib.request.urlretrieve(DATASET_URL)
    with zipfile.ZipFile(tmp, "r") as f:
        # On extrait les fichiers csv
        for i in csv_files:
            f.extract(i, CSV_PATH)
        
        # On extrait le fichier zip contenant des fichiers csv
        with f.open(zip_file + ".zip") as f2:
            with zipfile.ZipFile(f2, "r") as f3:
                f3.extractall(CSV_PATH)
        
        # On extrait le fichier pickle
        f.extract(pickle_file, PICKLE_PATH)

    # On supprime le fichier temporaire
    urllib.request.urlcleanup()
    
    print("Téléchargement des données terminé.")

# Chargement des ressources

Nous allons charger toutes les ressources Azure qui vont nous permettre de créer et d'enregistrer des jeux de données.

## Chargement du workspace

In [None]:
# On charge l’espace de travail Azure Machine Learning existant
ws = Workspace.from_config()

## Chargement du magasin de données

In [None]:
# On charge le magasin de données par défaut
datastore = ws.get_default_datastore()

# Exploration et analyse des données

## Fichiers clicks_hour_xxx.csv

### Chargement des fichiers

In [None]:
clicks_dir = CSV_PATH + "clicks"

# On ouvre les fichiers et on ajoute les données dans une liste
clicks = []
for i in tqdm(os.listdir(clicks_dir), leave=False):
    tmp = pd.read_csv(os.path.join(clicks_dir, i))
    clicks.append(tmp)
    
# On concatène toutes les données
clicks = pd.concat(clicks)
clicks.shape

In [None]:
clicks.head()

### Convertion des types

In [None]:
# On met à jour le type des variables
clicks = clicks.astype({
    "user_id": np.uint64,
    "session_id": np.uint64,
    "session_start": np.uint64,
    "session_size": np.uint16,
    "click_article_id": np.uint64,
    "click_timestamp": np.uint64,
    "click_environment": np.uint8,
    "click_deviceGroup": np.uint8,
    "click_os": np.uint8,
    "click_country": np.uint8,
    "click_region": np.uint8,
    "click_referrer_type": np.uint8
})

In [None]:
# On convertit les timestamps en datetime
clicks["session_start"] = pd.to_datetime(clicks['session_start'], unit='ms')
clicks["click_timestamp"] = pd.to_datetime(clicks['click_timestamp'], unit='ms')

In [None]:
# On renomme les colonnes
clicks = clicks.rename(columns={
    "session_start": "session_start_dt",
    "click_timestamp": "click_dt",
})

In [None]:
# On trie par ordre chronologique
clicks = clicks.sort_values("click_dt")

### Indicateurs statistiques

In [None]:
num_vars = [
    "session_size",
    "session_start_dt",
    "click_dt"
]

cat_vars = [
    "user_id",
    "session_id",
    "click_article_id",
    "click_environment",
    "click_deviceGroup",
    "click_os",
    "click_country",
    "click_region",
    "click_referrer_type"
]

In [None]:
clicks[num_vars].describe(
    percentiles=[0.25, 0.5, 0.75, 0.95, 0.99],
    datetime_is_numeric=True
).T

In [None]:
clicks[cat_vars].astype("category").describe().T

### Longueur des sessions

In [None]:
# On regroupe les données par session
grp = clicks.groupby("session_id")

# On agrège les données
grp_agg = grp.agg({"session_start_dt": "min", "click_dt": "max"})

# On calcule le timedelta entre le dernier click et le début de la session
grp_agg["session_length_td"] = grp_agg["click_dt"] - grp_agg["session_start_dt"]
grp_agg["session_length_td"].describe(percentiles=[0.25, 0.5, 0.75, 0.95, 0.99])

### Répartition des clicks par jour

In [None]:
# On regroupe les données par jour
grp = clicks.groupby(pd.Grouper(key="click_dt", freq="D"))

# On agrège les données
grp_agg = grp.agg({
    "click_dt": lambda x: x.max() - x.min(),
    "click_article_id": "count"
})
grp_agg = grp_agg.rename(columns={
    "click_dt": "click_delta",
    "click_article_id": "click_nb",
})
grp_agg

In [None]:
clicks = clicks[clicks["click_dt"] < datetime(2017, 10, 17)]

In [None]:
fig, ax = plt.subplots(figsize=(16, 8))

clicks["click_dt"].dt.date.value_counts().sort_index().plot.bar(ax=ax)

ax.set_title("Nombre de clicks par jour")
ax.set_xlabel("Jour")
ax.set_ylabel("Nombre de clicks")

plt.show()

### Nombre d'utilisateurs par jour

In [None]:
# On regroupe les données par jour
grp = clicks.groupby(pd.Grouper(key="click_dt", freq="D"))

# On calcule l'ensemble cumulé des ids des utilisateurs par jour
grp_agg = grp.agg({"user_id": lambda x: x.unique().tolist()}) 
grp_agg["user_id"] = grp_agg["user_id"].cumsum().apply(set)

# On ajoute une colonne représentant le jour précédent
grp_agg["prev_user_id"] = grp_agg["user_id"].shift(1)
grp_agg["prev_user_id"] = grp_agg["prev_user_id"].fillna("").apply(set)

# On calcule le nombre d'utilisateur connus et inconnus par jour
grp_agg["user_nb"] = grp_agg[["user_id", "prev_user_id"]].aggregate(lambda x: len(x[0] & x[1]), axis=1)
grp_agg["new_user_nb"] = (grp_agg["user_id"] - grp_agg["prev_user_id"]).apply(len)

# On nettoie les colonnes et l'index pour l'affichage
grp_agg = grp_agg[["user_nb", "new_user_nb"]]
grp_agg = grp_agg.rename(columns={
    "user_nb": "nombre d'utilisateurs connus dans la journée",
    "new_user_nb": "nombre d'utilisateurs inconnus dans la journée",
})
grp_agg.index = grp_agg.index.date
grp_agg

In [None]:
fig, ax = plt.subplots(figsize=(16, 8))

grp_agg.plot.bar(ax=ax)

ax.set_title("Nombre d'utilisateurs par jour")
ax.set_xlabel("Jour")
ax.set_ylabel("Nombre d'utilisateurs")

plt.show()

In [None]:
del(clicks)
gc.collect()

## Fichier articles_metadata.csv

In [None]:
# On récupère les métadonnées des articles
articles_metadata = pd.read_csv(CSV_PATH + "articles_metadata.csv")
articles_metadata.shape

In [None]:
articles_metadata.head()

### Convertion des types

In [None]:
# On met à jour le type des variables
articles_metadata = articles_metadata.astype({
    "article_id": np.uint64,
    "category_id": np.uint16,
    "created_at_ts": np.uint64,
    "publisher_id": np.uint8,
    "words_count": np.uint16
})

In [None]:
# On convertit les timestamps en datetime
articles_metadata["created_at_ts"] = pd.to_datetime(articles_metadata['created_at_ts'], unit='ms')

In [None]:
# On renomme les colonnes
articles_metadata = articles_metadata.rename(columns={
    "created_at_ts": "created_dt",
    "words_count": "word_nb",
})

### Indicateurs statistiques

In [None]:
num_vars = ["word_nb", "created_dt"]
cat_vars = ["article_id", "category_id"]

In [None]:
articles_metadata[num_vars].describe(
    percentiles=[0.25, 0.5, 0.75, 0.95, 0.99],
    datetime_is_numeric=True
).T

In [None]:
articles_metadata[cat_vars].astype("category").describe().T

### Suppression des variables inutiles

In [None]:
# On supprime les colonnes inutiles
articles_metadata = articles_metadata.drop(columns=["publisher_id"])

### Nombre d'articles ajoutés par an

In [None]:
# On regroupe les données par an
grp = articles_metadata.groupby(pd.Grouper(key="created_dt", freq="Y"))

# On calcule le nombre d'articles ajoutés
grp_agg = grp.agg({"article_id": "count"})

# On nettoie les colonnes et l'index pour l'affichage
grp_agg = grp_agg.rename(columns={
    "article_id": "nombre d'articles ajoutés dans l'année"
})
grp_agg.index = grp_agg.index.year
grp_agg

In [None]:
fig, ax = plt.subplots(figsize=(16, 8))

grp_agg.plot.bar(ax=ax)

ax.set_title("Nombre d'articles ajoutés par an")
ax.set_xlabel("Année")
ax.set_ylabel("Nombre d'articles")

plt.show()

In [None]:
del(articles_metadata)
gc.collect()

## Fichier articles_embeddings.pickle

In [None]:
# On récupère les embeddings des articles
with open(PICKLE_PATH + "articles_embeddings.pickle", "rb") as f:
    articles_embeddings = pickle.load(f)
    
# On les place dans un dataframe
articles_embeddings = pd.DataFrame(
    articles_embeddings.tolist(),
    columns=[f"emb_{i}" for i in range(articles_embeddings.shape[1])]
)
articles_embeddings.shape

In [None]:
articles_embeddings.head()

### Indicateurs statistiques

In [None]:
articles_embeddings.iloc[:10].T.describe().T

In [None]:
del(articles_embeddings)
gc.collect()

# Enregistrement des données brutes

## Chargement des données

In [None]:
clicks = get_clicks(CSV_PATH + "clicks")
clicks = clicks[clicks["click_dt"] < datetime(2017, 10, 17)]

In [None]:
clicks.iloc[[0, 1, -2, -1]]

In [None]:
articles = get_articles(
    CSV_PATH + "articles_metadata.csv",
    PICKLE_PATH + "articles_embeddings.pickle",
)

In [None]:
articles.iloc[[0, 1, -2, -1]]

## Enregistrement dans le datastore

In [None]:
# On enregistre les données jour par jour dans le datastore
upload_clicks_in_datastore(clicks, datastore)

In [None]:
# On enregistre les données année par année dans le datastore
upload_articles_in_datastore(articles, datastore)

## Enregistrement des datasets

In [None]:
# On crée/update le dataset avec tous les fichiers clicks présents dans le datastore
clicks_ds = create_update_clicks_dataset(ws, datastore)

In [None]:
# On crée/update le dataset avec tous les fichiers articles présents dans le datastore
articles_ds = create_update_articles_dataset(ws, datastore)

# Transformation des données

## Split temporel des données

In [None]:
test_end_dt = datetime(2017, 10, 17)
test_start_dt = test_end_dt - timedelta(days=1)

valid_end_dt = test_start_dt - timedelta(days=1)
valid_start_dt = valid_end_dt - timedelta(days=1)

train_end_dt = valid_start_dt - timedelta(days=1)
train_start_dt = train_end_dt - timedelta(days=5)

print("Test period :\t", test_end_dt, "-", test_start_dt)
print("Valid period :\t", valid_end_dt, "-", valid_start_dt)
print("Train period :\t", train_end_dt, "-", train_start_dt)

## Chargement des datasets

In [None]:
# On récupère le dataset filtré en fonction de l'horodatage des clicks
test_clicks_ds = get_clicks_dataset(ws, start_time=test_start_dt, end_time=test_end_dt)
valid_clicks_ds = get_clicks_dataset(ws, start_time=valid_start_dt, end_time=valid_end_dt)
train_clicks_ds = get_clicks_dataset(ws, start_time=train_start_dt, end_time=train_end_dt)

# On mets les données dans des dataframes
test_clicks = test_clicks_ds.to_pandas_dataframe().reset_index(drop=True)
valid_clicks = valid_clicks_ds.to_pandas_dataframe().reset_index(drop=True)
train_clicks = train_clicks_ds.to_pandas_dataframe().reset_index(drop=True)

test_clicks.shape, valid_clicks.shape, train_clicks.shape

In [None]:
# On récupère le dataset
articles_ds = get_articles_dataset(ws)

# On mets les données dans un dataframe
articles = articles_ds.to_pandas_dataframe().reset_index(drop=True)

articles.shape

## Filtrage des données

In [None]:
test_clicks = filter_clicks(test_clicks, click_article_nb_ge=5)
valid_clicks = filter_clicks(valid_clicks, click_article_nb_ge=5)

test_clicks.shape, valid_clicks.shape

## Nombre d'utilisateurs

In [None]:
test_user_ids = set(test_clicks["user_id"].unique())
valid_user_ids = set(valid_clicks["user_id"].unique())
train_user_ids = set(train_clicks["user_id"].unique())

In [None]:
for name, user_ids in zip(["test", "validation"], [test_user_ids, valid_user_ids]):
    user_nb = len(test_user_ids)

    known_user_nb = len(user_ids & train_user_ids)
    known_user_ratio = known_user_nb / len(user_ids)

    unknown_user_nb = len(user_ids - train_user_ids)
    unknown_user_ratio = unknown_user_nb / len(user_ids)

    print(f"Le jeu de {name} contient :")
    print(f"- {known_user_ratio:.1%} ({known_user_nb}/{user_nb}) utilisateurs connus.")
    print(f"- {unknown_user_ratio:.1%} ({unknown_user_nb}/{user_nb}) utilisateurs inconnus.\n")

## Notation des articles

### Jeu d'entrainement

In [None]:
train_user_article_ratings = get_user_article_ratings(train_clicks)
train_user_article_ratings.shape

In [None]:
train_user_article_ratings.iloc[[0, 1, -2, -1]]

In [None]:
train_user_article_ratings = add_user_article_ratings_tns(
    train_user_article_ratings,
    tn_nb=None,
    random_state=RANDOM_SEED
)
train_user_article_ratings.shape

In [None]:
train_user_article_ratings.iloc[[0, 1, -2, -1]]

In [None]:
train_user_article_ratings.to_parquet(
    PARQUET_PATH + "train_user_article_ratings.parquet",
    index=False,
    coerce_timestamps='ms'
)

### Jeu de validation

In [None]:
valid_user_article_ratings = get_user_article_ratings(valid_clicks)
valid_user_article_ratings.shape

In [None]:
valid_user_article_ratings = add_user_article_ratings_tns(
    valid_user_article_ratings,
    tn_nb=100,
    random_state=RANDOM_SEED
)
valid_user_article_ratings.shape

In [None]:
valid_user_article_ratings.iloc[[0, 1, -2, -1]]

In [None]:
valid_user_article_ratings.to_parquet(
    PARQUET_PATH + "valid_user_article_ratings.parquet",
    index=False,
    coerce_timestamps='ms'
)

### Jeu de test

In [None]:
test_user_article_ratings = get_user_article_ratings(test_clicks)
test_user_article_ratings.shape

In [None]:
test_user_article_ratings = add_user_article_ratings_tns(
    test_user_article_ratings,
    tn_nb=100,
    random_state=RANDOM_SEED
)
test_user_article_ratings.shape

In [None]:
test_user_article_ratings.iloc[[0, 1, -2, -1]]

In [None]:
test_user_article_ratings.to_parquet(
    PARQUET_PATH + "test_user_article_ratings.parquet",
    index=False,
    coerce_timestamps='ms'
)

## Profils des articles

### Jeu d'entrainement

In [None]:
article_profiles = get_article_profiles(train_clicks, articles)
article_profiles.shape

In [None]:
article_profiles.iloc[[0, 1, -2, -1]]

In [None]:
article_profiles.to_parquet(PARQUET_PATH + "article_profiles.parquet", index=False, coerce_timestamps='ms')

## Profils des utilisateur

### Jeu d'entrainement

In [None]:
train_user_profiles = get_user_profiles(train_clicks, article_profiles)
train_user_profiles.shape

In [None]:
train_user_profiles.head()

In [None]:
train_user_profiles.to_parquet(PARQUET_PATH + "train_user_profiles.parquet", index=False)

## Enregistrement des données transformées

In [None]:
train_user_article_ratings_ds = Dataset.Tabular.register_pandas_dataframe(
    dataframe=train_user_article_ratings,
    target=(datastore, "train_user_article_ratings"),
    name="train_user_article_ratings",
    description="Jeu d'entrainement des notations des articles"
)

valid_user_article_ratings_ds = Dataset.Tabular.register_pandas_dataframe(
    dataframe=valid_user_article_ratings,
    target=(datastore, "valid_user_article_ratings"),
    name="valid_user_article_ratings",
    description="Jeu de validation des notations des articles"
)

test_user_article_ratings_ds = Dataset.Tabular.register_pandas_dataframe(
    dataframe=test_user_article_ratings,
    target=(datastore, "test_user_article_ratings"),
    name="test_user_article_ratings",
    description="Jeu de test des notations des articles"
)

article_profiles_ds = Dataset.Tabular.register_pandas_dataframe(
    dataframe=article_profiles,
    target=(datastore, "article_profiles"),
    name="article_profiles",
    description="Jeu d'entrainement des profils des articles"
)

train_user_profiles_ds = Dataset.Tabular.register_pandas_dataframe(
    dataframe=train_user_profiles,
    target=(datastore, "train_user_profiles"),
    name="train_user_profiles",
    description="Jeu d'entrainement des profils des utilisateurs"
)

# SQL database  (à supprimer)

In [None]:
cursor.fast_executemany()

In [None]:
from io import StringIO

def to_sql(engine, df, table, if_exists='fail', sep='\t', encoding='utf8'):
    # Create Table
    df[:0].to_sql(table, engine, if_exists=if_exists)

    # Prepare data
    output = StringIO()
    df.to_csv(output, sep=sep, header=False, encoding=encoding)
    output.seek(0)

    # Insert data
    connection = engine.raw_connection()
    cursor = connection.cursor()
#     cursor.copy_from(output, table, sep=sep, null='')
    string = "BULK INSERT {} FROM '{}' (WITH FORMAT = 'CSV');"
    cursor.execute(string.format(table, output))
    connection.commit()
    cursor.close()
    
to_sql(engine, clicks, "clicks", if_exists='replace', sep='\t', encoding='utf8')

In [None]:
sql_datastore = Datastore.get(ws, "workspacesqlstore")

In [None]:
from azureml.data.sql_data_reference import SqlDataReference
from azureml.data.datapath import DataPath

In [None]:
import pyodbc
import sqlalchemy
from sqlalchemy.engine import URL

In [None]:
connection_string = 'Driver={ODBC Driver 17 for SQL Server};Server=tcp:p9-sql-server.database.windows.net,1433;Database=p9-db;Uid=yannsako@p9-sql-server;Pwd=wCAqaeCVZs4QFUz;Encrypt=yes;TrustServerCertificate=no;Connection Timeout=60;'
connection_url = URL.create("mssql+pyodbc", query={"odbc_connect": connection_string})

engine = sqlalchemy.create_engine(connection_url, fast_executemany=True)

In [None]:
clicks.to_sql("clicks", engine, if_exists='replace')

In [None]:
pd.read_sql_query("SELECT * FROM clicks", engine)

In [None]:
query = DataPath(sql_datastore, 'select * from clicks')
tabular = Dataset.Tabular.from_sql_query(query, query_timeout=60, validate=False)

In [None]:
df = tabular.to_pandas_dataframe()
df.shape