In [1]:
import requests
import json
import pyspark
from pyspark.sql import SparkSession
import re

sc = pyspark.SparkContext()
spark = SparkSession(sc)

url = 'https://s3-eu-west-1.amazonaws.com/dwh-test-resources/recipes.json'

r = requests.get(url=url)
data = r.text
parsed_recipes = data.replace('\n{"name"', ',\n{"name"')
recipes = json.loads("[" + parsed_recipes + "]")





In [36]:
def get_duration(duration):
    """ Parses time from JSON file to minutes
    -----------------------------------------
    Uses regex expresion to get numbers and 
    their positions. If the number is in the 
    position of hours, it's multuplied by 60.
    If it's in the position of minutes, it 
    stays the same.
    If the field is empty, it returns 0 minutes.

    Arguments:
    duration -- content of JSON file for duration
    field in the format (PTxHxM)
    """
    time_regex = re.compile('[A-Z]')
    time_values = time_regex.split(duration)
    time_values = list(filter(None, time_values))
    items = len(time_values)
    if items == 2:
        return int(time_values[0])*60 + int(time_values[1])
    elif items == 1:
        return int(time_values[0])
    return 0
  
def get_level(prep_time):
    """ Return complexity level for each
    recipe.
    ------------------------------------
    Using the following rules:
    Easy: total preparation time less than 
    30 minutes
    Medium: Total prepatarion time between
    30 minutes and an hour. 
    Hard: Total preparation time greater 
    than an hour.

    Please note that recipes with duration 
    of 0 minutes will not be classified.

    Arguments:
    prep_time -- total preparation time in
    minutes.
    """

    if prep_time < 30 and prep_time > 0:
        return 'easy'
    elif prep_time >=30 and prep_time < 60:
        return 'medium'
    return 'hard'

def map_tuples(x):
    """ Returns a tuple with the following format
    (level, (total_preparation_time, 1))
    ---------------------------------------------

    The selected format is necessary in order to use
    the reduceByKey function and returning total 
    time and the amount of recipes per level.

    Arguments:
    x -- input rows of rdd
    """
    return (x[6], (x[5], 1))

def count_times(a, b):
    """ Returns grouped levels with corresponding
    times and number of recipes.
    ----------------------------------------------

    Arguments:
    a -- Grouped data of previous rows
    b -- Current row of rdd
    """
    return (a[0] + b[0], a[1] + b[1])

def get_averages(x):
    """ Returns the average for each level
    by dividing the values of the tuple returned.
    ----------------------------------------------

    Arguments:
    x -- current row of rdd
    """
    return (x[0], x[1][0]/x[1][1])


In [2]:
def create_recipes_rdd():
    """ Creates an RDD of recipes with necessary data.
    --------------------------------------------------
    
    For the purpose of the challenge some columns were
    ommited from the RDD.
    """

    recipes_rows = []
    for index, recipe in enumerate(recipes):
        recipe_name = recipe['name']
        ingredients = recipe['ingredients']
        prep_time = get_duration(recipe['prepTime'])
        cook_time = get_duration(recipe['cookTime'])
        total_prep_time = prep_time + cook_time
        level=get_level(total_prep_time)
        recipes_rows.append((index, recipe_name, ingredients, prep_time, cook_time, total_prep_time, level))
    return sc.parallelize(recipes_rows)



    
recipes_rdd = create_recipes_rdd()

# Uses RDD to process data
recipes_with_beef = recipes_rdd\
                            .filter(lambda x: 'beef' in x[2].lower())
average_times = recipes_rdd\
                            .map(map_tuples)\
                            .reduceByKey(count_times)\
                            .map(get_averages)

# Create column names for CSV files
beef_recipes_columns = ['id', 'name', 'ingredients', 'prepTime', 'cookTime', 'totalPrepTime', 'level']
average_columns = ['difficulty', 'avg_total_cooking_time']

# Convert RDDs into DF in order to prepare CSV writing
beef_recipes_df = recipes_with_beef.toDF(beef_recipes_columns)
average_times_df = average_times.toDF(average_columns)

# Write into CSV files
beef_recipes_df.repartition(1).write.csv('beef', header=True)
average_times_df.repartition(1).write.csv('output', header=True)




