In [1]:
import pyarrow.parquet as pq
import pandas as pd
import os
import numpy as np

In [2]:
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all" # to make jupyter print all outputs, not just the last one
from IPython.core.display import HTML # to pretty print pandas df and be able to copy them over (e.g. to ppt slides)

In [3]:
os.listdir(os.path.join('.', 'parquets'))

['movies_df', 'movie_title_df', 'netflix_df', 'users_df']

In [4]:
# create dfs from parquet files
netflix_df = pd.read_parquet('parquets/netflix_df')
movie_titles_df = pd.read_parquet('parquets/movie_title_df')
movies_df = pd.read_parquet('parquets/movies_df')
users_df = pd.read_parquet('parquets/users_df')

In [5]:
# merge movies and reviews from movielens dataset
movielens_df = pd.merge(movies_df,users_df,on='movieId',how='outer')

In [6]:
movielens_df

Unnamed: 0,movieId,imdbId,tmdbId,title,genres,year,userId,rating,tag,date,timestamp_tag
0,1,114709,862.0,Toy Story,Adventure|Animation|Children|Comedy|Fantasy,1995,1,4.0,,2000-07-30,
1,1,114709,862.0,Toy Story,Adventure|Animation|Children|Comedy|Fantasy,1995,5,4.0,,1996-11-08,
2,1,114709,862.0,Toy Story,Adventure|Animation|Children|Comedy|Fantasy,1995,7,4.5,,2005-01-25,
3,1,114709,862.0,Toy Story,Adventure|Animation|Children|Comedy|Fantasy,1995,15,2.5,,2017-11-13,
4,1,114709,862.0,Toy Story,Adventure|Animation|Children|Comedy|Fantasy,1995,17,4.5,,2011-05-18,
...,...,...,...,...,...,...,...,...,...,...,...
102879,193581,5476944,432131.0,Black Butler: Book of the Atlantic,Action|Animation|Comedy|Fantasy,2017,184,4.0,,2018-09-16,
102880,193583,5914996,445030.0,No Game No Life: Zero,Animation|Comedy|Fantasy,2017,184,3.5,,2018-09-16,
102881,193585,6397426,479308.0,Flint,Drama,2017,184,3.5,,2018-09-16,
102882,193587,8391976,483455.0,Bungo Stray Dogs: Dead Apple,Action|Animation,2018,184,3.5,,2018-09-16,


To work around the problem of working with millions of rows, we will group the dataset by movieId. In order to do so, ratings, timestamps and userId per review of each movie need to be appended into dictionaries. This way, each review is unique and easy to count. Also it will reduce memory usage and allow us to import more movieIds into the training data, as one movie can have multiple reviews. Furthermore, data will also be easily accessible.

For both datasets, the approach will be the same: 

- A new column will be created which combines the information of a review
- The dataset will be grouped by movieId, aggregating each dictionary into a list

#### Movielens dataset:

In [7]:
# # create a new column 'user_data' filled with dictionaries
# movielens_df['review_data'] = movielens_df.apply(
#     lambda row: {'userId': row['userId'],
#                                  'rating': row['rating'],
#                                   'date': row['date'],
#                                   'tag': row['tag'],
#                                   'timestamp_tag': row['timestamp_tag']},
#     axis=1
# )

# create a new column 'user_data' filled with dictionaries of combined review data
movielens_df['review_data'] = movielens_df.apply(
    lambda row: {
        'userId': row['userId'],
        'rating': row['rating'],
        'date': row['date'],
        'tag': row['tag'],
        'timestamp_tag': row['timestamp_tag']
    } if any(row[['userId', 'rating', 'date', 'tag', 'timestamp_tag']].notna()) else None,
    axis=1
)

# # Create a new column 'review_data' filled with dictionaries
# movielens_df['review_data'] = movielens_df.apply(
#     lambda row: {
#         row['userId']: {
#             'rating': row['rating'],
#             'date': row['date'],
#             'tag': row['tag'],
#             'timestamp_tag': row['timestamp_tag']
#         }
#     } if any(row[['userId', 'rating', 'date', 'tag', 'timestamp_tag']].notna()) else None,
#     axis=1
# )

