# End-to-end Movie Success Pipeline

### Building our lightweight pipelines components using Python


#### Lightweight python components


Lightweight python components do not require you to build a new container image for every code change. They're intended to use for fast iteration in notebook environment.

#### Building a lightweight python component

To build a component just define a stand-alone python function and then call kfp.components.func_to_container_op(func) to convert it to a component that can be used in a pipeline.

There are several requirements for the function:

- The function should be stand-alone. It should not use any code declared outside of the function definition. Any imports should be added inside the main function. Any helper functions should also be defined inside the main function.

- The function can only import packages that are available in the base image. If you need to import a package that's not available you can try to find a container image that already includes the required packages. (As a workaround you can use the module subprocess to run pip install for the required package.)

- If the function operates on numbers, the parameters need to have type hints. Supported types are [int, float, bool]. Everything else is passed as string.

### Building Python function-based components

A Kubeflow Pipelines component is a self-contained set of code that performs one step in your ML workflow. A pipeline component is composed of:

- The component code, which implements the logic needed to perform a step in your ML workflow.

- A component specification, which defines the following:

    - The component's metadata, its name and description.
    - The component's interface, the component's inputs and outputs.
    - The component's implementation, the Docker container image to run, how to pass inputs to your component code, and how to get the component's outputs.
    

Python function-based components make it easier to iterate quickly by letting you build your component code as a Python function and generating the component specification for you.

### Setup

In [1]:
!python -m pip install --user --upgrade pip



In [2]:
from IPython import get_ipython 
!python -m pip install pandas 
!python -m pip install pandas matplotlib scipy scikit-learn tensorflow keras seaborn --user
!python -m pip install IPython numpy imblearn tensorboard wordcloud jsonlib-python3 urllib==1.25.9 --user
!python -m pip install spacy --user





ERROR: Could not find a version that satisfies the requirement urllib==1.25.9
ERROR: No matching distribution found for urllib==1.25.9




In [3]:
# !python -m pip install pipwin
# !python -m pipwin install jsonlib-python3

### Install dependencies and packages for getting/reading data from Google Cloud Storage

In [4]:
!python -m pip install fs-gcsfs
!python -m pip install gcsfs
!python -m pip install fsspec





## Install or update the pipelines SDK


### Run the following command to install the Kubeflow Pipelines SDK.

In [7]:
# You may need to restart your notebook kernel after updating the kfp sdk
!pip3 install --user --upgrade kfp
!pip3 install kfp --upgrade
!pip3 install kfp --upgrade --user
!pip3 install -U kfp

Collecting kfp
  Downloading kfp-1.1.2.tar.gz (159 kB)
Collecting kfp-server-api<2.0.0,>=1.1.1b1
  Downloading kfp-server-api-1.1.2rc1.tar.gz (54 kB)
Collecting kfp-pipeline-spec<0.2.0,>=0.1.0
  Downloading kfp_pipeline_spec-0.1.3.1-py3-none-any.whl (11 kB)
Building wheels for collected packages: kfp, kfp-server-api
  Building wheel for kfp (setup.py): started
  Building wheel for kfp (setup.py): finished with status 'done'
  Created wheel for kfp: filename=kfp-1.1.2-py3-none-any.whl size=218833 sha256=97f5ebdf481911fc7239cd8c4ef55cb0ef7532d7083977c513440392014e2491
  Stored in directory: c:\users\sillians\appdata\local\pip\cache\wheels\cc\e1\79\371d465a7b85585133d0a10a2aaf83aeaf7ff3c269d3d3cd12
  Building wheel for kfp-server-api (setup.py): started
  Building wheel for kfp-server-api (setup.py): finished with status 'done'
  Created wheel for kfp-server-api: filename=kfp_server_api-1.1.2rc1-py3-none-any.whl size=108061 sha256=1bc17973357a2202d7d744802e7a2d29bfe1a79669466e7aaf223df6eb

You should consider upgrading via the 'C:\Users\Sillians\AppData\Local\Microsoft\WindowsApps\PythonSoftwareFoundation.Python.3.7_qbz5n2kfra8p0\python.exe -m pip install --upgrade pip' command.


Requirement already up-to-date: kfp in c:\users\sillians\appdata\local\packages\pythonsoftwarefoundation.python.3.7_qbz5n2kfra8p0\localcache\local-packages\python37\site-packages (1.1.2)


You should consider upgrading via the 'C:\Users\Sillians\AppData\Local\Microsoft\WindowsApps\PythonSoftwareFoundation.Python.3.7_qbz5n2kfra8p0\python.exe -m pip install --upgrade pip' command.


Requirement already up-to-date: kfp in c:\users\sillians\appdata\local\packages\pythonsoftwarefoundation.python.3.7_qbz5n2kfra8p0\localcache\local-packages\python37\site-packages (1.1.2)


You should consider upgrading via the 'C:\Users\Sillians\AppData\Local\Microsoft\WindowsApps\PythonSoftwareFoundation.Python.3.7_qbz5n2kfra8p0\python.exe -m pip install --upgrade pip' command.


Requirement already up-to-date: kfp in c:\users\sillians\appdata\local\packages\pythonsoftwarefoundation.python.3.7_qbz5n2kfra8p0\localcache\local-packages\python37\site-packages (1.1.2)


You should consider upgrading via the 'C:\Users\Sillians\AppData\Local\Microsoft\WindowsApps\PythonSoftwareFoundation.Python.3.7_qbz5n2kfra8p0\python.exe -m pip install --upgrade pip' command.


Restart the kernel before you proceed

In [8]:
# Restart kernel after the pip install
import IPython

IPython.Application.instance().kernel.do_shutdown(True)

{'status': 'ok', 'restart': True}

## Build the Components

### Import the kfp and kfp.components packages.

In [1]:
import kfp                  # the Pipelines SDK. 
from kfp import compiler
import kfp.dsl as dsl
import kfp.gcp as gcp
import kfp.components as comp
import os
import subprocess
import json

from kfp.dsl.types import Integer, GCSPath, String
import kfp.notebook

In [10]:
import numpy as np
import pandas as  pd
import os
import matplotlib.pyplot as plt
import gcsfs

In [2]:
# where the outputs are stored
out_dir = "/home/jovyan/g03-movie-success/data/out/"

## Create a release experiment in the Kubeflow pipeline

#### Kubeflow Pipeline requires having an Experiment before making a run. An experiment is a group of comparable runs

In [3]:
EXPERIMENT_NAME = 'Movie Success Pipeline'        # Name of the experiment in the UI
BASE_IMAGE = "tensorflow/tensorflow:latest-gpu-py3"    # Base image used for components in the pipeline

PROJECT_NAME = "Kubeflow-mlops-pipeline"

### Create an instance of the kfp.Client class

In [4]:
# If you run this command on a Jupyter notebook running on Kubeflow, you can
# exclude the host parameter.
# client = kfp.Client()
client = kfp.Client()
exp = client.create_experiment(name=EXPERIMENT_NAME)

Failed to load kube config.




