In [1]:
import pandas as pd

In [2]:
df_recipes = pd.read_csv('../../Datasets/RAW_recipes.csv')
df_interactions = pd.read_csv('../../Datasets/RAW_interactions.csv')

In [3]:
df_grouped = df_interactions.groupby('recipe_id', group_keys=True)
df_ratings = df_grouped.agg({
    'recipe_id':'last',
    'rating':'mean'
})
df_ratings = df_ratings.reset_index(drop=True)
df_ratings = df_ratings.rename(columns={"recipe_id":"id", "rating":"score"})
df_merged = pd.merge(df_recipes, df_ratings, on='id')
df_merged['difficulty'] = df_merged.n_steps * df_merged.minutes
bins = [df_merged['difficulty'].min(), 150, 350, 750, df_merged['difficulty'].max()]
labels = ['easy', 'medium', 'hard', 'very hard']
df_merged['difficulty'] = pd.cut(df_merged['difficulty'], bins=bins, labels=labels)
df_merged = df_merged.drop(df_merged.index[[3381]]) #Error in line formatting

In [16]:
import re
from cassandra.cluster import Cluster
from cassandra.concurrent import execute_concurrent

cluster = Cluster(['127.0.0.1'])
session = cluster.connect()
 
set_mapping_table = str.maketrans({'[': '{', ']': '}'})
description_mapping_table = str.maketrans({"'": ""})
apostrophe_mapping_table = str.maketrans({"'": "''"})

query_insert_recipes_by_month_submitted = session.prepare("INSERT INTO recipe.recipes_by_month_submitted (month_submitted,score,id,name) VALUES (?,?,?,?)")
query_insert_recipes_by_difficulty = session.prepare("INSERT INTO recipe.recipes_by_difficulty (difficulty,score,id,name,date_submitted) VALUES (?,?,?,?,?)")
query_insert_recipes = session.prepare("INSERT INTO recipe.recipes (name,tags,date_submitted,score,id,minutes,contributor_id,nutrition,steps,number_of_steps,description,ingredients,number_of_ingredients,difficulty) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?)")
query_insert_recipes_by_tag = session.prepare("INSERT INTO recipe.recipes_by_tag (tag,date_submitted,score,id,name) VALUES (?,?,?,?,?)")
#insert into recipes_by_tag_by_score not needed since it is a materialized view, so it has the data from recipes_by_score

data_insert_recipes_by_month_submitted = []
data_insert_recipes_by_difficulty = []
data_insert_recipes = []
data_insert_recipes_by_tag = []

def convertToListOfFloat(text):
    result = []
    text = text.strip('[' + ']')
    for kv in re.split(',|\*|\n', text):
        kv = kv.strip()
        if(len(kv) == 0):
            continue
        result.append(float(kv))
    return result

def convertToSetOrListOfText(text, typeSelector):
    result = []
    if typeSelector == 'set':
        beginning = '{'
        end = '}'
        text = text.translate(set_mapping_table)
    else:
        beginning = '['
        end = ']'
    text = text.strip(beginning + end)
    for kv in re.split('\', |\", |\*|\n', text):
        kv = kv.strip()
        if(len(kv) == 0):
            continue
        elif(kv[0] == '\"'):
            kv = kv.strip("\"")
        elif(kv[0] == '\''):
            kv = kv.strip("\'")
        kv = kv.translate(apostrophe_mapping_table)
        if(result != ''):
            result.append(kv)
    return result
 
i = 0;
for index, row in df_merged.iterrows():
        tags = convertToSetOrListOfText(str(row['tags']), 'set')
        ingredients = convertToSetOrListOfText(str(row['ingredients']), 'set')
        steps = convertToSetOrListOfText(str(row['steps']), 'list')
        nutrition = convertToListOfFloat(str(row['nutrition']))
        data_insert_recipes_by_month_submitted.append((query_insert_recipes_by_month_submitted, (str(row['submitted'][:-3]),row['score'],row['id'],str(row['name']))))
        data_insert_recipes_by_difficulty.append((query_insert_recipes_by_difficulty, (str(row['difficulty']),row['score'],row['id'],str(row['name']),str(row['submitted']))))
        data_insert_recipes.append((query_insert_recipes, (str(row['name']),tags,str(row['submitted']),row['score'],row['id'],row['minutes'],row['contributor_id'],nutrition,steps,row['n_steps'],str(row['description']).translate(description_mapping_table),ingredients,row['n_ingredients'],str(row['difficulty']))))
        for tag in tags:
            if(tag != ''):
                data_insert_recipes_by_tag.append((query_insert_recipes_by_tag, (tag,str(row['submitted']),row['score'],row['id'],str(row['name']))))

In [19]:
import time

def split(a, n):
    k, m = divmod(len(a), n)
    return (a[i*k+min(i, m):(i+1)*k+min(i+1, m)] for i in range(n))

starting_time = time.time()
    
print("Inserting into recipe.recipes_by_month_submitted")
execute_concurrent(session, data_insert_recipes_by_month_submitted, raise_on_first_error=True)

print("Inserting into recipe.recipes_by_difficulty")
execute_concurrent(session, data_insert_recipes_by_difficulty, raise_on_first_error=True)

print("Inserting into recipe.recipes")
execute_concurrent(session, data_insert_recipes, raise_on_first_error=True)

#Splitting in order to reduce RAM consumption since execute_concurrent return values are garbage collected
i = 1
for data in list(split(data_insert_recipes_by_tag, 10)):
    print(f"Inserting into recipe.recipes_by_tag_{i}")
    execute_concurrent(session, data, raise_on_first_error=True)
    i+=1

#print("Inserting into recipe.recipes_by_tag_2")
#execute_concurrent(session, data_insert_recipes_by_tag_2, raise_on_first_error=True)

elapsed_time = time.time() - starting_time
print('Execution time:', time.strftime("%H:%M:%S", time.gmtime(elapsed_time)))
    
cluster.shutdown()

Inserting into recipe.recipes_by_month_submitted
Inserting into recipe.recipes_by_difficulty
Inserting into recipe.recipes
Inserting into recipe.recipes_by_tag_1
Inserting into recipe.recipes_by_tag_2
Inserting into recipe.recipes_by_tag_3
Inserting into recipe.recipes_by_tag_4
Inserting into recipe.recipes_by_tag_5
Inserting into recipe.recipes_by_tag_6
Inserting into recipe.recipes_by_tag_7
Inserting into recipe.recipes_by_tag_8
Inserting into recipe.recipes_by_tag_9
Inserting into recipe.recipes_by_tag_10
Execution time: 00:16:46
