## Prepare imports and spark

In [2]:
import argparse
import logging
import os
import shutil
import isodate

import findspark
findspark.init()

import pyspark
from pyspark import SparkContext
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import udf, lit

sc = SparkContext("local[*]").getOrCreate()
sqlc = SQLContext(sc)

## Files

In [3]:
# hardcoded paths
in_path = '/home/fpopic/Projects/PycharmProjects/HelloFreshChallenge/data/recipes.json'
out_path = '/home/fpopic/Projects/PycharmProjects/HelloFreshChallenge/data/output.parquet'

# remove previous hadoop file (directory) if already exists
if os.path.isdir(out_path):
    shutil.rmtree(out_path)

## Load input json file and infer DataFrame schema

In [4]:
recipes_df = sqlc.read.json(in_path).cache()

recipes_df.printSchema()
print("Number of loaded recipes:", recipes_df.count())

root
 |-- cookTime: string (nullable = true)
 |-- datePublished: string (nullable = true)
 |-- description: string (nullable = true)
 |-- image: string (nullable = true)
 |-- ingredients: string (nullable = true)
 |-- name: string (nullable = true)
 |-- prepTime: string (nullable = true)
 |-- recipeYield: string (nullable = true)
 |-- url: string (nullable = true)

Number of loaded recipes: 1042


## Prepare some UDFs for DataFrame transformations

In [5]:
@udf(returnType=BooleanType())
def contains_ingredient(recipe, ingredient):
    if recipe is not None:
        return ingredient in recipe.lower()
    return False

def parse_isoduration(iso_duration):
    """author: Ed Finkler"""
    """parse the given iso8601 duration string into a python timedelta object"""
    delta = None
    try:
        delta = isodate.parse_duration(iso_duration)
    except Exception as e:
        logging.warning(e.message)
    return delta  

@udf
def compute_dificulty(prepTime, cookTime):
    prepTime = parse_isoduration(prepTime)
    cookTime = parse_isoduration(cookTime)
    total = prepTime.seconds / 60.0 + cookTime.seconds / 60.0
    if total < 30.0:
        return "Easy"
    if 30.0 <= total <= 60.0:
        return "Medium"
    if total > 60.0:
        return "Hard"
    return "Unknown"

## Apply transformations and print schema

In [6]:
beef_recipes_df = recipes_df \
    .filter(contains_ingredient('ingredients', lit('beef'))) \
    .withColumn('difficulty', compute_dificulty('prepTime', 'cookTime'))
    
beef_recipes_df.printSchema()

beef_count = beef_recipes_df.count()
print("Number of recipes that use beef:", beef_count)

root
 |-- cookTime: string (nullable = true)
 |-- datePublished: string (nullable = true)
 |-- description: string (nullable = true)
 |-- image: string (nullable = true)
 |-- ingredients: string (nullable = true)
 |-- name: string (nullable = true)
 |-- prepTime: string (nullable = true)
 |-- recipeYield: string (nullable = true)
 |-- url: string (nullable = true)
 |-- difficulty: string (nullable = true)

Number of recipes that use beef: 47


## Sanity check before writing

In [7]:
beef_recipes_df \
    .select('name','prepTime', 'cookTime', 'difficulty') \
    .show(beef_count, False)

+---------------------------------------------------+--------+--------+----------+
|name                                               |prepTime|cookTime|difficulty|
+---------------------------------------------------+--------+--------+----------+
|Patty Melts                                        |PT10M   |PT25M   |Medium    |
|Spicy Stewed Beef with Creamy Cheddar Grits        |PT20M   |PT3H    |Hard      |
|Pork Chops with Garlic and Wine                    |PT5M    |PT25M   |Medium    |
|Salisbury Steak, Mashed Potatoes, and Peas         |PT10M   |PT20M   |Medium    |
|Hot &amp; Spicy Italian Drip Beef                  |PT5M    |PT4H    |Hard      |
|Surf &amp; Turf Cajun Pasta                        |PT20M   |PT25M   |Medium    |
|Spaghetti Sauce                                    |PT20M   |PT1H    |Hard      |
|Pepperoni Pizza Burgers                            |PT5M    |PT10M   |Easy      |
|Supreme Pizza Burgers                              |PT5M    |PT15M   |Easy      |
|Sho

## Write to parquet file

In [8]:
beef_recipes_df.write.parquet(out_path)