In [None]:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import math

In [None]:
import os
os.environ["PYSPARK_PYTHON"] = "python3"

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName("moive analysis") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

ModuleNotFoundError: ignored

In [None]:
movies = spark.read.load("/FileStore/tables/movies.csv", format='csv', header = True)
ratings = spark.read.load("/FileStore/tables/ratings.csv", format='csv', header = True)
links = spark.read.load("/FileStore/tables/links.csv", format='csv', header = True)
tags = spark.read.load("/FileStore/tables/tags.csv", format='csv', header = True)

In [None]:
movies.show(5)

In [None]:
ratings.show(5)

In [None]:
print('Distinct values of ratings:')
print (sorted(ratings.select('rating').distinct().rdd.map(lambda r: r[0]).collect()))

In [None]:

q1_result = ratings.groupBy('userID').count().orderBy('count', ascending=True)
tmp1 = q1_result.select('count').collect()[0]['count']
q2_result = ratings.groupBy('movieId').count().orderBy('count', ascending=True)
tmp2 = q2_result.select('count').collect()[0]['count']
print('For the users that rated movies and the movies that were rated:')
print('Minimum number of ratings per user is {}'.format(tmp1))
print('Minimum number of ratings per movie is {}'.format(tmp2))

In [None]:

count_result = q2_result.filter(q2_result['count'] == 1).count()
distinct_movieID = ratings.select('movieId').distinct().count()
print('{} out of {} movies are rated by only one user'.format(count_result, distinct_movieID))

In [None]:
usersNum = ratings.select("userID").distinct().count()
print('Number of users: ', usersNum)

In [None]:
moviesNum = movies.select("movieId").distinct().count()
print('Number of movies: ', moviesNum)

In [None]:
moviesRated = ratings.select("movieId").distinct().collect()
moviesRatedNum = ratings.select("movieId").distinct().count()
moviesTotal = movies.select("movieId").distinct().collect()
print('Number of movies rated by users: ', moviesRatedNum)
print('Number of movies not rated before: ', (moviesNum - moviesRatedNum))
print('Movies not rated before are: ')
for movie in moviesTotal:
  if movie not in moviesRated:
    print(movie)

In [None]:

from collections import OrderedDict
import pyspark.sql.functions as f
from pyspark.sql.functions import explode
tmp = movies.select("movieId","title","genres")
split_col = f.split(tmp['genres'], '\\|')
tmp = tmp.withColumn('split_col', split_col)
genresList = tmp.withColumn("split_col", explode(tmp.split_col)).select("split_col").distinct().collect()
genresList=[r['split_col'] for r in genresList]
genresList

In [None]:

from pyspark.sql.functions import array_contains

for genre in genresList:
  df1 = tmp.filter(array_contains(tmp.split_col, genre))
  print (genre + ':')
  df1.show()

In [None]:
from pyspark.mllib.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

In [None]:
movie_rating = sc.textFile("/FileStore/tables/ratings.csv")

In [None]:
header = movie_rating.take(1)[0]
rating_data = movie_rating.filter(lambda line: line!=header).map(lambda line: line.split(",")).map(lambda tokens: (tokens[0],tokens[1],tokens[2])).cache()

In [None]:

rating_data.take(3)

In [None]:
train, validation, test = rating_data.randomSplit([6,2,2],seed = 7856)

In [None]:
train.cache()

In [None]:
validation.cache()

In [None]:
test.cache()

In [None]:
import itertools
def train_ALS(train_data, validation_data, num_iters, reg_param, ranks):
    validation_for_predict_RDD = validation_data.map(lambda x: (x[0], x[1]))
    min_error = float('inf')
    best_rank = -1
    best_regularization = 0
    best_model = None
    for rank, reg, num in itertools.product(ranks, reg_param, num_iters):

      model = ALS.train(train_data, iterations=num, rank= rank, lambda_=reg) 
      
      predictions = model.predictAll(validation_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
      rates_and_preds = validation_data.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
      error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
      print ('{} latent factors, {} iterations and regularization = {}: validation RMSE is {}'.format(rank, num, reg, error))
      if error < min_error:
          min_error = error
          best_rank = rank
          best_num = num
          best_regularization = reg
          best_model = model
    print ('\nThe best model has {} latent factors, {} iterations and regularization = {}'.format(best_rank, best_num, best_regularization))
    return best_model

In [None]:
num_iterations = [10,15,20]
ranks = [6, 8, 10, 12, 14]
reg_params = [0.05, 0.1, 0.2, 0.4, 0.8]

import time
start_time = time.time()
final_model = train_ALS(train, validation, num_iterations, reg_params, ranks)

print ('Total Runtime: {:.2f} seconds'.format(time.time() - start_time))

In [None]:
import matplotlib.pyplot as plt
def plot_learning_curve(iter_array, train_data, validation_data, reg, rank):
  validation_for_predict_RDD = validation_data.map(lambda x: (x[0], x[1]))
  errors = []
  for num_iters in iter_array:
    model = ALS.train(train_data, iterations=num_iters, rank= rank, lambda_=reg) 
    predictions = model.predictAll(validation_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
    rates_and_preds = validation_data.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
    error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    errors.append(error)
  fig = plt.figure()
  ax = plt.axes()
  ax.plot(iter_array, errors);
  display(fig)

In [None]:
iter_array = [1, 2, 5, 10]
plot_learning_curve(iter_array, train, validation, 0.2, 10)



In [None]:
test_for_predict = test.map(lambda x: (x[0], x[1]))

predictions = final_model.predictAll(test_for_predict).map(lambda r: ((r[0], r[1]), r[2]))
rates_and_preds = test.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    
print ('For testing data the RMSE is %s' % (error))


In [None]:
sc = SparkContext(appName = "Text Cleaning")
strc = StreamingContext(sc, 3)


In [None]:
text_data = strc.socketTextStream("localhost", 8084)

In [None]:
import re
from nltk.corpus import stopwords
stop_words = set(stopwords.words('english'))
from nltk.stem import WordNetLemmatizer
lemmatizer = WordNetLemmatizer()
def clean_text(sentence):
    sentence = sentence.lower()
    sentence = re.sub("s+"," ", sentence)
    sentence = re.sub("W"," ", sentence)
    sentence = re.sub(r"httpS+", "", sentence)
    sentence = ' '.join(word for word in sentence.split() if word not in stop_words)
    sentence = [lemmatizer.lemmatize(token, "v") for token in sentence.split()]
    sentence = " ".join(sentence)
    return sentence.strip()

In [None]:
strc.start()
strc.awaitTermination()