In [8]:
movielens_df.isnull().sum()

movieId              0
imdbId               0
tmdbId              13
title                0
genres               0
year                20
userId               0
rating             207
tag              99201
date               207
timestamp_tag    99201
review_data          0
dtype: int64

In [9]:
# drop redundant columns as they are in the dictionary in the review_data column
movielens_df = movielens_df.drop(['userId','rating','date','tag','timestamp_tag'],axis=1)

In [10]:
# define aggregation functions
agg_funcs = {
    'title': 'first',
    'genres': 'first',
    'year': 'first',
    'review_data': lambda x: [
        {"userId": val['userId'], "rating": val['rating'], "date": val['date'],'tag': val['tag'],'timestamp_tag': val['timestamp_tag']}
        for val in x if pd.notnull(val)
    ] or None,
    'imdbId': 'first',
    'tmdbId': 'first',
}

# group by 'movieId' and aggregate
movielens_df = movielens_df.groupby('movieId').agg(agg_funcs).reset_index()

In [11]:
movielens_df

Unnamed: 0,movieId,title,genres,year,review_data,imdbId,tmdbId
0,1,Toy Story,Adventure|Animation|Children|Comedy|Fantasy,1995,"[{'userId': 1, 'rating': 4.0, 'date': 2000-07-...",114709,862.0
1,2,Jumanji,Adventure|Children|Fantasy,1995,"[{'userId': 6, 'rating': 4.0, 'date': 1996-10-...",113497,8844.0
2,3,Grumpier Old Men,Comedy|Romance,1995,"[{'userId': 1, 'rating': 4.0, 'date': 2000-07-...",113228,15602.0
3,4,Waiting to Exhale,Comedy|Drama|Romance,1995,"[{'userId': 6, 'rating': 3.0, 'date': 1996-10-...",114885,31357.0
4,5,Father of the Bride Part II,Comedy,1995,"[{'userId': 6, 'rating': 5.0, 'date': 1996-10-...",113041,11862.0
...,...,...,...,...,...,...,...
9737,193581,Black Butler: Book of the Atlantic,Action|Animation|Comedy|Fantasy,2017,"[{'userId': 184, 'rating': 4.0, 'date': 2018-0...",5476944,432131.0
9738,193583,No Game No Life: Zero,Animation|Comedy|Fantasy,2017,"[{'userId': 184, 'rating': 3.5, 'date': 2018-0...",5914996,445030.0
9739,193585,Flint,Drama,2017,"[{'userId': 184, 'rating': 3.5, 'date': 2018-0...",6397426,479308.0
9740,193587,Bungo Stray Dogs: Dead Apple,Action|Animation,2018,"[{'userId': 184, 'rating': 3.5, 'date': 2018-0...",8391976,483455.0


I chose to modify the genres column to tuples, as genres per movie will not change, and it is more memory efficient:

In [12]:
# convert genres column to a tuple
movielens_df['genres'] = movielens_df['genres'].str.replace('|', ',').apply(lambda x: tuple(x.split(',')) if isinstance(x, str) else None)

In [13]:
movielens_df.isnull().sum()
movielens_df

movieId         0
title           0
genres          0
year           13
review_data     0
imdbId          0
tmdbId          8
dtype: int64

