## Initial Setup

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Basics").getOrCreate()

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
1,application_1671253091643_0002,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [2]:
spark

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<pyspark.sql.session.SparkSession object at 0x7ffada6db310>

In [3]:
from pyspark.sql import functions as F

# Import for typecasting columns
from pyspark.sql.types import IntegerType, BooleanType, DateType, FloatType, StringType
from pyspark.sql.types import ArrayType

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

 ## <font color='DeepskyBlue'>Task 01: Read the data </font>

<font color='DeepskyBlue'> Ensure you read the data so that all columns are read with the right data type.
The "right" datatype at this stage are shown in the expected output cell below. </font>

### <font color='DeepskyBlue'>Task 1 </font>

In [4]:
# Task 01 Cell 1 out of 1

raw_recipes_df = (spark.read.csv(
    "s3://aws-emr-resources-362601132439-us-east-1/notebooks/e-6EQ18VWH4ISJJBY68567ZT0TX/RAW_recipes_cleaned.csv",
    header=True,
    inferSchema=True).withColumnRenamed("submitted", "submitted_date"))

#raw_recipes_df = spark.read.csv('https://upgradmaterials.s3.amazonaws.com/RAW_recipes_cleaned.csv', header=True, inferSchema=True).withColumnRenamed("submitted","submitted_date");

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [5]:
raw_recipes_df.printSchema()  # Checking the schema

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- name: string (nullable = true)
 |-- id: integer (nullable = true)
 |-- minutes: integer (nullable = true)
 |-- contributor_id: integer (nullable = true)
 |-- submitted_date: timestamp (nullable = true)
 |-- tags: string (nullable = true)
 |-- nutrition: string (nullable = true)
 |-- n_steps: integer (nullable = true)
 |-- steps: string (nullable = true)
 |-- description: string (nullable = true)
 |-- ingredients: string (nullable = true)
 |-- n_ingredients: integer (nullable = true)

In [6]:
# Submitted is a date and it is not in Date format
raw_recipes_df = (raw_recipes_df.withColumn(
    "submitted_date", raw_recipes_df["submitted_date"].cast("Date")))
raw_recipes_df.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- name: string (nullable = true)
 |-- id: integer (nullable = true)
 |-- minutes: integer (nullable = true)
 |-- contributor_id: integer (nullable = true)
 |-- submitted_date: date (nullable = true)
 |-- tags: string (nullable = true)
 |-- nutrition: string (nullable = true)
 |-- n_steps: integer (nullable = true)
 |-- steps: string (nullable = true)
 |-- description: string (nullable = true)
 |-- ingredients: string (nullable = true)
 |-- n_ingredients: integer (nullable = true)

