
# Netflix TV Shows and Movies Dataset Analysis

This notebook presents an exploratory data analysis (EDA) of the Netflix dataset using Scala and Apache Spark. 
The dataset contains information about TV shows and movies available on Netflix, including their release dates, 
genres, and ratings. The analysis focuses on identifying trends, understanding the distribution of content, 
and deriving meaningful insights.

## Steps Covered:
1. **Data Cleaning and Preprocessing**
2. **Exploratory Data Analysis (EDA)** using Spark SQL and Scala functions
3. **Visualization of Trends** such as content release years, genres, and ratings distribution

**Tools Used**:
- **Apache Spark** for distributed data processing
- **Scala** for defining transformations and performing analytics
- **Plotly** for generating visualizations


In [1]:

val netflixDF = spark.read
    .option("header", "true")
    .option("inferSchema", "true")
    .csv("netflix_titles.csv")

netflixDF.show(5)


  interactivity=interactivity, compiler=compiler, result=result)


(99072113, 3)
   user_id movie_id rating
0  1488844        1      3
1   822109        1      5
2   885013        1      4
3   823519        1      3


### Notice, this below 4 cells are for first part only(1/4)
To understand the data a little bit

In [2]:

#Check if the Rating column is null, if so it means it is 'movie id' row, count them 
movie_count = df1.isnull().sum()[1]
print('number of movies in part_1 : {}'.format(movie_count))


number of movies in part_1 : 4499


In [3]:
# Count total number of rows and subtract movies' count 
print('number of Users in part_1 : {}'.format(df1['user_id'].nunique()- movie_count))


number of Users in part_1 : 470758


In [4]:
# Count total number of observations 
num_observations = df1['user_id'].count()- movie_count
print('number of observations in part_1 : {}'.format(num_observations))

number of observations in part_1 : 24053764


In [15]:
# Trying to see the uniformity in the data 
print(df1.groupby('rating').count()* 100/num_observations)


# 85% of the data has  3 or more than rating



          user_id
rating           
1.0      4.648694
2.0     10.140089
3.0     28.703121
4.0     33.615284
5.0     22.892812


### Need to remove ID from rows and put into a column

In [2]:
### Use below code to process each part one by one, also keep updating the movie_id variable

df_nan = pd.DataFrame(pd.isnull(df1.rating))
df_nan = df_nan[df_nan['rating']==True]
print(df_nan.shape)

# create a single array with movie id - size ( difference of index) and value ( 1,2,3 etc)

movie_np = []

## We keep changing this variable by manually looking up the movie_id in one of those 4 data files
## As of now, this is to process the 4th part
movie_id = 13368

for i,j in zip(df_nan.index[1:],df_nan.index[:-1]):
#     print(i,j)
    temp_arr = np.full((1,i-j-1), movie_id)
    movie_np = np.append(movie_np,temp_arr)
    movie_id += 1

# last movie id

print(df_nan.iloc[-1, 0])

r = np.full((1,len(df1) - df_nan.index[-1] -1), movie_id)
#print(temp_arr)
movie_np = np.append(movie_np,r)
print(len(movie_np))


    

(4403, 1)
True
26847523


In [3]:
### Append the movie_np array as a column to the dataframe
###
df1 = df1[pd.notnull(df1['rating'])]
#Add the movie_id column
df1['movie_id'] = movie_np.astype(int)
df1['user_id'] = df1['user_id'].astype(int)
print(df1.columns)

print(df1.iloc[::5000000,:])

new_cols = df1.columns.tolist()
new_cols = new_cols[:1]+new_cols[-1:]+new_cols[1:2]
df1 = df1[new_cols]

print("persist the processed file ")
df1.to_csv("processed_part4.txt", encoding='utf-8', index=False)



Index(['user_id', 'rating', 'movie_id'], dtype='object')
          user_id  rating  movie_id
1         2385003     4.0     13368
5000907    495207     1.0     14274
10001561  2243515     4.0     14928
15002391  2210687     5.0     15758
20003238  2088300     5.0     16605
25004038   717559     3.0     17405
persist the processed file 


In [54]:
## Here we bring data into the forma of a matrix
## BUT, we don't use it
df_p = pd.pivot_table(df1,values='rating', index='movie_id', columns='user_id')
print(df_p.shape)

(4499, 470758)


# The code below is just to process the probe file 

In [4]:

## Code to pre-process probe.txt

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

from collections import OrderedDict
probe_file = open("probe.txt","r")
lines = probe_file.read().split("\n")

movie_id_dict = {}
prev_index = 0
prev_value = 1

valueList = []

for i in range(len(lines)):
    if(":" in lines[i]):
        movie_id_dict[str(prev_value)] = i-prev_index-1
        prev_index = i
        prev_value = int(lines[i].split(":")[0])
    else:
        valueList.append(lines[i])
    
movie_id_dict[str(prev_value)] = len(lines)-prev_index-1

orderedDict = OrderedDict(sorted(movie_id_dict.items()))

movie_np = []
## Create np array based on the ordered dict 

for k,v in orderedDict.items():
    temp_arr = np.full((1,v), int(k))
    movie_np = np.append(movie_np,temp_arr)

        
df = pd.DataFrame(valueList,columns=['user_id'])
df['movie_id'] = movie_np.astype(int)





         user_id  movie_id
