### Task 1

In [9]:
%%bash
cqlsh virtual-node01 -e \
"CREATE KEYSPACE IF NOT EXISTS mf_goryacheva WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 2};"

### Task 2

In [127]:
%%bash
cqlsh virtual-node01 -e \
"DROP TABLE mf_goryacheva.movies;"

In [94]:
%%bash
cqlsh virtual-node01 -e \
"CREATE TABLE mf_goryacheva.movies (\
 movieid int,\
 title text,\
 year int,\
 genres set<text>,\
 PRIMARY KEY (title, year, movieid)\
);"

In [85]:
movie_path = '/data/movielens/movies.csv'

In [86]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [87]:
movie_schema = StructType(fields=[
    StructField("movieid", IntegerType()),
    StructField("title", StringType()),
    StructField("genres", StringType())
])

In [88]:
movies = spark.read.csv(movie_path, schema=movie_schema, header=True)

In [89]:
movies_upd = movies.withColumn('movieid', trim(col('movieid')))\
                   .withColumn('new_title', trim(expr("substring(title, 1, length(title)-6)")))\
                   .withColumn('year', trim(col('title').substr(-5,4).astype('int')))\
                   .filter(col('genres') != '(no genres listed)')\
                   .withColumn('genres', split(col('genres'), '\|'))\
                   .select(col('movieid'), 
                           col('new_title').alias('title'),
                           col('year'),
                           col('genres'))\
                   .na.drop()

In [95]:
movies_upd.write\
          .format('org.apache.spark.sql.cassandra')\
          .mode('append')\
          .options(table='movies', keyspace='mf_goryacheva')\
          .save()

In [96]:
%%bash
cqlsh virtual-node01 -e \
"SELECT count(1) FROM mf_goryacheva.movies;"


 count
-------
 37754

(1 rows)

Aggregation query used without partition key



### Task 3

In [126]:
%%bash
cqlsh virtual-node01 -e \
"DROP TABLE mf_goryacheva.movies_by_genre;"

In [98]:
%%bash
cqlsh virtual-node01 -e \
"CREATE TABLE mf_goryacheva.movies_by_genre (genres text, movieid int, year int, title text, PRIMARY KEY (genres, year, movieid));"

In [99]:
movies_by_genre = movies_upd.select(explode("genres").alias("genres"),
                                    col("year"), 
                                    col("movieid"), 
                                    col("title"))\
                            .na.drop()

In [100]:
movies_by_genre.write\
               .format('org.apache.spark.sql.cassandra')\
               .mode('append')\
               .options(table='movies_by_genre', keyspace='mf_goryacheva')\
               .save()

In [101]:
%%bash
cqlsh virtual-node01 -e \
"SELECT count(1) \
   FROM mf_goryacheva.movies_by_genre \
  WHERE genres = 'Horror' \
    AND year >= 1980 \
    AND year <= 1990;"


 count
-------
   560

(1 rows)


In [None]:
# movies_by_genre.filter(col('genres')=='Horror')\
#                .filter(col('year')>=1980)\
#                .filter(col('year')<=1990)\
#                .count()

### Task 4

In [102]:
%%bash
cqlsh virtual-node01 -e \
"CREATE INDEX genres_idx ON mf_goryacheva.movies (genres);"

In [103]:
%%bash
cqlsh virtual-node01 -e \
"SELECT count(1) \
   FROM mf_goryacheva.movies \
  WHERE genres CONTAINS 'Horror' \
    AND year >= 1980 \
    AND year <= 1990\
  ALLOW FILTERING;"


 count
-------
   560

(1 rows)

Aggregation query used without partition key



### Task 5

In [65]:
ratings_path = '/data/movielens/ratings.csv'

In [75]:
ratings_schema = StructType(fields=[
    StructField("userid", IntegerType()),
    StructField("movieid", IntegerType()),
    StructField("rating", FloatType()),
    StructField("timestamp", IntegerType())
])

In [76]:
ratings = spark.read.csv(ratings_path, schema=ratings_schema, header=True)

In [77]:
ratings.show(5)

+------+-------+------+---------+
|userid|movieid|rating|timestamp|
+------+-------+------+---------+
|     1|    122|   2.0|945544824|
|     1|    172|   1.0|945544871|
|     1|   1221|   5.0|945544788|
|     1|   1441|   4.0|945544871|
|     1|   1609|   3.0|945544824|
+------+-------+------+---------+
only showing top 5 rows



In [106]:
avg_ratings = ratings.groupBy("movieid")\
                     .agg({'rating':'avg'})\
                     .select(col('movieid').alias('movieid_rnk'), col('avg(rating)').alias('avg_rnk'))

In [110]:
join_condition = (col("movieid") == col("movieid_rnk"))
movies_by_genre_rating = movies_by_genre.join(avg_ratings, join_condition, how='inner')\
                                        .drop('movieid_rnk')

In [111]:
movies_by_genre_rating.show(5)

+--------+----+-------+--------------------+-----------------+
|  genres|year|movieid|               title|          avg_rnk|
+--------+----+-------+--------------------+-----------------+
| Romance|2004|  33722|  Ladies in Lavender|3.533333333333333|
|   Drama|2004|  33722|  Ladies in Lavender|3.533333333333333|
|  Comedy|2004|  33722|  Ladies in Lavender|3.533333333333333|
|  Comedy|2006|  44022|Ice Age 2: The Me...|3.319560669456067|
|Children|2006|  44022|Ice Age 2: The Me...|3.319560669456067|
+--------+----+-------+--------------------+-----------------+
only showing top 5 rows



In [125]:
%%bash
cqlsh virtual-node01 -e \
"DROP TABLE mf_goryacheva.movies_by_genre_rating;"

In [112]:
%%bash
cqlsh virtual-node01 -e \
"CREATE TABLE mf_goryacheva.movies_by_genre_rating \
(genres text, \
 movieid int, \
 year int, \
 title text, \
 avg_rnk float,\
 PRIMARY KEY (genres, year, movieid) \
);"

In [117]:
movies_by_genre_rating.write\
                      .format('org.apache.spark.sql.cassandra')\
                      .mode('append')\
                      .options(table='movies_by_genre_rating', keyspace='mf_goryacheva')\
                      .save()

In [120]:
%%bash
cqlsh virtual-node01 -e \
"SELECT min(avg_rnk) AS min_rnk, avg(avg_rnk) AS avg_rnk, max(avg_rnk) AS max_rnk \
   FROM mf_goryacheva.movies_by_genre_rating \
  WHERE genres = 'Sci-Fi' \
    AND year >= 2000;
"


 min_rnk | avg_rnk | max_rnk
---------+---------+---------
     0.5 | 2.78996 |       5

(1 rows)
