### Processing pipeline for 'meta_Movies_and_TV.json'

In [1]:
datapath = 'DATA/'
filename = 'meta_Movies_and_TV.json.gz'
df_name = 'meta_Movies_and_TV'

features = ['asin', 'title', 'description']

In [2]:
# Essential imports
import pandas as pd
import numpy as np
import json
from pandas.io.json import json_normalize

# Strict JSON conversion
import json 
import gzip 

# Sleep
import time

# Progress display
from IPython.display import clear_output

from urllib.request import HTTPError

In [3]:
def gz_to_dataframe(datapath, filename):
    def parse(path): 
        g = gzip.open(path, 'rb') 
        for l in g: 
            yield eval(l) 
    def getDF(path): 
        i = 0 
        df = {} 
        for d in parse(path): 
            df[i] = d 
            i += 1 
        return pd.DataFrame.from_dict(df, orient='index') 
    return getDF(datapath+filename)
    
product_df = gz_to_dataframe(datapath, filename)[features]

In [4]:
product_df.shape

(208321, 3)

### 1. fetch artists names with amazon API

In [14]:
## Identify with amazon api servers
##

from amazon.api import AmazonAPI
from amazon.api import AsinNotFound

def get_amazon_interface():
    f = open("api_creds")
    ar = f.read().split("\n")
    return AmazonAPI(ar[0], ar[1], ar[2])
    return ar[0], ar[1], ar[2]

amazon = get_amazon_interface()

In [30]:
## API query helpers
##


''' Product lookup with API, asin can be a string ('one by one' lookup)
    or a list of strings ('bulk lookup').
    bulk lookup provides better performance
'''
def get_prod(asin) : 
    if not isinstance(asin, str): 
        acc_str = str(asin[0])
        for i in range(1, len(asin)) : 
            acc_str += ','+str(asin[i])
        print(acc_str)
        return amazon.lookup(ItemId=acc_str)
    else :
        return amazon.lookup(ItemId=asin)
    
    
''' Splits the interval [start-end] into bulks of size bulksize
'''    
def gen_bulk_index(start, end, bulksize=10, includeEnd=False):
    size = end - start + 1
    bulks = [list(range(start+(i*bulksize), start + (i+1)*bulksize)) for i in range(0, int(size/bulksize))]
    if includeEnd : 
        bulks.append(list(range(bulks[len(bulks)-1][bulksize-1], end+1)))
    else : 
        bulks.append(list(range(bulks[len(bulks)-1][bulksize-1], end)))
    return bulks    


''' Helper : extracts wanted data from the amazon product 'amazonProduct' and 
             puts it in the dataframe at the given positions   
'''
def set_df_cell_from_product(amazonProduct, rowNumber, fieldName1, fieldName2) :
    if (amazonProduct is None) : 
        product_df.set_value(rowNumber, fieldName1, [])
        product_df.set_value(rowNumber, fieldName2, [])     
    else : 
        actors = get_actors(amazonProduct)
        product_df.set_value(rowNumber, fieldName1, actors)
        directors = get_directors(amazonProduct)
        product_df.set_value(rowNumber, fieldName2, directors)


''' Save and load progress to file, as opposed to keeping it in memory inbetween cells
'''
def save_progress(dataframe, nb_rows_processed):
    dataframe.to_csv(datapath+"meta_movies_TV_temp.csv")
    file = open(datapath+"meta_movies_TV_progress", "w")
    file.write(str(nb_rows_processed))

    
def load_progress():
    dataframe = pd.read_csv(datapath+"meta_movies_TV_temp.csv")
    file = open(datapath+"meta_movies_TV_progress", "r")
    nb_rows_processed = file.readline()
    return dataframe, int(nb_rows_processed)


def get_directors(prod) : 
    return prod.directors

def get_actors(prod) : 
    return prod.actors

### API "bulk" querying

In [16]:
## Parameters for bulk item lookup, should be kept between runs
##

bulksize = 10

fresh_run = False

# used to restart from where we were in case of an error
lastItemLookedUp = 0
incompleterows = 0
emptyrows = 0
caught_httperrors = 0


if fresh_run : 
    product_df = product_df[features]
    product_df['actors'] = pd.Series(dtype=object)
    product_df['directors'] = pd.Series(dtype=object)
else : 
    product_df, lastItemLookedUp = load_progress()
    
print("last item looked up : ", lastItemLookedUp,  "  -  time : ",time.strftime("%H:%M:%S"), "\n\n")

last item looked up :  199134   -  time :  22:37:57 




  if self.run_code(code, result):


In [33]:
product_df.shape

(208321, 15)

In [32]:
lastItemLookedUp=

ref_for_progress = lastItemLookedUp
loop_complete = False


while not loop_complete : 
    try : 
        
        for bulk in gen_bulk_index(lastItemLookedUp, product_df.shape[0], bulksize=bulksize) : 
            # display progess
            if ((bulk[0]-(ref_for_progress)) % 100 == 0) : 
                clear_output()
                print("    ",int(100 * (bulk[0]+1) / product_df.shape[0]), "% completed (",bulk[0], " rows)", "  -  time : ",time.strftime("%H:%M:%S"))
                print("         Last Item Looked up : ", lastItemLookedUp, " / ", product_df.shape[0])
                print("         Incomplete rows : ", incompleterows)
                print("         Empty rows : ", emptyrows)
                print("         HTTPErrors caught : ", caught_httperrors)
                print("\n\n\n")

            # get asins for the bulk and fetch the matching AmazonProducts
            noAsinFound = False
            asins = product_df['asin'][bulk].tolist()
            try : 
                prods = get_prod(asins)
            except AsinNotFound : 
                noAsinFound = True

            
            if noAsinFound : 
                # Skip this bulk and reset the flag
                print("No Asin Found for bulk : ", bulk)
            elif (type(prods) is list) and (len(prods) == bulksize) :              
                # Case : we found exactly one result per ASIN
                #        process by bulk
                    for i, prod in enumerate(prods) : 
                        set_df_cell_from_product(prod, bulk[i], "actors", "directors")
            elif (type(prods) is list) or (type(prods) is amazon.api.AmazonProduct) :  
                # Case : we obtained a list of AmazonProducts or a single AmazonProduct
                #        fallback to 1-by-1 querying
                for n in bulk :               
                    asin = product_df['asin'][n]
                    try : 
                        prod = get_prod(asin)
                    except(AsinNotFound): 
                        prod = None
                    set_df_cell_from_product(prod, n, "actors", "directors")
                    time.sleep(0.5)
                incompleterows += 1

            # Save progress
            lastItemLookedUp = bulk[len(bulk)-1]

            # limit query frequency to avoid 503 errors
            time.sleep(min(bulksize/40, 10))
            
        loop_complete = True
            
    except HTTPError :
        # Nothing to do, loop_complete is still false,
        # We will go back to the query loop and start from the last recorded lastItemLookedUp
        
        # If we didn't make any progress, something must be wrong
        if lastItemLookedUp == ref_for_progress : 
            print("HTTPError caught at original_lastItemLookedUp  -  breaking")
            break 
        # else retry
        print()
        print(httperror)
        print()
        caught_httperrors += 1
        ref_for_progress = lastItemLookedUp
    
clear_output()
print("API query loop completed ! ")
product_df.to_csv(datapath+df_name+".csv")

IndexError: list index out of range

In [21]:
save_progress(product_df, lastItemLookedUp)