In [1]:
%reset

Once deleted, variables cannot be recovered. Proceed (y/[n])? y


In [2]:
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
from os.path import expanduser, join
from re import findall, compile
from pandas import read_csv, merge, to_datetime, DataFrame
import subprocess
from gensim.utils import simple_preprocess

In [3]:
home = expanduser('~')
mypath = f'{home}/Downloads/BigData/'
deromalized_path = join(mypath, 'Datastax_denormalized/').replace('\\', '/')

## Connect to DataStax

In [4]:
cloud_config= {'secure_connect_bundle': mypath+'secure-connect-bigdataproject2022.zip'}

with open(mypath+'GeneratedToken.csv', 'r', encoding='utf-8') as file:
    try:
        content = file.readlines()        
    except FileNotFoundError:
        raise('File not found')

tokens = findall(r'"(.*?)"', content[1])
client_id = tokens[0]
client_secret = tokens[1]

In [5]:
keyspace = 'big_data'
auth_provider = PlainTextAuthProvider(client_id, client_secret)
cluster = Cluster(cloud=cloud_config, auth_provider=auth_provider)
session = cluster.connect(keyspace)

### Python function/script for data uploading to DataStax

In [6]:
# csv column names must match exactly the keyspace's table column names 
def upload_data_to_cassandra(keyspace, table_name, filename, username, password):
    ds_bulk_url = 'C:/dsbulk-1.9.0/dsbulk-1.9.0/bin/dsbulk'
    filename_url = join(deromalized_path, filename).replace('\\', '/')
    bundle_url = join(mypath, 'secure-connect-bigdataproject2022.zip').replace('\\', '/')
    
    shell_comm = [ds_bulk_url, 'load', '-url', filename_url, '-k', keyspace, '-t', table_name, '-cl', 'ALL', 
                  '-b', bundle_url, '-u', username, '-p', password, '-header', 'true']
    
    ret = subprocess.run(shell_comm, capture_output=True, shell=True)
    # print(ret)
    return 

In [7]:
movies = read_csv(mypath+'archive/movie.csv', names=["movieid", "title", "genres"], header=0)
print(movies.shape)
movies.head()

(27278, 3)


Unnamed: 0,movieid,title,genres
0,1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
1,2,Jumanji (1995),Adventure|Children|Fantasy
2,3,Grumpier Old Men (1995),Comedy|Romance
3,4,Waiting to Exhale (1995),Comedy|Drama|Romance
4,5,Father of the Bride Part II (1995),Comedy


In [8]:
ratings = read_csv(mypath+'archive/rating.csv', usecols=['movieId', 'rating', 'timestamp'], header=0)
ratings = ratings.rename(columns={'movieId':'movieid'})
print(ratings.shape)
ratings.head()

(20000263, 3)


Unnamed: 0,movieid,rating,timestamp
0,2,3.5,2005-04-02 23:53:47
1,29,3.5,2005-04-02 23:31:16
2,32,3.5,2005-04-02 23:33:39
3,47,3.5,2005-04-02 23:32:07
4,50,3.5,2005-04-02 23:29:40


In [9]:
ratings['timestamp'] = to_datetime(ratings.timestamp)
ratings.set_index(ratings['timestamp'], inplace=True)

## Q1

## Denormalize and Model data for Q1

In [10]:
# function to clean out titles
def preprocess_movie_title(title):
    
    sep = ' ('
    title = title.split(sep, 1)[0]
    title = title.replace("'", "")
    
    comma_index = title.find(', The')
    a_index = title.find(', A')

    if comma_index > 0:
        return title[comma_index+2:] + ' ' + title[:comma_index]
    elif a_index > 0:
        return title[a_index+2:] + ' ' + title[:a_index]

    return title

In [11]:
# create release date column
movies['release_date'] = movies['title'].str.extract(r"\((\d+)\)")
## clean titles and store back to column title
movies['title'] = movies['title'].apply(preprocess_movie_title)
movies.head()

