In [160]:

import json
import pandas as pd
import numpy as np
import re
import psycopg2

from sqlalchemy import create_engine
from config import db_password
db_string = f"postgres://postgres:{db_password}@127.0.0.1:5432/movie_data"
engine = create_engine(db_string)


In [10]:
file_dir = 'C:/Users/RandallE/Class/Movies-ETL/'

In [None]:
#f'{file_dir}/wikipedia.movies'

In [11]:
with open(f'{file_dir}/wikipedia.movies.json', mode='r') as file:
    wiki_movies_raw = json.load(file)

In [12]:
kaggle_metadata = pd.read_csv(f'{file_dir}/movies_metadata.csv',low_memory=False)
ratings = pd.read_csv(f'{file_dir}/ratings.csv')

In [13]:
wiki_movies_df=pd.DataFrame(wiki_movies_raw)

In [14]:
wiki_movies = [movie for movie in wiki_movies_raw
               if ('Director' in movie or 'Directed by' in movie)
                   and 'imdb_link' in movie
                   and 'No. of episodes' not in movie]

In [15]:
wiki_movie_df=pd.DataFrame(wiki_movies)

In [16]:

def clean_movie(movie):
    movie = dict(movie) #create a non-destructive copy
    alt_titles = {}
    # combine alternate titles into one list
    for key in ['Also known as','Arabic','Cantonese','Chinese','French',
                'Hangul','Hebrew','Hepburn','Japanese','Literally',
                'Mandarin','McCune-Reischauer','Original title','Polish',
                'Revised Romanization','Romanized','Russian',
                'Simplified','Traditional','Yiddish']:
        if key in movie:
            alt_titles[key] = movie[key]
            movie.pop(key)
    if len(alt_titles) > 0:
        movie['alt_titles'] = alt_titles

    # merge column names
    def change_column_name(old_name, new_name):
        if old_name in movie:
            movie[new_name] = movie.pop(old_name)
    change_column_name('Adaptation by', 'Writer(s)')
    change_column_name('Country of origin', 'Country')
    change_column_name('Directed by', 'Director')
    change_column_name('Distributed by', 'Distributor')
    change_column_name('Edited by', 'Editor(s)')
    change_column_name('Length', 'Running time')
    change_column_name('Original release', 'Release date')
    change_column_name('Music by', 'Composer(s)')
    change_column_name('Produced by', 'Producer(s)')
    change_column_name('Producer', 'Producer(s)')
    change_column_name('Productioncompanies ', 'Production company(s)')
    change_column_name('Productioncompany ', 'Production company(s)')
    change_column_name('Released', 'Release Date')
    change_column_name('Release Date', 'Release date')
    change_column_name('Screen story by', 'Writer(s)')
    change_column_name('Screenplay by', 'Writer(s)')
    change_column_name('Story by', 'Writer(s)')
    change_column_name('Theme music composer', 'Composer(s)')
    change_column_name('Written by', 'Writer(s)')

    return movie

In [17]:
clean_movies = [clean_movie(movie) for movie in wiki_movies]
wiki_movies_df = pd.DataFrame(clean_movies)

In [19]:
# sorted(wiki_movies_df.columns.tolist())

In [21]:
wiki_movies_df['imdb_id'] = wiki_movies_df['imdb_link'].str.extract(r'(tt\d{7})')
# print(len(wiki_movies_df))
# Inplace True is like wiki_df=wiki_df.drop_duplicate
wiki_movies_df.drop_duplicates(subset='imdb_id', inplace=True)
# print(len(wiki_movies_df))
# wiki_movies_df.head()

In [23]:
# [[column,wiki_movies_df[column].isnull().sum()] for column in wiki_movies_df.columns]

In [24]:
wiki_columns_to_keep = [column for column in wiki_movies_df.columns if wiki_movies_df[column].isnull().sum() < len(wiki_movies_df) * 0.9]
wiki_movies_df = wiki_movies_df[wiki_columns_to_keep]

In [25]:
box_office = wiki_movies_df['Box office'].dropna()

In [26]:
# Function - test String
def is_not_a_string(x):
    return type(x) != str

In [27]:
#Function test
box_office[box_office.map(is_not_a_string)]

34                           [US$, 4,212,828]
54      [$6,698,361 (, United States, ), [2]]
74                    [$6,488,144, (US), [1]]
126                [US$1,531,489, (domestic)]
130                          [US$, 4,803,039]
                        ...                  
6980               [$99.6, million, [4], [5]]
6994                   [$365.6, million, [1]]
6995                         [$53.8, million]
7015                     [$435, million, [7]]
7048                   [$529.3, million, [4]]
Name: Box office, Length: 135, dtype: object

