<a href="https://colab.research.google.com/github/IsraelMelMon/Proyectos/blob/master/feature_eng_ml_kueski.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In this notebook we compute two features for the movielens ratings dataset. Each row of the dataset consists of the rating a user gave to certain movie. 

These features are roughly defined as follows:

**nb_previous_ratings:** number of ratings a user has given, prior to the current rating. 

**avg_ratings_previous:** average of the ratings a user has given, prior to the current rating.

We have the following problem: the ratings dataset is big (20M rows), and the feature `avg_ratings_previous` takes just too long to be computed for all the rows of the dataset (at least the way it's implemented). 

We have truncated the dataset so that this code runs quickly. 

We begin by extracting and truncating the dataset

In [None]:
import pandas as pd
from google.colab import drive
drive.mount('/content/drive')


Mounted at /content/drive


In [None]:
df_ratings = pd.read_csv("drive/MyDrive/rating.csv")


In [None]:
df_ratings

Unnamed: 0,userId,movieId,rating,timestamp
0,1,2,3.5,2005-04-02 23:53:47
1,1,29,3.5,2005-04-02 23:31:16
2,1,32,3.5,2005-04-02 23:33:39
3,1,47,3.5,2005-04-02 23:32:07
4,1,50,3.5,2005-04-02 23:29:40
...,...,...,...,...
20000258,138493,68954,4.5,2009-11-13 15:42:00
20000259,138493,69526,4.5,2009-12-03 18:31:48
20000260,138493,69644,3.0,2009-12-07 18:10:57
20000261,138493,70286,5.0,2009-11-13 15:42:24


In [None]:
# This will be the response variable of the model. Not used in this notebook, just added to give context.
df_ratings["rating_binary"] = df_ratings.rating.apply(lambda x: 1 if x >= 4 else 0)
df_ratings

Unnamed: 0,userId,movieId,rating,timestamp,rating_binary
0,1,2,3.5,2005-04-02 23:53:47,0
1,1,29,3.5,2005-04-02 23:31:16,0
2,1,32,3.5,2005-04-02 23:33:39,0
3,1,47,3.5,2005-04-02 23:32:07,0
4,1,50,3.5,2005-04-02 23:29:40,0
...,...,...,...,...,...
20000258,138493,68954,4.5,2009-11-13 15:42:00,1
20000259,138493,69526,4.5,2009-12-03 18:31:48,1
20000260,138493,69644,3.0,2009-12-07 18:10:57,0
20000261,138493,70286,5.0,2009-11-13 15:42:24,1


In [None]:
# Here we truncate the dataset
df_ratings_truncated = df_ratings.loc[0:100000, :]

We now proceed to compute the first feature: `nb_previous_ratings`

In [None]:
df_ratings_truncated = df_ratings_truncated.sort_values(by=["userId", "timestamp"])
df_ratings_truncated = df_ratings_truncated.reset_index(drop=True)
df_ratings_truncated["timestamp"] = pd.to_datetime(df_ratings_truncated.timestamp)
df_ratings_truncated

Unnamed: 0,userId,movieId,rating,timestamp,rating_binary
0,1,924,3.5,2004-09-10 03:06:38,0
1,1,919,3.5,2004-09-10 03:07:01,0
2,1,2683,3.5,2004-09-10 03:07:30,0
3,1,1584,3.5,2004-09-10 03:07:36,0
4,1,1079,4.0,2004-09-10 03:07:45,1
...,...,...,...,...,...
99996,702,489,0.5,2004-09-19 21:43:56,0
99997,702,1005,0.5,2004-09-19 21:46:36,0
99998,702,102,0.5,2004-09-19 21:47:08,0
99999,702,437,1.0,2004-09-19 21:47:27,0


In [None]:
df_grouped = df_ratings_truncated.groupby("userId")
df_ratings_truncated["nb_previous_ratings"] = df_grouped["timestamp"].rank(method="first") - 1

In [None]:
df_ratings_truncated

Unnamed: 0,userId,movieId,rating,timestamp,rating_binary,nb_previous_ratings
0,1,924,3.5,2004-09-10 03:06:38,0,0.0
1,1,919,3.5,2004-09-10 03:07:01,0,1.0
2,1,2683,3.5,2004-09-10 03:07:30,0,2.0
3,1,1584,3.5,2004-09-10 03:07:36,0,3.0
4,1,1079,4.0,2004-09-10 03:07:45,1,4.0
...,...,...,...,...,...,...
99996,702,489,0.5,2004-09-19 21:43:56,0,203.0
99997,702,1005,0.5,2004-09-19 21:46:36,0,204.0
99998,702,102,0.5,2004-09-19 21:47:08,0,205.0
99999,702,437,1.0,2004-09-19 21:47:27,0,206.0


We now compute the second feature: `avg_ratings_previous`

In [None]:
def avg_previous(df):
    avg = pd.Series(index=df.index)
    for i in df.index:
        df_aux = df.loc[df.timestamp < df.timestamp.loc[i], :]
        avg.at[i] = df_aux.rating.mean()
    return avg

In [None]:
import warnings
from time import *
warnings.filterwarnings('ignore')

In [None]:
avg_ratings_previous = pd.Series()
initial_time = time()
# the following cycle is the one that takes forever if we try to compute it for the whole dataset
for user in df_ratings_truncated.userId.unique():
    df_user = df_ratings_truncated.loc[df_ratings_truncated.userId == user, :]
    avg_ratings_previous = avg_ratings_previous.append(avg_previous(df_user))
end_time = time()
print("naive computation time (s): ", end_time-initial_time)
avg_ratings_previous

naive computation time (s):  80.72360944747925


0              NaN
1         3.500000
2         3.500000
3         3.500000
4         3.500000
            ...   
99996     2.871921
99997     2.860294
99998     2.848780
99999     2.837379
100000    2.828502
Length: 100001, dtype: float64

In [None]:
df_ratings_truncated["avg_ratings_previous"] = avg_ratings_previous
df_ratings_truncated.head(10)

Unnamed: 0,userId,movieId,rating,timestamp,rating_binary,nb_previous_ratings,avg_ratings_previous
0,1,924,3.5,2004-09-10 03:06:38,0,0.0,
1,1,919,3.5,2004-09-10 03:07:01,0,1.0,3.5
2,1,2683,3.5,2004-09-10 03:07:30,0,2.0,3.5
3,1,1584,3.5,2004-09-10 03:07:36,0,3.0,3.5
4,1,1079,4.0,2004-09-10 03:07:45,1,4.0,3.5
5,1,653,3.0,2004-09-10 03:08:11,0,5.0,3.6
6,1,2959,4.0,2004-09-10 03:08:18,1,6.0,3.5
7,1,337,3.5,2004-09-10 03:08:29,0,7.0,3.571429
8,1,1304,3.0,2004-09-10 03:08:40,0,8.0,3.5625
9,1,3996,4.0,2004-09-10 03:08:47,1,9.0,3.5


PySpark and Java installation


In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-2.4.7/spark-2.4.7-bin-hadoop2.7.tgz
#!wget -q https://www-us.apache.org/dist/spark/spark-2.4.1/spark-2.4.1-bin-hadoop2.7.tgz
!tar xf spark-2.4.7-bin-hadoop2.7.tgz
!pip install -q findspark

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.7-bin-hadoop2.7"

we initiate the Spark session

In [None]:
import findspark
findspark.init()
import pandas_gbq
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
from pyspark.sql import SQLContext
sqlContext = SQLContext(spark)


we create a pyspark sqlcontext of the truncated DataFrame (we use the truncated dataframe for illustrative purposes)

In [None]:
sqlc_truncated_ratings = sqlContext.createDataFrame(df_ratings_truncated.iloc[:,:-1])
sqlc_truncated_ratings.show()

+------+-------+------+-------------------+-------------+-------------------+
|userId|movieId|rating|          timestamp|rating_binary|nb_previous_ratings|
+------+-------+------+-------------------+-------------+-------------------+
|     1|    924|   3.5|2004-09-10 03:06:38|            0|                0.0|
|     1|    919|   3.5|2004-09-10 03:07:01|            0|                1.0|
|     1|   2683|   3.5|2004-09-10 03:07:30|            0|                2.0|
|     1|   1584|   3.5|2004-09-10 03:07:36|            0|                3.0|
|     1|   1079|   4.0|2004-09-10 03:07:45|            1|                4.0|
|     1|    653|   3.0|2004-09-10 03:08:11|            0|                5.0|
|     1|   2959|   4.0|2004-09-10 03:08:18|            1|                6.0|
|     1|    337|   3.5|2004-09-10 03:08:29|            0|                7.0|
|     1|   1304|   3.0|2004-09-10 03:08:40|            0|                8.0|
|     1|   3996|   4.0|2004-09-10 03:08:47|            1|       

we calculate the expanding average of every user (we see that this naive computation time is much less than using just Pandas in Python)


In [None]:
sqlc_truncated_ratings.createOrReplaceTempView("sqlc_truncated_ratings")
spark_initial_time = time()


sqlc_truncated_ratings = spark.sql("""select *, 
                          avg(rating)
                          over (partition by userId order by userId, timestamp) * nb_previous_ratings / nb_previous_ratings
                          as avg_previous_ratings 
                          from sqlc_truncated_ratings
                          """)

spark_end_time = time()
print("naive spark computation time (s):", spark_end_time-spark_initial_time)
#sqlc_truncated_ratings.orderBy("userId","timestamp").show()
#sql_ratings_table= sqlc_truncated_ratings.orderBy("userId","timestamp")
#sqlc_truncated_ratings = spark.sql("""insert into sqlc_truncated_ratings select avg_previous_ratings.* from (select '6','FFF') avg_previous_ratings""")
sqlc_truncated_ratings = sqlc_truncated_ratings.orderBy("userId","timestamp").na.fill(0)
sqlc_truncated_ratings.show()

                          

naive spark computation time (s): 0.28458642959594727
+------+-------+------+-------------------+-------------+-------------------+--------------------+
|userId|movieId|rating|          timestamp|rating_binary|nb_previous_ratings|avg_previous_ratings|
+------+-------+------+-------------------+-------------+-------------------+--------------------+
|     1|    924|   3.5|2004-09-10 03:06:38|            0|                0.0|                 0.0|
|     1|    919|   3.5|2004-09-10 03:07:01|            0|                1.0|                 3.5|
|     1|   2683|   3.5|2004-09-10 03:07:30|            0|                2.0|                 3.5|
|     1|   1584|   3.5|2004-09-10 03:07:36|            0|                3.0|                 3.5|
|     1|   1079|   4.0|2004-09-10 03:07:45|            1|                4.0|                 3.6|
|     1|    653|   3.0|2004-09-10 03:08:11|            0|                5.0|                 3.5|
|     1|   2959|   4.0|2004-09-10 03:08:18|            

we install mlflow

In [None]:
!pip install mlflow 
!pip install tqdm
!pip install pyarrow
!pip install pyngrok 

Collecting mlflow
[?25l  Downloading https://files.pythonhosted.org/packages/b6/69/c6b3911ccb421adc779390ca2ea54cb888a54e282d50e8d20ce751b5c7ab/mlflow-1.12.1-py3-none-any.whl (13.9MB)
[K     |████████████████████████████████| 13.9MB 294kB/s 
[?25hCollecting databricks-cli>=0.8.7
[?25l  Downloading https://files.pythonhosted.org/packages/40/88/ae1f78cf582b707c605c77df49b4c8786a4465edc51adb25d2f98ef4c4de/databricks-cli-0.14.1.tar.gz (54kB)
[K     |████████████████████████████████| 61kB 7.1MB/s 
Collecting gunicorn; platform_system != "Windows"
[?25l  Downloading https://files.pythonhosted.org/packages/69/ca/926f7cd3a2014b16870086b2d0fdc84a9e49473c68a8dff8b57f7c156f43/gunicorn-20.0.4-py2.py3-none-any.whl (77kB)
[K     |████████████████████████████████| 81kB 7.7MB/s 
[?25hCollecting alembic<=1.4.1
[?25l  Downloading https://files.pythonhosted.org/packages/e0/e9/359dbb77c35c419df0aedeb1d53e71e7e3f438ff64a8fdb048c907404de3/alembic-1.4.1.tar.gz (1.1MB)
[K     |██████████████████████

In [None]:
import numpy as np
import pandas as pd
import pyspark
import os
import urllib
import sys
import mlflow
from pyngrok import ngrok
from pyspark.sql.functions import *
from pyspark.ml.classification import *
from pyspark.ml.evaluation import *
from pyspark.ml.feature import *

our feature columns are always after the **rating_binary** column

In [None]:
feature_cols = sqlc_truncated_ratings.columns[5:]
print(feature_cols)

['nb_previous_ratings', 'avg_previous_ratings']


In [None]:
assembler = pyspark.ml.feature.VectorAssembler(inputCols=feature_cols, outputCol="features")
sqlc_truncated_ratings = assembler.transform(sqlc_truncated_ratings)

We label the our vectors features and classes

In [None]:
sqlc_truncated_ratings = sqlc_truncated_ratings.select(["features", "rating_binary"])
label_indexer = pyspark.ml.feature.StringIndexer(inputCol="rating_binary", outputCol="label").fit(sqlc_truncated_ratings)
sqlc_truncated_ratings = label_indexer.transform(sqlc_truncated_ratings)
sqlc_truncated_ratings = sqlc_truncated_ratings.select(["features", "label"])

In [None]:
train, test = sqlc_truncated_ratings.randomSplit([0.80, 0.20])
reg = 0.01

In [None]:
with mlflow.start_run(run_name="MLflow on Colab"):
   lr = pyspark.ml.classification.LogisticRegression(regParam=reg)
   model = lr.fit(train)
   prediction = model.transform(test)
   evaluator =pyspark.ml.evaluation.BinaryClassificationEvaluator(metricName="areaUnderROC")
   aucroc = evaluator.evaluate(prediction)
   mlflow.log_param("Regularization", reg)
   mlflow.log_metric("areaUnderROC", aucroc)
   #mlflow.log_metric("accuracy", acc)
   #mlflow.log_metric("recall", recall)
   mlflow.spark.log_model(model, "spark-model")

In [None]:
get_ipython().system_raw("mlflow ui --port 5000 &")
ngrok.kill()
NGROK_AUTH_TOKEN = "1lzK2WgGsmil7u2C8yFD1e5Kb7a_57445NZHtq96VpTdaES1X"
ngrok.set_auth_token(NGROK_AUTH_TOKEN)
ngrok_tunnel = ngrok.connect(addr="5000", proto="http", bind_tls=True)
print("MLflow Tracking UI:", ngrok_tunnel.public_url)



t=2020-12-22T04:43:03+0000 lvl=warn msg="can't bind default web address, trying alternatives" obj=web addr=127.0.0.1:4040


MLflow Tracking UI: https://a9945401b87b.ngrok.io