MaxRetryError: HTTPConnectionPool(host='localhost', port=80): Max retries exceeded with url: /apis/v1beta1/experiments (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x000002481F806C50>: Failed to establish a new connection: [WinError 10061] No connection could be made because the target machine actively refused it'))

### This section is were we start building the different Python function-based components. We will define the component's code as a standalone python function.

## Preprocessing data for Exploratory Data Analysis Function

In [5]:
# data_movies = "gs://movie-success-bucket/data/tmdb_5000_movies.csv/tmdb_5000_movies.csv"
# data_credits = "gs://movie-success-bucket/data/tmdb_5000_credits.csv/tmdb_5000_credits.csv"

In [6]:
def preprocess_data_analysis(data_path):
    
    # func_to_container_op requires packages to be imported inside of the function. 
    import sys, subprocess;
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'pip'])
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'pandas'])
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'scikit-learn']) 
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'matplotlib']) 
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'seaborn']) 
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'jsonlib'])
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'wordcloud'])
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'fs-gcsfs'])
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'gcsfs'])
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'fsspec'])
    import json
    import gcsfs
    import ast
    from wordcloud import WordCloud
    import warnings
    warnings.filterwarnings('ignore')
    
    import os
    import numpy as np
    import pandas as pd
    import matplotlib.pyplot as plt
    import seaborn as sns
    import json
    import pickle
    
    
    #define a function that loads json columns in movies dataset
    def load_tmdb_movies(path):
        df = pd.read_csv(path)
        df['release_date'] = pd.to_datetime(df['release_date']).apply(lambda x: x.date())
        json_columns = ['genres', 'keywords', 'production_countries', 'production_companies', 'spoken_languages']
        for column in json_columns:
            df[column] = df[column].apply(json.loads)
        return df
    
    #define a function that loads json columns in credits dataset
    def load_tmdb_credits(path):
        df = pd.read_csv(path)
        json_columns = ['cast', 'crew']
        for column in json_columns:
            df[column] = df[column].apply(json.loads)
        return df
    
    #load datasets
    movies = load_tmdb_movies("gs://movie-success-bucket/data/tmdb_5000_movies.csv/tmdb_5000_movies.csv")
    credits = load_tmdb_credits("gs://movie-success-bucket/data/tmdb_5000_credits.csv/tmdb_5000_credits.csv")
    
    # statistical info about numerical data
    movies.describe()
    
    # info on variable types and filling factor
    tab_info=pd.DataFrame(movies.dtypes).T.rename(index={0:'column type'})
    tab_info=tab_info.append(pd.DataFrame(movies.isnull().sum()).T.rename(index={0:'null values'}))
    tab_info=tab_info.append(pd.DataFrame(movies.isnull().sum()/movies.shape[0]*100).T.rename(index={0:'null values (%)'}))
    tab_info
    
    
    # Dealing with json Entries
    # Some functions that will come handy during data wrangling
    def extract_feature(x, field='name'):
        """
        function is intended to extract values of a specified field
        as a list...
        """
        return [i[field] for i in x]
    
    def find_index_val(x, idx=0):
        """
        Function would come in handy when
        trying to find primary genre for example..
        """
        return x[idx] if len(x) > 0 else np.NAN
    
    def cal_length(x):
        """
        function to get the length of a value
        """
        return len(x)
    
    def find_role(x, role='Director'):
        """
        when the role is specified, function helps gets 
        the job of the specified role
        """
        for i in x:
            if i['job'] == role:
                return i['name']
        return np.NaN
    
    def find_animation(x):
        """
        function helps to identify animated and 
        non-animated movies.
        """
        if(len(x)==0):
            return np.nan
        elif('Animation' in x):
            return 1
        else:
            return 0
    
    def get_lengths(x):
        return len(x)
    
    
    def safe_access(container, index_values):
        result = container
        try:
            for idx in index_values:
                result = result[idx]
            return result
        except IndexError or KeyError:
            return pd.np.nan # return missing value rather than an error upon indexing/key failure
        
    
    #get the genre list for each movie
    movies['list_genres'] = movies['genres'].apply(extract_feature)
    
    
    #Apply the functions to extract some useful features from the dataset
    movies['primary_genre'] = movies['list_genres'].apply(find_index_val)

    movies['list_keywords'] = movies['keywords'].apply(extract_feature)

    movies['list_production_companies'] = movies['production_companies'].apply(extract_feature)

    movies['num_production_companies'] = movies['list_production_companies'].apply(cal_length)

    movies['list_productioin_countries'] = movies['production_countries'].apply(extract_feature)

    movies['num_productioin_countries'] = movies['list_productioin_countries'].apply(cal_length)

    movies['list_spoken_languages'] = movies['spoken_languages'].apply(extract_feature)

    movies['num_spoken_languages'] = movies['list_spoken_languages'].apply(cal_length)

    movies['animated'] = movies['list_genres'].apply(find_animation)
    
    
    # Movie Features
    movie_features = ['id', 'original_title', 'budget', 'revenue', 'original_language', 'status', 
                  'release_date', 'overview', 'tagline', 'list_keywords', 'primary_genre', 'list_genres', 
                  'list_productioin_countries', 'num_production_companies', 
                  'num_productioin_countries', 'list_spoken_languages', 'num_spoken_languages', 'popularity', 
                  'vote_average', 'vote_count', 'runtime', 'animated']
    
    
    # Copy the features for analysis
    movie_final = movies[movie_features].copy()
    movie_final.head()
    
    # Applying the extraction functions to the credits dataset
    credits['movie_director'] = credits['crew'].apply(find_role, role='Director')
    credits['crew_size'] = credits['crew'].apply(get_lengths)
    credits['cast_size'] = credits['cast'].apply(get_lengths)

    
    # get credits features
    credit_features = ['movie_id', 'title', 'movie_director', 'crew_size', 'cast_size']
    
    credit_final = credits[credit_features].copy()
    
    
    credits.apply(lambda row: [x.update({'movie_id': row['movie_id']}) for x in row['cast']], axis=1)
    credits.apply(lambda row: [x.update({'movie_id': row['movie_id']}) for x in row['crew']], axis=1)
    credits.apply(lambda row: [person.update({'order': order}) for order, person in enumerate(row['crew'])], axis=1)

    # get list of cast
    cast_list = []
    credits["cast"].apply(lambda x: cast_list.extend(x))
    cast = pd.DataFrame(cast_list)
    cast["type"] = "cast"
    
    # get list of crew
    crew_list = []
    credits["crew"].apply(lambda x: crew_list.extend(x))
    crew = pd.DataFrame(crew_list)
    crew["type"] = "crew"
    # list of cast and crew
    people = pd.concat([cast, crew], ignore_index=True, sort=True)
    
    # Merge the movie_final and credit_final together
    df = pd.merge(movie_final,credit_final, left_on='id', right_on='movie_id')
    
    
    # Dealing with zero values, missing runtime data
    # Replace all the zero values in the following columns with their respective mean values
    df['revenue'] = df['revenue'].replace(0, np.nan).fillna(df['revenue'].mean())
    df['budget'] = df['budget'].replace(0,np.nan).fillna(df['budget'].mean())
    df['vote_count'] = df['vote_count'].replace(0, np.nan).fillna(df['vote_count'].mean())
    df['vote_average'] = df['vote_average'].replace(0, np.nan).fillna(df['vote_average'].mean())
    #fill missing 'runtime' values with mean.
    df['runtime']=df['runtime'].replace(0, np.nan).fillna(df['runtime'].mean())
    
    
    # Dealing with datetime values
    df["release_date"] = pd.to_datetime(df["release_date"]) #convert release date to datetime
    df["release_year"] =df["release_date"].dt.year #extract release year
    df["release_month"] = df["release_date"].dt.month  #extract release month
    df["release_quarter"] = df["release_date"].dt.quarter #extract release quarter


    # Save Dataframe using the pickle extension
    df.to_pickle(f'{data_path}/preprocessed-data-analysis.pkl')
    print("Preprocessing data for Exploratory data analysis Done")

