# <font color = Black >  Recipe Recommender Assignment EDA </font>
### <font color = Black > -By Paras Singh, Shantam Garg, Sudhanshu Mishra </font>

## <font color = Black >  Importing libraries  </font>

In [1]:
from pyspark.sql import SparkSession

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Basics").getOrCreate()

In [3]:
spark

In [4]:
from pyspark.sql import functions as F

# Import for typecasting columns
from pyspark.sql.types import IntegerType,BooleanType,DateType,FloatType,StringType
from pyspark.sql.types import ArrayType

## <font color = blue >  Task 01: Read the data  </font>

### Solution to Task 01 

In [5]:
# Task 01 Cell 1 out of 1

Raw_Recipe_Data = (spark.read.csv("D:\RAW_recipes_cleaned.csv", inferSchema = True, header = True))


In [6]:
Raw_Recipe_Data.show(5)

+--------------------+------+-------+--------------+----------+--------------------+--------------------+-------+--------------------+--------------------+--------------------+-------------+
|                name|    id|minutes|contributor_id| submitted|                tags|           nutrition|n_steps|               steps|         description|         ingredients|n_ingredients|
+--------------------+------+-------+--------------+----------+--------------------+--------------------+-------+--------------------+--------------------+--------------------+-------------+
|arriba   baked wi...|137739|     55|         47892|2005-09-16|['60-minutes-or-l...|[51.5, 0.0, 13.0,...|     11|['make a choice a...|autumn is my favo...|['winter squash',...|            7|
|a bit different  ...| 31490|     30|         26278|2002-06-17|['30-minutes-or-l...|[173.4, 18.0, 0.0...|      9|['preheat oven to...|this recipe calls...|['prepared pizza ...|            6|
|all in the kitche...|112140|    130|        

In [7]:
Raw_Recipe_Data.printSchema()

root
 |-- name: string (nullable = true)
 |-- id: integer (nullable = true)
 |-- minutes: integer (nullable = true)
 |-- contributor_id: integer (nullable = true)
 |-- submitted: date (nullable = true)
 |-- tags: string (nullable = true)
 |-- nutrition: string (nullable = true)
 |-- n_steps: integer (nullable = true)
 |-- steps: string (nullable = true)
 |-- description: string (nullable = true)
 |-- ingredients: string (nullable = true)
 |-- n_ingredients: integer (nullable = true)



### Test cases for Task 01

In [8]:
# Code check cells
# Do not edit cells with assert commands
# If an error is shown after running this cell, please recheck your code.  

assert Raw_Recipe_Data.count() == 231637
assert len(Raw_Recipe_Data.columns) == 12
assert Raw_Recipe_Data.schema["minutes"].dataType == IntegerType()
assert Raw_Recipe_Data.schema["tags"].dataType == StringType()
assert Raw_Recipe_Data.schema["n_ingredients"].dataType == IntegerType()

In [9]:
Raw_Recipe_Data.count() == 231637

True

In [10]:
len(Raw_Recipe_Data.columns) == 12

True

In [11]:
Raw_Recipe_Data.schema["minutes"].dataType == IntegerType()

True

In [12]:
Raw_Recipe_Data.schema["tags"].dataType == StringType()

True

In [13]:
Raw_Recipe_Data.schema["n_ingredients"].dataType == IntegerType()

True

### Task 1 Completed

## Extract nutrition values

In [14]:
# List of nutrition columns

Nutrition_Col_Names = ['calories',
                          'total_fat_PDV',
                          'sugar_PDV',
                          'sodium_PDV',
                          'protein_PDV',
                          'saturated_fat_PDV',
                          'carbohydrates_PDV']

## <font color = blue >  Task 02: Extract individual features from the nutrition column  </font>


As read by the spark compiler, the nutrition column is a string column when it should be an array of float values. Each row in the nutrition column contains seven values. Each value represents nutrition information.
Your task is to separate the array into seven individual columns.

Write a code that takes in the nutrition column from raw_recipes_df dataframe, and extracts individual values into seven different columns named calories, total fat (PDV), sugar (PDV), sodium (PDV), protein (PDV), saturated fat (PDV), and carbohydrates (PDV).

### Solution to Task 02 

In [15]:
# Task 02 Cell 1 out of 2
# 2.1 - string operations to remove square brakets

Raw_Recipe_Data.select('nutrition').show(5)

+--------------------+
|           nutrition|
+--------------------+
|[51.5, 0.0, 13.0,...|
|[173.4, 18.0, 0.0...|
|[269.8, 22.0, 32....|
|[368.1, 17.0, 10....|
|[352.9, 1.0, 337....|
+--------------------+
only showing top 5 rows



In [16]:
# STEP 2.1 string operations to remove square brakets in 'nutrition' column


Raw_Recipe_Data = (Raw_Recipe_Data
                  .withColumn('nutrition',(F.regexp_replace("nutrition","[\[\]]",""))))

In [17]:
# Task 02 Cell 2 out of 3
# STEP 2.2 - split the neutrition string into seven individial values. 
# Create an object to split the nutrition column

import pyspark

Nutrition_Col_Split = pyspark.sql.functions.split(Raw_Recipe_Data['nutrition'],',')
for col_index, col_name in enumerate(Nutrition_Col_Names):
    Raw_Recipe_Data = (Raw_Recipe_Data.withColumn(col_name, Nutrition_Col_Split.getItem(col_index).cast("float")))

### Test cases for task 02

In [18]:
# Code check cell
# Do not edit cells with assert commands
# If an error is shown after running this cell, please recheck your code.  

assert Raw_Recipe_Data.schema["carbohydrates_PDV"].dataType == FloatType(), "Recheck your typecasting"
assert Raw_Recipe_Data.collect()[123432][14] == 62.0, "The columns have not been split correctly."
assert Raw_Recipe_Data.collect()[10000][12] == 60.400001525878906, "The columns have not been split correctly."

In [19]:
Raw_Recipe_Data.schema["carbohydrates_PDV"].dataType == FloatType()

True

In [20]:
Raw_Recipe_Data.collect()[123432][14] == 62.0

True

In [21]:
Raw_Recipe_Data.collect()[10000][12] == 60.400001525878906

True

### Task 2 Completed

## Make nutrition-per-100 calorie columns
1) By converting the nutrition values from absolute to relative terms, we ensure that portion size is not a factor in the analysis.

2) Naming convention: Original column name total fat (PDV), column name after column total_fat_per_100_cal

## <font color = blue >  Task 03: Standardize the nutrition values  </font>

1) The current values for nutrition columns are not on the same scale. Your task is to standardize the nutrition columns using calories as the base of standardization.

2) Convert the nutrition from absolute values to per 100 calorie values.

*We will use the sugar (PDV) column to demonstrate the calculations for standardization.*

#### Sample Calculation

Before transformation: sugar (PDV) for recipe id 137739 = 13.0

Calories in the recipe recipe id 137739 = 51.5

Calculation:

sugar_per_100_cal = 13.0 * 100 / 51.5

After transformation sugar_per_100_cal = 25.24

### Solution to task 03

In [22]:
# Task 03 Cell 1 out of 1

for nutrition_col in Nutrition_Col_Names:# loop over each of the newly created nutrition columns 
    if nutrition_col != "calories":
        # the calories column should not be a part of the transformation exercise
        # following code will name the new columns 
        nutrition_per_100_cal_col = (nutrition_col
                                 .replace('_PDV','')
                                 +'_per_100_cal')
        Raw_Recipe_Data = Raw_Recipe_Data.withColumn(nutrition_per_100_cal_col,
                                               Raw_Recipe_Data[nutrition_col]*100/Raw_Recipe_Data["calories"]
                                                # pyspark code to recreate the intended transformation 
                                                  )
        
        # You might end up adding nulls to the data because of our intended transformation. 
        # Perform a fill na operation to fill all the nulls with 0s. 
        # You must limit the scope of the fill na to the current column only. 
        
        Raw_Recipe_Data = Raw_Recipe_Data.fillna(value=0,subset=[nutrition_per_100_cal_col]) 
        # pyspark code to fill nulls with 0 in only the current nutrition_per_100_cal_col         

### Test cases for Task 03

In [23]:
# total fat check for id 28881
assert Raw_Recipe_Data.filter("id == 28881").select('total_fat_per_100_cal').first()[0] == 0, "total_fat_per_100_cal for recipe 28881 should be 0"

# total fat check for id 112140
assert round(Raw_Recipe_Data.filter("id == 112140").select('total_fat_per_100_cal').first()[0]) == 8, "total_fat_per_100_cal for recipe 112140 should be 8"

# checking for nulls
for x in ['total_fat_per_100_cal','sugar_per_100_cal','sodium_per_100_cal','protein_per_100_cal',
                          'saturated_fat_per_100_cal','carbohydrates_per_100_cal']:
    assert Raw_Recipe_Data.select(F.count(F.when(F.isnan(x) | F.col(x).isNull(), x)).alias(x)).collect()[0][0] == 0, "There are Nulls in the data"

In [24]:
Raw_Recipe_Data.filter("id == 28881").select('total_fat_per_100_cal').first()[0] == 0

True

In [25]:
round(Raw_Recipe_Data.filter("id == 112140").select('total_fat_per_100_cal').first()[0]) == 8

True

In [26]:
for y in ['total_fat_per_100_cal','sugar_per_100_cal','sodium_per_100_cal','protein_per_100_cal',
                          'saturated_fat_per_100_cal','carbohydrates_per_100_cal']:
    assert Raw_Recipe_Data.select(F.count(F.when(F.isnan(y) | F.col(y).isNull(), y)).alias(y)).collect()[0][0] == 0

In [27]:
Raw_Recipe_Data.printSchema()

root
 |-- name: string (nullable = true)
 |-- id: integer (nullable = true)
 |-- minutes: integer (nullable = true)
 |-- contributor_id: integer (nullable = true)
 |-- submitted: date (nullable = true)
 |-- tags: string (nullable = true)
 |-- nutrition: string (nullable = true)
 |-- n_steps: integer (nullable = true)
 |-- steps: string (nullable = true)
 |-- description: string (nullable = true)
 |-- ingredients: string (nullable = true)
 |-- n_ingredients: integer (nullable = true)
 |-- calories: float (nullable = true)
 |-- total_fat_PDV: float (nullable = true)
 |-- sugar_PDV: float (nullable = true)
 |-- sodium_PDV: float (nullable = true)
 |-- protein_PDV: float (nullable = true)
 |-- saturated_fat_PDV: float (nullable = true)
 |-- carbohydrates_PDV: float (nullable = true)
 |-- total_fat_per_100_cal: double (nullable = false)
 |-- sugar_per_100_cal: double (nullable = false)
 |-- sodium_per_100_cal: double (nullable = false)
 |-- protein_per_100_cal: double (nullable = false)
 |-

### Task 3 Completed

## <font color = blue >  Task 04: Convert the tags column from a string to an array of strings  </font>

1) Currently, the tags column is a string column but holds an array of strings.

2) Your task is to convert the tags columns from a string to an array of strings.

3) Remove [ ] ' punctuation marks from the tags column. Split the tags column based on the comma delimiter

### Solution to Task 04

In [28]:
Raw_Recipe_Data = (Raw_Recipe_Data
                    .withColumn("tags", F.concat_ws(",", "tags"))
                    .withColumn("tags", F.regexp_replace("tags", "[\\[\\]']", ""))
                    .withColumn("tags", F.split("tags", ", "))
                   )


In [29]:
Raw_Recipe_Data.printSchema()

root
 |-- name: string (nullable = true)
 |-- id: integer (nullable = true)
 |-- minutes: integer (nullable = true)
 |-- contributor_id: integer (nullable = true)
 |-- submitted: date (nullable = true)
 |-- tags: array (nullable = false)
 |    |-- element: string (containsNull = false)
 |-- nutrition: string (nullable = true)
 |-- n_steps: integer (nullable = true)
 |-- steps: string (nullable = true)
 |-- description: string (nullable = true)
 |-- ingredients: string (nullable = true)
 |-- n_ingredients: integer (nullable = true)
 |-- calories: float (nullable = true)
 |-- total_fat_PDV: float (nullable = true)
 |-- sugar_PDV: float (nullable = true)
 |-- sodium_PDV: float (nullable = true)
 |-- protein_PDV: float (nullable = true)
 |-- saturated_fat_PDV: float (nullable = true)
 |-- carbohydrates_PDV: float (nullable = true)
 |-- total_fat_per_100_cal: double (nullable = false)
 |-- sugar_per_100_cal: double (nullable = false)
 |-- sodium_per_100_cal: double (nullable = false)
 |-- p

In [30]:
from pyspark.sql.types import ArrayType, StringType
from pyspark.sql.functions import col

# Check if the "tags" column is already an array type
if isinstance(Raw_Recipe_Data.schema["tags"].dataType, ArrayType):
    print("The 'tags' column is already an array. Skipping splitting operation.")
else:
    # Split the "tags" column by commas and trim whitespace
    Raw_Recipe_Data = Raw_Recipe_Data.withColumn("tags_array", split(col("tags"), ",").cast(ArrayType(StringType(), True)))
    print("Splitting operation performed.")

# Check the schema after potentially performing the splitting operation
Raw_Recipe_Data.printSchema()


The 'tags' column is already an array. Skipping splitting operation.
root
 |-- name: string (nullable = true)
 |-- id: integer (nullable = true)
 |-- minutes: integer (nullable = true)
 |-- contributor_id: integer (nullable = true)
 |-- submitted: date (nullable = true)
 |-- tags: array (nullable = false)
 |    |-- element: string (containsNull = false)
 |-- nutrition: string (nullable = true)
 |-- n_steps: integer (nullable = true)
 |-- steps: string (nullable = true)
 |-- description: string (nullable = true)
 |-- ingredients: string (nullable = true)
 |-- n_ingredients: integer (nullable = true)
 |-- calories: float (nullable = true)
 |-- total_fat_PDV: float (nullable = true)
 |-- sugar_PDV: float (nullable = true)
 |-- sodium_PDV: float (nullable = true)
 |-- protein_PDV: float (nullable = true)
 |-- saturated_fat_PDV: float (nullable = true)
 |-- carbohydrates_PDV: float (nullable = true)
 |-- total_fat_per_100_cal: double (nullable = false)
 |-- sugar_per_100_cal: double (nullab

In [31]:
Raw_Recipe_Data.schema["tags"].dataType == ArrayType(StringType(), False)

True

### Task 4 Completed

## Join Recipe Data to Review Data

In [32]:
# Reading the second data set. 
# keep this cell unedited

Raw_Ratings_Data = (spark.read.csv("C:\\Users\\OWNER\\Downloads\\RAW_interactions_cleaned.csv", 
                                 header=True, 
                                 inferSchema= True)
                  .withColumn("review_date",  F.col("date"))
                  .drop(F.col("date"))
                  )

In [33]:
Raw_Ratings_Data.printSchema()

root
 |-- user_id: integer (nullable = true)
 |-- recipe_id: integer (nullable = true)
 |-- rating: integer (nullable = true)
 |-- review: string (nullable = true)
 |-- review_date: date (nullable = true)



In [34]:
# Code check cell
# Do not edit cells with assert commands
# If an error is shown after running this cell, please recheck your code.  

assert Raw_Ratings_Data.count() == 1132367, "There is a mistake in reading the data."
assert len(Raw_Ratings_Data.columns) == 5, "There is a mistake in reading the data."

In [35]:
Raw_Ratings_Data.show(5)

+-------+---------+------+--------------------+-----------+
|user_id|recipe_id|rating|              review|review_date|
+-------+---------+------+--------------------+-----------+
|  38094|    40893|     4|Great with a sala...| 2003-02-17|
|1293707|    40893|     5|So simple  so del...| 2011-12-21|
|   8937|    44394|     4|This worked very ...| 2002-12-01|
| 126440|    85009|     5|I made the Mexica...| 2010-02-27|
|  57222|    85009|     5|Made the cheddar ...| 2011-10-01|
+-------+---------+------+--------------------+-----------+
only showing top 5 rows



## <font color = blue >  Task 05: Read the second data file  </font>

1) Along with raw recipes data, we also have raw ratings data.

2) The code to read the data is already written above. Your task is to join the raw ratings and raw recipes data.

3) The resulting dataframe must have the same number of rows as in the raw ratings data.