In [28]:
#Use lambda instead of function
box_office = box_office.apply(lambda x: ' '.join(x) if type(x) == list else x)

In [30]:
# form_one = r'\$\d+\.?\d*\s*[mb]illion'
# box_office.str.contains(form_one, flags=re.IGNORECASE).sum()

In [32]:
# form_two = r'\$\d{1,3}(?:,\d{3})+'
# box_office.str.contains(form_two, flags=re.IGNORECASE).sum()

In [33]:
# matches_form_one = box_office.str.contains(form_one, flags=re.IGNORECASE)
# matches_form_two = box_office.str.contains(form_two, flags=re.IGNORECASE)

In [34]:
# box_office[~matches_form_one & ~matches_form_two]

In [35]:
form_one = r'\$\s*\d+\.?\d*\s*[mb]illi?on'
form_two = r'\$\s*\d{1,3}(?:[,\.]\d{3})+(?!\s[mb]illion)'
box_office = box_office.str.replace(r'\$.*[-—–](?![a-z])', '$', regex=True)

In [36]:
def parse_dollars(s):
    # if s is not a string, return NaN
    if type(s) != str:
        return np.nan

    # if input is of the form $###.# million
    if re.match(r'\$\s*\d+\.?\d*\s*milli?on', s, flags=re.IGNORECASE):

        # remove dollar sign and " million"
        s = re.sub('\$|\s|[a-zA-Z]','', s)

        # convert to float and multiply by a million
        value = float(s) * 10**6

        # return value
        return value

    # if input is of the form $###.# billion
    elif re.match(r'\$\s*\d+\.?\d*\s*billi?on', s, flags=re.IGNORECASE):

        # remove dollar sign and " billion"
        s = re.sub('\$|\s|[a-zA-Z]','', s)

        # convert to float and multiply by a billion
        value = float(s) * 10**9

        # return value
        return value

    # if input is of the form $###,###,###
    elif re.match(r'\$\s*\d{1,3}(?:[,\.]\d{3})+(?!\s[mb]illion)', s, flags=re.IGNORECASE):

        # remove dollar sign and commas
        s = re.sub('\$|,','', s)

        # convert to float
        value = float(s)

        # return value
        return value

    # otherwise, return NaN
    else:
        return np.nan

In [37]:
wiki_movies_df['box_office'] = box_office.str.extract(f'({form_one}|{form_two})', flags=re.IGNORECASE)[0].apply(parse_dollars)

In [39]:
#1 Create a budget variable
budget = wiki_movies_df['Budget'].dropna()
# budget

In [42]:
# Step 2 Convert to string
budget = budget.map(lambda x: ' '.join(x) if type(x) == list else x)
# budget

In [45]:
# Step 3 Remove dollar sign and hypen
budget = budget.str.replace(r'\$.*[-—–](?![a-z])', '$', regex=True)
# budget

In [47]:
# Step 4 Match form one and two 
matches_form_one = budget.str.contains(form_one, flags=re.IGNORECASE)
matches_form_two = budget.str.contains(form_two, flags=re.IGNORECASE)
# Step 5 Show unmatched
# budget[~matches_form_one & ~matches_form_two]

In [49]:
budget = budget.str.replace(r'\[\d+\]\s*', '')
# budget[~matches_form_one & ~matches_form_two]

In [50]:
wiki_movies_df['budget'] = budget.str.extract(f'({form_one}|{form_two})', flags=re.IGNORECASE)[0].apply(parse_dollars)

In [51]:
wiki_movies_df.drop('Budget', axis=1, inplace=True)

In [52]:
#correct release date
release_date = wiki_movies_df['Release date'].dropna().apply(lambda x: ' '.join(x) if type(x) == list else x)

In [53]:
date_form_one = r'(?:January|February|March|April|May|June|July|August|September|October|November|December)\s[123]\d,\s\d{4}'
date_form_two = r'\d{4}.[01]\d.[123]\d'
date_form_three = r'(?:January|February|March|April|May|June|July|August|September|October|November|December)\s\d{4}'
date_form_four = r'\d{4}'

In [55]:
# release_date.str.extract(f'({date_form_one}|{date_form_two}|{date_form_three}|{date_form_four})', flags=re.IGNORECASE)

In [56]:
wiki_movies_df['release_date'] = pd.to_datetime(release_date.str.extract(f'({date_form_one}|{date_form_two}|{date_form_three}|{date_form_four})')[0], infer_datetime_format=True)

In [57]:
running_time = wiki_movies_df['Running time'].dropna().apply(lambda x: ' '.join(x) if type(x) == list else x)

In [59]:
# running_time.str.contains(r'^\d*\s*minutes$', flags=re.IGNORECASE).sum()