## Exploratory Data Analysis Function

In [7]:
def exploratory_data_analysis(data_path):
    
    # func_to_container_op requires packages to be imported inside of the function. 
    import sys, subprocess;
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'pip'])
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'pandas'])
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'scikit-learn']) 
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'matplotlib']) 
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'seaborn']) 
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'jsonlib'])
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'wordcloud'])
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'urllib'])
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'fs-gcsfs'])
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'gcsfs'])
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'fsspec'])
    
#     pip install urllib3
    import json
    import ast
    from wordcloud import WordCloud
    import warnings
    import gcsfs
    warnings.filterwarnings('ignore')
    
    import os
    import numpy as np
    import pandas as pd
    import matplotlib.pyplot as plt
    import seaborn as sns
    import json
    import pickle
    import urllib
    
    
    # Read the preprocessed pickle data file
    df = pd.read_pickle(f'{data_path}/preprocessed-data-analysis.pkl')
    
    df.head()
    
    # DATA ANALYSIS AND VISUALIZATION
    # General Distirbution of Numerical Values
    
    #Initiate KDE subplots for numerical columns
    fig, axarr = plt.subplots(4, 2, figsize=(15, 20))
    sns.kdeplot(df["budget"], ax=axarr[0][0])
    axarr[0][0].xaxis.set_ticks(np.arange(0, 4.25e8, 0.50e8))

    sns.kdeplot(df["revenue"], ax=axarr[0][1])
    axarr[0][1].xaxis.set_ticks(np.arange(0, 3e9, 0.50e9))
    sns.kdeplot(df["runtime"], ax=axarr[1][0])

    sns.kdeplot(df["popularity"], ax=axarr[1][1])
    axarr[1][1].xaxis.set_ticks(np.arange(0, 900, 50))

    sns.kdeplot(df["vote_average"], ax=axarr[2][0])
    axarr[2][0].xaxis.set_ticks(np.arange(0, 11, 1))

    sns.kdeplot(df["vote_count"], ax=axarr[2][1])
    sns.kdeplot(df["release_year"], ax=axarr[3][0])

    axarr[3][1].axis("off")
    
    #set subplot titles
    axarr[0][0].set_title('Budget Distribution', fontsize = 24)
    axarr[0][1].set_title('Revenue Distribution', fontsize = 24)
    axarr[1][0].set_title('Runtime Distribution', fontsize = 24)
    axarr[1][1].set_title('Popularity Distribution', fontsize = 24)
    axarr[2][0].set_title('Average vote Distribution', fontsize = 24)
    axarr[2][1].set_title('Vote count Distribution', fontsize = 24)
    axarr[3][0].set_title('Release Year Distribution', fontsize = 24)

    axarr[0][0].set_xlabel('Budget ($)', fontsize = 12)
    axarr[0][1].set_xlabel('Revenue ($)', fontsize = 12)
    axarr[1][0].set_xlabel('Runtime (min)', fontsize = 12)

    fig.tight_layout()
    
    
    # Innitiate scatter plots with correlation lines
    fig, axarr = plt.subplots(3, 2, figsize=(15, 20))
    p_color = dict(color="C0")
    l_color = dict(color="C1")
    sns.regplot(x="budget", y="revenue", data=df, fit_reg=True, scatter_kws=p_color, line_kws=l_color, ax=axarr[0][0])
    sns.regplot(x="runtime", y="revenue", data=df, fit_reg=True, scatter_kws=p_color, line_kws=l_color, ax=axarr[0][1])
    sns.regplot(x="release_year", y="revenue", data=df, fit_reg=True, scatter_kws=p_color, line_kws=l_color, ax=axarr[1][0])
    sns.regplot(x="popularity", y="revenue", data=df, fit_reg=True, scatter_kws=p_color, line_kws=l_color, ax=axarr[1][1])
    sns.regplot(x="vote_average", y="revenue", data=df, fit_reg=True, scatter_kws=p_color, line_kws=l_color, ax=axarr[2][0])
    sns.regplot(x="vote_count", y="revenue", data=df, fit_reg=True, scatter_kws=p_color, line_kws=l_color, ax=axarr[2][1])

    axarr[0][0].set_xlabel('Budget ($)', fontsize = 12)
    axarr[0][0].set_ylabel('Revenue ($)', fontsize = 12)
    axarr[0][1].set_xlabel('Runtime (min)', fontsize = 12)
    axarr[1][0].set_ylabel('Revenue ($)', fontsize = 12)
    axarr[2][0].set_ylabel('Revenue ($)', fontsize = 12)
    axarr[2][1].set_ylabel('Revenue ($)', fontsize = 12)

    #Set plot titles
    axarr[0][0].set_title('Revenue vs Budget', fontsize = 25)
    axarr[0][1].set_title('Revenue vs Runtime', fontsize = 25)
    axarr[1][0].set_title('Revenue vs Release Year', fontsize = 25)
    axarr[1][1].set_title('Revenue vs Popularity', fontsize = 25)
    axarr[2][0].set_title('Revenue vs Average Vote', fontsize = 25)
    axarr[2][1].set_title('Revenue vs Vote Count', fontsize = 25)

    fig.tight_layout()
    
    
    #computes profit or loss from budget and revenue
    df['profit'] = df['revenue'] - df['budget'] 
    df['profit_rate'] = df['profit'] / df['budget']
    
    fig, axarr = plt.subplots(3, 2, figsize=(15, 15))
    p_color = dict(color="C0")
    l_color = dict(color="C1")
    sns.regplot(x="profit", y="budget", data=df, fit_reg=True, scatter_kws=p_color, line_kws=l_color, ax=axarr[0][0])
    sns.regplot(x="runtime", y="budget", data=df, fit_reg=True, scatter_kws=p_color, line_kws=l_color, ax=axarr[0][1])
    sns.regplot(x="release_year", y="budget", data = df, fit_reg=True, scatter_kws=p_color, line_kws=l_color, ax=axarr[1][0])
    sns.regplot(x="popularity", y="budget", data=df, fit_reg=True, scatter_kws=p_color, line_kws=l_color, ax=axarr[1][1])
    sns.regplot(x="vote_average", y="budget", data=df, fit_reg=True, scatter_kws=p_color, line_kws=l_color, ax=axarr[2][0])
    sns.regplot(x="vote_count", y="budget", data=df, fit_reg=True, scatter_kws=p_color, line_kws=l_color, ax=axarr[2][1])

    axarr[0][0].set_ylabel('Budget ($)', fontsize = 12)
    axarr[0][0].set_xlabel('Profit ($)', fontsize = 12)
    axarr[0][1].set_xlabel('Runtime (min)', fontsize = 12)
    axarr[1][0].set_ylabel('Budget ($)', fontsize = 12)
    axarr[2][0].set_ylabel('Budget ($)', fontsize = 12)

    #Set title for each plot
    axarr[0][0].set_title('Budget vs Profit', fontsize = 20)
    axarr[0][1].set_title('Budget vs Runtime', fontsize = 20)
    axarr[1][0].set_title('Budget vs Release Year', fontsize = 20)
    axarr[1][1].set_title('Budget vs Popularity', fontsize = 20)
    axarr[2][0].set_title('Budget vs Average Vote', fontsize = 20)
    axarr[2][1].set_title('Budget vs Vote Count', fontsize = 20)
    fig.tight_layout()
    
    
    # Exploring Budget and Revenue with Respect to Datetime
    #computes total revenue, profit, budget per release year
    revenues = df.groupby('release_year')['revenue'].sum() 
    budgets = df.groupby('release_year')['budget'].sum()
    profits = df.groupby('release_year')['profit'].sum()
    rate = df.groupby('release_year')['profit_rate'].mean()
    
    
    plt.figure(figsize=(15,8))
    x1 = df['release_year']
    y1 = revenues
    # plotting the revenue points 
    plt.plot(revenues, label = "Revenues")
    # line 2 points
    x2 = df['release_year']
    y2 = budgets
    # plotting the budgets points 
    plt.plot(budgets, label = "Budgets")
    x3 = df['release_year']
    y3 = profits
    # plotting the profits points
    plt.plot(profits, label = "Profits")
    x4 = df['release_year']
    y4 = rate
    plt.plot(rate,label='profit rate')
    
    
    plt.xlabel('Year')
    # Set the y axis label of the current axis.
    plt.ylabel('Amount ($)')
    # Set a title of the current axes.
    plt.title('Trends in Revenue, Budget and Profit over the Years', fontsize =25)
    # show a legend on the plot
    plt.legend(loc='upper left', fontsize = 15)
    # Display a figure.
    plt.show()
    
    
    df['release_year'].max()
    
    last_10_years = df[df['release_year']>2007]
    
    
    revenues = last_10_years.groupby('release_year')['revenue'].sum()
    budgets = last_10_years.groupby('release_year')['budget'].sum()
    profits = last_10_years.groupby('release_year')['profit'].sum()
    rate = last_10_years.groupby('release_year')['profit_rate'].mean()
    
    
    plt.figure(figsize=(14,7))
    x1 = df['release_year']
    y1 = revenues
    # plotting the revenue points 
    plt.plot(revenues, label = "Revenues")
    # line 2 points
    x2 = df['release_year']
    y2 = budgets
    # plotting the budgets points 
    plt.plot(budgets, label = "Budgets")
    x3 = df['release_year']
    y3 = profits
    # plotting the profits points
    plt.plot(profits, label = "Profits")
    x4 = df['release_year']
    y4 = rate
    plt.plot(rate,label='profit rate')

    plt.xlabel('Year')
    # Set the y axis label of the current axis.
    plt.ylabel('Amount ($)')
    # Set a title of the current axes.
    plt.title('Trends in Revenue, Budget and Profit over the last 10 Years', fontsize =25)
    # show a legend on the plot
    plt.legend(loc='upper left', fontsize = 14)
    # Display a figure.
    plt.show()

    
    
    # How many movies have grossed over $1 billion for each which year?
    billi_movies = df[df['revenue']>1000000000].sort_values('revenue', ascending =False)
    
    
    fig =sns.catplot(x='release_year', data=billi_movies, kind="count", palette = 'prism')
    fig.set_xticklabels(rotation=45)
    plt.title('Number of Movies that Grossed more than $1 Billion', fontsize = 20)
    plt.gcf().set_size_inches(10, 4)
    
    
    #compute total and avg revenue per month, quarter
    monthly_revenue = df.groupby('release_month')['revenue'].sum().to_frame().reset_index()
    avg_monthly_revenue =  df.groupby('release_month')['revenue'].mean().to_frame().reset_index()
    quarterly_revenue = df.groupby('release_quarter')['revenue'].sum().to_frame().reset_index()
    avg_quarterly_revenue =  df.groupby('release_quarter')['revenue'].mean().to_frame().reset_index()
    
    
    
    #Initiate subplots for different distribution of revenue by relese month
    fig, axarr = plt.subplots(3,2, figsize=(15, 15))
    sns.barplot(x='release_month', y = 'revenue', data = monthly_revenue, ax = axarr[1][0], palette = 'prism')
    sns.countplot(df["release_month"], ax=axarr[0][0], palette='prism')
    sns.barplot(x='release_month', y = 'revenue', data = avg_monthly_revenue, ax = axarr[2][0], palette = 'prism')
    sns.barplot(x='release_quarter', y='revenue', data =avg_quarterly_revenue,ax =axarr[2][1],palette ='magma')
    sns.countplot(df['release_quarter'], ax=axarr[0][1],palette ='magma')
    sns.barplot(x='release_quarter', y = 'revenue', data = quarterly_revenue, ax = axarr[1][1],palette ='magma')

    #set plot titles

    axarr[2][0].set_title('Average Monthly Revenue', fontsize = 20)
    axarr[0][0].set_title('Number of Movies Released Per Month', fontsize = 20)
    axarr[1][0].set_title('Total Monthly Revenue', fontsize = 20)
    axarr[0][1].set_title('Number of Movies Released Per Quarter',fontsize =20)
    axarr[1][1].set_title('Total Quarterly Revenue',fontsize = 20)
    axarr[2][1].set_title('Average Quartely Revenue',fontsize = 20)

    axarr[1][0].set_ylabel('Revenue ($)', fontsize = 12)
    axarr[2][0].set_ylabel('Revenue ($)', fontsize = 12)

    #add customized labels to plots
    labels = ['Jan', 'Feb', 'Mar','April', 'May', 'Jun','Jul','Aug','Sep','Oct','Nov','Dec']
    x =[axarr[0][0], axarr[1][0],axarr[2][0]]
    for a in x:
        a.set_xticklabels(labels,fontsize = 14)

    fig.tight_layout()
    
    
    
    #Initiate subplots for different distribution of revenue by relese month
    fig, axarr = plt.subplots(2,1, figsize=(15, 15))
    sns.boxplot(x = 'release_month', y = 'revenue', data = df, ax = axarr[0], palette ='prism')
    sns.boxplot(x = 'release_quarter', y = 'revenue', data = df, ax = axarr[1],palette ='rainbow')

    axarr[0].set_title('Revenue vs Release Month', fontsize = 25)
    axarr[1].set_title('Revenue vs Release Quarter',fontsize = 25)
    axarr[0].set_ylabel('Revenue ($)', fontsize = 12)
    axarr[1].set_ylabel('Revenue ($)', fontsize = 12)

    axarr[0].set_xticklabels(labels,fontsize = 14)
    
    
    
    # Exploring Movies
    # Which movies have the highest ratings?
    C= movies['vote_average'].mean()
    m= movies['vote_count'].quantile(0.9)
    
    
    q_movies = df.copy().loc[df['vote_count'] >= m]
    q_movies.shape

    def weighted_rating(x, m=m, C=C):
        v = x['vote_count']
        R = x['vote_average']
        # Calculation based on the IMDB formula
        return (v/(v+m) * R) + (m/(m+v) * C)
    
    
    # Define a new feature 'score' and calculate its value with `weighted_rating()`
    q_movies['score'] = q_movies.apply(weighted_rating, axis=1)
    
    
    #Sort movies based on score calculated above
    q_movies = q_movies.sort_values('score', ascending=False)

    #Print the top 15 movies according to IMDB rating
    q_movies[['original_title','movie_director', 'vote_count', 'vote_average', 'score','release_year']].head(15)

    
    # Most popular and least popular movies
    pop= df.sort_values('popularity', ascending=False)
    pop_10 = pop.head(10)
    
    
    plt.figure(figsize=(12,10))

    sns.barplot(y = 'original_title',x = 'popularity',data= pop_10,palette = 'prism', hue='movie_director', dodge = False)
    #plt.gca().invert_yaxis()
    plt.xlabel("Popularity", fontsize = 16)
    plt.ylabel("") 
    plt.title("Most Popular Movies", fontsize = 25)
    plt.legend(title ='Movie Director',fontsize = 15).get_title().set_fontsize(18)
    plt.yticks(fontsize= 15)
    
    
    least_10 = pop.tail(10)
    
    plt.figure(figsize=(12,8))

    least_10.index = least_10.original_title.values
    chart = least_10['popularity'].sort_values(ascending =True).plot.barh(color=sns.color_palette("prism",10), fontsize = 15)
    chart.set_xticklabels(chart.get_xticklabels(), rotation=45, horizontalalignment='right')
    chart.set_xlabel('Popularity')
    chart.set_ylabel('Movie')
    chart.set_title('Least Popular Movies', fontsize = 25)

    
    # Big Budget Movies
    big_budget = df.sort_values('budget', ascending = False).iloc[:10]
    
    plt.figure(figsize=(12,8))
    sns.barplot(x = 'budget',
                y = 'title',
                palette='magma',
                data = big_budget,
               hue= 'movie_director',
               dodge = False)
    plt.title('Big Budget Movies', fontsize = 20)
    plt.legend(fontsize = 15)
    plt.xticks(fontsize = 15)
    plt.yticks(fontsize = 15)
    plt.legend(title ='Movie Director',fontsize = 14).get_title().set_fontsize(18)

    plt.show()
    
    # How many movies made profit or loss?
    
    
    print("Exploratory Data Analysis Done")