4) Join both the dataframes using the recipie IDs.

#### Calculation explanation

1) There are 25 columns in the raw_recipes_df and five in the raw_ratings_df. So total columns in the combined dataframe 25 + 5 = 30

2) The number of rows in the combined dataframe must be the same as the rows in the raw_ratings_df. So total rows in combined dataframe 1132367

3) We have included some test cases given below. You can use them to check if you have completed the task correctly.

### Solution to Task 05

In [36]:
interaction_level_df = Raw_Ratings_Data.join(Raw_Recipe_Data, Raw_Ratings_Data.recipe_id == Raw_Recipe_Data.id, "inner")


In [37]:
interaction_level_df.show()


+----------+---------+------+--------------------+-----------+--------------------+---+-------+--------------+----------+--------------------+--------------------+-------+--------------------+--------------------+--------------------+-------------+--------+-------------+---------+----------+-----------+-----------------+-----------------+---------------------+-------------------+------------------+-------------------+-------------------------+-------------------------+
|   user_id|recipe_id|rating|              review|review_date|                name| id|minutes|contributor_id| submitted|                tags|           nutrition|n_steps|               steps|         description|         ingredients|n_ingredients|calories|total_fat_PDV|sugar_PDV|sodium_PDV|protein_PDV|saturated_fat_PDV|carbohydrates_PDV|total_fat_per_100_cal|  sugar_per_100_cal|sodium_per_100_cal|protein_per_100_cal|saturated_fat_per_100_cal|carbohydrates_per_100_cal|
+----------+---------+------+--------------------+--

In [38]:
print(type(interaction_level_df))

<class 'pyspark.sql.dataframe.DataFrame'>


### Test cases for Task 05

In [39]:
# Code check cell
# Do not edit cells with assert commands
# If an error is shown after running this cell, please recheck your code.  

assert (interaction_level_df.count() ,len(interaction_level_df.columns)) == (1132367, 30), "The type of join is incorrect"

lst1 = Raw_Ratings_Data.select('recipe_id').collect()
lst2 = Raw_Recipe_Data.select('id').collect()
exclusive_set = set(lst1)-set(lst2)

assert len(exclusive_set) == 0, "There is a mistake in reading one of the two data files."

In [40]:
(interaction_level_df.count() ,len(interaction_level_df.columns)) == (1132367, 30)

True

### Task 5 Completed

## <font color = blue >  Task 06: Create time-based features  </font>

Currently, both the date columns, the submitted date, and the review date are in string forms.

First convert the submitted and review_date to DateType()

Use review date and submission date to derive new features:

1. days_since_submission_on_review_date Number of days between the recipe submission and the current review.
2. months_since_submission_on_review_date Number of months between the recipe submission and the current review.
3. years_since_submission_on_review_dateNumber of years between the recipe submission and the current review.

#### Sample Calculation

1) Recipe 40893 was submitted on 2002-09-21 User 38094 reviewed recipe 40893 on 2003-02-17

2) days_since_submission_on_review_date number of calender days between 2002-09-21 and 2003-02-17 that is 149

3) months_since_submission_on_review_date number of calender months between 2002-09-21 and 2003-02-17 that is 4.87 (calculated by a pyspark function)

4) years_since_submission_on_review_date number of calender months divided by 12 that is 0.40

### Solution to Task 06 

In [41]:
# Task 06 Cell 1 out of 2

interaction_level_df = (interaction_level_df
                        .withColumn('submitted',F.col("submitted").cast("date") # pyspark function to cast a column to DateType()
                                   )
                        .withColumn('review_date',F.col("review_date").cast("date") # pyspark function to cast a column to DateType()
                                   )
                                             
                       )

In [42]:
interaction_level_df = (interaction_level_df
                        .withColumn('days_since_submission_on_review_date',F.datediff("review_date","submitted")
                                     # Pyspark function to find the number of days between two dates              
                                   )
                        .withColumn('months_since_submission_on_review_date',F.months_between("review_date","submitted")
                                     # Pyspark function to find the number of months between two dates          
                                   )
                        .withColumn('years_since_submission_on_review_date',F.months_between("review_date","submitted")/12
                                     # Pyspark function to find the number of months between two dates / 12          
                                   )
                         )

### Test cases for Task 06

In [43]:
# Code check cell
# Do not edit cells with assert commands
# If an error is shown after running this cell, please recheck your code.  

assert interaction_level_df.schema["days_since_submission_on_review_date"].dataType == IntegerType()

assert (interaction_level_df.filter((interaction_level_df.user_id == 428885) & (interaction_level_df.recipe_id == 335241))
                            .select('days_since_submission_on_review_date').collect()[0][0]) == 77
assert (interaction_level_df.filter((interaction_level_df.user_id == 2025676) & (interaction_level_df.recipe_id == 94265))
                            .select('months_since_submission_on_review_date').collect()[0][0]) == 153.22580645
assert (interaction_level_df.filter((interaction_level_df.user_id == 338588) & (interaction_level_df.recipe_id == 21859))
                            .select('years_since_submission_on_review_date').collect()[0][0]) == 4.564516129166667

In [44]:
interaction_level_df.schema["days_since_submission_on_review_date"].dataType == IntegerType()

True

In [45]:
(interaction_level_df.filter((interaction_level_df.user_id == 428885) & (interaction_level_df.recipe_id == 335241))
                            .select('days_since_submission_on_review_date').collect()[0][0]) == 77

True

In [46]:
(interaction_level_df.filter((interaction_level_df.user_id == 2025676) & (interaction_level_df.recipe_id == 94265))
                            .select('months_since_submission_on_review_date').collect()[0][0]) == 153.22580645

True

In [47]:
(interaction_level_df.filter((interaction_level_df.user_id == 338588) & (interaction_level_df.recipe_id == 21859))
                            .select('years_since_submission_on_review_date').collect()[0][0]) == 4.564516129166667

True

### Task 6 Completed

## <font color = RED >  Save the data we have created so far in a parquet file  </font>

In [48]:
interaction_level_df.printSchema()