In [61]:
# running_time[running_time.str.contains(r'^\d*\s*minutes$', flags=re.IGNORECASE) != True]

In [63]:
# running_time.str.contains(r'^\d*\s*m', flags=re.IGNORECASE).sum()

In [65]:
# running_time[running_time.str.contains(r'^\d*\s*m', flags=re.IGNORECASE) != True]

In [66]:
running_time_extract = running_time.str.extract(r'(\d+)\s*ho?u?r?s?\s*(\d*)|(\d+)\s*m')

In [67]:
running_time_extract = running_time_extract.apply(lambda col: pd.to_numeric(col, errors='coerce')).fillna(0)

In [68]:
wiki_movies_df['running_time'] = running_time_extract.apply(lambda row: row[0]*60 + row[1] if row[2] == 0 else row[2], axis=1)

In [69]:
wiki_movies_df.drop('Running time', axis=1, inplace=True)

In [71]:
# wiki_movies_df.info()

In [73]:
# kaggle_metadata.dtypes

In [75]:
# kaggle_metadata['adult'].value_counts()

In [77]:
# kaggle_metadata[~kaggle_metadata['adult'].isin(['True','False'])]

In [78]:
kaggle_metadata = kaggle_metadata[kaggle_metadata['adult'] == 'False'].drop('adult',axis='columns')

In [80]:
# kaggle_metadata['video'].value_counts()

In [82]:
# kaggle_metadata['video'] == 'True'

In [83]:
kaggle_metadata['video'] = kaggle_metadata['video'] == 'True'

In [87]:
#convert columns to correct numerice datatype
kaggle_metadata['budget'] = kaggle_metadata['budget'].astype(int)
kaggle_metadata['id'] = pd.to_numeric(kaggle_metadata['id'], errors='raise')
kaggle_metadata['popularity'] = pd.to_numeric(kaggle_metadata['popularity'], errors='raise')
kaggle_metadata['release_date'] = pd.to_datetime(kaggle_metadata['release_date'])

In [89]:
# ratings.info(null_counts=True)

In [91]:
# pd.to_datetime(ratings['timestamp'], unit='s')

In [92]:
ratings['timestamp'] = pd.to_datetime(ratings['timestamp'], unit='s')

In [94]:
# ratings['rating'].describe()

In [96]:
# ratings['rating'].plot(kind='hist')

In [97]:
movies_df = pd.merge(wiki_movies_df, kaggle_metadata, on='imdb_id', suffixes=['_wiki','_kaggle'])

In [99]:
# movies_df.info()

In [101]:
# movies_df[['title_wiki','title_kaggle']]

In [103]:
# #check for unmatched titles\
# movies_df[movies_df['title_wiki'] != movies_df['title_kaggle']][['title_wiki','title_kaggle']]

In [105]:
# movies_df[(movies_df['title_kaggle'] == '') | (movies_df['title_kaggle'].isnull())]

In [107]:
# movies_df.fillna(0).plot(x='running_time', y='runtime', kind='scatter')

In [109]:
# movies_df.fillna(0).plot(x='budget_wiki',y='budget_kaggle', kind='scatter')

In [111]:
# movies_df.fillna(0)[movies_df['box_office'] < 10**9].plot(x='box_office', y='revenue', kind='scatter')

In [113]:
# movies_df[['release_date_wiki','release_date_kaggle']].plot(x='release_date_wiki', y='release_date_kaggle', style='.')

In [115]:
# movies_df[(movies_df['release_date_wiki'] > '1996-01-01') & (movies_df['release_date_kaggle'] < '1965-01-01')]

In [117]:
# movies_df[(movies_df['release_date_wiki'] > '1996-01-01') & (movies_df['release_date_kaggle'] < '1965-01-01')].index

In [118]:
movies_df = movies_df.drop(movies_df[(movies_df['release_date_wiki'] > '1996-01-01') & (movies_df['release_date_kaggle'] < '1965-01-01')].index)

In [120]:
# movies_df[movies_df['release_date_wiki'].isnull()]

In [123]:
# movies_df['Language'].apply(lambda x: tuple(x) if type(x) == list else x).value_counts(dropna=False)

In [125]:
# movies_df['original_language'].value_counts(dropna=False)

In [127]:
# movies_df[['Production company(s)','production_companies']]

In [128]:
#drop the title_wiki, release_date_wiki, Language and Production company columns.
movies_df.drop(columns=['title_wiki','release_date_wiki','Language','Production company(s)'], inplace=True)

In [130]:
#create missing data for a column pair and drop the un-needed column.
def fill_missing_kaggle_data(df, kaggle_column, wiki_column):
    df[kaggle_column] = df.apply(lambda row: row[wiki_column] if row[kaggle_column] == 0 else row[kaggle_column], axis=1)
    df.drop(columns=wiki_column, inplace=True)