Unnamed: 0,movieid,title,genres,release_date
0,1,Toy Story,Adventure|Animation|Children|Comedy|Fantasy,1995
1,2,Jumanji,Adventure|Children|Fantasy,1995
2,3,Grumpier Old Men,Comedy|Romance,1995
3,4,Waiting to Exhale,Comedy|Drama|Romance,1995
4,5,Father of the Bride Part II,Comedy,1995


In [12]:
# group by year - month - day and get avg
ratings_by_year_month_week = DataFrame(ratings.groupby(by=['movieid', ratings.index.year, ratings.index.month, ratings.index.day])['rating'].mean().round(decimals=1))
ratings_by_year_month_week.index.set_names(['movieid', 'year', 'month', 'day'], inplace=True)
ratings_by_year_month_week.reset_index(inplace=True)

# left join this avg with movies
popular_movies_by_date = merge(left=movies.loc[:, 'movieid':'genres'], right=ratings_by_year_month_week, left_on='movieid', right_on='movieid')
popular_movies_by_date.head()

Unnamed: 0,movieid,title,genres,year,month,day,rating
0,1,Toy Story,Adventure|Animation|Children|Comedy|Fantasy,1996,1,29,4.0
1,1,Toy Story,Adventure|Animation|Children|Comedy|Fantasy,1996,2,1,5.0
2,1,Toy Story,Adventure|Animation|Children|Comedy|Fantasy,1996,2,2,4.5
3,1,Toy Story,Adventure|Animation|Children|Comedy|Fantasy,1996,2,5,5.0
4,1,Toy Story,Adventure|Animation|Children|Comedy|Fantasy,1996,2,12,5.0


In [13]:
popular_movies_by_date[(popular_movies_by_date['year'] == 2015) & (popular_movies_by_date['month'] == 1)].head()

Unnamed: 0,movieid,title,genres,year,month,day,rating
6431,1,Toy Story,Adventure|Animation|Children|Comedy|Fantasy,2015,1,1,3.8
6432,1,Toy Story,Adventure|Animation|Children|Comedy|Fantasy,2015,1,2,4.0
6433,1,Toy Story,Adventure|Animation|Children|Comedy|Fantasy,2015,1,3,3.6
6434,1,Toy Story,Adventure|Animation|Children|Comedy|Fantasy,2015,1,4,3.5
6435,1,Toy Story,Adventure|Animation|Children|Comedy|Fantasy,2015,1,5,4.1


### Store data to file and insert  to popular_movies_by_date

In [14]:
popular_movies_by_date.to_csv(deromalized_path + 'popular_movies_by_date.csv', index=0)

In [15]:
upload_data_to_cassandra(keyspace, 'popular_movies_by_date', 'popular_movies_by_date.csv', client_id, client_secret)

## Q2

## Denormalize and Model data for Q2

In [16]:
word_title = movies['title'].apply(simple_preprocess)
movies_by_keyword = DataFrame(word_title.tolist(), index=movies['movieid']).stack()
movies_by_keyword = movies_by_keyword.reset_index()[[0, 'movieid']] 
movies_by_keyword.columns = ['keyword', 'movieid']
movies_by_keyword.head()

Unnamed: 0,keyword,movieid
0,toy,1
1,story,1
2,jumanji,2
3,grumpier,3
4,old,3


In [17]:
# left join movies with movies_by_keyword to get title and genres
movies_by_keyword = merge(left=movies_by_keyword, right=movies.loc[:,'movieid':'genres'], left_on='movieid', right_on='movieid')

# compute avg rating for every movie and left join with movies_by_keyword
avgs = ratings.groupby(by='movieid')['rating'].mean().reset_index().round(decimals=1)
movies_by_keyword = merge(left=movies_by_keyword, right=avgs, left_on='movieid', right_on='movieid')
movies_by_keyword

Unnamed: 0,keyword,movieid,title,genres,rating
0,toy,1,Toy Story,Adventure|Animation|Children|Comedy|Fantasy,3.9
1,story,1,Toy Story,Adventure|Animation|Children|Comedy|Fantasy,3.9
2,jumanji,2,Jumanji,Adventure|Children|Fantasy,3.2
3,grumpier,3,Grumpier Old Men,Comedy|Romance,3.2
4,old,3,Grumpier Old Men,Comedy|Romance,3.2
...,...,...,...,...,...
76037,the,131258,The Pirates,Adventure,2.5
76038,pirates,131258,The Pirates,Adventure,2.5
76039,rentun,131260,Rentun Ruusu,(no genres listed),3.0
76040,ruusu,131260,Rentun Ruusu,(no genres listed),3.0