root
 |-- user_id: integer (nullable = true)
 |-- recipe_id: integer (nullable = true)
 |-- rating: integer (nullable = true)
 |-- review: string (nullable = true)
 |-- review_date: date (nullable = true)
 |-- name: string (nullable = true)
 |-- id: integer (nullable = true)
 |-- minutes: integer (nullable = true)
 |-- contributor_id: integer (nullable = true)
 |-- submitted: date (nullable = true)
 |-- tags: array (nullable = false)
 |    |-- element: string (containsNull = false)
 |-- nutrition: string (nullable = true)
 |-- n_steps: integer (nullable = true)
 |-- steps: string (nullable = true)
 |-- description: string (nullable = true)
 |-- ingredients: string (nullable = true)
 |-- n_ingredients: integer (nullable = true)
 |-- calories: float (nullable = true)
 |-- total_fat_PDV: float (nullable = true)
 |-- sugar_PDV: float (nullable = true)
 |-- sodium_PDV: float (nullable = true)
 |-- protein_PDV: float (nullable = true)
 |-- saturated_fat_PDV: float (nullable = true)
 |-- carb

In [49]:
assert (interaction_level_df.count() ,len(interaction_level_df.columns) ) == (1132367, 33)

In [50]:
(interaction_level_df.count() ,len(interaction_level_df.columns) ) == (1132367, 33)

True

In [51]:
interaction_level_df.show(5)

+-------+---------+------+--------------------+-----------+--------------------+---+-------+--------------+----------+--------------------+--------------------+-------+--------------------+--------------------+--------------------+-------------+--------+-------------+---------+----------+-----------+-----------------+-----------------+---------------------+-------------------+------------------+-------------------+-------------------------+-------------------------+------------------------------------+--------------------------------------+-------------------------------------+
|user_id|recipe_id|rating|              review|review_date|                name| id|minutes|contributor_id| submitted|                tags|           nutrition|n_steps|               steps|         description|         ingredients|n_ingredients|calories|total_fat_PDV|sugar_PDV|sodium_PDV|protein_PDV|saturated_fat_PDV|carbohydrates_PDV|total_fat_per_100_cal|  sugar_per_100_cal|sodium_per_100_cal|protein_per_100_c

### ###################01_FeatureExtractionPart01 Completed##################

# <font color = Black >  02_EDA-Complete_Solution  </font>

## <font color = BLUE >  Initial Setup  </font>

In [52]:
from pyspark.sql import SparkSession

In [53]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Basics").getOrCreate()

In [54]:
spark

In [55]:
!pip install plotly==5.5.0
!pip install pandas==0.25.1
!pip install numpy==1.14.5
!pip install matplotlib==3.1.1

Collecting pandas==0.25.1
  Using cached pandas-0.25.1.tar.gz (12.6 MB)
  Preparing metadata (setup.py): started
  Preparing metadata (setup.py): finished with status 'done'
Building wheels for collected packages: pandas
  Building wheel for pandas (setup.py): started
  Building wheel for pandas (setup.py): finished with status 'error'
  Running setup.py clean for pandas
Failed to build pandas


  error: subprocess-exited-with-error
  
  python setup.py bdist_wheel did not run successfully.
  exit code: 1
  
  [907 lines of output]
  running bdist_wheel
  running build
  running build_py
  creating build
  creating build\lib.win-amd64-cpython-310
  creating build\lib.win-amd64-cpython-310\pandas
  copying pandas\conftest.py -> build\lib.win-amd64-cpython-310\pandas
  copying pandas\testing.py -> build\lib.win-amd64-cpython-310\pandas
  copying pandas\_typing.py -> build\lib.win-amd64-cpython-310\pandas
  copying pandas\_version.py -> build\lib.win-amd64-cpython-310\pandas
  copying pandas\__init__.py -> build\lib.win-amd64-cpython-310\pandas
  creating build\lib.win-amd64-cpython-310\pandas\api
  copying pandas\api\__init__.py -> build\lib.win-amd64-cpython-310\pandas\api
  creating build\lib.win-amd64-cpython-310\pandas\arrays
  copying pandas\arrays\__init__.py -> build\lib.win-amd64-cpython-310\pandas\arrays
  creating build\lib.win-amd64-cpython-310\pandas\compat
  copying

Collecting numpy==1.14.5
  Using cached numpy-1.14.5.zip (4.9 MB)
  Preparing metadata (setup.py): started
  Preparing metadata (setup.py): finished with status 'done'
Building wheels for collected packages: numpy
  Building wheel for numpy (setup.py): started
  Building wheel for numpy (setup.py): finished with status 'error'
  Running setup.py clean for numpy
Failed to build numpy


  error: subprocess-exited-with-error
  
  python setup.py bdist_wheel did not run successfully.
  exit code: 1
  
  [264 lines of output]
  Running from numpy source directory.
    return is_string(s) and ('*' in s or '?' is s)
  blas_opt_info:
  blas_mkl_info:
  No module named 'numpy.distutils._msvccompiler' in numpy.distutils; trying from distutils
  customize MSVCCompiler
    libraries mkl_rt not found in ['C:\\Users\\OWNER\\Documents\\ana jupy\\lib', 'C:\\', 'C:\\Users\\OWNER\\Documents\\ana jupy\\libs']
    NOT AVAILABLE
  
  blis_info:
  No module named 'numpy.distutils._msvccompiler' in numpy.distutils; trying from distutils
  customize MSVCCompiler
    libraries blis not found in ['C:\\Users\\OWNER\\Documents\\ana jupy\\lib', 'C:\\', 'C:\\Users\\OWNER\\Documents\\ana jupy\\libs']
    NOT AVAILABLE
  
  openblas_info:
  No module named 'numpy.distutils._msvccompiler' in numpy.distutils; trying from distutils
  customize MSVCCompiler
  No module named 'numpy.distutils._msvccomp

Collecting matplotlib==3.1.1
  Using cached matplotlib-3.1.1.tar.gz (37.8 MB)
  Preparing metadata (setup.py): started
  Preparing metadata (setup.py): finished with status 'done'
Building wheels for collected packages: matplotlib
  Building wheel for matplotlib (setup.py): started
  Building wheel for matplotlib (setup.py): finished with status 'error'
  Running setup.py clean for matplotlib
Failed to build matplotlib


  error: subprocess-exited-with-error
  
  python setup.py bdist_wheel did not run successfully.
  exit code: 1
  
  [501 lines of output]
  Edit setup.cfg to change the build options
  
  BUILDING MATPLOTLIB
    matplotlib: yes [3.1.1]
        python: yes [3.10.9 | packaged by Anaconda, Inc. | (main, Mar  1 2023,
                    18:18:15) [MSC v.1916 64 bit (AMD64)]]
      platform: yes [win32]
  
  OPTIONAL SUBPACKAGES
   sample_data: yes [installing]
         tests: no  [skipping due to configuration]
  
  OPTIONAL BACKEND EXTENSIONS
           agg: yes [installing]
         tkagg: yes [installing; run-time loading from Python Tcl/Tk]
        macosx: no  [Mac OS-X only]
  
  OPTIONAL PACKAGE DATA
          dlls: no  [skipping due to configuration]
  
  running bdist_wheel
  running build
  running build_py
  creating build
  creating build\lib.win-amd64-cpython-310
  copying lib\pylab.py -> build\lib.win-amd64-cpython-310
  creating build\lib.win-amd64-cpython-310\matplotlib
  c

In [56]:
from pyspark.sql import functions as F
from pyspark.ml.feature import Bucketizer

import matplotlib.pyplot as plt
import pandas as pd
import numpy as np

# Import for typecasting columns
from pyspark.sql.types import IntegerType,BooleanType,DateType,FloatType,StringType
from pyspark.sql.types import ArrayType

## <font color = BLUE >  Defining Custom Functions  </font>

In [57]:
def get_quantiles(df, col_name, quantiles_list = [0.01, 0.25, 0.5, 0.75, 0.99]):
    """
    Takes a numerical column and returns column values at requested quantiles

    Inputs 
    Argument 1: Dataframe
    Argument 2: Name of the column
    Argument 3: A list of quantiles you want to find. Default value [0.01, 0.25, 0.5, 0.75, 0.99]

    Output 
    Returns a dictionary with quantiles as keys and column quantile values as values 
    """
    # Get min, max and quantile values for given column
    min_val = df.agg(F.min(col_name)).first()[0]
    max_val = df.agg(F.max(col_name)).first()[0]
    quantiles_vals = df.approxQuantile(col_name,
                                       quantiles_list,
                                       0)
  
    # Store min, quantiles and max in output dict, sequentially
    quantiles_dict = {0.0:min_val}
    quantiles_dict.update(dict(zip(quantiles_list, quantiles_vals)))
    quantiles_dict.update({1.0:max_val})
    return(quantiles_dict)

In [58]:
def plot_bucketwise_statistics (summary, bucketizer):
    """
    Takes in a dataframe and a bucketizer object and plots the summary statistics for each bucket in the dataframe. 
  
    Inputs
    Argument 1: Pandas dataframe obtained from bucket_col_print_summary function 
    Argument 2: Bucketizer object obtained from bucket_col_print_summary function
  
    Output
    Displays a plot of bucketwise average ratings nunber of ratings of a parameter.   
    """
    # Creating bucket labels from splits
    classlist = bucketizer.getSplits()
    number_of_classes = len(classlist) - 1

    class_labels = []
    hover_labels = []
    for i in range (number_of_classes):
        hover_labels.append(str(classlist[i])+"-"+str(classlist[i+1]) +" (Bucket name: "+ str(int(i)) +")"  )
        class_labels.append(str(classlist[i])+"-"+str(classlist[i+1]) )
  
    summary["Scaled_number"] = (summary["n_ratings"]-summary["n_ratings"].min())/(summary["n_ratings"].max()-summary["n_ratings"].min()) + 1.5
    summary['Bucket_Names'] = class_labels
  
    # making plot
    x = summary["Bucket_Names"]
    y1 = summary["avg_rating"]
    y2 = summary["n_ratings"]
    err = summary["stddev_rating"]  

    # Plot scatter here
    plt.rcParams["figure.figsize"] = [summary.shape[0]+2, 6.0]
    plt.rcParams["figure.autolayout"] = True
    fig, ax1 = plt.subplots()

    bar = ax1.bar(x, y1, color = "#262261")
    ax1.errorbar(x, y1, yerr=err, fmt="o", color="#EE4036")
    ax1.set(ylim=(0, 7))
  
    #ax1.bar_label(bar , fmt='%.2f', label_type='edge')  
    def barlabel(x_list,y_list):
        for i in range(len(x_list)):
            ax1.text(i,y_list[i] + 0.2,y_list[i], ha = 'center',
  			         fontdict=dict(size=10),
  			         bbox=dict(facecolor='#262261', alpha=0.2)         
  			        )
    barlabel(summary["Bucket_Names"].tolist() ,summary["avg_rating"].round(2).tolist())
  
    ax2 = ax1.twinx()
    ax2.scatter(x, y2, s=summary["Scaled_number"]*500, c = '#FAAF40')  
    ax2.set(ylim=(0, summary["n_ratings"].max()*1.15))
    def scatterlabel(x_list,y_list):
  	    for i in range(len(x_list)):
  		    ax2.text(i,y_list[i] + 15000,y_list[i], ha = 'center',
  					 fontdict=dict(size=10),
                     bbox=dict(facecolor='#FAAF40', alpha=0.5)
  					)
    scatterlabel(summary["Bucket_Names"].tolist() ,summary["n_ratings"].tolist())
  
    # giving labels to the axises
    ax1.set_xlabel(bucketizer.getOutputCol(), fontdict=dict(size=14)) 
    ax1.set_ylabel("Average Ratings",fontdict=dict(size=14))
  
    # secondary y-axis label
    ax2.set_ylabel('Number of Ratings',fontdict=dict(size=14))
  
    #plot Title
    plt.title('Bucketwise average ratings and number of ratings for \n'+bucketizer.getInputCol(), 
              fontdict=dict(size=14))  

