## Task -1 Exploratory Data Analysis

In [None]:
"""
Installing Spark with its dependencies
Installing Spark
Install Dependencies:

Java 8
Apache Spark with hadoop and
Findspark (used to locate the spark in the system)
"""

!sudo ./install_spark.py


In [None]:
#Set Environment Variables:

import os

current_directory = os.getcwd()


os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = os.path.join(current_directory,"spark-3.1.1-bin-hadoop3.2")

In [None]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True) # Property used to format output tables better
spark

In [None]:
#will be used sqlite3 to be able to reach .db file

import sqlite3

con = sqlite3.connect('Datasets/movielens-small.db')
cur = con.cursor()

In [None]:
#For SQLite JDBC driver, it can be downloaded via:

!curl -O https://repo1.maven.org/maven2/org/xerial/sqlite-jdbc/3.34.0/sqlite-jdbc-3.34.0.jar

In [None]:
# Write a SQL query to create a dataframe with including userid, movieid, genre and rating

import csv
with open("movielens.csv", "w") as csvFile:
    fieldnames = ['userId', 'movieId', 'genre', 'rating']
    writer = csv.DictWriter(csvFile, fieldnames=fieldnames)
    writer.writeheader()

    for row in cur.execute("""SELECT ratings.userId, movies.movieId, genres, rating 
                              FROM (((movies 
                                      INNER JOIN links ON movies.movieid = links.movieid) 
                                      LEFT JOIN ratings ON movies.movieId = ratings.movieId) 
                                      LEFT JOIN tags ON movies.movieid = tags.movieid)"""):
 
        userId = row[0]
        movieId = row[1]
        genre = row[2]
        rating = row[3]

        writer.writerow({'userId': userId, 
                        'movieId':movieId,
                        'genre': genre,
                        'rating':rating}
        )

movielens_small_df = spark.read.csv("movielens.csv", header=True)

print("number of rows of our dataframe:", movielens_small_df.count())
print("\n")

movielens_small_df

In [None]:
# Count ratings for each movie, and list top 5 movies with the highest value

movie_rating_count = []

for row in cur.execute("""SELECT movieid, title,COUNT(rating) 
                          FROM (SELECT ratings.userId, movies.movieId, title,genres, rating 
                                FROM (((movies 
                                        INNER JOIN links ON movies.movieid = links.movieid)
                                        LEFT JOIN ratings ON movies.movieId = ratings.movieId)
                                        LEFT JOIN tags ON movies.movieid = tags.movieid))                                                    
                          GROUP BY movieid
                          ORDER BY count(rating) DESC
                          LIMIT 5
                        """):
  movie_rating_count.append(row)

schema = ["movieId", "title", "rating count"]
 
# calling function to create dataframe
df = spark.createDataFrame(movie_rating_count, schema)

df.show(truncate=False)
  

In [None]:
# Find and list top 5 most rated genres

rated_genres = []

for row in cur.execute("""SELECT genres, COUNT(rating) 
                          FROM (SELECT ratings.userId, movies.movieId, genres, rating 
                                FROM (((movies 
                                        INNER JOIN links ON movies.movieid = links.movieid) 
                                        LEFT JOIN ratings ON movies.movieId = ratings.movieId) 
                                        LEFT JOIN tags ON movies.movieid = tags.movieid))
                          GROUP BY genres
                          ORDER BY COUNT(rating) DESC
                          LIMIT 5
                        """):
  rated_genres.append(row)

schema = ["genres", "rating count"]
 
# calling function to create dataframe
df = spark.createDataFrame(rated_genres, schema)

df.show(truncate=False)


In [None]:
#Find and list top 5 most rated tags

rated_tags = []

for row in cur.execute("""SELECT tag, COUNT(rating) 
                          FROM (SELECT ratings.userId, movies.movieId, genres, tag, rating 
                                FROM (((movies 
                                        INNER JOIN links ON movies.movieid = links.movieid) 
                                        LEFT JOIN ratings ON movies.movieId = ratings.movieId) 
                                        LEFT JOIN tags ON movies.movieid = tags.movieid))
                          WHERE tag is NOT NULL
                          GROUP BY tag
                          ORDER BY COUNT(rating) DESC
                          LIMIT 5
                        """):
  rated_tags.append(row)