### Get Data

In [11]:
bucket = "movie-success-bucket"

In [12]:
data = pd.read_csv("gs://{}/data/merged_movies_dataset.csv".format(bucket))

data.head(2)

Unnamed: 0.1,Unnamed: 0,id,title,cast,crew,budget,genres,homepage,keywords,original_language,...,production_companies,production_countries,release_date,revenue,runtime,spoken_languages,status,tagline,vote_average,vote_count
0,0,19995,Avatar,"[{""cast_id"": 242, ""character"": ""Jake Sully"", ""...","[{""credit_id"": ""52fe48009251416c750aca23"", ""de...",237000000,"[{""id"": 28, ""name"": ""Action""}, {""id"": 12, ""nam...",http://www.avatarmovie.com/,"[{""id"": 1463, ""name"": ""culture clash""}, {""id"":...",en,...,"[{""name"": ""Ingenious Film Partners"", ""id"": 289...","[{""iso_3166_1"": ""US"", ""name"": ""United States o...",2009-12-10,2787965087,162.0,"[{""iso_639_1"": ""en"", ""name"": ""English""}, {""iso...",Released,Enter the World of Pandora.,7.2,11800
1,1,285,Pirates of the Caribbean: At World's End,"[{""cast_id"": 4, ""character"": ""Captain Jack Spa...","[{""credit_id"": ""52fe4232c3a36847f800b579"", ""de...",300000000,"[{""id"": 12, ""name"": ""Adventure""}, {""id"": 14, ""...",http://disney.go.com/disneypictures/pirates/,"[{""id"": 270, ""name"": ""ocean""}, {""id"": 726, ""na...",en,...,"[{""name"": ""Walt Disney Pictures"", ""id"": 2}, {""...","[{""iso_3166_1"": ""US"", ""name"": ""United States o...",2007-05-19,961000000,169.0,"[{""iso_639_1"": ""en"", ""name"": ""English""}]",Released,"At the end of the world, the adventure begins.",6.9,4500
2,2,206647,Spectre,"[{""cast_id"": 1, ""character"": ""James Bond"", ""cr...","[{""credit_id"": ""54805967c3a36829b5002c41"", ""de...",245000000,"[{""id"": 28, ""name"": ""Action""}, {""id"": 12, ""nam...",http://www.sonypictures.com/movies/spectre/,"[{""id"": 470, ""name"": ""spy""}, {""id"": 818, ""name...",en,...,"[{""name"": ""Columbia Pictures"", ""id"": 5}, {""nam...","[{""iso_3166_1"": ""GB"", ""name"": ""United Kingdom""...",2015-10-26,880674609,148.0,"[{""iso_639_1"": ""fr"", ""name"": ""Fran\u00e7ais""},...",Released,A Plan No One Escapes,6.3,4466
3,3,49026,The Dark Knight Rises,"[{""cast_id"": 2, ""character"": ""Bruce Wayne / Ba...","[{""credit_id"": ""52fe4781c3a36847f81398c3"", ""de...",250000000,"[{""id"": 28, ""name"": ""Action""}, {""id"": 80, ""nam...",http://www.thedarkknightrises.com/,"[{""id"": 849, ""name"": ""dc comics""}, {""id"": 853,...",en,...,"[{""name"": ""Legendary Pictures"", ""id"": 923}, {""...","[{""iso_3166_1"": ""US"", ""name"": ""United States o...",2012-07-16,1084939099,165.0,"[{""iso_639_1"": ""en"", ""name"": ""English""}]",Released,The Legend Ends,7.6,9106
4,4,49529,John Carter,"[{""cast_id"": 5, ""character"": ""John Carter"", ""c...","[{""credit_id"": ""52fe479ac3a36847f813eaa3"", ""de...",260000000,"[{""id"": 28, ""name"": ""Action""}, {""id"": 12, ""nam...",http://movies.disney.com/john-carter,"[{""id"": 818, ""name"": ""based on novel""}, {""id"":...",en,...,"[{""name"": ""Walt Disney Pictures"", ""id"": 2}]","[{""iso_3166_1"": ""US"", ""name"": ""United States o...",2012-03-07,284139100,132.0,"[{""iso_639_1"": ""en"", ""name"": ""English""}]",Released,"Lost in our world, found in another.",6.1,2124