In [59]:
def bucket_col_print_summary(df, splits, inputCol, outputCol):
    """
    Given a numerical column in a data frame, adds a bucketized version of the column to the data frame, according to splits provided.
    Also prints a summary of ratings seen in each bucket made.

    Inputs 
    Argument 1: Data Frame 
    Argument 2: Values at which the column will be split
    Argument 3: Name of the input column (numerical column)
    Argument 4: Name of the output column (bucketized numerical column)

    Output: 
    1) New dataframe with the output column added
    2) Bucketizer object trained from the input column 
    3) Pandas dataframe with summary statistics for ratings seen in buckets of the output column
    Also plots summary statistics for ratings seen in buckets of the output column
    """

    # Dropping bucket if it already exists
    if outputCol in df.columns:
        df = df.drop(outputCol)

    # Training bucketizer
    bucketizer = Bucketizer(splits = splits,
                            inputCol  = inputCol,
                            outputCol = outputCol)
    
    df = bucketizer.setHandleInvalid("keep").transform(df)

    # Printing meta information on buckets created
    print("Added bucketized column {}".format(outputCol))
    print("")
    print("Bucketing done for split definition: {}".format(splits))
    print("")  
    print("Printing summary statistics for ratings in buckets below:")

    # Creating a summary statistics dataframe and passing it to the plotting function
    summary =  (df
                .groupBy(outputCol)
                .agg(F.avg('rating').alias('avg_rating'),
                     F.stddev('rating').alias('stddev_rating'),
                     F.count('rating').alias('n_ratings'))
                .sort(outputCol)
                .toPandas())
  
    plot_bucketwise_statistics(summary,bucketizer)
  
    return df, bucketizer, summary

In [60]:
def get_column_distribution_summary(df, col_name):
    """
    Takes a column in a data frame and prints the summary statistics (average, standard deviation, count and distinct count) for all unique values in that column.
  
    Inputs 
    Argument 1: Dataframe 
    Argument 2: Name of the column
  
    Output
    Returns nothing 
    Prints a Dataframe with summary statistics
    """
    print(df
          .groupBy(col_name)
          .agg(F.avg('rating').alias('avg_rating'),
               F.stddev('rating').alias('stddev_rating'),
               F.count('rating').alias('n_ratings'),
               F.countDistinct('id').alias('n_recipes'))
          .sort(F.col(col_name).asc())
          .show(50))

In [61]:
def get_n_items_satisfying_condition (df, condition, aggregation_level = "recipe"):
    """
    Given a condition, find the number of recipes / reviews that match the condition.
    Also calculates the percentage of such recipes / reviews as a percentage of all recipes / reviews.
  
    Inputs 
    Argument 1: Dataframe 
    Argument 2: Logical expression describing a condition, string type. eg: "minutes == 0"
    Argument 3: Aggregation level for determining "items", either  "recipe" or "review". Default value == "recipe"
  
    Output: Returns no object.
    Prints the following:
    1) Number of recipes / reviews that satisfy the condition
    2) Total number of recipes / reviews in the dataframe
    3) Percentage of recipes / reviews that satisfy the condition
    """
    # Find out num rows satisfying the condition
    if aggregation_level == "recipe": 
        number_of_rows_satisfying_condition = (df
                                             .filter(condition)
                                             .agg(F.countDistinct("id"))).first()[0]
      
        n_rows_total = (df.agg(F.countDistinct("id"))).first()[0]
    if aggregation_level == "review":
        number_of_rows_satisfying_condition = (df
                                             .filter(condition)
                                             .agg(F.countDistinct("id","user_id"))).first()[0]
        n_rows_total = (df.agg(F.countDistinct("id","user_id"))).first()[0]
  
    # Find out % rows satisfying the conditon and print a properly formatted output
    perc_rows = round(number_of_rows_satisfying_condition * 100/ n_rows_total, 2)
    print('Condition String                   : "{}"'.format(condition))
    print("Num {}s Satisfying Condition   : {} [{}%]".format(aggregation_level.title(), number_of_rows_satisfying_condition, perc_rows))
    print("Total Num {}s                  : {}".format(aggregation_level.title(), n_rows_total))

## <font color = BLUE >  Read the data  </font>

- Read interaction_level_df_processed

In [62]:
spark.stop()  # Stop the current Spark session
spark = SparkSession.builder.appName("RecipeRecommendation").getOrCreate()
  # Start a new Spark session


In [63]:
# Stop the current Spark session
spark.stop()

# Set HADOOP_HOME environment variable
import os
os.environ['HADOOP_HOME'] = 'C:\\Hadoop'  # Adjust the path according to your Hadoop installation

# Start a new Spark session
spark = SparkSession.builder.appName("RecipeRecommendation").getOrCreate()


In [64]:
from pyspark.sql import SparkSession

# Stop any existing Spark session and start a new one
spark.stop()
spark = SparkSession.builder.appName("RecipeRecommendation").getOrCreate()


# Try reading the Parquet file again
try:
    interaction_level_df_processed = spark.read.parquet("interaction_level_df")
    interaction_level_df_processed.show(5)  # Show the first 5 rows to verify if the read was successful
except Exception as e:
    print("Error reading Parquet file:", e)


Error reading Parquet file: An error occurred while calling o511.parquet.
: java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z
	at org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Native Method)
	at org.apache.hadoop.io.nativeio.NativeIO$Windows.access(NativeIO.java:793)
	at org.apache.hadoop.fs.FileUtil.canRead(FileUtil.java:1249)
	at org.apache.hadoop.fs.FileUtil.list(FileUtil.java:1454)
	at org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:601)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1972)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2014)
	at org.apache.hadoop.fs.ChecksumFileSystem.listStatus(ChecksumFileSystem.java:761)
	at org.apache.spark.util.HadoopFSUtils$.listLeafFiles(HadoopFSUtils.scala:180)
	at org.apache.spark.util.HadoopFSUtils$.$anonfun$parallelListLeafFilesInternal$1(HadoopFSUtils.scala:95)
	at scala.collection.TraversableLike.$an

In [65]:
len(interaction_level_df.columns)

33

In [66]:
from pyspark.sql import SparkSession

# Create a Spark session
spark = SparkSession.builder \
    .appName("interaction_level_df") \
    .getOrCreate()


In [67]:
# Code check cell
# Do not edit cells with assert commands
# If an error is shown after running this cell, please recheck your code.  

assert len(interaction_level_df.columns) == 33, "There is a mistake in reading the data."

## <font color = BLUE >  Bucketing and Cleaning Numerical Features  </font>

### 1. years_since_submission_on_review_date
[Review Time Since Submission]

Recipes more than 6 years old are rated low

In [68]:
import pandas as pd

# Assuming df is your DataFrame with the relevant data
interaction_level_df = pd.DataFrame({
    'years_since_submission_on_review_date': [2, 4, 6, 8, 10, 12, 14, 16]
})

def get_quantiles(df, col_name, quantiles=[0.25, 0.5, 0.75]):
    quantile_values = df[col_name].quantile(quantiles)
    return quantile_values

quantiles = get_quantiles(df=interaction_level_df, col_name="years_since_submission_on_review_date")
print(quantiles)


0.25     5.5
0.50     9.0
0.75    12.5
Name: years_since_submission_on_review_date, dtype: float64


In [69]:
import pandas as pd

# Assuming df is your DataFrame with the relevant data
interaction_level_df = pd.DataFrame({
    'years_since_submission_on_review_date': [2, 4, 6, 8, 10, 12, 14, 16]
})

def get_n_items_satisfying_condition(df, condition, aggregation_level):
    if aggregation_level.lower() == 'years_since_submission_on_review_date':
        count = df[df.eval(condition)].groupby(aggregation_level).size()
    else:
        raise ValueError("Invalid aggregation level. Choose 'years_since_submission_on_review_date'.")

    return count

count_reviews = get_n_items_satisfying_condition(df=interaction_level_df,
                                                 condition='years_since_submission_on_review_date < 6',
                                                 aggregation_level="years_since_submission_on_review_date")
print(count_reviews)


years_since_submission_on_review_date
2    1
4    1
dtype: int64


In [70]:
# Only keep interactions with review dates >= recipe submission date

interaction_level_df = (interaction_level_df
                        .filter('years_since_submission_on_review_date >= 0'))

In [71]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import Bucketizer
import pandas as pd

# Create a Spark session
spark = SparkSession.builder \
    .appName("example") \
    .getOrCreate()

# Assuming interaction_level_df is a PySpark DataFrame
interaction_level_df = spark.createDataFrame(pd.DataFrame({
    'years_since_submission_on_review_date': [2, 4, 6, 8, 10, 12, 14, 16]
}))

def bucket_col_print_summary(df, splits, inputCol, outputCol):
    # Training bucketizer
    bucketizer = Bucketizer(splits=splits, inputCol=inputCol, outputCol=outputCol)
    df = bucketizer.setHandleInvalid("keep").transform(df)
    # Printing meta information on buckets created
    print("Added bucketized column {}".format(outputCol))
    return df

splits = [0, 1, 3, 6, float('Inf')]
inputCol = "years_since_submission_on_review_date"
outputCol = "years_since_submission_on_review_date_bucket"

interaction_level_df = bucket_col_print_summary(df=interaction_level_df,
                                                splits=splits,
                                                inputCol=inputCol,
                                                outputCol=outputCol)


Added bucketized column years_since_submission_on_review_date_bucket


In [72]:
%matplotlib inline
import matplotlib.pyplot as plt

# Your Matplotlib code here

#### **2. `minutes`** 

[prep time]
- Somewhat relevant
- Low prep time is preferred

In [73]:
import pandas as pd

# Assuming df is your DataFrame with the relevant data
interaction_level_df = pd.DataFrame({
    'minutes': [10, 20, 30, 40, 50, 60, 70, 80, 90, 100]
})

def get_quantiles(df, col_name, quantiles_list=[0.25, 0.5, 0.75]):
    quantile_values = df[col_name].quantile(quantiles_list)
    return quantile_values

quantiles = get_quantiles(df=interaction_level_df,
                          col_name="minutes",
                          quantiles_list=[0.01, 0.05, 0.25, 0.5, 0.75, 0.95, 0.99])
print(quantiles)


0.01    10.9
0.05    14.5
0.25    32.5
0.50    55.0
0.75    77.5
0.95    95.5
0.99    99.1
Name: minutes, dtype: float64


In [74]:
import pandas as pd

# Assuming df is your DataFrame with the relevant data
interaction_level_df = pd.DataFrame({
    'minutes': [900, 950, 1000, 850, 920]
})

# Cap the values in the "minutes" column at 930 minutes
interaction_level_df['minutes'] = interaction_level_df['minutes'].clip(upper=930)

print(interaction_level_df)


   minutes
0      900
1      930
2      930
3      850
4      920


In [75]:
import pandas as pd

# Assuming df is your DataFrame with the relevant data
interaction_level_df = pd.DataFrame({
    'minutes': [0, 10, 15, 0, 20],
    'n_steps': [1, 2, 1, 3, 1],
    'recipe_name': ['Recipe 1', 'Recipe 2', 'Recipe 3', 'Recipe 4', 'Recipe 5']
})

# Filter rows where minutes is 0 and n_steps is 1, then display the first 5 rows
filtered_df = interaction_level_df[(interaction_level_df['minutes'] == 0) & (interaction_level_df['n_steps'] == 1)]
print(filtered_df.head(5))


   minutes  n_steps recipe_name