### Store file and Insert to movies_by_keyword

In [18]:
movies_by_keyword.to_csv(deromalized_path + 'movies_by_keyword.csv', index=0)
upload_data_to_cassandra(keyspace, 'movies_by_keyword', 'movies_by_keyword.csv', client_id, client_secret)

# Q3

## Denormalize and Model data for Q3

In [19]:
# split genres per row
b = DataFrame(movies['genres'].str.split('|').tolist(), index=movies['movieid']).stack()
b = b.reset_index()[[0, 'movieid']] 
b.columns = ['genre', 'movieid'] 

# left join the genres with movies
movies_by_genre = merge(left=b, right=movies.drop(columns='genres', axis=1), left_on='movieid', right_on='movieid')
# calc movie avg
avgs = ratings.groupby(by='movieid')['rating'].mean().reset_index().rename(columns={'rating':'avg_rating'}).round(decimals=1)

# join movies and ratins
movies_by_genre = merge(left=movies_by_genre, right=avgs, left_on='movieid', right_on='movieid')

# remove trash rows
movies_by_genre = movies_by_genre[movies_by_genre['release_date'].str.len() == 4]
movies_by_genre = movies_by_genre[movies_by_genre['genre'] != '(no genres listed)']

movies_by_genre

Unnamed: 0,genre,movieid,title,release_date,avg_rating
0,Adventure,1,Toy Story,1995,3.9
1,Animation,1,Toy Story,1995,3.9
2,Children,1,Toy Story,1995,3.9
3,Comedy,1,Toy Story,1995,3.9
4,Fantasy,1,Toy Story,1995,3.9
...,...,...,...,...,...
53413,Comedy,131256,"Feuer, Eis & Dosenbier",2002,4.0
53414,Adventure,131258,The Pirates,2014,2.5
53416,Adventure,131262,Innocence,2014,4.0
53417,Fantasy,131262,Innocence,2014,4.0


In [21]:
movies_by_genre[movies_by_genre['genre'] == 'Adventure'].head()

Unnamed: 0,genre,movieid,title,release_date,avg_rating
0,Adventure,1,Toy Story,1995,3.9
5,Adventure,2,Jumanji,1995,3.2
19,Adventure,8,Tom and Huck,1995,3.1
23,Adventure,10,GoldenEye,1995,3.4
30,Adventure,13,Balto,1995,3.3


### Store file and Insert to movies_by_genre

In [22]:
movies_by_genre.to_csv(deromalized_path + 'movies_by_genre.csv', index=0)
upload_data_to_cassandra(keyspace, 'movies_by_genre', 'movies_by_genre.csv', client_id, client_secret)

## Q4

## Denormalize and Model data for Q4

In [23]:
tags = read_csv(mypath+'archive/tag.csv', usecols=['movieId', 'tag'])
tags = tags.rename(columns={'movieId':'movieid'})
# tags['tag'].replace(r'[^\w\s]',' ',regex=True, inplace = True)
tags['tag'].replace("'","/'", inplace = True)
tags = tags[tags['tag'].str.len() > 1]
tags.head()

Unnamed: 0,movieid,tag
0,4141,Mark Waters
1,208,dark hero
2,353,dark hero
3,521,noir thriller
4,592,dark hero


In [24]:
movies_by_title = merge(left=tags, right=movies.loc[:, 'movieid':'genres'], left_on='movieid', right_on='movieid')
print(movies_by_title.shape)
movies_by_title.head()

(464590, 4)


Unnamed: 0,movieid,tag,title,genres
0,4141,Mark Waters,Head Over Heels,Comedy|Romance
1,4141,naive,Head Over Heels,Comedy|Romance
2,4141,Mark Waters,Head Over Heels,Comedy|Romance
3,4141,Freddie Prinze Jr.,Head Over Heels,Comedy|Romance
4,208,dark hero,Waterworld,Action|Adventure|Sci-Fi