## Building Python function-based components

### Define your component's code as a standalone python function.

### Preprocessing Function

In [14]:
def preprocess_data_modeling(data_path):
    
    # func_to_container_op requires packages to be imported inside of the function.
    import sys, subprocess;
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'pip'])
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'pandas'])
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'scikit-learn'])
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'jsonlib'])
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'fs-gcsfs'])
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'gcsfs'])
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'fsspec'])
    import ast
    import json
    import pandas as pd
    import gcsfs
    import numpy as np
    from ast import literal_eval
    from pandas import Series, DataFrame,read_csv
    import pickle
    
    # get bucket name
    bucket = "movie-success-bucket"
    
    # read data
    data = pd.read_csv("gs://{}/data/merged_movies_dataset.csv".format(bucket)) # get the merged dataset
    
    # remove not required columns
    data = data.drop('original_title', axis = 1, inplace = True)
    
    # print the first 5 rows
    print(data.head())
    
    # Handling the Json Columns
    # Applying the literal_eval function of ast on all the json columns
    json_cols = ['cast','crew','genres','keywords','production_companies','production_countries','spoken_languages']
    for col in json_cols:
        data[col] = data[col].apply(literal_eval)
        
    # Helper Functions for the same
    # function to get the names of the movies genre
    def get_genre(x):
        if(isinstance(x, list)):
            genre = [i['name'] for i in x]
    
    return genre

    # function to get the jobs of the crew members 
    def get_jobs(x):
        if(isinstance(x, list)):
            jobs = [i['job'] for i in x]
    return jobs

    # function to get the target/label (Animation == 1 / Not_Animation == 0)
    def get_labels(x):
        if(len(x)==0):
            return np.nan
        elif('Animation' in x):
            return 1
        else:
            return 0
    
    # Get percentage of voice artists among total cast
    def get_characternames(x):
        if(isinstance(x, list)):
            chr_name = [i['character'] for i in x]
            countc = 0
            for j in chr_name:
                if('(voice)' in j):
                    countc += 1
            if(len(chr_name)!=0):
                return (countc/len(chr_name))
            else:
                return 0
            
    # function to get crew memebers whose jobs are Costume Design
    def get_costume_labels(x):
        if 'Costume Design' in x:
            return 1
        else:
            return 0
        
    # function to get the genre department with the Lighting role
    def get_genre_cd(x):
        if(isinstance(x, list)):
            dept = [i['department'] for i in x]
        if 'Lighting' in dept:
            return 0
        else:
            return 1
        
    # Applying the above functions 
    data['genres'] = data['genres'].apply(get_genre)
    data['crew_jobs'] = data['crew'].apply(get_jobs)
    data['percent_of_voice_artists'] = data['cast'].apply(get_characternames)
    data['labels'] = data['genres'].apply(get_labels)
    
    # Rounding off the percentage to 3 decimal places
    for x in range(0,len(data['percent_of_voice_artists'])):
        data['percent_of_voice_artists'][x] = np.round(data['percent_of_voice_artists'][x],3)
        
    # number of Labels missing / Null values  
    data.labels.isna().sum()
    
    
    # dealing with Labels missing values
    idxsc = data[((data.labels != 1) & (data.labels != 0))].index
    data.drop(idxsc, inplace = True)
    data.reset_index(drop= True, inplace= True)
    
    # checking for dataset Features with missing values
    data.isna().sum()
    
    # check the number of animated and non_animated movies
    AnimatedMoviesCount = np.sum(data['labels'] == 1)
    NotAnimatedMoviesCount = np.sum(data['labels'] == 0)