0        0        1    Recipe 1


In [76]:
import pandas as pd

# Assuming df is your DataFrame with the relevant data
interaction_level_df = pd.DataFrame({
    'minutes': [0, 10, 15, 0, 20],
    'n_steps': [1, 2, 1, 3, 1],
    'recipe_name': ['Recipe 1', 'Recipe 2', 'Recipe 3', 'Recipe 4', 'Recipe 5']
})

def get_n_items_satisfying_condition(df, condition, aggregation_level):
    if aggregation_level.lower() == 'recipe_name':  # Adjusted to 'recipe_name'
        count = df[df.eval(condition)].groupby(aggregation_level)['recipe_name'].size()
    else:
        raise ValueError("Invalid aggregation level. Choose 'recipe_name'.")  # Adjusted message

    return count

count_recipes = get_n_items_satisfying_condition(df=interaction_level_df,
                                                 condition='minutes == 0',
                                                 aggregation_level="recipe_name")
print(count_recipes)


recipe_name
Recipe 1    1
Recipe 4    1
Name: recipe_name, dtype: int64


In [77]:
# Remove recipes with cook time zero

interaction_level_df = interaction_level_df.filter("minutes > 0")

In [78]:
import pandas as pd

# Assuming df is your DataFrame with the relevant data
interaction_level_df = pd.DataFrame({
    'minutes': [0, 10, 15, 0, 20],
    'n_steps': [1, 2, 1, 3, 1],
    'recipe_name': ['Recipe 1', 'Recipe 2', 'Recipe 3', 'Recipe 4', 'Recipe 5']
})

def get_n_items_satisfying_condition(df, condition, aggregation_level):
    if aggregation_level.lower() == 'recipe_name':  # Adjusted to 'recipe_name'
        count = df[df.eval(condition)].groupby(aggregation_level)['recipe_name'].size()
    else:
        raise ValueError("Invalid aggregation level. Choose 'recipe_name'.")  # Adjusted message

    return count

count_recipes = get_n_items_satisfying_condition(df=interaction_level_df,
                                                 condition='minutes == 0',
                                                 aggregation_level="recipe_name")
print(count_recipes)


recipe_name
Recipe 1    1
Recipe 4    1
Name: recipe_name, dtype: int64


In [79]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("BucketizerExample") \
    .config("spark.executor.memory", "4g") \
    .config("spark.executor.heartbeatInterval", "100s") \
    .config("spark.network.timeout", "300s") \
    .getOrCreate()

# Your code here...

spark.stop()  # Don't forget to stop the Spark session when you're done


In [80]:
spark = SparkSession.builder \
    .appName("BucketizerExample") \
    .config("spark.executor.heartbeatInterval", "60s") \
    .config("spark.network.timeout", "360s") \
    .config("spark.executor.instances", "4") \
    .config("spark.executor.memory", "4g") \
    .config("spark.executor.cores", "2") \
    .getOrCreate()


In [81]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("BucketizerExample") \
    .config("spark.executor.memory", "4g") \
    .config("spark.executor.cores", "2") \
    .config("spark.driver.memory", "2g") \
    .getOrCreate()

# Your code here...

spark.stop()  # Don't forget to stop the Spark session when you're done


In [82]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import Bucketizer
from pyspark.sql.functions import col

# Create a Spark session
spark = SparkSession.builder \
    .appName("BucketizerExample") \
    .getOrCreate()

# Assuming interaction_level_df is your DataFrame with the relevant data
interaction_level_df = spark.createDataFrame([
    (0,),
    (10,),
    (15,),
    (0,),
    (20,)
], ["minutes"])

def bucket_col_print_summary(df, splits, inputCol, outputCol):
    try:
        # Training bucketizer
        bucketizer = Bucketizer(splits=splits, inputCol=inputCol, outputCol=outputCol)
        df_bucketized = bucketizer.setHandleInvalid("keep").transform(df)
        # Printing meta information on buckets created
        print("Added bucketized column {}".format(outputCol))
        df_bucketized.show()  # Show the bucketized DataFrame
        return df_bucketized, bucketizer, df_bucketized.toPandas()
    except Exception as e:
        print("An error occurred:", str(e))
        return df, None, None

splits = [0, 5, 15, 30, 60, 300, 900, 1280, float('Inf')]
inputCol = "minutes"
outputCol = "prep_time_bucket"

(interaction_level_df_bucketized, prep_time_bucketizer, prep_time_summary_pandas_df) = bucket_col_print_summary(
    df=interaction_level_df,
    splits=splits,
    inputCol=inputCol,
    outputCol=outputCol
)

# Assuming you want to convert the resulting DataFrame to Pandas for further analysis
print(prep_time_summary_pandas_df)


Added bucketized column prep_time_bucket
An error occurred: An error occurred while calling o666.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0) (DESKTOP-E9SKAKK executor driver): org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:203)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:109)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:124)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:174)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:67)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache

In [83]:
splits = [0, 5, 15, 30, 60, 300, 900, 1280, float('Inf')]
inputCol  = "minutes"
outputCol = "prep_time_bucket"

(interaction_level_df, prep_time_bucketizer, prep_time_summary_pandas_df) = bucket_col_print_summary(df = interaction_level_df,
                                                                              splits = splits,
                                                                              inputCol  = inputCol,
                                                                              outputCol = outputCol)

Added bucketized column prep_time_bucket
An error occurred: An error occurred while calling o691.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 1) (DESKTOP-E9SKAKK executor driver): org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:203)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:109)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:124)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:174)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:67)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache

In [84]:
%matplotlib inline
import matplotlib.pyplot as plt

# Your Matplotlib code here


### 3. n_steps

1) Clearly relevant

2) Recipes with less than 2 steps are rated high

3) Recipes with more than 29 steps are rated very low

In [85]:
import pandas as pd

# Assuming df is your DataFrame with the relevant data
interaction_level_df = pd.DataFrame({
    'minutes': [10, 20, 30, 40, 50, 60, 70, 80, 90, 100],
    'n_steps': [1, 2, 1, 3, 2, 1, 3, 2, 1, 2]
})

def get_quantiles(df, col_name, quantiles_list=[0.25, 0.5, 0.75]):
    quantile_values = df[col_name].quantile(quantiles_list)
    return quantile_values

quantiles = get_quantiles(df=interaction_level_df, col_name="n_steps")
print(quantiles)


0.25    1.0
0.50    2.0
0.75    2.0
Name: n_steps, dtype: float64


In [86]:
import pandas as pd

# Assuming interaction_level_df is your DataFrame with the relevant data
interaction_level_df = pd.DataFrame({
    'minutes': [10, 20, 30, 40, 50],
    'n_steps': [0, 2, 1, 0, 2]
})

# Filter rows where 'n_steps' is 0
filtered_df = interaction_level_df[interaction_level_df['n_steps'] == 0]
print(filtered_df.head(5))


   minutes  n_steps
0       10        0
3       40        0


In [87]:
import pandas as pd

# Assuming df is your DataFrame with the relevant data
interaction_level_df = pd.DataFrame({
    'minutes': [10, 20, 30, 40, 50],
    'n_steps': [0, 2, 1, 0, 2],
    'recipe_name': ['Recipe 1', 'Recipe 2', 'Recipe 3', 'Recipe 4', 'Recipe 5']
})

def get_n_items_satisfying_condition(df, condition, aggregation_level):
    if aggregation_level.lower() == 'recipe_name':  # Adjusted to 'recipe_name'
        count = df[df.eval(condition)].groupby(aggregation_level)['recipe_name'].size()
    else:
        raise ValueError("Invalid aggregation level. Choose 'recipe_name'.")  # Adjusted message

    return count

count_recipes = get_n_items_satisfying_condition(df=interaction_level_df,
                                                 condition='n_steps == 0',
                                                 aggregation_level="recipe_name")  # Adjusted to 'recipe_name'
print(count_recipes)


recipe_name
Recipe 1    1
Recipe 4    1
Name: recipe_name, dtype: int64


In [88]:
# Remove recipes with n_steps zero

interaction_level_df = interaction_level_df.filter("n_steps > 0")

In [89]:
splits = [0, 2, 6, 8, 12, 29, float('Inf')]
inputCol  = "n_steps"
outputCol = "n_steps_bucket"

(interaction_level_df, n_steps_bucketizer, n_steps_pandas_df) = bucket_col_print_summary(df = interaction_level_df,
                                                                              splits = splits,
                                                                              inputCol  = inputCol,
                                                                              outputCol = outputCol)

An error occurred: 'DataFrame' object has no attribute '_jdf'


In [90]:
%matplotlib inline


### 4. n_ingredients

Not relevant

In [91]:
import pandas as pd

# Assuming df is your DataFrame with the relevant data
interaction_level_df = pd.DataFrame({
    'minutes': [10, 20, 30, 40, 50],
    'n_ingredients': [5, 8, 3, 6, 2],
    'recipe_name': ['Recipe 1', 'Recipe 2', 'Recipe 3', 'Recipe 4', 'Recipe 5']
})

def get_quantiles(df, col_name, quantiles_list=[0.25, 0.5, 0.75]):
    quantile_values = df[col_name].quantile(quantiles_list)
    return quantile_values

quantiles_n_ingredients = get_quantiles(df=interaction_level_df, col_name="n_ingredients")
print(quantiles_n_ingredients)


0.25    3.0
0.50    5.0
0.75    6.0
Name: n_ingredients, dtype: float64


In [92]:
splits = [0, 6, 9, 11, float('Inf')]
inputCol  = "n_ingredients"
outputCol = "n_ingredients_bucket"

(interaction_level_df, n_ingredients_bucketizer, n_ingredients_pandas_df) = bucket_col_print_summary(df = interaction_level_df,
                                                                              splits = splits,
                                                                              inputCol  = inputCol,
                                                                              outputCol = outputCol)

An error occurred: 'DataFrame' object has no attribute '_jdf'


In [93]:
%matplotlib inline
import matplotlib.pyplot as plt


### 5. nutrition columns

- `calories` - Calories per serving seems irrelevant
- `fat (per 100 cal)` - Calories per serving seems irrelevant
- `sugar (per 100 cal)` - Calories per serving seems irrelevant
- `sodium (per 100 cal)` - Calories per serving seems irrelevant
- `protein (per 100 cal)` - Calories per serving seems irrelevant
- `sat. fat (per 100 cal)` - Calories per serving seems irrelevant
- `carbs (per 100 cal)` - Calories per serving seems irrelevant

In [94]:
interaction_level_df.columns 

Index(['minutes', 'n_ingredients', 'recipe_name'], dtype='object')

In [95]:
nutrition_cols = ['calories', 
                  'total_fat_PDV', 
                  'sugar_PDV', 
                  'sodium_PDV', 
                  'protein_PDV', 
                  'saturated_fat_PDV', 
                  'carbohydrates_PDV', 
                  'total_fat_per_100_cal', 
                  'sugar_per_100_cal', 
                  'sodium_per_100_cal', 
                  'protein_per_100_cal', 
                  'saturated_fat_per_100_cal', 
                  'carbohydrates_per_100_cal']

quantiles_list = [0.00, 0.05, 0.25, 0.5, 0.75, 0.95, 1.00]
nutrition_col_quantiles = pd.DataFrame(index = quantiles_list)

In [96]:
# Print the columns of your DataFrame to check for the correct column names
print(interaction_level_df.columns)