In [7]:
raw_recipes_df.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+------+-------+--------------+--------------+--------------------+--------------------+-------+--------------------+--------------------+--------------------+-------------+
|                name|    id|minutes|contributor_id|submitted_date|                tags|           nutrition|n_steps|               steps|         description|         ingredients|n_ingredients|
+--------------------+------+-------+--------------+--------------+--------------------+--------------------+-------+--------------------+--------------------+--------------------+-------------+
|arriba   baked wi...|137739|     55|         47892|    2005-09-16|['60-minutes-or-l...|[51.5, 0.0, 13.0,...|     11|['make a choice a...|autumn is my favo...|['winter squash',...|            7|
|a bit different  ...| 31490|     30|         26278|    2002-06-17|['30-minutes-or-l...|[173.4, 18.0, 0.0...|      9|['preheat oven to...|this recipe calls...|['prepared pizza ...|            6|
|all in the kitche...|112

<font color='blue'>Test cases for Task 01</font>

In [8]:
# Code check cell
# Do not edit cells with assert commands
# If an error is shown after running this cell, please recheck your code.

assert raw_recipes_df.count(
) == 231637, "There is a mistake in reading the data."
assert len(
    raw_recipes_df.columns) == 12, "There is a mistake in reading the data."
assert raw_recipes_df.schema["minutes"].dataType == IntegerType(
), "The data types have not been read correctly."
assert raw_recipes_df.schema["tags"].dataType == StringType(
), "The data types have not been read correctly."
assert raw_recipes_df.schema["n_ingredients"].dataType == IntegerType(
), "The data types have not been read correctly."

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

 ## <font color='Teal'>Task 02: Extract individual features from the nutrition column.

## Extract ```nutrition``` values 

In [9]:
# List of nutrition columns

nutrition_column_names = [
    'calories', 'total_fat_PDV', 'sugar_PDV', 'sodium_PDV', 'protein_PDV',
    'saturated_fat_PDV', 'carbohydrates_PDV'
]

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<font color='Teal'> 
As read by the spark compiler, the nutrition column is a string column when it should be an array of float values. Each row in the nutrition column contains seven values. Each value represents nutrition information.</font>

In [10]:
# Task 02 Cell 1 out of 2
# 2.1 - string operations to remove square brakets
raw_recipes_df = raw_recipes_df.withColumn(
    'nutrition', F.regexp_replace('nutrition', '[\\[\\]]', ''));

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [11]:
# Task 02 Cell 2 out of 3
# STEP 2.2 - split the nutrition string into seven individial values.
# Creating an object to split the nutrition column

nutrition_cols_split = F.split(raw_recipes_df['nutrition'], ',')

# A loop to extract individual values from the nutrition column

for col_index, col_name in enumerate(nutrition_column_names):
    # col_index holds the index number of each column, e.g., calories will be 0
    # col_name holds the name of each column

    raw_recipes_df = raw_recipes_df.withColumn(
        col_name,
        nutrition_cols_split.getItem(col_index).cast('float'))

raw_recipes_df.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- name: string (nullable = true)
 |-- id: integer (nullable = true)
 |-- minutes: integer (nullable = true)
 |-- contributor_id: integer (nullable = true)
 |-- submitted_date: date (nullable = true)
 |-- tags: string (nullable = true)
 |-- nutrition: string (nullable = true)
 |-- n_steps: integer (nullable = true)
 |-- steps: string (nullable = true)
 |-- description: string (nullable = true)
 |-- ingredients: string (nullable = true)
 |-- n_ingredients: integer (nullable = true)
 |-- calories: float (nullable = true)
 |-- total_fat_PDV: float (nullable = true)
 |-- sugar_PDV: float (nullable = true)
 |-- sodium_PDV: float (nullable = true)
 |-- protein_PDV: float (nullable = true)
 |-- saturated_fat_PDV: float (nullable = true)
 |-- carbohydrates_PDV: float (nullable = true)

**Test cases for task 02**

In [12]:
# Code check cell
# Do not edit cells with assert commands
# If an error is shown after running this cell, please recheck your code.

assert raw_recipes_df.schema["carbohydrates_PDV"].dataType == FloatType(
), "Recheck your typecasting"
assert raw_recipes_df.collect(
)[123432][14] == 62.0, "The columns have not been split correctly."
assert raw_recipes_df.collect(
)[10000][12] == 60.400001525878906, "The columns have not been split correctly."

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## <font color='Teal'>Task 03: Standardize the nutrition values </font>

## Make nutrition-per-100 calorie columns

By converting the nutrition values from absolute to relative terms, we ensure that portion size is not a factor in the analysis. 


Naming convention: Original column name ```total fat (PDV)```, column name after column ```total_fat_per_100_cal```

<font color='Teal'>
The current values for nutrition columns are not on the same scale. 
Ask is to standardize the nutrition columns using calories as the base of standardization. 

Convert the nutrition from absolute values to per 100 calorie values. 
</font>

<font color='Teal'>
    
We will use the  ```sugar (PDV)``` column to demonstrate the calculations for standardization.  

</font>

<font color='Teal'>
    
**Sample Calculation**

Before transformation: ```sugar (PDV)``` for recipe id 137739 = 13.0

Calories in the recipe recipe id 137739                       = 51.5

Calculation:  
sugar_per_100_cal = 13.0 * 100 / 51.5 

After transformation ```sugar_per_100_cal``` = 25.24
    
</font>

### <font color='blue'>Solution to Task 3 </font>

<font color='blue'>Complete the code in the following cell</font>

In [13]:
# Task 03 Cell 1 out of 1

for col_index, col_name in enumerate(nutrition_column_names):
    if col_name != 'calories':
        nutrition_per_100_cal_col = col_name.replace('_PDV', '_per_100_cal')

        raw_recipes_df = raw_recipes_df.withColumn(
            nutrition_per_100_cal_col,
            F.round(raw_recipes_df[col_name] * 100 / raw_recipes_df['calories'],
                    2))

        # You might end up adding nulls to the data because of our intended transformation.
        # Perform a fill na operation to fill all the nulls with 0s.
        # You must limit the scope of the fill na to the current column only.

        raw_recipes_df = raw_recipes_df.fillna(value=0.0,
                                               subset=nutrition_per_100_cal_col)
raw_recipes_df.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+------+-------+--------------+--------------+--------------------+--------------------+-------+--------------------+--------------------+--------------------+-------------+--------+-------------+---------+----------+-----------+-----------------+-----------------+---------------------+-----------------+------------------+-------------------+-------------------------+-------------------------+
|                name|    id|minutes|contributor_id|submitted_date|                tags|           nutrition|n_steps|               steps|         description|         ingredients|n_ingredients|calories|total_fat_PDV|sugar_PDV|sodium_PDV|protein_PDV|saturated_fat_PDV|carbohydrates_PDV|total_fat_per_100_cal|sugar_per_100_cal|sodium_per_100_cal|protein_per_100_cal|saturated_fat_per_100_cal|carbohydrates_per_100_cal|
+--------------------+------+-------+--------------+--------------+--------------------+--------------------+-------+--------------------+--------------------+-----

**Test cases for Task 03**

In [14]:
# total fat check for id 28881
assert raw_recipes_df.filter("id == 28881").select(
    'total_fat_per_100_cal').first(
    )[0] == 0, "total_fat_per_100_cal for recipe 28881 should be 0"

# total fat check for id 112140
assert round(
    raw_recipes_df.filter("id == 112140").select('total_fat_per_100_cal').first(
    )[0]) == 8, "total_fat_per_100_cal for recipe 112140 should be 8"

# checking for nulls
for c in [
        'total_fat_per_100_cal', 'sugar_per_100_cal', 'sodium_per_100_cal',
        'protein_per_100_cal', 'saturated_fat_per_100_cal',
        'carbohydrates_per_100_cal'
]:
    assert raw_recipes_df.select(
        F.count(F.when(
            F.isnan(c) | F.col(c).isNull(),
            c)).alias(c)).collect()[0][0] == 0, "There are Nulls in the data"

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## <font color='Teal'>Task 04: Convert the tags column from a string to an array of strings </font>

<font color='Teal'>
    
Currently, the tags column is a string column but holds an array of strings. 

Your task is to convert the tags columns from a string to an array of strings. 
    
</font>

<font color='Teal'>

Remove ```[``` ```]``` ```'``` punctuation marks from the tags column. 
Split the tags column based on the comma delimiter. 
    

</font>

<font color='Teal'> We have included some test cases given below. You can use them to check if you have completed the task correctly.  </font>

### <font color='blue'>Solution to Task 4 </font>

In [15]:
# Task 04 Cell 1 out of 1

raw_recipes_df = (
                    raw_recipes_df.withColumn('tags', F.regexp_replace('tags', "[\\]\\[' ]",''))
                                .withColumn('tags', F.split(F.col('tags'), ','))
                 )
raw_recipes_df.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- name: string (nullable = true)
 |-- id: integer (nullable = true)
 |-- minutes: integer (nullable = true)
 |-- contributor_id: integer (nullable = true)
 |-- submitted_date: date (nullable = true)
 |-- tags: array (nullable = true)
 |    |-- element: string (containsNull = false)
 |-- nutrition: string (nullable = true)
 |-- n_steps: integer (nullable = true)
 |-- steps: string (nullable = true)
 |-- description: string (nullable = true)
 |-- ingredients: string (nullable = true)
 |-- n_ingredients: integer (nullable = true)
 |-- calories: float (nullable = true)
 |-- total_fat_PDV: float (nullable = true)
 |-- sugar_PDV: float (nullable = true)
 |-- sodium_PDV: float (nullable = true)
 |-- protein_PDV: float (nullable = true)
 |-- saturated_fat_PDV: float (nullable = true)
 |-- carbohydrates_PDV: float (nullable = true)
 |-- total_fat_per_100_cal: double (nullable = false)
 |-- sugar_per_100_cal: double (nullable = false)
 |-- sodium_per_100_cal: double (nullable = false)
 |

**Test cases for Task 04**

In [16]:
raw_recipes_df.schema["tags"].dataType

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

ArrayType(StringType(), False)

In [17]:
# Code check cell
# Do not edit cells with assert commands
# If an error is shown after running this cell, please recheck your code.

assert raw_recipes_df.schema["tags"].dataType == ArrayType(
    StringType(), False), "You have not split the string into an array."
assert raw_recipes_df.collect()[2][5] == [
    'time-to-make', 'course', 'preparation', 'main-dish', 'chili',
    'crock-pot-slow-cooker', 'dietary', 'equipment', '4-hours-or-less'
], "Recheck your string cleaning and splitting operations.";

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### <font color='Teal'>If all test cases pass task 04 ends </font>

## <font color='Teal'>Task 05: Read the second data file </font>

## Join Recipe Data to Review Data

In [18]:
# Reading the second data set.
# keep this cell unedited

raw_ratings_df = (spark.read.csv(
    "s3://aws-emr-resources-362601132439-us-east-1/notebooks/e-6EQ18VWH4ISJJBY68567ZT0TX/RAW_interactions_cleaned.csv",
    header=True,
    inferSchema=True).withColumnRenamed("date", "review_date"))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [19]:
# Review _date is in string format, which we should convert  to date.

raw_ratings_df = raw_ratings_df.withColumn('review_date',
                                           F.col('review_date').cast('date'))

raw_ratings_df.printSchema()
raw_ratings_df.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- user_id: integer (nullable = true)
 |-- recipe_id: integer (nullable = true)
 |-- review_date: date (nullable = true)
 |-- rating: integer (nullable = true)
 |-- review: string (nullable = true)

+-------+---------+-----------+------+--------------------+
|user_id|recipe_id|review_date|rating|              review|
+-------+---------+-----------+------+--------------------+
|  38094|    40893| 2003-02-17|     4|Great with a sala...|
|1293707|    40893| 2011-12-21|     5|So simple  so del...|
|   8937|    44394| 2002-12-01|     4|This worked very ...|
| 126440|    85009| 2010-02-27|     5|I made the Mexica...|
|  57222|    85009| 2011-10-01|     5|Made the cheddar ...|
+-------+---------+-----------+------+--------------------+
only showing top 5 rows

In [20]:
# Code check cell
# Do not edit cells with assert commands
# If an error is shown after running this cell, please recheck your code.

assert raw_ratings_df.count(
) == 1132367, "There is a mistake in reading the data."
assert len(
    raw_ratings_df.columns) == 5, "There is a mistake in reading the data."

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### <font color='blue'>Solution to Task 5 </font>

<font color='blue'>Complete the code in the following cell</font>

In [21]:
# Task 05 Cell 1 out of 1

interaction_level_df = raw_ratings_df.join(
    raw_recipes_df, raw_recipes_df.id == raw_ratings_df.recipe_id, "inner")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

**Test cases for Task 05**

In [22]:
# Code check cell
# Do not edit cells with assert commands
# If an error is shown after running this cell, please recheck your code.

assert (interaction_level_df.count(), len(
    interaction_level_df.columns)) == (1132367,
                                       30), "The type of join is incorrect"

list1 = raw_ratings_df.select('recipe_id').collect()
list2 = raw_recipes_df.select('id').collect()
exclusive_set = set(list1) - set(list2)

assert len(exclusive_set
          ) == 0, "There is a mistake in reading one of the two data files."

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### <font color='Teal'>If all test cases pass task 05 ends </font>

## <font color='Teal'>Task 06:  Create time-based features</font>


<font color='Teal'>

Use review date and submission date to derive new features:
1. ```days_since_submission_on_review_date``` Number of days between the recipe submission and the current review.  
2. ```months_since_submission_on_review_date``` Number of months between the recipe submission and the current review. 
3. ```years_since_submission_on_review_date```Number of years between the recipe submission and the current review. 

</font>

### <font color='blue'>Solution to Task 6 </font>

<font color='blue'>Complete the code in the following cell</font>

In [23]:
interaction_level_df.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- user_id: integer (nullable = true)
 |-- recipe_id: integer (nullable = true)
 |-- review_date: date (nullable = true)
 |-- rating: integer (nullable = true)
 |-- review: string (nullable = true)
 |-- name: string (nullable = true)
 |-- id: integer (nullable = true)
 |-- minutes: integer (nullable = true)
 |-- contributor_id: integer (nullable = true)
 |-- submitted_date: date (nullable = true)
 |-- tags: array (nullable = true)
 |    |-- element: string (containsNull = false)
 |-- nutrition: string (nullable = true)
 |-- n_steps: integer (nullable = true)
 |-- steps: string (nullable = true)
 |-- description: string (nullable = true)
 |-- ingredients: string (nullable = true)
 |-- n_ingredients: integer (nullable = true)
 |-- calories: float (nullable = true)
 |-- total_fat_PDV: float (nullable = true)
 |-- sugar_PDV: float (nullable = true)
 |-- sodium_PDV: float (nullable = true)
 |-- protein_PDV: float (nullable = true)
 |-- saturated_fat_PDV: float (nullable = true)
 |-- 

In [24]:
interaction_level_df = (interaction_level_df.withColumn(
    'days_since_submission_on_review_date',
    F.datediff(
        interaction_level_df.review_date,
        interaction_level_df.submitted_date)).withColumn(
            'months_since_submission_on_review_date',
            F.months_between(
                interaction_level_df.review_date,
                interaction_level_df.submitted_date)).withColumn(
                    'years_since_submission_on_review_date',
                    F.months_between(interaction_level_df.review_date,
                                     interaction_level_df.submitted_date) / 12))
interaction_level_df.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+---------+-----------+------+--------------------+--------------------+---+-------+--------------+--------------+--------------------+--------------------+-------+--------------------+--------------------+--------------------+-------------+--------+-------------+---------+----------+-----------+-----------------+-----------------+---------------------+-----------------+------------------+-------------------+-------------------------+-------------------------+------------------------------------+--------------------------------------+-------------------------------------+
|user_id|recipe_id|review_date|rating|              review|                name| id|minutes|contributor_id|submitted_date|                tags|           nutrition|n_steps|               steps|         description|         ingredients|n_ingredients|calories|total_fat_PDV|sugar_PDV|sodium_PDV|protein_PDV|saturated_fat_PDV|carbohydrates_PDV|total_fat_per_100_cal|sugar_per_100_cal|sodium_per_100_cal|protein_per_1

**Test cases for Task 06**

In [25]:
# Code check cell
# Do not edit cells with assert commands
# If an error is shown after running this cell, please recheck your code.

assert interaction_level_df.schema[
    "days_since_submission_on_review_date"].dataType == IntegerType()

assert (interaction_level_df.filter((interaction_level_df.user_id == 428885) &
                                    (interaction_level_df.recipe_id == 335241)).
        select('days_since_submission_on_review_date').collect()[0][0]) == 77
assert (interaction_level_df.filter((interaction_level_df.user_id == 2025676) &
                                    (interaction_level_df.recipe_id == 94265)).
        select('months_since_submission_on_review_date').collect()[0][0]
       ) == 153.22580645
assert (interaction_level_df.filter((interaction_level_df.user_id == 338588) &
                                    (interaction_level_df.recipe_id == 21859)).
        select('years_since_submission_on_review_date').collect()[0][0]
       ) == 4.564516129166667

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### <font color='Teal'>If all test cases pass task 06 ends</font>

## Save the data we have created so far in a parquet file. 

In [26]:
interaction_level_df.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- user_id: integer (nullable = true)
 |-- recipe_id: integer (nullable = true)
 |-- review_date: date (nullable = true)
 |-- rating: integer (nullable = true)
 |-- review: string (nullable = true)
 |-- name: string (nullable = true)
 |-- id: integer (nullable = true)
 |-- minutes: integer (nullable = true)
 |-- contributor_id: integer (nullable = true)
 |-- submitted_date: date (nullable = true)
 |-- tags: array (nullable = true)
 |    |-- element: string (containsNull = false)
 |-- nutrition: string (nullable = true)
 |-- n_steps: integer (nullable = true)
 |-- steps: string (nullable = true)
 |-- description: string (nullable = true)
 |-- ingredients: string (nullable = true)
 |-- n_ingredients: integer (nullable = true)
 |-- calories: float (nullable = true)
 |-- total_fat_PDV: float (nullable = true)
 |-- sugar_PDV: float (nullable = true)
 |-- sodium_PDV: float (nullable = true)
 |-- protein_PDV: float (nullable = true)
 |-- saturated_fat_PDV: float (nullable = true)
 |-- 

In [27]:
assert (interaction_level_df.count(),
        len(interaction_level_df.columns)) == (1132367, 33)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [29]:
## Write the raw_recipes_df
interaction_level_df.write\
                    .mode('overwrite')\
                    .parquet("s3://aws-emr-resources-362601132439-us-east-1/notebooks/e-6EQ18VWH4ISJJBY68567ZT0TX/Data/interaction_level_df_processed.parquet")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…