In [None]:
# use function to add zeros
fill_missing_kaggle_data(movies_df, 'runtime', 'running_time')
fill_missing_kaggle_data(movies_df, 'budget_kaggle', 'budget_wiki')
fill_missing_kaggle_data(movies_df, 'revenue', 'box_office')
# movies_df

In [None]:
# for col in movies_df.columns:
#     lists_to_tuples = lambda x: tuple(x) if type(x) == list else x
#     value_counts = movies_df[col].apply(lists_to_tuples).value_counts(dropna=False)
#     num_values = len(value_counts)
#     if num_values == 1:
#         print(col)

In [135]:
# movies_df['video'].value_counts(dropna=False)

In [136]:
movies_df = movies_df[['imdb_id','id','title_kaggle','original_title','tagline','belongs_to_collection','url','imdb_link',
                       'runtime','budget_kaggle','revenue','release_date_kaggle','popularity','vote_average','vote_count',
                       'genres','original_language','overview','spoken_languages','Country',
                       'production_companies','production_countries','Distributor',
                       'Producer(s)','Director','Starring','Cinematography','Editor(s)','Writer(s)','Composer(s)','Based on'
                      ]]

In [137]:
# set desired column names
movies_df.rename({'id':'kaggle_id',
                  'title_kaggle':'title',
                  'url':'wikipedia_url',
                  'budget_kaggle':'budget',
                  'release_date_kaggle':'release_date',
                  'Country':'country',
                  'Distributor':'distributor',
                  'Producer(s)':'producers',
                  'Director':'director',
                  'Starring':'starring',
                  'Cinematography':'cinematography',
                  'Editor(s)':'editors',
                  'Writer(s)':'writers',
                  'Composer(s)':'composers',
                  'Based on':'based_on'
                 }, axis='columns', inplace=True)

A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  errors=errors,


In [139]:
# movies_df.head()

In [140]:
rating_counts = ratings.groupby(['movieId','rating'], as_index=False).count()

In [141]:
rating_counts = ratings.groupby(['movieId','rating'], as_index=False).count() \
                .rename({'userId':'count'}, axis=1)

In [142]:
rating_counts = ratings.groupby(['movieId','rating'], as_index=False).count() \
                .rename({'userId':'count'}, axis=1) \
                .pivot(index='movieId',columns='rating', values='count')

In [143]:
rating_counts.columns = ['rating_' + str(col) for col in rating_counts.columns]

In [144]:
#use a left merge, to keep everything in movies_df:
movies_with_ratings_df = pd.merge(movies_df, rating_counts, left_on='kaggle_id', right_index=True, how='left')

In [145]:
#not every movie got a rating for each rating level, so put zeros in those columns
movies_with_ratings_df[rating_counts.columns] = movies_with_ratings_df[rating_counts.columns].fillna(0)

In [146]:
#good to this point at 8.4.2

In [161]:
engine = create_engine(db_string)

In [162]:
movies_df.to_sql(name='movies', con=engine)

In [163]:
# create a variable for the number of rows imported
rows_imported = 0
for data in pd.read_csv(f'{file_dir}ratings.csv', chunksize=1000000):

    # print out the range of rows that are being imported
    print(f'importing rows {rows_imported} to {rows_imported + len(data)}...', end='')

    data.to_sql(name='ratings', con=engine, if_exists='append')

    # increment the number of rows imported by the size of 'data'
    rows_imported += len(data)

    # print that the rows have finished importing
    print('Done.')

importing rows 0 to 1000000...Done.
importing rows 1000000 to 2000000...Done.
importing rows 2000000 to 3000000...Done.
importing rows 3000000 to 4000000...Done.
importing rows 4000000 to 5000000...Done.
importing rows 5000000 to 6000000...Done.
importing rows 6000000 to 7000000...Done.
importing rows 7000000 to 8000000...Done.
importing rows 8000000 to 9000000...Done.
importing rows 9000000 to 10000000...Done.
importing rows 10000000 to 11000000...Done.
importing rows 11000000 to 12000000...Done.
importing rows 12000000 to 13000000...Done.
importing rows 13000000 to 14000000...Done.
importing rows 14000000 to 15000000...Done.
importing rows 15000000 to 16000000...Done.
importing rows 16000000 to 17000000...Done.
importing rows 17000000 to 18000000...Done.
importing rows 18000000 to 19000000...Done.
importing rows 19000000 to 20000000...Done.
importing rows 20000000 to 21000000...Done.
importing rows 21000000 to 22000000...Done.
importing rows 22000000 to 23000000...Done.
importing row

In [None]:
print('1111')