#     print("Number of Animated Movies are: ", AnimatedMoviesCount)
#     print("Number of Not Animated Movies are: ", NotAnimatedMoviesCount)

    # Apply the get_costume_labels function
    data['costume'] = data['crew_jobs'].apply(get_costume_labels)
    
    data.costume.value_counts()
    
    # Apply get_genre_cd function
    data['lighting_dept'] = data['crew'].apply(get_genre_cd)

    data.lighting_dept.value_counts()
    
    # Taking into account only those movies having atleast 7 crew members
    # So as to handle the quality of training data Tested for multiple values, but 7 yielded best result
    idx=[]
    for x in range(0,data.shape[0]):
        if len(data.crew_jobs[x])>7:
            idx.append(x)
    print("Number of Movies with more than 7 crew members: ",str(len(idx)))

    df = data.iloc[idx,:]
    
    
    # Get the number of animated and non_animated movies
    AnimatedMoviesCount2 = np.sum(df['labels'] == 1)
    NotAnimatedMoviesCount2 = np.sum(df['labels'] == 0)
    
    print("Number of Animated Movies are: ", AnimatedMoviesCount2)
    print("Number of Not Animated Movies are: ", NotAnimatedMoviesCount2)
    
    
    # Converting 'crew_jobs' from list to string (in lower form) via join function
    def join_strings(x):
        return ", ".join(x)

    def str_lower(x):
        return x.lower()

    df['crew_jobs'] = df['crew_jobs'].apply(join_strings)
    df['crew_jobs'] = df['crew_jobs'].apply(str_lower)
    
    # get the number of labels
    df['labels'].value_counts() 
    
    # Save Dataframe using the pickle extension
    df.to_pickle(f'{data_path}/preprocessed-data-model.pkl')
    print("Preprocessing Done")
    
    #Save preprocessed data
    df.to_csv("data/preprocessed", index=False)

In [15]:
# data_path = "gs://{}/data/merged_movies_dataset.csv".format(bucket)

# preprocess(data_path)

#### Save preprocessed data to google cloud bucket

In [16]:
# !gsutil cp data/preprocessed gs://${bucket}/data/preprocessed

In [17]:
# where preprocessed data is stored
# in_dir = "gs://{}/data/preprocessed".format(bucket)

## Model training Function

In [18]:
from typing import NamedTuple
def model_training(data_path, model_file):
    
    # func_to_container_op requires packages to be imported inside of the function.
    import sys, subprocess;
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'pip'])
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'pandas'])
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'scikit-learn'])
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'numpy'])
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'imblearn']) 
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'jsonlib']) 
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'tensorflow'])
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'tensorboard'])  
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'IPython'])
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'spacy'])
    import pandas as pd
    import numpy as np
    import pickle
    import imblearn
    import spacy
    from spacy.lang.en import STOP_WORDS
    from sklearn import metrics
    from sklearn.metrics import accuracy_score, recall_score, precision_score, f1_score, classification_report, confusion_matrix
    from sklearn.model_selection import train_test_split
    from sklearn.feature_extraction.text import TfidfVectorizer,CountVectorizer,TfidfTransformer
    from sklearn.svm import SVC
    from sklearn.pipeline import Pipeline
    
    
    # get data
