In [1]:
%load_ext autoreload
%autoreload 2

import sys
sys.path.append('src')

In [2]:
from utils.config import load_config
config = load_config()


## Data Download

In [92]:
# !mkdir -p data
# !kaggle datasets download -d grouplens/movielens-20m-dataset -p data/ --unzip

Downloading movielens-20m-dataset.zip to data
 99%|███████████████████████████████████████▍| 193M/195M [00:06<00:00, 54.0MB/s]
100%|████████████████████████████████████████| 195M/195M [00:06<00:00, 32.3MB/s]


In [7]:
import zipfile
import requests
import os

def download_and_unzip(zip_url, output_dir):
    
    os.makedirs(output_dir, exist_ok=True)
    raw_dir = config['data_paths']['raw']
    print(raw_dir)
    if (
        os.path.exists(raw_dir) and 
        all([file in os.listdir(raw_dir) for file in ['links.csv', 'movies.csv', 'ratings.csv', 'tags.csv']])
    ):
        print("Files already exist. Skipping download.")
        return
    
    # Download the zip file
    response = requests.get(zip_url)
    zip_path = os.path.join(output_dir, os.path.basename(zip_url))
    with open(zip_path, 'wb') as f:
        f.write(response.content)
    
    # Unzip the file
    with zipfile.ZipFile(zip_path, 'r') as zip_ref:
        zip_ref.extractall(output_dir)
    
    # Remove the zip file
    os.remove(zip_path)
        
zip_url = "https://files.grouplens.org/datasets/movielens/ml-32m.zip"
download_and_unzip(zip_url, "data")


/Users/ajaykarthicksenthilkumar/dev/personal/recommendation-system/data/ml-32m
Files already exist. Skipping download.


In [8]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import countDistinct


# spark.driver.memory is used to set the memory for the spark driver process. 
spark = SparkSession.builder.master("local[4]") \
                    .appName('recommendation_system') \
                    .config("spark.driver.memory", "15g") \
                    .getOrCreate()
sc = spark.sparkContext

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/10/17 21:34:01 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [9]:
file_path = os.path.join(config['data_paths']['raw'], 'ratings.csv')


df = spark.read.options(
            header=True,
            inferSchema=True
        ) \
        .csv(file_path)

                                                                                

In [10]:
print(f"There are totally {df.count()} ratings in the dataset.")



There are totally 32000204 ratings in the dataset.


                                                                                

In [11]:
df.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: integer (nullable = true)



In [12]:
df.show(3)

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|     17|   4.0|944249077|
|     1|     25|   1.0|944250228|
|     1|     29|   2.0|943230976|
+------+-------+------+---------+
only showing top 3 rows



In [13]:
print(f"The unique number of users is {df.select(countDistinct('userId')).collect()[0][0]}\n"
      f"User ID starts from {df.agg({'userId': 'min'}).collect()[0][0]}\n"
      f"User ID ends at {df.agg({'userId': 'max'}).collect()[0][0]}"
)



The unique number of users is 200948
User ID starts from 1
User ID ends at 200948


                                                                                

In [14]:
print(f"The unique number of movies is {df.select(countDistinct('movieId')).collect()[0][0]}\n"
      f"Movie ID starts from {df.agg({'movieId': 'min'}).collect()[0][0]}\n"
      f"Movie ID ends at {df.agg({'movieId': 'max'}).collect()[0][0]}"
)



The unique number of movies is 84432
Movie ID starts from 1
Movie ID ends at 292757


                                                                                

In [13]:
from pyspark.sql.functions import monotonically_increasing_id

# create unique user mappings
unique_users = df.select('userId').distinct().sort('userId')
unique_users = unique_users.withColumn("userIdx", monotonically_increasing_id())
unique_users.show()

                                                                                

+------+-------+
|userId|userIdx|
+------+-------+
|     1|      0|
|     2|      1|
|     3|      2|
|     4|      3|
|     5|      4|
|     6|      5|
|     7|      6|
|     8|      7|
|     9|      8|
|    10|      9|
|    11|     10|
|    12|     11|
|    13|     12|
|    14|     13|
|    15|     14|
|    16|     15|
|    17|     16|
|    18|     17|
|    19|     18|
|    20|     19|
+------+-------+
only showing top 20 rows



In [14]:
# create unique movie mappings
unique_movies = df.select('movieId').distinct().sort('movieId')
unique_movies = unique_movies.withColumn("movieIdx", monotonically_increasing_id())
unique_movies.show()



+-------+--------+
|movieId|movieIdx|
+-------+--------+
|      1|       0|
|      2|       1|
|      3|       2|
|      4|       3|
|      5|       4|
|      6|       5|
|      7|       6|
|      8|       7|
|      9|       8|
|     10|       9|
|     11|      10|
|     12|      11|
|     13|      12|
|     14|      13|
|     15|      14|
|     16|      15|
|     17|      16|
|     18|      17|
|     19|      18|
|     20|      19|
+-------+--------+
only showing top 20 rows



                                                                                

In [15]:
# join the mappings with the original dataframe
df_with_mapped_ids = df.join(unique_users, on="userId").join(unique_movies, on="movieId")
df_with_mapped_ids.show()

                                                                                

+-------+------+------+----------+-------+--------+
|movieId|userId|rating| timestamp|userIdx|movieIdx|
+-------+------+------+----------+-------+--------+
|   1088|   148|   0.5|1471747769|    147|    1061|
|   2366|   148|   1.0|1471747783|    147|    2275|
|   4519|   148|   1.0|1471747756|    147|    4415|
|   1580|   496|   3.5|1633649130|    495|    1523|
|   1580|   833|   2.5|1193952315|    832|    1523|
|   1645|   833|   4.0|1198795114|    832|    1583|
|   3175|   833|   3.0|1196235442|    832|    3082|
|   3997|   833|   2.0|1193993251|    832|    3894|
|  44022|   833|   4.0|1193956572|    832|   10668|
|   2366|  1088|   4.0| 992200629|   1087|    2275|
|   3175|  1088|   3.0| 992201135|   1087|    3082|
| 175197|  1238|   2.0|1515524615|   1237|   46161|
|   1088|  1580|   3.5|1122328147|   1579|    1061|
|   1959|  2122|   2.0| 975701849|   2121|    1870|
|   6620|  2142|   4.0|1108961715|   2141|    6498|
|   1591|  3794|   4.0|1215501092|   3793|    1534|
|   1580|  3

In [25]:
print(f"The unique number of users is {df_with_mapped_ids.select(countDistinct('userIdx')).collect()[0][0]}\n"
      f"User ID starts from {df_with_mapped_ids.agg({'userIdx': 'min'}).collect()[0][0]}\n"
      f"User ID ends at {df_with_mapped_ids.agg({'userIdx': 'max'}).collect()[0][0]}"
)



The unique number of users is 200948
User ID starts from 0
User ID ends at 200947


                                                                                

In [26]:
print(f"The unique number of movies is {df_with_mapped_ids.select(countDistinct('movieIdx')).collect()[0][0]}\n"
      f"Movie ID starts from {df_with_mapped_ids.agg({'movieIdx': 'min'}).collect()[0][0]}\n"
      f"Movie ID ends at {df_with_mapped_ids.agg({'movieIdx': 'max'}).collect()[0][0]}"
)



The unique number of movies is 84432
Movie ID starts from 0
Movie ID ends at 84431


                                                                                

In [17]:
file_path = os.path.join(config['data_paths']['processed'], 'ratings.parquet')

df_with_mapped_ids.write \
    .parquet(file_path, mode='overwrite')

                                                                                

In [18]:
spark.stop()