Unnamed: 0,movieId,title,genres,year,review_data,imdbId,tmdbId
0,1,Toy Story,"(Adventure, Animation, Children, Comedy, Fantasy)",1995,"[{'userId': 1, 'rating': 4.0, 'date': 2000-07-...",114709,862.0
1,2,Jumanji,"(Adventure, Children, Fantasy)",1995,"[{'userId': 6, 'rating': 4.0, 'date': 1996-10-...",113497,8844.0
2,3,Grumpier Old Men,"(Comedy, Romance)",1995,"[{'userId': 1, 'rating': 4.0, 'date': 2000-07-...",113228,15602.0
3,4,Waiting to Exhale,"(Comedy, Drama, Romance)",1995,"[{'userId': 6, 'rating': 3.0, 'date': 1996-10-...",114885,31357.0
4,5,Father of the Bride Part II,"(Comedy,)",1995,"[{'userId': 6, 'rating': 5.0, 'date': 1996-10-...",113041,11862.0
...,...,...,...,...,...,...,...
9737,193581,Black Butler: Book of the Atlantic,"(Action, Animation, Comedy, Fantasy)",2017,"[{'userId': 184, 'rating': 4.0, 'date': 2018-0...",5476944,432131.0
9738,193583,No Game No Life: Zero,"(Animation, Comedy, Fantasy)",2017,"[{'userId': 184, 'rating': 3.5, 'date': 2018-0...",5914996,445030.0
9739,193585,Flint,"(Drama,)",2017,"[{'userId': 184, 'rating': 3.5, 'date': 2018-0...",6397426,479308.0
9740,193587,Bungo Stray Dogs: Dead Apple,"(Action, Animation)",2018,"[{'userId': 184, 'rating': 3.5, 'date': 2018-0...",8391976,483455.0


#### Netflix Prize dataset:

In [14]:
netflix_df['date'] = (pd.to_datetime(netflix_df['date'])).dt.date

In [15]:
netflix_df.isnull().sum()
netflix_df
netflix_df[netflix_df['rating'].isnull()]
print("None values belong to movieids as rows of userid column end with colon. Therefore, they do not have be handled.")

userId       0
rating    1433
date      1433
dtype: int64

Unnamed: 0,userId,rating,date
0,1:,,NaT
1,1488844,3.0,2005-09-06
2,822109,5.0,2005-05-13
3,885013,4.0,2005-10-19
4,30878,4.0,2005-12-26
...,...,...,...
7999995,54150,5.0,2005-12-16
7999996,127381,4.0,2005-12-17
7999997,1391274,2.0,2005-12-17
7999998,1560530,5.0,2005-12-17


Unnamed: 0,userId,rating,date
0,1:,,NaT
548,2:,,NaT
694,3:,,NaT
2707,4:,,NaT
2850,5:,,NaT
...,...,...,...
7975826,13669:,,NaT
7978497,13670:,,NaT
7978598,13671:,,NaT
7978708,13672:,,NaT


None values belong to movieids as rows of userid column end with colon. Therefore, they do not have be handled.


In [16]:
# Extracting arrays directly from DataFrame
user_ids = netflix_df['userId'].values
ratings = netflix_df['rating'].values
dates = netflix_df['date'].values

data = []
current_movie_id = None

# Convert user_ids to a pandas Series to use the .str accessor
user_ids_series = pd.Series(user_ids)

# Find movieIds based on them ending with a colon, strip it off the colon and take that part, 
# and append the rest of the data in tuples to order everything per movieid
movie_indices = np.where(user_ids_series.str.endswith(':'))[0]
for i, idx in enumerate(movie_indices):
    # checks if the iteration in the loop is at the last indice of the movie_indices ending with colon in the dataframe 
    if i == len(movie_indices) - 1:
        next_idx = len(netflix_df)
    # sets the index to the id of the next movieId, assuming that the ids are growing ascendingly
    else:
        next_idx = movie_indices[i+1]
    current_movie_id = user_ids[idx].split(':')[0]
    # create corresponding dictionaries for the data that has been found
    user_data = {idx: val for idx, val in enumerate(netflix_df.loc[idx+1:next_idx-1, 'userId'].tolist())}
    rating_data = {idx: val for idx, val in enumerate(netflix_df.loc[idx+1:next_idx-1, 'rating'].tolist())}
    date_data = {idx: val for idx, val in enumerate(netflix_df.loc[idx+1:next_idx-1, 'date'].tolist())}
    data.append({
        'movieId': current_movie_id,
        'userId': user_data,
        'rating': rating_data,
        'date': date_data
    })

# Create DataFrame from processed data
netflix_df = pd.DataFrame(data, columns=['movieId', 'userId', 'rating', 'date'])

In [17]:
netflix_df