schema = ["tags", "tags count"]
 
# calling function to create dataframe
df = spark.createDataFrame(rated_tags, schema)

df.show(truncate=False)



In [None]:
# By using timestamp from ratings table, provide top 5 most frequent users within a week

weekly_activity = []

for row in cur.execute("""SELECT userid, strftime('%Y-%W', datetime(timestamp, 'unixepoch')) AS week_year, COUNT(strftime('%Y-%W', datetime(timestamp, 'unixepoch'))) AS weekly_activity 
                          FROM ratings
                          GROUP BY userid, week_year 
                          ORDER BY weekly_activity DESC
                          LIMIT 5
                        """):
  weekly_activity.append(row)

schema = ["userId", "week of the year", "weekly activity of user"]
 
# calling function to create dataframe
df = spark.createDataFrame(weekly_activity, schema)

df.show(truncate=False)



In [None]:
# Calculate average ratings for each genre, and plot average ratings of top 10 genres with descending order

genres_avgrating=[]

for row in cur.execute("""SELECT genres, round(avg(rating), 2) avg_rating 
                          FROM (SELECT ratings.userId, movies.movieId, genres, rating 
                                FROM (((movies 
                                        INNER JOIN links ON movies.movieid = links.movieid) 
                                        LEFT JOIN ratings ON movies.movieId = ratings.movieId) 
                                        LEFT JOIN tags ON movies.movieid = tags.movieid))
                          GROUP BY genres
                          ORDER BY avg_rating DESC
                          LIMIT 10"""):
  genres_avgrating.append(row)

schema = ["genres", "avg_rating"]
 
# calling function to create dataframe
df = spark.createDataFrame(genres_avgrating, schema)

df.show(truncate=False)



## TASK 2 - Recommender Design

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS

In [None]:
spark = SparkSession.builder.appName("movieRecommendation").getOrCreate() 

In [None]:
# using movie rating data to probide implicit feature using ALS(Alternate Least Squares)

movie_rating=[]

for row in cur.execute("""SELECT userId, movieId, rating 
                          FROM ratings
                                      """):
  movie_rating.append(row)

schema = ["userId", "movieId", "rating"]
 
# calling function to create dataframe
movie_rating_df = spark.createDataFrame(movie_rating, schema)

movie_rating_df




In [None]:
# description of created schema 
movie_rating_df.printSchema()

In [None]:
#splitting dataset to train the model as 80% for train and remaining for test data.
(train, test) = movie_rating_df.randomSplit([0.8, 0.2], seed=87)

In [None]:
# 1st Recommender model - Alternating Least Square (ALS) Matrix Factorization in Collaborative Filtering on rating (as actual values) 

als = ALS(rank=10, maxIter=15, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating", coldStartStrategy="drop")

model = als.fit(train)

pred = model.transform(test)

pred = pred.selectExpr("userId as userId","movieId as movieId","rating as rating","prediction as implicit")

pred.show(truncate=False)

In [None]:
#calculating RMSE and MAE to evaluate performance of the models. 

eval_rmse = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="implicit")
eval_mae = RegressionEvaluator(metricName="mae", labelCol="rating", predictionCol="implicit")


rmse = eval_rmse.evaluate(pred)
mae = eval_mae.evaluate(pred)


print("RMSE of ALS:", rmse)
print("MAE of ALS:", mae)




In [None]:
# 2nd Recommender model - Alternating Least Square (ALS) Matrix Factorization in Collaborative Filtering on designed implicit feedback values 


(train_implicit, test_implicit) = pred.randomSplit([0.8, 0.2], seed=87)

als_implicit = ALS(rank=10, maxIter=15, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="implicit", coldStartStrategy="drop")

model_implicit = als_implicit.fit(train_implicit)

pred_implicit = model_implicit.transform(test_implicit)

pred_implicit.show(truncate=False)

In [None]:
eval_rmse = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
eval_mae = RegressionEvaluator(metricName="mae", labelCol="rating", predictionCol="prediction")


rmse = eval_rmse.evaluate(pred_implicit)
mae = eval_mae.evaluate(pred_implicit)


print("RMSE of ALS_Implicit Feedback:", rmse)
print("MAE of ALS_Implicit Feedback:", mae)