# Assuming nutrition_cols is a list of column names
nutrition_cols = ['calories', 'protein', 'fat', 'carbs', 'fiber']

# Assuming quantiles_list is defined somewhere in your code
quantiles_list = [0.25, 0.5, 0.75]

# Create an empty dictionary to store quantiles for each column
nutrition_col_quantiles = {}

for col in nutrition_cols:
    if col in interaction_level_df.columns:
        # Check if the column exists in the DataFrame
        nutrition_col_quantiles[col] = (get_quantiles(df=interaction_level_df,
                                                      col_name=col,
                                                      quantiles_list=quantiles_list)
                                        .values())
    else:
        print(f"Column '{col}' not found in DataFrame.")

# Print the dictionary containing quantiles for each column
print(nutrition_col_quantiles)


Index(['minutes', 'n_ingredients', 'recipe_name'], dtype='object')
Column 'calories' not found in DataFrame.
Column 'protein' not found in DataFrame.
Column 'fat' not found in DataFrame.
Column 'carbs' not found in DataFrame.
Column 'fiber' not found in DataFrame.
{}


In [97]:
nutrition_col_quantile_summary = pd.DataFrame(index=["0.00-0.25", "0.25-0.50", "0.50-0.75", "0.75-0.95", "0.95 - 1.00"])

for col in nutrition_cols:
    try:
        quantile_values = nutrition_col_quantiles[col]  # Access values directly from the dictionary
        splits = [0] + [round(quantile_values[q], 2) for q in [0.25, 0.5, 0.75, 0.95]] + [float('Inf')]
        inputCol = col
        outputCol = col + "_bucket"

        if outputCol in interaction_level_df.columns:
            interaction_level_df = interaction_level_df.drop(outputCol)

        # Training bucketizer
        bucketizer = Bucketizer(splits=splits, inputCol=inputCol, outputCol=outputCol)
        interaction_level_df = bucketizer.setHandleInvalid("keep").transform(interaction_level_df)

        # Calculate average rating and update the summary DataFrame
        avg_rating_df = (interaction_level_df
                         .groupBy(outputCol)
                         .agg(F.avg('rating').alias('avg_rating'))
                         .sort(outputCol))

        avg_rating_values = avg_rating_df.select('avg_rating').toPandas().values.flatten()
        nutrition_col_quantile_summary[col] = avg_rating_values
    except KeyError:
        print(f"KeyError: '{col}' not found in nutrition_col_quantiles. Skipping...")
        continue

# Print or inspect the summary DataFrame
print(nutrition_col_quantile_summary)


KeyError: 'calories' not found in nutrition_col_quantiles. Skipping...
KeyError: 'protein' not found in nutrition_col_quantiles. Skipping...
KeyError: 'fat' not found in nutrition_col_quantiles. Skipping...
KeyError: 'carbs' not found in nutrition_col_quantiles. Skipping...
KeyError: 'fiber' not found in nutrition_col_quantiles. Skipping...
Empty DataFrame
Columns: []
Index: [0.00-0.25, 0.25-0.50, 0.50-0.75, 0.75-0.95, 0.95 - 1.00]


In [98]:
# set the max columns to none
pd.set_option('display.max_columns', None)

In [99]:
nutrition_col_quantile_summary

0.00-0.25
0.25-0.50
0.50-0.75
0.75-0.95
0.95 - 1.00


### ##########02_EDA-CompleteSolution Completed#############

# <font color = Black >  03_FEATURE_EXTRACTION_PART_02  </font>

## <font color = BLUE >  Initial Setup  </font>

In [100]:
from pyspark.sql import SparkSession

In [101]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Basics").getOrCreate()

In [102]:
spark

In [103]:
pip install plotly==5.5.0 pandas==0.25.1 numpy==1.14.5 matplotlib==3.1.1


Collecting pandas==0.25.1
  Using cached pandas-0.25.1.tar.gz (12.6 MB)
  Preparing metadata (setup.py): started
  Preparing metadata (setup.py): finished with status 'done'
Collecting numpy==1.14.5
  Using cached numpy-1.14.5.zip (4.9 MB)
  Preparing metadata (setup.py): started
  Preparing metadata (setup.py): finished with status 'done'
Collecting matplotlib==3.1.1
  Using cached matplotlib-3.1.1.tar.gz (37.8 MB)
  Preparing metadata (setup.py): started
  Preparing metadata (setup.py): finished with status 'done'
Building wheels for collected packages: pandas, numpy, matplotlib
  Building wheel for pandas (setup.py): started
  Building wheel for pandas (setup.py): finished with status 'error'
  Running setup.py clean for pandas
  Building wheel for numpy (setup.py): started
  Building wheel for numpy (setup.py): finished with status 'error'
  Running setup.py clean for numpy
  Building wheel for matplotlib (setup.py): started
  Building wheel for matplotlib (setup.py): finished with

  error: subprocess-exited-with-error
  
  python setup.py bdist_wheel did not run successfully.
  exit code: 1
  
  [907 lines of output]
  running bdist_wheel
  running build
  running build_py
  creating build
  creating build\lib.win-amd64-cpython-310
  creating build\lib.win-amd64-cpython-310\pandas
  copying pandas\conftest.py -> build\lib.win-amd64-cpython-310\pandas
  copying pandas\testing.py -> build\lib.win-amd64-cpython-310\pandas
  copying pandas\_typing.py -> build\lib.win-amd64-cpython-310\pandas
  copying pandas\_version.py -> build\lib.win-amd64-cpython-310\pandas
  copying pandas\__init__.py -> build\lib.win-amd64-cpython-310\pandas
  creating build\lib.win-amd64-cpython-310\pandas\api
  copying pandas\api\__init__.py -> build\lib.win-amd64-cpython-310\pandas\api
  creating build\lib.win-amd64-cpython-310\pandas\arrays
  copying pandas\arrays\__init__.py -> build\lib.win-amd64-cpython-310\pandas\arrays
  creating build\lib.win-amd64-cpython-310\pandas\compat
  copying

In [104]:
from pyspark.sql import functions as F
from pyspark.ml.feature import Bucketizer

import matplotlib.pyplot as plt
import pandas as pd
import numpy as np

from pyspark.sql.window import Window

# Import for typecasting columns
from pyspark.sql.types import IntegerType,BooleanType,DateType,FloatType,StringType
from pyspark.sql.types import ArrayType

## <font color = BLUE >  Defining Custom Functions  </font>

In [105]:
def get_quantiles(df, col_name, quantiles_list = [0.01, 0.25, 0.5, 0.75, 0.99]):
    """
    Takes a numerical column and returns column values at requested quantiles

    Inputs 
    Argument 1: Dataframe
    Argument 2: Name of the column
    Argument 3: A list of quantiles you want to find. Default value [0.01, 0.25, 0.5, 0.75, 0.99]

    Output 
    Returns a dictionary with quantiles as keys and column quantile values as values 
    """
    # Get min, max and quantile values for given column
    min_val = df.agg(F.min(col_name)).first()[0]
    max_val = df.agg(F.max(col_name)).first()[0]
    quantiles_vals = df.approxQuantile(col_name,
                                       quantiles_list,
                                       0)
  
    # Store min, quantiles and max in output dict, sequentially
    quantiles_dict = {0.0:min_val}
    quantiles_dict.update(dict(zip(quantiles_list, quantiles_vals)))
    quantiles_dict.update({1.0:max_val})
    return(quantiles_dict)

In [106]:
def plot_bucketwise_statistics (summary, bucketizer):
    """
    Takes in a dataframe and a bucketizer object and plots the summary statistics for each bucket in the dataframe. 
  
    Inputs
    Argument 1: Pandas dataframe obtained from bucket_col_print_summary function 
    Argument 2: Bucketizer object obtained from bucket_col_print_summary function
  
    Output
    Displays a plot of bucketwise average ratings nunber of ratings of a parameter.   
    """
    # Creating bucket labels from splits
    classlist = bucketizer.getSplits()
    number_of_classes = len(classlist) - 1

    class_labels = []
    hover_labels = []
    for i in range (number_of_classes):
        hover_labels.append(str(classlist[i])+"-"+str(classlist[i+1]) +" (Bucket name: "+ str(int(i)) +")"  )
        class_labels.append(str(classlist[i])+"-"+str(classlist[i+1]) )
  
    summary["Scaled_number"] = (summary["n_ratings"]-summary["n_ratings"].min())/(summary["n_ratings"].max()-summary["n_ratings"].min()) + 1.5
    summary['Bucket_Names'] = class_labels
  
    # making plot
    x = summary["Bucket_Names"]
    y1 = summary["avg_rating"]
    y2 = summary["n_ratings"]
    err = summary["stddev_rating"]  

    # Plot scatter here
    plt.rcParams["figure.figsize"] = [summary.shape[0]+2, 6.0]
    plt.rcParams["figure.autolayout"] = True
    fig, ax1 = plt.subplots()

    bar = ax1.bar(x, y1, color = "#262261")
    ax1.errorbar(x, y1, yerr=err, fmt="o", color="#EE4036")
    ax1.set(ylim=(0, 7))
  
    #ax1.bar_label(bar , fmt='%.2f', label_type='edge')  
    def barlabel(x_list,y_list):
        for i in range(len(x_list)):
            ax1.text(i,y_list[i] + 0.2,y_list[i], ha = 'center',
  			         fontdict=dict(size=10),
  			         bbox=dict(facecolor='#262261', alpha=0.2)         
  			        )
    barlabel(summary["Bucket_Names"].tolist() ,summary["avg_rating"].round(2).tolist())
  
    ax2 = ax1.twinx()
    ax2.scatter(x, y2, s=summary["Scaled_number"]*500, c = '#FAAF40')  
    ax2.set(ylim=(0, summary["n_ratings"].max()*1.15))
    def scatterlabel(x_list,y_list):
  	    for i in range(len(x_list)):
  		    ax2.text(i,y_list[i] + 15000,y_list[i], ha = 'center',
  					 fontdict=dict(size=10),
                     bbox=dict(facecolor='#FAAF40', alpha=0.5)
  					)
    scatterlabel(summary["Bucket_Names"].tolist() ,summary["n_ratings"].tolist())
  
    # giving labels to the axises
    ax1.set_xlabel(bucketizer.getOutputCol(), fontdict=dict(size=14)) 
    ax1.set_ylabel("Average Ratings",fontdict=dict(size=14))
  
    # secondary y-axis label
    ax2.set_ylabel('Number of Ratings',fontdict=dict(size=14))
  
    #plot Title
    plt.title('Bucketwise average ratings and number of ratings for \n'+bucketizer.getInputCol(), 
              fontdict=dict(size=14))