Unnamed: 0,movieId,userId,rating,date
0,1,"{0: '1488844', 1: '822109', 2: '885013', 3: '3...","{0: 3.0, 1: 5.0, 2: 4.0, 3: 4.0, 4: 3.0, 5: 3....","{0: 2005-09-06, 1: 2005-05-13, 2: 2005-10-19, ..."
1,2,"{0: '2059652', 1: '1666394', 2: '1759415', 3: ...","{0: 4.0, 1: 3.0, 2: 4.0, 3: 5.0, 4: 4.0, 5: 2....","{0: 2005-09-05, 1: 2005-04-19, 2: 2005-04-22, ..."
2,3,"{0: '1025579', 1: '712664', 2: '1331154', 3: '...","{0: 4.0, 1: 5.0, 2: 4.0, 3: 3.0, 4: 5.0, 5: 4....","{0: 2003-03-29, 1: 2004-02-01, 2: 2004-07-03, ..."
3,4,"{0: '1065039', 1: '1544320', 2: '410199', 3: '...","{0: 3.0, 1: 1.0, 2: 5.0, 3: 3.0, 4: 1.0, 5: 1....","{0: 2005-09-06, 1: 2004-06-28, 2: 2004-10-16, ..."
4,5,"{0: '1745265', 1: '885013', 2: '1997470', 3: '...","{0: 5.0, 1: 5.0, 2: 5.0, 3: 1.0, 4: 4.0, 5: 5....","{0: 2005-02-08, 1: 2005-05-15, 2: 2005-05-30, ..."
...,...,...,...,...
1428,13669,"{0: '1294425', 1: '780597', 2: '160977', 3: '1...","{0: 4.0, 1: 5.0, 2: 5.0, 3: 5.0, 4: 4.0, 5: 5....","{0: 2005-07-07, 1: 2004-05-17, 2: 2005-08-02, ..."
1429,13670,"{0: '247794', 1: '497196', 2: '2625420', 3: '3...","{0: 1.0, 1: 2.0, 2: 1.0, 3: 3.0, 4: 2.0, 5: 2....","{0: 2002-09-04, 1: 2003-04-13, 2: 2004-06-03, ..."
1430,13671,"{0: '876388', 1: '1989892', 2: '1478381', 3: '...","{0: 4.0, 1: 3.0, 2: 4.0, 3: 4.0, 4: 1.0, 5: 4....","{0: 2005-11-28, 1: 2005-11-14, 2: 2005-10-24, ..."
1431,13672,"{0: '1294425', 1: '1395430', 2: '1945809', 3: ...","{0: 5.0, 1: 5.0, 2: 3.0, 3: 4.0, 4: 4.0, 5: 2....","{0: 2005-07-07, 1: 2005-06-09, 2: 2005-06-28, ..."


Doing this the id column will be an object, needs to be integer for merging with movie titles:

In [18]:
# convert movieId to integers
netflix_df['movieId'] = netflix_df['movieId'].astype(int)

In [19]:
# merge with movietitles to add title and year
netflix_df = pd.merge(netflix_df,movie_titles_df,on='movieId',how='left')

In [20]:
# # Create a new column to store the combined dictionaries
# netflix_df['review_data'] = None

# # Iterate through the rows
# for index, row in netflix_df.iterrows():
#     combined_dict = {}
#     # Iterate through the keys in the userId column
#     for key in row['userId']:
#         combined_dict[key] = {
#             'userId': row['userId'][key],
#             'rating': row['rating'][key],
#             'date': row['date'][key]
#         }
#     netflix_df.at[index, 'review_data'] = combined_dict

# Create a new column to store the combined dictionaries
netflix_df['review_data'] = None

# Iterate through the rows
for index, row in netflix_df.iterrows():
    combined_reviews = []  # List to store combined dictionaries for each review
    # Iterate through the rows to combine userId, rating, and date
    for i in range(len(row['userId'])):
        review_dict = {}
        # Check if any of the values are not None
        if row['userId'][i] is not None or row['rating'][i] is not None or row['date'][i] is not None:
            review_dict['userId'] = row['userId'][i]
            review_dict['rating'] = row['rating'][i]
            review_dict['date'] = row['date'][i]
            combined_reviews.append(review_dict)
    
    # Set the combined reviews to the 'review_data' column
    netflix_df.at[index, 'review_data'] = combined_reviews

