In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
                    .master("local[1]") \
                    .appName("test") \
                    .getOrCreate()

----------------------------------------
Exception occurred during processing of request from ('127.0.0.1', 38270)
Traceback (most recent call last):
  File "/opt/conda/envs/vscode_pyspark/lib/python3.11/socketserver.py", line 317, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/opt/conda/envs/vscode_pyspark/lib/python3.11/socketserver.py", line 348, in process_request
    self.finish_request(request, client_address)
  File "/opt/conda/envs/vscode_pyspark/lib/python3.11/socketserver.py", line 361, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/opt/conda/envs/vscode_pyspark/lib/python3.11/socketserver.py", line 755, in __init__
    self.handle()
  File "/opt/conda/envs/vscode_pyspark/lib/python3.11/site-packages/pyspark/accumulators.py", line 295, in handle
    poll(accum_updates)
  File "/opt/conda/envs/vscode_pyspark/lib/python3.11/site-packages/pyspark/accumulators.py", line 267, in poll
    if self.rfile i

In [5]:
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType, FloatType
import pandas as pd

@F.pandas_udf("int")
def parse_iso8601_duration(str_duration : pd.Series) -> pd.Series:
    return str_duration.apply(lambda duration: (pd.Timedelta(duration).seconds / 60))

def extract_and_transform_data(path:str):
    df = spark.read.schema(recipe_schema).json(path)
    return df.select(F.col("name"),
                     F.col("ingredients"),
                     F.col("url"),
                     F.col("image"),
                     F.col("cookTime"),
                     F.col("recipeYield"),
                     F.col("datePublished"),
                     F.col("prepTime"),
                     F.col("description")) \
             .filter((df.ingredients.like('%beef%')) | (df.ingredients.like('%Beef%'))) \
             .withColumn("cookTime_in_minutes", parse_iso8601_duration(F.col("cookTime"))) \
             .withColumn("prepTime_in_minutes", parse_iso8601_duration(F.col("prepTime")))

def add_totals(df):
    return df.withColumn("Total_Cooking_Time", df.cookTime_in_minutes + df.prepTime_in_minutes)

recipe_schema = StructType(fields = [StructField("name", StringType(), True),
                                     StructField("ingredients", StringType(), True),
                                     StructField("url", StringType(), True),
                                     StructField("image", StringType(), True),
                                     StructField("cookTime", StringType(), True),
                                     StructField("recipeYield", StringType(), True),
                                     StructField("datePublished", DateType(), True),
                                     StructField("prepTime", StringType(), True),
                                     StructField("description", StringType(), True)])

In [21]:
sample_df = extract_and_transform_data('input_files/prod_data/recipes-000.json')
totaled_df = add_totals(sample_df)
adjusted_df = totaled_df.withColumn("Difficulty", F.when(totaled_df.Total_Cooking_Time < 30, 'Easy')
                                                   .when((totaled_df.Total_Cooking_Time > 30) & (totaled_df.Total_Cooking_Time < 60), 'Medium')
                                                   .when(totaled_df.Total_Cooking_Time > 60, 'Hard'))

totaled_df.createOrReplaceTempView('Table')
sql_df = spark.sql("SELECT *, CASE WHEN Total_Cooking_Time < 30 THEN 'Easy' WHEN Total_Cooking_Time BETWEEN 30 AND 60 THEN 'Medium' WHEN Total_Cooking_Time > 60 THEN 'Hard' END AS Difficulty FROM Table")
print(sql_df.show())

# adjusted_df.createOrReplaceTempView('Table2')
# final_df = spark.sql("SELECT Difficulty, ROUND(AVG(Total_Cooking_Time),2) AS Average_Cooking_Time_Per_Difficulty FROM Table2 GROUP BY Difficulty")
# print(final_df.count())
# final_df.show()



+--------------------+--------------------+--------------------+--------------------+--------+-----------+-------------+--------+--------------------+-------------------+-------------------+------------------+----------+
|                name|         ingredients|                 url|               image|cookTime|recipeYield|datePublished|prepTime|         description|cookTime_in_minutes|prepTime_in_minutes|Total_Cooking_Time|Difficulty|
+--------------------+--------------------+--------------------+--------------------+--------+-----------+-------------+--------+--------------------+-------------------+-------------------+------------------+----------+
|French Onion Soup...|2 Tablespoons But...|http://thepioneer...|http://static.the...|   PT30M|          8|   2010-11-23|   PT20M|Important note: t...|                 30|                 20|                50|    Medium|
|          Baked Ziti|2 Tablespoons Oli...|http://thepioneer...|http://static.the...|   PT45M|         12|   2012-11

In [22]:
print(adjusted_df.show())

+--------------------+--------------------+--------------------+--------------------+--------+-----------+-------------+--------+--------------------+-------------------+-------------------+------------------+----------+
|                name|         ingredients|                 url|               image|cookTime|recipeYield|datePublished|prepTime|         description|cookTime_in_minutes|prepTime_in_minutes|Total_Cooking_Time|Difficulty|
+--------------------+--------------------+--------------------+--------------------+--------+-----------+-------------+--------+--------------------+-------------------+-------------------+------------------+----------+
|French Onion Soup...|2 Tablespoons But...|http://thepioneer...|http://static.the...|   PT30M|          8|   2010-11-23|   PT20M|Important note: t...|                 30|                 20|                50|    Medium|
|          Baked Ziti|2 Tablespoons Oli...|http://thepioneer...|http://static.the...|   PT45M|         12|   2012-11