In [25]:
movies_by_title = movies_by_title.groupby(by=['movieid', 'tag'])['tag'].count().reset_index(name='tag_count')
movies_by_title = merge(left=movies_by_title, right=movies.loc[:, 'movieid':'genres'], left_on='movieid', right_on='movieid')

movie_info_by_title = merge(left=movies_by_title, right=avgs, left_on='movieid', right_on='movieid')

print(movie_info_by_title.shape)
movie_info_by_title.head()

(198886, 6)


Unnamed: 0,movieid,tag,tag_count,title,genres,avg_rating
0,1,2009 reissue in Stereoscopic 3-D,1,Toy Story,Adventure|Animation|Children|Comedy|Fantasy,3.9
1,1,3D,3,Toy Story,Adventure|Animation|Children|Comedy|Fantasy,3.9
2,1,55 movies every kid should see--Entertainment ...,1,Toy Story,Adventure|Animation|Children|Comedy|Fantasy,3.9
3,1,Animation,1,Toy Story,Adventure|Animation|Children|Comedy|Fantasy,3.9
4,1,BD-Video,1,Toy Story,Adventure|Animation|Children|Comedy|Fantasy,3.9


In [26]:
movie_info_by_title[movie_info_by_title['title'] == 'Jumanji'].head()

Unnamed: 0,movieid,tag,tag_count,title,genres,avg_rating
84,2,CLV,1,Jumanji,Adventure|Children|Fantasy,3.2
85,2,Children,4,Jumanji,Adventure|Children|Fantasy,3.2
86,2,Chris Van Allsburg,2,Jumanji,Adventure|Children|Fantasy,3.2
87,2,Dynamic CGI Action,1,Jumanji,Adventure|Children|Fantasy,3.2
88,2,Fantasy,1,Jumanji,Adventure|Children|Fantasy,3.2


### Store file and Insert to movies_by_title

In [27]:
movie_info_by_title.to_csv(deromalized_path + 'movie_info_by_title.csv', index=0)
upload_data_to_cassandra(keyspace, 'movie_info_by_title', 'movie_info_by_title.csv', client_id, client_secret)

## Q5

## Denormalize and Model data for Q5

In [28]:
tags['tag'] = tags['tag'].str.lower()

mrt = merge(left=tags.drop_duplicates(), right=movies.loc[:, 'movieid':'genres'], left_on='movieid', right_on='movieid')
movies_by_tag = merge(left=mrt, right=avgs, left_on='movieid', right_on='movieid')
movies_by_tag.head()

Unnamed: 0,movieid,tag,title,genres,avg_rating
0,4141,mark waters,Head Over Heels,Comedy|Romance,2.9
1,4141,naive,Head Over Heels,Comedy|Romance,2.9
2,4141,freddie prinze jr.,Head Over Heels,Comedy|Romance,2.9
3,208,dark hero,Waterworld,Action|Adventure|Sci-Fi,2.9
4,208,kevin costner,Waterworld,Action|Adventure|Sci-Fi,2.9


In [29]:
movies_by_tag[movies_by_tag['tag'] == 'comedy'].head()

Unnamed: 0,movieid,tag,title,genres,avg_rating
276,1391,comedy,Mars Attacks!,Action|Comedy|Sci-Fi,3.0
585,3052,comedy,Dogma,Adventure|Comedy|Fantasy,3.6
667,6539,comedy,Pirates of the Caribbean: The Curse of the Bla...,Action|Adventure|Comedy|Fantasy,3.9
1004,8529,comedy,The Terminal,Comedy|Drama|Romance,3.5
1040,8622,comedy,Fahrenheit 9/11,Documentary,3.6


### Store file and Insert to movies_by_tag

In [30]:
movies_by_tag.to_csv(mypath+'Datastax_denormalized/movies_by_tag.csv', index=0)
upload_data_to_cassandra(keyspace, 'movies_by_tag', 'movies_by_tag.csv', client_id, client_secret)