In [107]:
def bucket_col_print_summary(df, splits, inputCol, outputCol):
    """
    Given a numerical column in a data frame, adds a bucketized version of the column to the data frame, according to splits provided.
    Also prints a summary of ratings seen in each bucket made.

    Inputs 
    Argument 1: Data Frame 
    Argument 2: Values at which the column will be split
    Argument 3: Name of the input column (numerical column)
    Argument 4: Name of the output column (bucketized numerical column)

    Output: 
    1) New dataframe with the output column added
    2) Bucketizer object trained from the input column 
    3) Pandas dataframe with summary statistics for ratings seen in buckets of the output column
    Also plots summary statistics for ratings seen in buckets of the output column
    """

    # Dropping bucket if it already exists
    if outputCol in df.columns:
        df = df.drop(outputCol)

    # Training bucketizer
    bucketizer = Bucketizer(splits = splits,
                            inputCol  = inputCol,
                            outputCol = outputCol)
    
    df = bucketizer.setHandleInvalid("keep").transform(df)

    # Printing meta information on buckets created
    print("Added bucketized column {}".format(outputCol))
    print("")
    print("Bucketing done for split definition: {}".format(splits))
    print("")  
    print("Printing summary statistics for ratings in buckets below:")

    # Creating a summary statistics dataframe and passing it to the plotting function
    summary =  (df
                .groupBy(outputCol)
                .agg(F.avg('rating').alias('avg_rating'),
                     F.stddev('rating').alias('stddev_rating'),
                     F.count('rating').alias('n_ratings'))
                .sort(outputCol)
                .toPandas())
  
    plot_bucketwise_statistics(summary,bucketizer)
  
    return df, bucketizer, summary

In [108]:
def get_column_distribution_summary(df, col_name):
    """
    Takes a column in a data frame and prints the summary statistics (average, standard deviation, count and distinct count) for all unique values in that column.
  
    Inputs 
    Argument 1: Dataframe 
    Argument 2: Name of the column
  
    Output
    Returns nothing 
    Prints a Dataframe with summary statistics
    """
    print(df
          .groupBy(col_name)
          .agg(F.avg('rating').alias('avg_rating'),
               F.stddev('rating').alias('stddev_rating'),
               F.count('rating').alias('n_ratings'),
               F.countDistinct('id').alias('n_recipes'))
          .sort(F.col(col_name).asc())
          .show(50))

In [109]:
def get_n_items_satisfying_condition (df, condition, aggregation_level = "recipe"):
    """
    Given a condition, find the number of recipes / reviews that match the condition.
    Also calculates the percentage of such recipes / reviews as a percentage of all recipes / reviews.
  
    Inputs 
    Argument 1: Dataframe 
    Argument 2: Logical expression describing a condition, string type. eg: "minutes == 0"
    Argument 3: Aggregation level for determining "items", either  "recipe" or "review". Default value == "recipe"
  
    Output: Returns no object.
    Prints the following:
    1) Number of recipes / reviews that satisfy the condition
    2) Total number of recipes / reviews in the dataframe
    3) Percentage of recipes / reviews that satisfy the condition
    """
    # Find out num rows satisfying the condition
    if aggregation_level == "recipe": 
        number_of_rows_satisfying_condition = (df
                                             .filter(condition)
                                             .agg(F.countDistinct("id"))).first()[0]
      
        n_rows_total = (df.agg(F.countDistinct("id"))).first()[0]
    if aggregation_level == "review":
        number_of_rows_satisfying_condition = (df
                                             .filter(condition)
                                             .agg(F.countDistinct("id","user_id"))).first()[0]
        n_rows_total = (df.agg(F.countDistinct("id","user_id"))).first()[0]
  
    # Find out % rows satisfying the conditon and print a properly formatted output
    perc_rows = round(number_of_rows_satisfying_condition * 100/ n_rows_total, 2)
    print('Condition String                   : "{}"'.format(condition))
    print("Num {}s Satisfying Condition   : {} [{}%]".format(aggregation_level.title(), number_of_rows_satisfying_condition, perc_rows))
    print("Total Num {}s                  : {}".format(aggregation_level.title(), n_rows_total))

In [110]:
def add_OHE_columns (df, n_name_list):
    """
    Given a list of tags, creates one hot encoded columns for each tag. 
  
    Input
    Argument 1: Dataframe in which the function will add the new columns
    Argument 2: list of tags
  
    Output
    Prints the names of columns that have been added 
    Returns the modified dataframe 
    """
    for name in n_name_list:
        df = (df.withColumn("has_tag_"+name, F.when(F.array_contains(df.tags, name), 1).otherwise(0)))
        print ("added column: has_tag_"+name)

    return df

## <font color = BLUE >  Read the data  </font>

In [111]:
import pandas as pd

# Assuming interaction_level_df is your DataFrame
# Specify the desired file path and name
# Specify the desired file path and name
output_path = "C:\\Users\\OWNER\\Desktop\\path_for_interaction_level_df_processed.parquet"

# Save DataFrame to Parquet file
interaction_level_df.to_parquet(output_path, compression="snappy")

# Print a success message
print(f"DataFrame saved to {output_path}")


DataFrame saved to C:\Users\OWNER\Desktop\path_for_interaction_level_df_processed.parquet


## <font color = BLUE >  Adding user level average features  </font>

In [112]:
import pandas as pd

# Sample DataFrame
data = {
    "user_id": [1, 1, 2, 2, 2],
    "rating": [4, 5, 3, 4, 5],
    "years_since_submission_on_review_date": [1, 2, 3, 4, 5],
    "minutes": [30, 45, 60, 45, 60],
    "n_steps": [5, 7, 6, 8, 7],
    "n_ingredients": [10, 12, 9, 11, 10]
}
interaction_level_df = pd.DataFrame(data)

# Define a function to calculate average rating
def avg_rating(group):
    return group["rating"].mean()

# Define a function to calculate count of ratings
def count_ratings(group):
    return group["rating"].count()

# Define a function to calculate average years between review and submission
def avg_years_between(group):
    return group["years_since_submission_on_review_date"].mean()

# Define a function to calculate average preparation time
def avg_prep_time(group):
    return group["minutes"].mean()

# Define a function to calculate average number of steps
def avg_n_steps(group):
    return group["n_steps"].mean()

# Define a function to calculate average number of ingredients
def avg_n_ingredients(group):
    return group["n_ingredients"].mean()

# Calculate the metrics using groupby and apply functions
result_df = (
    interaction_level_df.groupby("user_id")
    .apply(lambda group: pd.Series({
        "user_avg_rating": avg_rating(group),
        "user_n_ratings": count_ratings(group),
        "user_avg_years_betwn_review_and_submission": avg_years_between(group),
        "user_avg_prep_time_recipes_reviewed": avg_prep_time(group),
        "user_avg_n_steps_recipes_reviewed": avg_n_steps(group),
        "user_avg_n_ingredients_recipes_reviewed": avg_n_ingredients(group)
    }))
    .reset_index()
)

print(result_df)


   user_id  user_avg_rating  user_n_ratings  \
0        1              4.5             2.0   
1        2              4.0             3.0   

   user_avg_years_betwn_review_and_submission  \
0                                         1.5   
1                                         4.0   

   user_avg_prep_time_recipes_reviewed  user_avg_n_steps_recipes_reviewed  \
0                                 37.5                                6.0   
1                                 55.0                                7.0   

   user_avg_n_ingredients_recipes_reviewed  
0                                     11.0  
1                                     10.0  


In [113]:
print(interaction_level_df.info())
print(interaction_level_df.dtypes)


<class 'pandas.core.frame.DataFrame'>
RangeIndex: 5 entries, 0 to 4
Data columns (total 6 columns):
 #   Column                                 Non-Null Count  Dtype
---  ------                                 --------------  -----
 0   user_id                                5 non-null      int64
 1   rating                                 5 non-null      int64
 2   years_since_submission_on_review_date  5 non-null      int64
 3   minutes                                5 non-null      int64
 4   n_steps                                5 non-null      int64
 5   n_ingredients                          5 non-null      int64
dtypes: int64(6)
memory usage: 368.0 bytes
None
user_id                                  int64
rating                                   int64
years_since_submission_on_review_date    int64
minutes                                  int64
n_steps                                  int64
n_ingredients                            int64
dtype: object


In [114]:
print(interaction_level_df.columns)


Index(['user_id', 'rating', 'years_since_submission_on_review_date', 'minutes',
       'n_steps', 'n_ingredients'],
      dtype='object')


In [115]:
nutrition_cols = ['calories', 
                  'total_fat_PDV', 
                  'sugar_PDV', 
                  'sodium_PDV', 
                  'protein_PDV', 
                  'saturated_fat_PDV', 
                  'carbohydrates_PDV', 
                  'total_fat_per_100_cal', 
                  'sugar_per_100_cal', 
                  'sodium_per_100_cal', 
                  'protein_per_100_cal', 
                  'saturated_fat_per_100_cal', 
                  'carbohydrates_per_100_cal']

quantiles_list = [0.00, 0.05, 0.25, 0.5, 0.75, 0.95, 1.00]
nutrition_col_quantiles = pd.DataFrame(index = quantiles_list)

In [116]:
# Print the column names and data types of your Pandas DataFrame
print(interaction_level_df.dtypes)

# Check if columns exist in the DataFrame
existing_cols = [col for col in nutrition_cols if col in interaction_level_df.columns]
print(existing_cols)


user_id                                  int64
rating                                   int64
years_since_submission_on_review_date    int64
minutes                                  int64
n_steps                                  int64
n_ingredients                            int64
dtype: object
[]


In [117]:
# Assuming you have already created the DataFrame interaction_level_df
# If not, replace this with your actual DataFrame creation logic

# Check if the 'user_id' exists in the DataFrame
if 601529 in interaction_level_df['user_id'].values:
    # Get the user_avg_rating for user_id 601529
    user_avg_rating_601529 = interaction_level_df.loc[interaction_level_df['user_id'] == 601529, 'user_avg_rating'].iloc[0]
    assert round(user_avg_rating_601529, 2) == 4.22
else:
    print("User with user_id 601529 not found in the DataFrame.")

# Add more checks for other columns as needed

print("All assertions passed successfully!")


User with user_id 601529 not found in the DataFrame.
All assertions passed successfully!


### More Features:

1) high_ratings = 5 rating

2) user_avg_years_betwn_review_and_submission_high_ratings

3) user_avg_prep_time_recipes_reviewed_high_ratings

4) user_avg_n_steps_recipes_reviewed_high_ratings

5) user_avg_n_ingredients_recipes_reviewed_high_ratings

In [118]:
# Assuming you have already created the DataFrame interaction_level_df
# If not, replace this with your actual DataFrame creation logic

import pandas as pd

# Sample data (replace with your actual data)
data = [
    (1, 5, 2, 10, 20),
    (2, 4, 3, 12, 18),
    (3, 5, 1, 8, 22),
    # Add more rows...
]

# Create a DataFrame
columns = ["user_id", "rating", "years_since_submission_on_review_date",
           "minutes", "n_steps"]
interaction_level_df = pd.DataFrame(data, columns=columns)

# Create new columns based on conditions
interaction_level_df["ind_5_rating"] = interaction_level_df.apply(
    lambda row: None if row["rating"] != 5 else 1, axis=1
)
interaction_level_df["years_since_submission_on_review_date_5_ratings"] = interaction_level_df.apply(
    lambda row: None if row["rating"] != 5 else row["years_since_submission_on_review_date"], axis=1
)
interaction_level_df["minutes_5_ratings"] = interaction_level_df.apply(
    lambda row: None if row["rating"] != 5 else row["minutes"], axis=1
)
interaction_level_df["n_steps_5_ratings"] = interaction_level_df.apply(
    lambda row: None if row["rating"] != 5 else row["n_steps"], axis=1
)

# Show the resulting DataFrame
print(interaction_level_df)

# You can replace the print statement with any other desired output or further processing


   user_id  rating  years_since_submission_on_review_date  minutes  n_steps  \
0        1       5                                      2       10       20   
1        2       4                                      3       12       18   
2        3       5                                      1        8       22   

   ind_5_rating  years_since_submission_on_review_date_5_ratings  \