When compared these two models, the 1st model ( ALS on rating ) shows better performance compared to 2nd model (ALS on implicit feedback) according to error metrics such as Root Mean Square Error (RMSE) and Mean Absolute Error (MAE). 

## Task – 3 Text Analysis

In [None]:
spark = SparkSession.builder.appName("textAnalysis").getOrCreate()

In [None]:
from pyspark.ml.feature import Tokenizer, RegexTokenizer
from pyspark.sql.functions import col, udf


In [None]:
# download the data in Dataset folder.



!curl -o Datasets/aclImdb_v1.tar.gz https://ai.stanford.edu/~amaas/data/sentiment/aclImdb_v1.tar.gz


In [None]:
#Extract the tar file as dataset

import tarfile

def tarfile_extract(tar_file, output_dir=os.getcwd()+'/Datasets'):
    tar = tarfile.open(tar_file, 'r:gz')
    total_files = sum(1 for _ in tar)
    tar.extractall(output_dir, members=extract_progress(tar, total_files))
    tar.close()

def extract_progress(tar, total_files):
    for member in tar:
        yield member
        total_files -= 1
        print(f"Remaining files: {total_files}", end='\r')
    print("\nExtraction completed.")

wd = os.getcwd()
tarfile_extract(wd+'/Datasets/aclImdb_v1.tar.gz')


In [None]:

#using 'alldata' list to store all the files in the directories
alldata=[]


#collecting data in train/pos folder
for fname in os.listdir(wd+'/Datasets/aclImdb/train/pos'):
    with open(os.path.join(wd+'/Datasets/aclImdb/train/pos', fname), encoding = 'utf-8') as infile:
        for line in infile:
            alldata.append((line,'train','pos'))

#collecting data in train/neg folder
for fname in os.listdir(wd+'/Datasets/aclImdb/train/neg'):
    with open(os.path.join(wd+'/Datasets/aclImdb/train/neg', fname), encoding = 'utf-8') as infile:
        for line in infile:
            alldata.append((line,'train','neg'))
#collecting data in test/pos folder
for fname in os.listdir(wd+'/Datasets/aclImdb/test/pos'):
    with open(os.path.join(wd+'/Datasets/aclImdb/test/pos', fname), encoding = 'utf-8') as infile:
        for line in infile:
            alldata.append((line,'test','pos'))
#collecting data in test/neg folder
for fname in os.listdir(wd+'/Datasets/aclImdb/test/neg'):
    with open(os.path.join(wd+'/Datasets/aclImdb/test/neg', fname), encoding = 'utf-8') as infile:
        for line in infile:
            alldata.append((line,'test','neg'))




In [None]:

from pyspark.sql.types import ArrayType, StructField, StructType, StringType, IntegerType

appName = "list to Spark Data Frame"
master = "local"

# Create Spark session
spark = SparkSession.builder \
    .appName(appName) \
    .master(master) \
    .getOrCreate()

# List
data = alldata

# Create a schema for the dataframe
schema = StructType([
    StructField('content', StringType(), True),
    StructField('label', StringType(), True),
    StructField('sentiemtn', StringType(), True)
])

# Convert list to RDD
rdd = spark.sparkContext.parallelize(data)

# Create data frame
df = spark.createDataFrame(rdd,schema)
print(df.schema)
df.show()

In [None]:
df.count()

In [None]:
# Schema of created Spark Dataframe

df.printSchema()

###Tokenization

In [None]:
#using RegexTokenizer for tokenizing contents

tokenizer = RegexTokenizer(inputCol="content", outputCol="tokenized_content", pattern="\\W") # used regexp to determine pattern as 'not word'

countTokens = udf(lambda w: len(w), IntegerType())

tokenized = tokenizer.transform(df)

tokenized.show()

### Removing Stop Words

In [None]:
type(tokenized)

In [None]:
from pyspark.ml.feature import StopWordsRemover

tokenized.show()

In [None]:
df_tokenized = tokenized.select("tokenized_content").withColumn("tokenCount", countTokens(col("tokenized_content")))

In [None]:
type(df_tokenized)

In [None]:
# SWR -> stop word remover
SWR  = StopWordsRemover (inputCol='tokenized_content', outputCol='SWRed')


#See the result of removal operation
SWR.transform(df_tokenized).select('SWRed').show(truncate=False)