# # Create a new column to store the combined dictionaries
# netflix_df['review_data'] = None

# # Iterate through the rows
# for index, row in netflix_df.iterrows():
#     combined_reviews = []  # List to store combined dictionaries for each review
#     # Iterate through the rows to combine userId, rating, and date
#     for i in range(len(row['userId'])):
#         review_dict = {
#             'userId': row['userId'][i],
#             'rating': row['rating'][i] if i < len(row['rating']) else None,
#             'date': row['date'][i] if i < len(row['date']) else None
#         }
#         combined_reviews.append(review_dict)
    
#     # Set the combined reviews to the 'review_data' column
#     netflix_df.at[index, 'review_data'] = combined_reviews

# # Define chunk size for processing
# chunk_size = 100000

# # Calculate total number of chunks
# num_chunks = len(netflix_df) // chunk_size + 1

# # Convert the columns to NumPy arrays
# userId_array = np.array(netflix_df['userId'].tolist())
# rating_array = np.array(netflix_df['rating'].tolist())
# date_array = np.array(netflix_df['date'].tolist())

# # Iterate through the chunks
# for chunk_idx in range(num_chunks):
#     # Determine the start and end indices for the current chunk
#     start_idx = chunk_idx * chunk_size
#     end_idx = min((chunk_idx + 1) * chunk_size, len(netflix_df))

#     # Create a new column to store the combined dictionaries for the current chunk
#     netflix_df.loc[start_idx:end_idx, 'review_data'] = None

#     # Iterate through the rows in the current chunk
#     for idx in range(start_idx, end_idx):
#         combined_reviews = []  # List to store combined dictionaries for each review
#         for userId, rating, date in zip(userId_array[idx], rating_array[idx], date_array[idx]):
#             if userId is not None or rating is not None or date is not None:
#                 review_dict = {
#                     'userId': userId,
#                     'rating': rating,
#                     'date': date
#                 }
#                 combined_reviews.append(review_dict)

#         # Set the combined reviews to the 'review_data' column for the current row
#         netflix_df.at[idx, 'review_data'] = combined_reviews

In [21]:
# drop redundant columns
netflix_df = netflix_df.drop(['userId','rating','date'],axis=1)

In [22]:
# exclude anything other than numbers in the year column
netflix_df = netflix_df[netflix_df['year'].str.isdigit()]

In [23]:
netflix_df.isnull().sum()

movieId        0
year           0
title          0
review_data    0
dtype: int64

In [24]:
# convert review_data to values to extract elements from it
review_data = netflix_df['review_data'].values

# convert every element of review_data column dictionary to lists, to assess null values
ratings = [entry['rating'] for row in review_data for entry in row if 'rating' in entry]
dates = [entry['date'] for row in review_data for entry in row if 'date' in entry]
userids = [entry['userId'] for row in review_data for entry in row if 'userId' in entry]

# check for null values in every element of the review_data dictionaries
print('There are {} null values within the netflix prize movie ratings.'.format(ratings.count(None)))
print('There are {} null values within the netflix prize movie dates.'.format(dates.count(None)))
print('There are {} null values within the netflix prize movie userIds.'.format(userids.count(None)))
print("No null values that need to be handled.")

There are 0 null values within the netflix prize movie ratings.
There are 0 null values within the netflix prize movie dates.
There are 0 null values within the netflix prize movie userIds.
No null values that need to be handled.


## Finalizing and converting to parquet:

To convert to parquet the keys of the dictionaries need to be converted to strings, otherwise the conversion to parquet does not work. Afterwards to Netflix and Movielens dataframes will be converted to parquet files.

In [25]:
netflix_df.columns
movielens_df.columns
netflix_df
movielens_df

Index(['movieId', 'year', 'title', 'review_data'], dtype='object')

Index(['movieId', 'title', 'genres', 'year', 'review_data', 'imdbId',
       'tmdbId'],
      dtype='object')