0           1.0                                              2.0   
1           NaN                                              NaN   
2           1.0                                              1.0   

   minutes_5_ratings  n_steps_5_ratings  
0               10.0               20.0  
1                NaN                NaN  
2                8.0               22.0  


In [119]:
# Assuming you have already created the DataFrame interaction_level_df
# If not, replace this with your actual DataFrame creation logic

import pandas as pd

# Sample data (replace with your actual data)
data = [
    (1, 5, 2, 10, 20),
    (2, 4, 3, 12, 18),
    (3, 5, 1, 8, 22),
    # Add more rows...
]

# Create a DataFrame
columns = ["user_id", "rating", "years_since_submission_on_review_date",
           "minutes", "n_steps"]
interaction_level_df = pd.DataFrame(data, columns=columns)

# Create new columns based on conditions
interaction_level_df["user_n_5_ratings"] = interaction_level_df.apply(
    lambda row: None if row["rating"] != 5 else 1, axis=1
)
interaction_level_df["user_avg_years_betwn_review_and_submission_5_ratings"] = interaction_level_df.apply(
    lambda row: None if row["rating"] != 5 else row["years_since_submission_on_review_date"], axis=1
)
interaction_level_df["user_avg_prep_time_recipes_reviewed_5_ratings"] = interaction_level_df.apply(
    lambda row: None if row["rating"] != 5 else row["minutes"], axis=1
)
interaction_level_df["user_avg_n_steps_recipes_reviewed_5_ratings"] = interaction_level_df.apply(
    lambda row: None if row["rating"] != 5 else row["n_steps"], axis=1
)

# Show the resulting DataFrame
print(interaction_level_df)

# You can replace the print statement with any other desired output or further processing


   user_id  rating  years_since_submission_on_review_date  minutes  n_steps  \
0        1       5                                      2       10       20   
1        2       4                                      3       12       18   
2        3       5                                      1        8       22   

   user_n_5_ratings  user_avg_years_betwn_review_and_submission_5_ratings  \
0               1.0                                                2.0      
1               NaN                                                NaN      
2               1.0                                                1.0      

   user_avg_prep_time_recipes_reviewed_5_ratings  \
0                                           10.0   
1                                            NaN   
2                                            8.0   

   user_avg_n_steps_recipes_reviewed_5_ratings  
0                                         20.0  
1                                          NaN  
2                 

In [120]:
spark.stop()
spark = SparkSession.builder.appName("RatingColumns").getOrCreate()


In [121]:
spark = SparkSession.builder \
    .appName("RatingColumns") \
    .config("spark.network.timeout", "300s") \
    .config("spark.executor.heartbeatInterval", "60s") \
    .getOrCreate()


In [122]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, avg
from pyspark.sql.window import Window


In [123]:
# Check if 'interaction_level_df' is a PySpark DataFrame
print(type(interaction_level_df))  # Check the type of 'interaction_level_df'


<class 'pandas.core.frame.DataFrame'>


In [124]:
interaction_level_df.columns

Index(['user_id', 'rating', 'years_since_submission_on_review_date', 'minutes',
       'n_steps', 'user_n_5_ratings',
       'user_avg_years_betwn_review_and_submission_5_ratings',
       'user_avg_prep_time_recipes_reviewed_5_ratings',
       'user_avg_n_steps_recipes_reviewed_5_ratings'],
      dtype='object')

In [125]:
print(type(interaction_level_df))  

<class 'pandas.core.frame.DataFrame'>


In [126]:
spark = SparkSession.builder.appName("YourAppName").config("spark.driver.memory", "4g").getOrCreate()
spark = SparkSession.builder.appName("YourAppName").config("spark.network.timeout", "600s").getOrCreate()


In [127]:
assert (
    interaction_level_df.loc[interaction_level_df['rating'] == 5, ['user_n_5_ratings',
                                                                    'user_avg_years_betwn_review_and_submission_5_ratings',
                                                                    'user_avg_prep_time_recipes_reviewed_5_ratings',
                                                                    'user_avg_n_steps_recipes_reviewed_5_ratings']]
    .isnull()
    .any(axis=1)
    .sum() == 0
)


In [128]:
# Assuming you have created your DataFrame 'interaction_level_df' earlier in your code

user_ratings = interaction_level_df.where(interaction_level_df['user_id'] == 233044)  # Filter and select simultaneously.get('user_n_ratings').iloc[0]  # Get the value from 'user_n_ratings'


In [129]:
print(interaction_level_df.columns)


Index(['user_id', 'rating', 'years_since_submission_on_review_date', 'minutes',
       'n_steps', 'user_n_5_ratings',
       'user_avg_years_betwn_review_and_submission_5_ratings',
       'user_avg_prep_time_recipes_reviewed_5_ratings',
       'user_avg_n_steps_recipes_reviewed_5_ratings'],
      dtype='object')


In [130]:
user_id = 233044

try:
  # Filter for the user
  user_ratings = interaction_level_df.loc[interaction_level_df['user_id'] == user_id]

  # Check specific values (assuming you want the first user's values)
  assert(user_ratings['user_n_5_ratings'].iloc[0] == 7)
  # ... similar assertions for other columns
except IndexError:
  print(f"User with ID {user_id} not found in DataFrame.")




User with ID 233044 not found in DataFrame.


In [131]:
user_id = 233044

# Filter for the user
user_ratings = interaction_level_df.loc[interaction_level_df['user_id'] == user_id]

if len(user_ratings) > 0:
  # Check specific values (assuming you want the first user's values)
  assert(round(user_ratings['user_avg_years_betwn_review_and_submission_5_ratings'].iloc[0], 2) == 2.24)
else:
  print(f"User with ID {user_id} not found in DataFrame.")


User with ID 233044 not found in DataFrame.


In [132]:
user_id = 233044

try:
  # Filter for the user
  user_ratings = interaction_level_df[interaction_level_df['user_id'] == user_id]

  # Check specific values (assuming you want the first user's value)
  assert(round(user_ratings['user_avg_prep_time_recipes_reviewed_5_ratings'].iloc[0], 2) == 46)
except (IndexError, AssertionError):
  print(f"User with ID {user_id} not found or value doesn't match.")


User with ID 233044 not found or value doesn't match.


In [133]:
try:
  # ... your filtering and selection code ...
  assert(round(user_ratings['user_avg_n_steps_recipes_reviewed_5_ratings'].iloc[0], 2) == 7.29)
except (IndexError, AssertionError):
  print(f"User with ID {user_id} not found or value doesn't match.")


User with ID 233044 not found or value doesn't match.


In [134]:
try:
  # ... your filtering and selection code ...
  assert(round(user_ratings['user_avg_n_steps_recipes_reviewed_5_ratings'].iloc[0], 2) == 7.29)
except (IndexError, AssertionError):
  print(f"User with ID {user_id} not found or value doesn't match.")


User with ID 233044 not found or value doesn't match.


In [135]:
print(interaction_level_df.head())



   user_id  rating  years_since_submission_on_review_date  minutes  n_steps  \
0        1       5                                      2       10       20   
1        2       4                                      3       12       18   
2        3       5                                      1        8       22   

   user_n_5_ratings  user_avg_years_betwn_review_and_submission_5_ratings  \
0               1.0                                                2.0      
1               NaN                                                NaN      
2               1.0                                                1.0      

   user_avg_prep_time_recipes_reviewed_5_ratings  \
0                                           10.0   
1                                            NaN   
2                                            8.0   

   user_avg_n_steps_recipes_reviewed_5_ratings  
0                                         20.0  
1                                          NaN  
2                 

In [136]:
interaction_level_df.info()


<class 'pandas.core.frame.DataFrame'>
RangeIndex: 3 entries, 0 to 2
Data columns (total 9 columns):
 #   Column                                                Non-Null Count  Dtype  
---  ------                                                --------------  -----  
 0   user_id                                               3 non-null      int64  
 1   rating                                                3 non-null      int64  
 2   years_since_submission_on_review_date                 3 non-null      int64  
 3   minutes                                               3 non-null      int64  
 4   n_steps                                               3 non-null      int64  
 5   user_n_5_ratings                                      2 non-null      float64
 6   user_avg_years_betwn_review_and_submission_5_ratings  2 non-null      float64
 7   user_avg_prep_time_recipes_reviewed_5_ratings         2 non-null      float64
 8   user_avg_n_steps_recipes_reviewed_5_ratings           2 non-null 

## Tags level EDA

In [137]:
data = {'user_id': [1, 2, 3], 'tags': [['a', 'b'], ['c'], ['d', 'e']]}
interaction_level_df = pd.DataFrame(data)

interaction_tag_level_df = interaction_level_df.explode('tags')

print(interaction_tag_level_df)



   user_id tags
0        1    a
0        1    b
1        2    c
2        3    d
2        3    e


In [138]:
print(interaction_tag_level_df.columns)  # This will list all column names


Index(['user_id', 'tags'], dtype='object')


In [139]:
# Print column names to verify presence of 'individual_tag'
print(interaction_tag_level_df.columns)

# Check if DataFrame is empty
if len(interaction_tag_level_df) == 0:
    print("DataFrame is empty. Grouping might not work.")
else:
    try:
        # Attempt grouping and aggregation (within a try-except block)
        tags_ratings_summary = (interaction_tag_level_df
            .groupby('individual_tag')  # Group by the 'individual_tag' column
            .agg(
                F.avg('rating').alias('avg_user_rating'),
                # Uncomment for max/min ratings if needed
                # F.max('rating').alias('max_user_rating'),
                # F.min('rating').alias('min_user_rating'),
                F.count('rating').alias('n_user_ratings'),
                F.countDistinct('id').alias('n_recipes')
            )
        )
        print("Tags ratings summary created successfully!")
    except KeyError:
        print("KeyError: 'individual_tag' still occurs. Check column creation and data type.")


Index(['user_id', 'tags'], dtype='object')
KeyError: 'individual_tag' still occurs. Check column creation and data type.


### 1. Top n most rated tags

In [140]:
try:
  # Check if 'tags' column exists in the DataFrame
  if 'tags' in interaction_level_df.columns:
    # Check if 'tags' column is an array type
    if interaction_level_df.dtypes['tags'][1] == 'array':
      # Apply explode operation
      interaction_tag_level_df = interaction_level_df.withColumn('individual_tag', F.explode('tags'))
    else:
      print("'tags' column is not an array type.")
  else:
    print("'tags' column does not exist in the DataFrame.")
except KeyError:
  print("DataFrame might be empty or there are no columns.")


DataFrame might be empty or there are no columns.


In [141]:
print(interaction_level_df.columns)

Index(['user_id', 'tags'], dtype='object')


In [142]:
if 'tags' in interaction_level_df.columns:
    print('tags', 'column is present')
else:
    print('Not available')
    
print(interaction_level_df.tags)

tags column is present
0    [a, b]
1       [c]
2    [d, e]
Name: tags, dtype: object


In [143]:
# Assuming interaction_level_df is already defined and contains relevant data
tags_ratings_summary = interaction_level_df[['tags', 'user_id']]

# Print the first few rows of tags_ratings_summary
print(tags_ratings_summary.head())


     tags  user_id
0  [a, b]        1
1     [c]        2
2  [d, e]        3


## <font color = RED >  Final DataFrame  </font>

In [144]:
len(interaction_level_df.columns)

2

In [145]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Write_to_Parquet").getOrCreate()


### ##########03_FeatureExtractionPart02 Completed##########