# Huawei R&D Technical Interview Answer on Big Data

## Table of contents
1. [Task – 1 Exploratory Data Analysis:](#EDA)
2. [Task – 2 Recommender Design:](#RD)
    1. [Step 1 : Recommender Model 1 Alternate Least Squares: Model Training and Evaluation](#step1)
    2. [Step 2 : Recommender Model 2 KNN Algorithm: Model Training and Evaluation](#step2)    
3. [Task – 3 Text Analysis:](#TA)


## Task – 1 Exploratory Data Analysis:<a name="EDA"></a>
Writing queries with SQL-PYspark

In [320]:
# Import PySpark
## import findspark  
## findspark.init()  
## import pyspark # only run after findspark.init()
from pyspark.sql import SparkSession
from pyspark.context import SparkContext
from pyspark.sql.functions import * 
from pyspark.sql.types import *

In [335]:
# Initilazing Spark Session
## First of all, a Spark session needs to be initialized. With the help of SparkSession, DataFrame can be created and registered as tables. Moreover, SQL tables are executed, tables can be cached For detailed explanations for each parameter of SparkSession, kindly visit https://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html?highlight=sparksession#pyspark.sql.SparkSession.
spark = SparkSession.builder.appName("HuaweiTechnicalInterview").getOrCreate()

In [13]:
# Loading Datasets (Implementing JSON File in PySpark)
links = spark.read.option("multiline","true").json('C:/Users/kadir/Downloads/Huawei R&D Technical Interview Question on Big Data/links.json')
ratings = spark.read.option("multiline","true").json('C:/Users/kadir/Downloads/Huawei R&D Technical Interview Question on Big Data/ratings.json')
movies = spark.read.option("multiline","true").json('C:/Users/kadir/Downloads/Huawei R&D Technical Interview Question on Big Data/movies.json')
tags = spark.read.option("multiline","true").json('C:/Users/kadir/Downloads/Huawei R&D Technical Interview Question on Big Data/tags.json')

In [14]:
# to verify that it was created correctly by viewing part of the dataset
links.printSchema()
links.show(3)

root
 |-- imdbId: string (nullable = true)
 |-- movieId: long (nullable = true)
 |-- tmdbId: string (nullable = true)

+-------+-------+------+
| imdbId|movieId|tmdbId|
+-------+-------+------+
|0114709|      1|   862|
|0113497|      2|  8844|
|0113228|      3| 15602|
+-------+-------+------+
only showing top 3 rows



In [15]:
# to verify that it was created correctly by viewing part of the dataset
tags.printSchema()
tags.show(3)

root
 |-- movieId: long (nullable = true)
 |-- tag: string (nullable = true)
 |-- timestamp: long (nullable = true)
 |-- userId: long (nullable = true)

+-------+---------+----------+------+
|movieId|      tag| timestamp|userId|
+-------+---------+----------+------+
|      1|animation|1306926135|    40|
|      1|  fantasy|1306926130|    40|
|      1|    Pixar|1306926133|    40|
+-------+---------+----------+------+
only showing top 3 rows



In [16]:
# to verify that it was created correctly by viewing part of the dataset
ratings.printSchema()
ratings.show(3)

root
 |-- movieId: long (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: long (nullable = true)
 |-- userId: long (nullable = true)

+-------+------+---------+------+
|movieId|rating|timestamp|userId|
+-------+------+---------+------+
|      6|   2.0|980730861|     1|
|     22|   3.0|980731380|     1|
|     32|   2.0|980731926|     1|
+-------+------+---------+------+
only showing top 3 rows



In [17]:
# to verify that it was created correctly by viewing part of the dataset
movies.printSchema()
movies.show(3)

root
 |-- genres: string (nullable = true)
 |-- movieId: long (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)

+--------------------+-------+----------------+----+
|              genres|movieId|           title|year|
+--------------------+-------+----------------+----+
|Adventure|Animati...|      1|       Toy Story|1995|
|Adventure|Childre...|      2|         Jumanji|1995|
|      Comedy|Romance|      3|Grumpier Old Men|1995|
+--------------------+-------+----------------+----+
only showing top 3 rows



In [18]:
# Creating Data Frames

In [19]:
#Performing task 1 queries 
from pyspark.sql.functions import desc
from pyspark.sql.functions import asc
from pyspark.sql.functions import count
from pyspark.sql.functions import sum
from pyspark.sql.functions import sum

In [20]:
### 1. Write a SQL query to create a dataframe with including userid, movieid, genre and rating
### SQL Query %%sql SELECT userId,title,genres,rating FROM movies FULL OUTER JOIN ratings ON movies.movieId = ratings.movieId LIMIT 5; 

task1=ratings.join(movies, ratings.movieId == movies.movieId, 'outer').select(ratings.userId,movies.title,movies.genres,ratings.rating)
task1.show(5)

+------+---------+--------------------+------+
|userId|    title|              genres|rating|
+------+---------+--------------------+------+
|     7|Toy Story|Adventure|Animati...|   5.0|
|    10|Toy Story|Adventure|Animati...|   4.0|
|    13|Toy Story|Adventure|Animati...|   4.5|
|    16|Toy Story|Adventure|Animati...|   5.0|
|    21|Toy Story|Adventure|Animati...|   5.0|
+------+---------+--------------------+------+
only showing top 5 rows



In [21]:
#### 2.Count ratings for each movie, and list top 5 movies with the highest value
### SQL Query %%sql SELECT movieId,COUNT(rating),SUM(rating) FROM ratings GROUP BY movieId ORDER BY SUM(rating) DESC  LIMIT 5;   
#Creating or replacing a local temporary view with this DataFrame.
ratings.createOrReplaceTempView("ratings")
# Define my query
query = "SELECT movieId,COUNT(rating),SUM(rating) FROM ratings GROUP BY movieId ORDER BY SUM(rating) DESC  LIMIT 5"
newdf = spark.sql(query)
#display the content of new dataframe
newdf.show()

+-------+-------------+-----------+
|movieId|count(rating)|sum(rating)|
+-------+-------------+-----------+
|    318|          328|     1457.0|
|    593|          337|     1427.5|
|    296|          327|     1353.0|
|    260|          306|     1284.0|
|    356|          318|     1243.0|
+-------+-------------+-----------+



In [22]:
### 3.Find and list top 5 most rated genres
### SQL Query%%sql SELECT genres,SUM(rating) FROM movies LEFT OUTER JOIN ratings ON movies.movieId = ratings.movieId GROUP BY genres ORDER BY SUM(rating) DESC  LIMIT 5;  
ratings.createOrReplaceTempView("ratings")
movies.createOrReplaceTempView("movies")
# Define my query
task3 = "SELECT genres,SUM(rating) FROM movies LEFT OUTER JOIN ratings ON movies.movieId = ratings.movieId GROUP BY genres ORDER BY SUM(rating) DESC  LIMIT 5;"
newtask3 = spark.sql(task3)
#display the content of new dataframe
newtask3.show()

+--------------+-----------+
|        genres|sum(rating)|
+--------------+-----------+
|         Drama|    25650.0|
|        Comedy|    20529.5|
|Comedy|Romance|    12930.5|
| Drama|Romance|    11295.5|
|  Comedy|Drama|    10709.5|
+--------------+-----------+



In [23]:
### 4.- Find and list top 5 most rated tags
### %%sql SELECT tag,SUM(rating) FROM tags LEFT OUTER JOIN ratings ON tags.movieId = ratings.movieId GROUP BY tag ORDER BY SUM(rating) DESC  LIMIT 5;  
ratings.createOrReplaceTempView("ratings")
tags.createOrReplaceTempView("tags")
# Define my query
task4 = "SELECT tag,SUM(rating) FROM tags LEFT OUTER JOIN ratings ON tags.movieId = ratings.movieId GROUP BY tag ORDER BY SUM(rating) DESC  LIMIT 5; "
newtask4 = spark.sql(task4)
#display the content of new dataframe
newtask4.show()

+------------+-----------+
|         tag|sum(rating)|
+------------+-----------+
|       drama|    14111.5|
|twist ending|    12200.0|
|      sci-fi|    11163.0|
|  psychology|    10683.5|
|       crime|     9973.0|
+------------+-----------+



In [24]:
### 5.By using timestamp from ratings table, provide top 5 most frequent users within a week

In [25]:
### 6.Calculate average ratings for each genre, and plot average ratings of top 10 genres with descending order
### %%sql SELECT genres,AVG(rating) FROM movies LEFT OUTER JOIN ratings ON movies.movieId = ratings.movieId GROUP BY genres ORDER BY AVG(rating) DESC  LIMIT 10;
ratings.createOrReplaceTempView("ratings")
movies.createOrReplaceTempView("movies")
# Define my query
task6 = "SELECT genres,AVG(rating) FROM movies LEFT OUTER JOIN ratings ON movies.movieId = ratings.movieId GROUP BY genres ORDER BY AVG(rating) DESC  LIMIT 10"
newtask6 = spark.sql(task6)
#display the content of new dataframe
newtask6.show()

+--------------------+-----------+
|              genres|avg(rating)|
+--------------------+-----------+
|Action|Adventure|...|        5.0|
|Crime|Documentary...|        5.0|
|Adventure|Fantasy...|        5.0|
|Animation|Documen...|        5.0|
|Crime|Horror|Mystery|       4.75|
|Comedy|Crime|Western|        4.5|
|Animation|Comedy|...|        4.5|
|Adventure|Comedy|...|        4.5|
|Action|Animation|...|        4.5|
|Action|Animation|...|        4.5|
+--------------------+-----------+



## Task – 2 Recommender Design: <a name="RD"></a>
The first paragraph text
**<font color=red>*Recomendaiton System Question*</font>**
- Provide an implicit feature by using any of the data from the given database
- Train two individual recommender models, one by using rating (from ratings table) and the other one by using your designed implicit feedback
- Present comparison between two models, by using essential metrics

**<font color=red>*Recomendaiton System Explanaiton-1*</font>**
Recommender systems are machine learning-based systems that scan through all possible options and provides a prediction or recommendation. However, building a recommendation system has the below complications:

- Users’ data is interchangeable.
- The data volume is large and includes a significant list of movies, shows, customers’ profiles and interests, ratings, and other data points.
- New registered customers use to have very limited information.
- Real-time prediction for users.
- Old users can have an overabundance of information.
- It should not show items that are very different or too similar.
- Users can change the rating of items on change of his/her mind.


### Step 1 : Recommender Model 1 Alternate Least Squares: Model Training and Evaluation <a name="step1"></a>
**<font color=Blue>*ALS*</font>** is one of the low rank matrix approximation algorithms for collaborative filtering. ALS decomposes user-item matrix into two low rank matrixes: user matrix and item matrix. In collaborative filtering, users and products are described by a small set of latent factors that can be used to predict missing entries. And ALS algorithm learns these latent factors by matrix factorization

In [27]:
##Importing Essential Libraries for machine learning processes
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS

### Train Test split

Use the randomSplit function to divide the data

In [30]:
train_data,test_data = ratings.randomSplit([0.7,0.3])


In [31]:
train_data.describe().show()

+-------+-----------------+------------------+--------------------+------------------+
|summary|          movieId|            rating|           timestamp|            userId|
+-------+-----------------+------------------+--------------------+------------------+
|  count|            70032|             70032|               70032|             70032|
|   mean|8544.972626799177|3.4908184829792095|1.0909841194480524E9| 342.0761223440713|
| stddev|19621.04239062862|1.0671878312293106|1.6328108103097975E8|193.84742349370407|
|    min|                1|               0.5|           828504918|                 1|
|    max|           129651|               5.0|          1427754939|               706|
+-------+-----------------+------------------+--------------------+------------------+



In [32]:
test_data.describe().show()

+-------+------------------+------------------+-------------------+------------------+
|summary|           movieId|            rating|          timestamp|            userId|
+-------+------------------+------------------+-------------------+------------------+
|  count|             29991|             29991|              29991|             29991|
|   mean| 8772.262445400287|3.4926311226701343|1.092772549526858E9| 341.0243739788603|
| stddev|20001.310616469647|1.0697166979508876|1.635322738786036E8|193.85643213001717|
|    min|                 1|               0.5|          828504918|                 1|
|    max|            126407|               5.0|         1427752633|               706|
+-------+------------------+------------------+-------------------+------------------+



## Build Model
### Build the recommendation model using ALS on the training data\\ Now let's build our ALS model and fit this model with training features created by randomsplit() function.

In [33]:
(training, test) = ratings.randomSplit([0.8, 0.2])

In [34]:
# Build the recommendation model using ALS on the training data
als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating")
model = als.fit(training)

In [35]:
## Generating Predictions & Model Evaluation
predictions = model.transform(test)
predictions.show()

+-------+------+----------+------+----------+
|movieId|rating| timestamp|userId|prediction|
+-------+------+----------+------+----------+
|      1|   3.0|1273228128|   271| 3.3592305|
|      1|   3.5|1130850782|   663| 3.7884517|
|      1|   3.5|1306926127|    40| 4.1003222|
|      1|   3.0| 864828661|   340| 3.3957133|
|      1|   4.0| 834767668|   647| 5.2124333|
|      1|   3.0| 944340594|    92|  3.358765|
|      1|   4.0| 832447335|   489|  4.134621|
|      1|   3.0| 894902672|    72| 3.7634354|
|      1|   3.5|1175935476|   310| 3.6029649|
|      1|   1.0|1328400944|    84| 2.9836214|
|      1|   3.0| 867338979|   171| 2.7192297|
|      1|   3.0| 865287270|   395| 3.6687374|
|      1|   3.0| 975602509|   394| 3.2908976|
|      1|   3.0| 866736804|   156| 2.9144173|
|      1|   3.0| 850988642|   653| 3.0380714|
|      1|   3.5|1296441497|   550|  4.038661|
|      1|   3.5|1232916522|   256| 3.4899235|
|      1|   3.0|1046140820|   105|  4.069527|
|      1|   3.0|1008864246|   106|

In [36]:
from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

Root-mean-square error = nan


In [37]:
avgRatings = ratings.select('rating').groupBy().avg().first()[0]
print ('The average rating in the dataset is: {}'.format(avgRatings))

The average rating in the dataset is: 3.491361986743049


So now that we have the model, how would you actually supply a recommendation to a user?
The approach here will be simple We will be taking a single userid example11 as features and pass it to trained ALS Model. The same way we did with the test data!

In [39]:
single_user = test.filter(test['userId']==11).select(['movieId','userId'])
# User had 10 ratings in the test data set 
# Realistically this should be some sort of hold out set!
single_user.show()

+-------+------+
|movieId|userId|
+-------+------+
|     19|    11|
|    235|    11|
|    553|    11|
|   3793|    11|
+-------+------+



In [40]:
recomendations = model.transform(single_user)
recomendations.orderBy('prediction',ascending=False).show()

+-------+------+----------+
|movieId|userId|prediction|
+-------+------+----------+
|    235|    11|  6.762187|
|     19|    11| 3.8154223|
|   3793|    11| 3.7515554|
|    553|    11|  2.880481|
+-------+------+----------+



In [229]:
recomendations.createOrReplaceTempView("recomendations")
movies.createOrReplaceTempView("movies")
# Define my query
singleuser1 = "SELECT * FROM movies LEFT OUTER JOIN recomendations ON movies.cmovieId = recomendations.movieId where userId=11;"
newsingleuser1 = spark.sql(singleuser1)
#display the content of new dataframe
newsingleuser1.show()

+--------------------+--------+--------------------+----+-------+------+----------+
|              genres|cmovieId|               title|year|movieId|userId|prediction|
+--------------------+--------+--------------------+----+-------+------+----------+
|Action|Adventure|...|    3793|               X-Men|2000|   3793|    11| 3.7515554|
|        Comedy|Drama|     235|             Ed Wood|1994|    235|    11|  6.762187|
|              Comedy|      19|Ace Ventura: When...|1995|     19|    11| 3.8154223|
|Action|Drama|Western|     553|           Tombstone|1993|    553|    11|  2.880481|
+--------------------+--------+--------------------+----+-------+------+----------+



### Step 2 : Recommender Model 2 KNN Algorithm: Model Training and Evaluation <a name="step1"></a>
Collaborative filtering based systems use the actions of users to recommend other items. In general, they can either be user based or item based. User based collaborating filtering uses the patterns of users similar to me to recommend a product (users like me also looked at these other items). Item based collaborative filtering uses the patterns of users who browsed the same item as me to recommend me a product (users who looked at my item also looked at these other items). Item-based approach is usually prefered than user-based approach. User-based approach is often harder to scale because of the dynamic nature of users, whereas items usually don't change much, so item-based approach often can be computed offline.

In [99]:
movies=movies.withColumnRenamed("movieId","cmovieId")

In [100]:
ratings.createOrReplaceTempView("ratings")
movies.createOrReplaceTempView("movies")
# Define my query
forKNN = "SELECT * FROM movies LEFT OUTER JOIN ratings ON movies.cmovieId = ratings.movieId;"
newforKNN = spark.sql(forKNN)
#display the content of new dataframe
newforKNN.show()

+------+--------+--------------------+----+-------+------+----------+------+
|genres|cmovieId|               title|year|movieId|rating| timestamp|userId|
+------+--------+--------------------+----+-------+------+----------+------+
|Comedy|      19|Ace Ventura: When...|1995|     19|   3.0| 831728688|   685|
|Comedy|      19|Ace Ventura: When...|1995|     19|   2.0|1051770941|   677|
|Comedy|      19|Ace Ventura: When...|1995|     19|   4.0|1247814964|   668|
|Comedy|      19|Ace Ventura: When...|1995|     19|   3.0|1127249360|   663|
|Comedy|      19|Ace Ventura: When...|1995|     19|   3.0| 834610377|   662|
|Comedy|      19|Ace Ventura: When...|1995|     19|   2.0| 836221196|   656|
|Comedy|      19|Ace Ventura: When...|1995|     19|   3.5|1137595314|   639|
|Comedy|      19|Ace Ventura: When...|1995|     19|   3.0| 839937911|   620|
|Comedy|      19|Ace Ventura: When...|1995|     19|   2.0|1134735487|   616|
|Comedy|      19|Ace Ventura: When...|1995|     19|   4.0| 833033911|   615|

In [101]:
combine_movie_rating = newforKNN.na.drop( subset = ['title'])

In [102]:
# Define my query
combine_movie_rating.createOrReplaceTempView("combine_movie_rating")
movie_ratingCount = "SELECT title,count(rating) as totalRatingCount FROM combine_movie_rating GROUP BY title ORDER BY count(rating) DESC  "
newmovie_ratingCount = spark.sql(movie_ratingCount)
#display the content of new dataframe
newmovie_ratingCount.show()

+--------------------+----------------+
|               title|totalRatingCount|
+--------------------+----------------+
|Silence of the La...|             337|
|Shawshank Redempt...|             328|
|        Pulp Fiction|             327|
|       Jurassic Park|             324|
|        Forrest Gump|             318|
|Star Wars: Episod...|             306|
|          Braveheart|             292|
|Terminator 2: Jud...|             278|
|         Matrix, The|             265|
|       Fugitive, The|             250|
|Star Wars: Episod...|             246|
|Star Wars: Episod...|             246|
|           Apollo 13|             244|
|Independence Day ...|             242|
|    Schindler's List|             241|
| Usual Suspects, The|             239|
|              Batman|             235|
|           Toy Story|             232|
|  Dances with Wolves|             227|
|     American Beauty|             221|
+--------------------+----------------+
only showing top 20 rows



In [103]:
newmovie_ratingCount=newmovie_ratingCount.withColumnRenamed("title","ctitle")

In [104]:
combine_movie_rating.createOrReplaceTempView("combine_movie_rating")
newmovie_ratingCount.createOrReplaceTempView("newmovie_ratingCount")
# Define my query
rating_with_totalRatingCount = "SELECT userId, movieId,title,rating,totalRatingCount,genres FROM newmovie_ratingCount LEFT OUTER JOIN combine_movie_rating ON newmovie_ratingCount.ctitle = combine_movie_rating.title;"
newrating_with_totalRatingCount = spark.sql(rating_with_totalRatingCount)
#display the content of new dataframe
newrating_with_totalRatingCount.show()

+------+-------+--------------------+------+----------------+--------------------+
|userId|movieId|               title|rating|totalRatingCount|              genres|
+------+-------+--------------------+------+----------------+--------------------+
|     1|    593|Silence of the La...|   5.0|             337|Crime|Horror|Thri...|
|     7|    593|Silence of the La...|   4.0|             337|Crime|Horror|Thri...|
|     8|    593|Silence of the La...|   5.0|             337|Crime|Horror|Thri...|
|     9|    593|Silence of the La...|   4.0|             337|Crime|Horror|Thri...|
|    10|    593|Silence of the La...|   3.5|             337|Crime|Horror|Thri...|
|    12|    593|Silence of the La...|   5.0|             337|Crime|Horror|Thri...|
|    14|    593|Silence of the La...|   4.0|             337|Crime|Horror|Thri...|
|    18|    593|Silence of the La...|   5.0|             337|Crime|Horror|Thri...|
|    26|    593|Silence of the La...|   5.0|             337|Crime|Horror|Thri...|
|   

In [216]:
pivotDF = newrating_with_totalRatingCount.groupBy("title","userId") \
      .sum("rating") \
      .groupBy("title") \
      .pivot("userId") \
.count()


movie_features_df=newrating_with_totalRatingCount.pivot_table(index='title',columns='userId',values='rating').fillna(0)
movie_features_df.head()

In [221]:
newrating_with_totalRatingCount = newrating_with_totalRatingCount.toPandas()

In [222]:
movie_features_df=newrating_with_totalRatingCount.pivot_table(index='title',columns='userId',values='rating').fillna(0)
movie_features_df.head()

userId,1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0,9.0,10.0,...,697.0,698.0,699.0,700.0,701.0,702.0,703.0,704.0,705.0,706.0
title,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
"""Great Performances"" Cats",0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
'Til There Was You,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
"'burbs, The",0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
'night Mother,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
(500) Days of Summer,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0


In [223]:
from scipy.sparse import csr_matrix

movie_features_df_matrix = csr_matrix(movie_features_df.values)

from sklearn.neighbors import NearestNeighbors


model_knn = NearestNeighbors(metric = 'cosine', algorithm = 'brute')
model_knn.fit(movie_features_df_matrix)

NearestNeighbors(algorithm='brute', metric='cosine')

In [230]:
query_index = np.random.choice(movie_features_df.shape[0])
print(query_index)
query_index =1

1928


In [231]:
distances, indices = model_knn.kneighbors(movie_features_df.iloc[query_index,:].values.reshape(1, -1), n_neighbors = 6)

In [233]:
for i in range(0, len(distances.flatten())):
    if i == 0:
        print('Recommendations for {0}:\n'.format(movie_features_df.index[query_index]))
    else:
        print('{0}: {1}, with distance of {2}:'.format(i, movie_features_df.index[indices.flatten()[i]], distances.flatten()[i]))

Recommendations for 'Til There Was You:

1: Plenty, with distance of 0.05131670194948623:
2: Stealing Home, with distance of 0.05131670194948623:
3: Children of the Revolution, with distance of 0.05131670194948623:
4: Governess, The, with distance of 0.05131670194948623:
5: Love Serenade, with distance of 0.05131670194948623:


-  **<font color=Blue>To sum up,</font>** In this study, a recommendation system was prepared with two different approaches. The first is user-based and offers suggestions for users, while the second study is film-based and offers suggestions for movies that are close to that movie.

## Task – 3 Text Analysis: <a name="TA"></a>

- Create a dataframe with following schema:
root
|-- content: string (nullable = true)
|-- label: string (nullable = true)
|-- sentiment: string (nullable = true)
- Design a tokenizer for content column and remove stop words, and give descriptive information
about obtained content column


#### Step 1 - Import nltk and download stopwords, and then import stopwords from NLTK
- nltk is the most popular Python package for Natural Language processing, it provides algorithms for importing, cleaning, pre-processing text data in human language and then apply computational linguistics algorithms like sentiment analysis.

In [416]:
import os
import re
import time
import nltk
import glob
import shutil
import string
import numpy as np

import tensorflow as tf
from tensorflow.keras.layers.experimental.preprocessing import TextVectorization
from tensorflow.keras import layers
from __future__ import absolute_import, division, print_function

nltk.download('stopwords')

from nltk.corpus import stopwords 
from nltk.tokenize import word_tokenize 
eng_stopwords = set(stopwords.words('english')) 

[nltk_data] Downloading package stopwords to
[nltk_data]     C:\Users\kadir\AppData\Roaming\nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


In [417]:
import shutil, sys 
from shutil import make_archive

### Data Collection
The data used in this model study data is made up of publicly shared locations. There is one analyst in the list, there is a list of people who are looking for training, that is, a comparison list, and there is a test list.


In [418]:
url = 'https://ai.stanford.edu/~amaas/data/sentiment/aclImdb_v1.tar.gz'

dataset = tf.keras.utils.get_file('aclImdb_v1.tar.gz', url,
                                  untar=True, cache_dir='.',
                                  cache_subdir='')

dataset_dir = os.path.join(os.path.dirname(dataset), 'aclImdb')

train_dir = os.path.join(dataset_dir, 'train')



To prepare a dataset for binary classification, you will need two folders on disk, corresponding to class_a and class_b. These will be the positive and negative movie reviews, which can be found in aclImdb/train/pos and aclImdb/train/neg. As the IMDB dataset contains additional folders, you will remove them before using this utility.

In [419]:
# remove unused folders to make it easier to load the data
remove_dir = os.path.join(train_dir, 'unsup')
shutil.rmtree(remove_dir)

In [420]:
os.listdir(dataset_dir)

['imdb.vocab', 'imdbEr.txt', 'README', 'test', 'train']

In [421]:
train_dir = os.path.join(dataset_dir, 'train')
os.listdir(train_dir)

['labeledBow.feat',
 'neg',
 'pos',
 'unsupBow.feat',
 'urls_neg.txt',
 'urls_pos.txt',
 'urls_unsup.txt']

-  **<font color=purple>The aclImdb/train/pos and aclImdb/train/neg directories contain many text files, each of which is a single movie review. Let's take a look at one of them</font>**

In [427]:
sample_file = os.path.join(train_dir, 'pos/1181_9.txt')
with open(sample_file) as f:
    txt=f.read()


In [428]:
txt

"Rachel Griffiths writes and directs this award winning short film. A heartwarming story about coping with grief and cherishing the memory of those we've loved and lost. Although, only 15 minutes long, Griffiths manages to capture so much emotion and truth onto film in the short space of time. Bud Tingwell gives a touching performance as Will, a widower struggling to cope with his wife's death. Will is confronted by the harsh reality of loneliness and helplessness as he proceeds to take care of Ruth's pet cow, Tulip. The film displays the grief and responsibility one feels for those they have loved and lost. Good cinematography, great direction, and superbly acted. It will bring tears to all those who have lost a loved one, and survived."

In [429]:
from pyspark.sql import Row
from pyspark.sql.types import *

In [430]:
# create expected schema

expectedSchema = StructType([
  StructField("content", StringType(), True),
  StructField("label", StringType(), True),
  StructField("sentiment", StringType(), True),
])

In [431]:
from pyspark.sql.types import StructType,StructField, StringType, IntegerType
data2 = [(txt,"0","neg")  ]

df = spark.createDataFrame(data=data2,schema=expectedSchema)
df.printSchema()


root
 |-- content: string (nullable = true)
 |-- label: string (nullable = true)
 |-- sentiment: string (nullable = true)



-  **<font color=purple>The first dataframe creation process in task 3 was carried out through an example. In this context, the expected schema was created and the schema was shown according to the specified data contents.</font>**

Next, you will use the text_dataset_from_directory utility to create a labeled tf.data.Dataset. tf.data is a powerful collection of tools for working with data.

When running a machine learning experiment, it is a best practice to divide your dataset into three splits: train, validation, and test.

The IMDB dataset has already been divided into train and test, but it lacks a validation set. Let's create a validation set using an 80:20 split of the training data by using the validation_split argument below.

In [432]:
batch_size = 32
seed = 42

raw_train_ds = tf.keras.utils.text_dataset_from_directory(
    'aclImdb/train', 
    batch_size=batch_size, 
    validation_split=0.2, 
    subset='training', 
    seed=seed)

Found 25000 files belonging to 2 classes.
Using 20000 files for training.


As you can see above, there are 25,000 examples in the training folder, of which you will use 80% (or 20,000) for training. As you will see in a moment, you can train a model by passing a dataset directly to model.fit. If you're new to tf.data, you can also iterate over the dataset and print out a few examples as follows.

In [433]:
for text_bathc, label_batch, in raw_train_ds.take(1):
  for i in range(3):
    print("Content", text_batch.numpy()[i])
    print("Label", label_batch.numpy()[i])
    print('\n')
   

Content b'Every scene was put together perfectly.This movie had a wonderful cast and crew. I mean, how can you have a bad movie with Robert Downey Jr. in it,none have and ever will exist. He has the ability to brighten up any movie with his amazing talent.This movie was perfect! I saw this movie sitting all alone on a movie shelf in "Blockbuster" and like it was calling out to me,I couldn\'t resist picking it up and bringing it home with me. You can call me a sappy romantic, but this movie just touched my heart, not to mention made me laugh with pleasure at the same time. Even though it made me cry,I admit, at the end, the whole movie just brightened up my outlook on life thereafter.I suggested to my horror, action, and pure humor movie buff of a brother,who absolutely adored this movie. This is a movie with a good sense of feeling.It could make you laugh out loud, touch your heart, make you fall in love,and enjoy your life.Every time you purposefully walk past this movie, just be awar

Sentiment analysis aims to estimate the sentiment polarity of a body of text based solely on its content. The sentiment polarity of text can be defined as a value that says whether the expressed opinion is positive (polarity=1), negative (polarity=0),

In [434]:
print("Label 0 corresponds to", raw_train_ds.class_names[0])
print("Label 1 corresponds to", raw_train_ds.class_names[1])

Label 0 corresponds to neg
Label 1 corresponds to pos


With the method shown above, we can categorize the data as content label and sentiment in the main data.

If we were building the model, in the next steps, we would create a validation and test dataset.
and we would use the remaining 5,000 reviews from the training set for Validation

In [435]:
raw_val_ds = tf.keras.utils.text_dataset_from_directory(
    'aclImdb/train', 
    batch_size=batch_size, 
    validation_split=0.2, 
    subset='validation', 
    seed=seed)

Found 25000 files belonging to 2 classes.
Using 5000 files for validation.


In [436]:
raw_test_ds = tf.keras.utils.text_dataset_from_directory(
    'aclImdb/test', 
    batch_size=batch_size)

Found 25000 files belonging to 2 classes.


In [437]:
## Data preprocessing and analysis

In [None]:
# Having looked at our data above, we see that the raw text contains HTML break
# tags of the form '<br />'. These tags will not be removed by the default
# standardizer (which doesn't strip HTML). Because of this, we will need to
# create a custom standardization function.

In [438]:
def custom_standardization(input_data):
  # lowercase the input string
  lowercase = tf.strings.lower(input_data)
  # remove html tags
  strip_html = tf.strings.regex_replace(lowercase, '<br />', ' ')
  # remove punctuations
  return tf.strings.regex_replace(strip_html, 
                                  '[%s]' % re.escape(string.punctuation), '')

In [439]:
# create vectorization layer for text data
max_features = 10000
seq_length = 224

vectorization_layer = TextVectorization(
    standardize = custom_standardization,
    max_tokens = max_features,
    output_mode = 'int',
    output_sequence_length = seq_length
)

In [None]:
# Now that we have our custom standardization, we can instantiate our text
# vectorization layer. We are using this layer to normalize, split, and map
# strings to integers, so we set our 'output_mode' to 'int'.
# Note that we're using the default split function,
# and the custom standardization defined above.
# We also set an explicit maximum sequence length, since the CNNs later in our
# model won't support ragged sequences.

In [440]:
def vectorize_text(text, label):
  text = tf.expand_dims(text, -1)
  return vectorization_layer(text), label

In [None]:
# Now that the vocab layer has been created, call `adapt` on a text-only
# dataset to create the vocabulary. You don't have to batch, but for very large
# datasets this means you're not keeping spare copies of the dataset in memory.

In [441]:
# create vectorization layer for text data
from tensorflow.keras.layers.experimental.preprocessing import TextVectorization
max_features = 10000
seq_length = 224

vectorization_layer = TextVectorization(
    standardize = custom_standardization,
    max_tokens = max_features,
    output_mode = 'int',
    output_sequence_length = seq_length
)

Call adapt to fit the state of the preprocessing layer to the dataset. This will cause the model to build an index of strings to integers.

In [442]:
# make a text-only dataset (without labels) and call adapt
train_text = raw_train_ds.map(lambda x, y: x)


In [443]:

vectorization_layer.adapt(train_text)

In [445]:
# get a batch of 32 reviews and labels from the dataset
text_batch, label_batch = next(iter(raw_train_ds))

first_review, first_label = text_batch[0], label_batch[0]
print(f'Review: {first_review}')
print(f'Label: {raw_train_ds.class_names[first_label]}')
print(f'Vectorized Review: {vectorize_text(first_review, first_label)}')

Review: b'Great movie - especially the music - Etta James - "At Last". This speaks volumes when you have finally found that special someone.'
Label: neg
Vectorized Review: (<tf.Tensor: shape=(1, 224), dtype=int64, numpy=
array([[  85,   17,  260,    2,  222,    1,  571,   31,  229,   11, 2421,
           1,   51,   22,   25,  404,  251,   12,  308,  282,    0,    0,
           0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,
           0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,
           0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,
           0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,
           0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,
           0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,
           0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,
           0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,
           0,    0,    0,    0,    0,  

In [446]:
print("1021 ---> ", vectorization_layer.get_vocabulary()[1021])
print("101 ---> ", vectorization_layer.get_vocabulary()[101])
print('Vocabulary Size: {}'.format(len(vectorization_layer.get_vocabulary())))

1021 --->  cop
101 --->  after
Vocabulary Size: 10000


- Apply the TextVectorization layer to the train, validation, and test dataset.

In [448]:
train_ds = raw_train_ds.map(vectorize_text)
val_ds = raw_val_ds.map(vectorize_text)
test_ds = raw_test_ds.map(vectorize_text)

In [449]:
raw_train_ds

<BatchDataset element_spec=(TensorSpec(shape=(None,), dtype=tf.string, name=None), TensorSpec(shape=(None,), dtype=tf.int32, name=None))>

In [None]:
-  **<font color=purple>The first dataframe creation process in task 3 was carried out through an example. In this context, the expected schema was created and the schema was shown according to the specified data contents.</font>**