0          30878         1
1        2647871         1
2        1283744         1
3        2488120         1
4         317050         1
5        1904905         1
6        1989766         1
7          14756         1
8        1027056         1
9        1149588         1
10       1394012         1
11       1406595         1
12       2529547         1
13       1682104         1
14       2625019         1
15       2603381         1
16       1774623         1
17        470861         1
18        712610         1
19       1772839         1
20       1059319         1
21       2380848         1
22        548064         1
23       1952305        10
24       1531863        10
25       2326571      1000
26        977808      1000
27       1010534      1000
28       1861759      1000
29         79755      1000
...          ...       ...
1408366   194929      9995
1408367  1847661      9995
1408368    66828      9996
1408369  1149582      9996
1408370   336696      9996
1

## The code below reads each processed part (remember there are 4) and processed_probe.txt 
After that, we create indices on (user_id,movie_id) and use set difference to filter out train and test data

In [1]:

val netflixDF = spark.read
    .option("header", "true")
    .option("inferSchema", "true")
    .csv("netflix_titles.csv")

netflixDF.show(5)


   user_id  movie_id  rating
0  2385003     13368     4.0
1   659432     13368     3.0
2   751812     13368     2.0
3  2625420     13368     2.0
4  1650301     13368     1.0
5  2269227     13368     4.0
6  2220672     13368     4.0
7  2500511     13368     4.0
8  1452058     13368     2.0
9  1624891     13368     3.0


In [2]:

val netflixDF = spark.read
    .option("header", "true")
    .option("inferSchema", "true")
    .csv("netflix_titles.csv")

netflixDF.show(5)


   user_id  movie_id  rating
0    30878         1     NaN
1  2647871         1     NaN
2  1283744         1     NaN
3  2488120         1     NaN
4   317050         1     NaN
5  1904905         1     NaN
6  1989766         1     NaN
7    14756         1     NaN
8  1027056         1     NaN
9  1149588         1     NaN


In [3]:

print("length of train df")
print(len(df_train))

print()

print("length of probe df")
print(len(df_probe))

df_probe['movie_id'] = df_probe['movie_id'].astype(int)

# for i in range(len(df_probe)):
#     if (df_probe.iloc[i]['movie_id'] > 4499):
#         df_probe.drop(df_probe.index[i], inplace=True)

# print(len(df_probe))
keys = ['user_id', 'movie_id']
i1 = df_train.set_index(keys).index
i2 = df_probe.set_index(keys).index
df_pure_train =  df_train[~i1.isin(i2)]
df_pure_train.to_csv("processed_pure_train_4.txt", encoding='utf-8', index=False)

## Now get probe
i3 = df_pure_train.set_index(keys).index
df_pure_probe = df_train[~i1.isin(i3)]


df_pure_probe.to_csv("processed_pure_probe_4.txt", encoding='utf-8', index=False)





length of train df
26847523

length of probe df
1408395


### This is the ALS based model code, using Apache Spark
Please note, we are using the complete dataset here, all ~100 million records

In [1]:
import pyspark
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating

conf = pyspark.SparkConf()
conf.set("spark.driver.maxResultSize", "4g")
conf.set("spark.driver.memory","4g")
sc = pyspark.SparkContext(conf=conf)

from datetime import datetime

# Load and parse the data
data = sc.textFile("all_4_train.txt")
ratings = data.map(lambda l: l.split(','))\
    .map(lambda l: Rating(int(l[0]), int(l[1]), float(l[2])))

#print(ratings.collect()[0:5])
 
print("Starting the training at :")
print(str(datetime.now()))
print()
# Build the recommendation model using Alternating Least Squares
rank = 15
numIterations = 10
lambda_ = 0.09 # default is 0.01
model = ALS.train(ratings, rank, numIterations, lambda_)

print("Finished training at :")
print(str(datetime.now()))

("Reading test or Probe file")
probe_data = sc.textFile("all_4_probe.txt")

ratings_probe = probe_data.map(lambda l: l.split(','))\
    .map(lambda l: Rating(int(l[0]), int(l[1]), float(l[2])))

# Evaluate the model on training data
testdata = ratings_probe.map(lambda p: (p[0], p[1]))
predictions = model.predictAll(testdata).map(lambda r: ((r[0], r[1]), r[2]))
ratesAndPreds = ratings_probe.map(lambda r: ((r[0], r[1]), r[2])).join(predictions)
MSE = ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).mean()
RMSE = MSE**(0.5)


print()
print("With Parameters")
print("lambda = "+ str(lambda_))
print("Number of features = "+str(rank))
print("Mean Squared Error = " + str(MSE))
print("Root Mean Squared Error = " + str(RMSE))



print("Finished All at :")
print(str(datetime.now()))


# # Save and load model
# model.save(sc, "CollabALS")
# sameModel = MatrixFactorizationModel.load(sc, "CollabALS")


sc.stop()

Starting the training at :
2017-11-20 17:42:49.902628

Finished training at :
2017-11-20 18:01:13.524056

With Parameters
lambda = 0.09
Number of features = 15
Mean Squared Error = 0.8727117236047375
Root Mean Squared Error = 0.9341904107861189
Finished All at :
2017-11-20 18:02:10.742918