#     df = pd.read_csv("gs://movie-success-bucket/data/preprocessed")
    
    #load the transformed data
    df = pd.read_pickle(f'{data_path}/preprocessed-data-model.pkl')
    df.head()
    
    # Get the features and labels
    X = df['crew_jobs']
    y = df['labels']
    
    # split data
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.20, random_state=53)
    
    # function to output our scores 
    def score_output(y_test, y_pred):
        print(metrics.confusion_matrix(y_test, y_pred))
        print(metrics.classification_report(y_test, y_pred))
        accuracy = accuracy_score(y_test, y_pred)
        print('The Accuracy on The Test Set is: %s' % accuracy)
        
    # model
    nlp = spacy.load('en_core_web_sm')
    
    # instantiate stopwords to use
    stop_words_str = " ".join(STOP_WORDS)
    stop_words_lemma = set(word.lemma_ for word in nlp(stop_words_str))

    additional_words = ['editor', 'director', 'producer', 'writer', 'assistant', 'sound']

    for word in additional_words:
        stop_words_lemma = stop_words_lemma.union({word})
        
    # define the lemmatizer function
    def lemmatizer(text):
        return [word.lemma_ for word in nlp(text)]
    
    # Without Stop Words
    bow = TfidfVectorizer(ngram_range = (1,1))

    model = Pipeline([('bag_of_words', bow),('classifier', SVC())])
    model.fit(X_train,y_train)
    
    print("Without Stop Words")
    print('Training accuracy: {}'.format(model.score(X_train,y_train)))
    y_pred = model.predict(X_test)
    score_output(y_test, y_pred)
    
    # output the splitted data file to path 
    np.savez_compressed(f'{data_path}/train-test-data.npz', 
                       X_train=X_train,
                       X_test=X_test,
                       y_train=y_train,
                       y_test=y_test)
    
    #Save the model as a pickle file.
    with open(f'{data_path}/{model_file}', 'wb') as file:
        pickle.dump(model, file)
        
    # Save the classifier model to the designated 
#     with open(f'{data_path}/{classifier_file}', 'wb') as file:
#         pickle.dump(classifier, file)

In [19]:
# estimator = model_training(out_dir, "model")

#### Export saved model to google cloud storage bucket.

In [20]:
# !gsutil cp {out_dir}/model gs://${bucket}/{out_dir}/model

## Model Validation Function

In [21]:
from typing import NamedTuple
def model_validation(data_path, model_file) -> NamedTuple(
    'ModelvalidationOutputs',
    [
      ('recall', float),
      ('accuracy', float),
      ('precision', float),
      ('f1score', float),
      ('mlpipeline_metrics', 'Metrics')
    ]):
    
    # func_to_container_op requires packages to be imported inside of the function.
    import sys, subprocess;
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'pip'])
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'pandas'])
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'scikit-learn'])
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'numpy'])
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'jsonlib']) 
    import pandas as pd
    import numpy as np
    import json
    import pickle
    from sklearn.metrics import classification_report, recall_score, accuracy_score,precision_score, f1_score, confusion_matrix
    
    
    # Load and unpack the test_data
    with open(f'{data_path}/{model_file}','rb') as file:
        model = pickle.load(file)
        
    # load the transformed data
    train_test_data = np.load(f'{out_data_path}/train-test-data.npz')
    X_train = train_test_data['X_train']
    X_test  = train_test_data['X_test']
    y_train = train_test_data['y_train']
    y_test  = train_test_data['y_test']
    
        
    # write out metrics
    accuracy = model.score(X_train,y_train)
    y_pred = model.predict(X_test)
    
    
    # Model Evaluation
    recall = recall_score(y_test,y_pred)
    accuracy = accuracy_score(y_test,y_pred)
    precision = precision_score(y_test,y_pred)
    f1score = f1_score(y_test,y_pred)
        
    # Export metrics
    metrics = {
      'metrics': [{
        'name': 'accuracy-score', # The name of the metric. Visualized as the column name in the runs table.
        'numberValue':  accuracy, # The value of the metric. Must be a numeric value.
        'format': "PERCENTAGE",   # The optional format of the metric. Supported values are "RAW" (displayed in raw format) and "PERCENTAGE" (displayed in percentage format).
      },{
        'name': 'recall-score',
        'numberValue': recall,
        'format': "PERCENTAGE",
      },{
        'name': 'precision-score',
        'numberValue': precision,
        'format': "PERCENTAGE",
      },{
        'name': 'f1score',
        'numberValue': f1score,
        'format': "PERCENTAGE",
      }]}
    
    
    # The Report file
    with open(f'{data_path}/result.txt', 'w') as result:
        result.write("Report: {} ".format(report))
        
    #output the splitted data file to path
    np.savez_compressed(f'{data_path}/validated-data.npz', 
                       x_test=x_test,
                       y_test=y_test,
                       y_pred=y_pred)

    # Save y_pred and y_test as pickle files
    pickle.dump(y_pred, open(f'{data_path}/y_pred.pkl','wb'))
    pickle.dump(y_test, open(f'{data_path}/y_test.pkl','wb'))
    
    # Save the classifier model to the designated 
    with open(f'{data_path}/{model_file}', 'wb') as file:
        pickle.dump(model, file)
        
    
        
    with open(f'{data_path}/classifier_result.txt', 'w') as result:
        result.write(" Prediction: {},\n\nActual: {} ".format(y_pred, y_test))
        
    from collections import namedtuple
    model_eval_output = namedtuple(
        'ModelvalidationOutputs',
        ['accuracy', 'recall', 'precision', 'f1score',  'mlpipeline_metrics']) 
    return model_eval_output(accuracy, recall, precision, f1score,  json.dumps(metrics))

In [None]:
# model_validation(out_dir, "model")

In [23]:
from typing import NamedTuple
def confusion_matrix(data_path, model_file):
    
     # func_to_container_op requires packages to be imported inside of the function.
    import sys, subprocess;
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'pip'])
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'pandas'])
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'scikit-learn'])
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'numpy'])
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'matplotlib']) 
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'jsonlib']) 
    
    import json
    import pandas as pd
    import numpy as np
    import matplotlib.pyplot as plt
    import pickle
    from sklearn.metrics import classification_report, confusion_matrix
    from sklearn.metrics import plot_confusion_matrix
    
    
     # Load the saved classifier model
    with open(f'{data_path}/{model_file}', 'rb') as file:
        model = pickle.load(file)
        
    # Load the y_pred data file
    pickle_in = open(f'{data_path}/y_pred.pkl',"rb")
    y_pred = pickle.load(pickle_in)
    
    # Load the y_test data file
    pickle_ = open(f'{data_path}/y_test.pkl',"rb")
    y_test = pickle.load(pickle_)
    
    # Confusion matrix
    matrix = confusion_matrix(y_test.reshape(-1,1), y_pred)
    print(matrix)

## Clustering Analysis