Unnamed: 0,movieId,year,title,review_data
0,1,2003,Dinosaur Planet,"[{'userId': '1488844', 'rating': 3.0, 'date': ..."
1,2,2004,Isle of Man TT 2004 Review,"[{'userId': '2059652', 'rating': 4.0, 'date': ..."
2,3,1997,Character,"[{'userId': '1025579', 'rating': 4.0, 'date': ..."
3,4,1994,Paula Abdul's Get Up & Dance,"[{'userId': '1065039', 'rating': 3.0, 'date': ..."
4,5,2004,The Rise and Fall of ECW,"[{'userId': '1745265', 'rating': 5.0, 'date': ..."
...,...,...,...,...
1428,13669,1988,Red Dwarf: Series 4,"[{'userId': '1294425', 'rating': 4.0, 'date': ..."
1429,13670,1997,White Badge,"[{'userId': '247794', 'rating': 1.0, 'date': 2..."
1430,13671,1967,Samurai Rebellion,"[{'userId': '876388', 'rating': 4.0, 'date': 2..."
1431,13672,1973,Kung Fu: Season 2,"[{'userId': '1294425', 'rating': 5.0, 'date': ..."


Unnamed: 0,movieId,title,genres,year,review_data,imdbId,tmdbId
0,1,Toy Story,"(Adventure, Animation, Children, Comedy, Fantasy)",1995,"[{'userId': 1, 'rating': 4.0, 'date': 2000-07-...",114709,862.0
1,2,Jumanji,"(Adventure, Children, Fantasy)",1995,"[{'userId': 6, 'rating': 4.0, 'date': 1996-10-...",113497,8844.0
2,3,Grumpier Old Men,"(Comedy, Romance)",1995,"[{'userId': 1, 'rating': 4.0, 'date': 2000-07-...",113228,15602.0
3,4,Waiting to Exhale,"(Comedy, Drama, Romance)",1995,"[{'userId': 6, 'rating': 3.0, 'date': 1996-10-...",114885,31357.0
4,5,Father of the Bride Part II,"(Comedy,)",1995,"[{'userId': 6, 'rating': 5.0, 'date': 1996-10-...",113041,11862.0
...,...,...,...,...,...,...,...
9737,193581,Black Butler: Book of the Atlantic,"(Action, Animation, Comedy, Fantasy)",2017,"[{'userId': 184, 'rating': 4.0, 'date': 2018-0...",5476944,432131.0
9738,193583,No Game No Life: Zero,"(Animation, Comedy, Fantasy)",2017,"[{'userId': 184, 'rating': 3.5, 'date': 2018-0...",5914996,445030.0
9739,193585,Flint,"(Drama,)",2017,"[{'userId': 184, 'rating': 3.5, 'date': 2018-0...",6397426,479308.0
9740,193587,Bungo Stray Dogs: Dead Apple,"(Action, Animation)",2018,"[{'userId': 184, 'rating': 3.5, 'date': 2018-0...",8391976,483455.0


In [26]:
# convert to parquet
netflix_df.to_parquet('cleaned/netflix_parquet')
movielens_df.to_parquet('cleaned/movielens_parquet')

In [27]:
# netflix_movielens_df = pd.concat([netflix_df,movielens_df],ignore_index=True)

# # show to check afterwards if the indices are not replaced after concatenated dictionaries
# netflix_movielens_df[netflix_movielens_df['movieId']==1]

In [28]:
# def agg_concat_dicts_np(series):
#     # Convert series of dictionaries to a list of dictionaries
#     list_of_dicts = list(series)
#     # Merge dictionaries
#     merged_dict = {}
#     key_counter = 0
#     for d in list_of_dicts:
#         if d is not None:
#             for value in d.values():
#                 merged_dict[str(key_counter)] = value
#                 key_counter += 1
#     return merged_dict

# # group by 'movieId' and aggregate using the custom function
# recsys_df = netflix_movielens_df.groupby('movieId').agg({
#     'title' : 'first',
#     'year' : 'first',
#     'genres' : 'first',
#     'imdbId' : 'max',
#     'tmdbId' : 'max',
#     'review_data': agg_concat_dicts_np,
# }).reset_index()

In [29]:
# recsys_df.isnull().sum()
# recsys_df

In [30]:
# # convert to parquet
# recsys_df.to_parquet('cleaned/netflix_movielens_cleaned_parquet')