In [24]:
# def clustering_analysis(data_path):
    
#     # func_to_container_op requires packages to be imported inside of the function. 
#     import sys, subprocess;
#     subprocess.run([sys.executable, '-m', 'pip', 'install', 'pip==20.2.4'])
#     subprocess.run([sys.executable, '-m', 'pip', 'install', 'pandas==0.23.4'])
#     subprocess.run([sys.executable, '-m', 'pip', 'install', 'scikit-learn==0.22']) 
#     subprocess.run([sys.executable, '-m', 'pip', 'install', 'matplotlib==3.3.1']) 
#     subprocess.run([sys.executable, '-m', 'pip', 'install', 'seaborn==0.10.1']) 
#     subprocess.run([sys.executable, '-m', 'pip', 'install', 'jsonlib==1.6.1'])
#     subprocess.run([sys.executable, '-m', 'pip', 'install', 'tensorboard==2.1.0'])
#     subprocess.run([sys.executable, '-m', 'pip', 'install', 'wordcloud==1.8.0'])
#     import json
#     import os
#     import numpy as np
#     import pandas as pd
#     import matplotlib.pyplot as plt
#     import seaborn as sns
#     import pickle
#     import urllib
    
#     import tensorflow.keras.backend as K
#     from tensorflow.keras.layers import Layer, InputSpec
#     from tensorflow.keras.layers import Dense, Input
#     from tensorflow.keras.models import Model
#     from tensorflow.keras.optimizers import SGD
#     from tensorflow.keras import callbacks
#     from tensorflow.keras.initializers import VarianceScaling
#     from sklearn.cluster import KMeans  

## Build a pipeline component from the function

### Convert the function to a pipeline operation.

In [25]:
# Create preproces lightweight components.
preprocess_data_analysis_op = comp.func_to_container_op(preprocess_data_analysis, base_image=BASE_IMAGE)

# Create the analysis lightweight components.
exploratory_data_analysis_op = comp.func_to_container_op(exploratory_data_analysis, base_image=BASE_IMAGE)

# Create preproces lightweight components.
preprocess_data_modeling_op = comp.func_to_container_op(preprocess_data_modeling, base_image=BASE_IMAGE)

# Create the training lightweight components.
model_training_op = comp.func_to_container_op(model_training, base_image=BASE_IMAGE)

# Create the model evaluation lightweight components.
model_validation_op = comp.func_to_container_op(model_validation, base_image=BASE_IMAGE)

# Create the confusion matrix lightweight components.
confusion_matrix_op = comp.func_to_container_op(confusion_matrix, base_image=BASE_IMAGE)

## Build Kubeflow Pipeline


- Our next step will be to create the various components that will make up the pipeline. Define the pipeline using the @dsl.pipeline decorator.


- The pipeline function is defined and includes a number of paramters that will be fed into our various components throughout execution. Kubeflow Pipelines are created decalaratively. This means that the code is not run until the pipeline is compiled.


- A Persistent Volume Claim can be quickly created using the VolumeOp method to save and persist data between the components.

    - Note that while this is a great method to use locally, you could also use a cloud bucket for your persistent storage.

In [26]:
# domain-specific language 
# Define the Pipeline
@dsl.pipeline(
    name='Movie Success Pipeline',
    description='End-to-end Movie Success machine learning Project pipeline.'
)

# Define parameters to be fed into pipeline
def Movie_Success_container_pipeline(
    data_path: str,  # DATA_PATH
    model_file: str  # CLASSIFIER_PATH    
):
    
    # Create a persistent volume
    # Define volume to share data between components
    vop = dsl.VolumeOp(
    name="creat_volume",
    resource_name="data-volume", 
    size="1Gi", 
    modes=dsl.VOLUME_MODE_RWO)
    
    
    # Define Pipeline Components and dependencies
    # We do this with ContainerOp, an object that defines a pipeline component from a container.
    
    # Create movie success preprocessing component for analysis.
    movie_success_preprocessing_data_analysis_container = preprocess_data_analysis_op(data_path) \
                                                            .add_pvolumes({data_path: vop.volume})
    
    # Create movie success preprocessing component for modeling.
    movie_success_preprocessing_data_modeling_container = preprocess_data_modeling_op(data_path) \
                                                            .add_pvolumes({data_path: vop.volume})
    
    # Create movie success exploratory data analysis component
    movie_success_exploratory_analysis_container = exploratory_data_analysis_op(data_path) \
                                        .add_pvolumes({data_path: movie_success_preprocessing_data_analysis_container.volume})
    
    # Create movie success model training component
    movie_success_model_training_container = model_training_op(data_path, model_file) \
                                        .add_pvolumes({data_path: movie_success_preprocessing_data_modeling_container.pvolume})
    
    # Create movie success model validation component
    movie_success_model_validation_container = model_validation_op(data_path, model_file) \
                                        .add_pvolumes({data_path: movie_success_model_training_container.pvolume})
     
    # Create movie success confusion matrix component
    movie_success_confusion_matrix_container = confusion_matrix_op(data_path, model_file) \
                                        .add_pvolumes({data_path: movie_success_model_validation_container.pvolume})
    
    # Print the result of the prediction
    Movie_Success_container_container = dsl.ContainerOp(
        name="Movie Success prediction",  # the name displayed for the component execution during runtime.
        image='library/bash:4.4.23',      # Image tag for the Docker container to be used.
        pvolumes={data_path: movie_success_model_validation_container.pvolume}, # dictionary of paths and associated Persistent Volumes to be mounted to the container before execution.
        arguments=['cat', f'{data_path}/classifier_result.txt'] # command to be run by the container at runtime.
    )

## Compile and run the pipeline

- Finally we feed our pipeline definition into the compiler and run it as an experiment. This will give us 2 links at the bottom that we can follow to the Kubeflow Pipelines UI where you can check logs, artifacts, inputs/outputs, and visually see the progress of your pipeline.

- Kubeflow Pipelines lets you group pipeline runs by Experiments. You can create a new experiment, or call kfp.Client().list_experiments() to see existing ones. If you don't specify the experiment name, the Default experiment will be used.



#### Define some environment variables which are to be used as inputs at various points in the pipeline.

In [27]:
DATA_PATH = '/mnt'  # mount your filesystems or devices
CLASSIFIER_PATH = 'movie_success_main.pkl'

In [29]:
pipeline_func = Movie_Success_container_pipeline

In [30]:
experiment_name=EXPERIMENT_NAME
run_name = pipeline_func.__name__ + ' run'


arguments = {"data_path":DATA_PATH,
             "classifier_file":CLASSIFIER_PATH}


# Compile pipeline to generate compressed YAML definition of the pipeline.
kfp.compiler.Compiler().compile(pipeline_func,'{}.zip'.format(experiment_name))



# Submit pipeline directly from pipeline function
run_result = client.create_run_from_pipeline_func(pipeline_func, 
                                                  experiment_name=experiment_name, 
                                                  run_name=run_name, 
                                                  arguments=arguments)

AttributeError: 'ContainerOp' object has no attribute 'volume'