In [9]:
import sqlite3
import os
from deltalake import DeltaTable
from deltalake.writer import write_deltalake
import numpy as np
import pandas as pd
conn = sqlite3.connect("database.sqlite")


In [22]:
user_df = pd.read_sql_query("select * from user;", conn)
grade_df = pd.read_sql_query("select * from grade;", conn)
method_df = pd.read_sql_query("select * from method;", conn)
ascent_df = pd.read_sql_query("select * from ascent;", conn)

In [23]:
# Write bronze data
write_deltalake("climbing_project/data/bronze/user", user_df, mode="overwrite")
write_deltalake("climbing_project/data/bronze/grade", grade_df, mode="overwrite")
write_deltalake("climbing_project/data/bronze/method", method_df, mode="overwrite")
write_deltalake("climbing_project/data/bronze/ascent", ascent_df, mode="overwrite")

In [29]:
# Read bronze data
fact_user_bronze_df = DeltaTable("climbing_project/data/bronze/user").to_pandas()
fact_ascent_bronze_df = DeltaTable("climbing_project/data/bronze/ascent").to_pandas()
dim_grade_bronze_df = DeltaTable("climbing_project/data/bronze/grade").to_pandas()
dim_method_bronze_df = DeltaTable("climbing_project/data/bronze/method").to_pandas()


In [None]:
# Apply transformations for silver table

In [69]:
# METHOD rename column name to climb_type
dim_method_silver_df = dim_method_bronze_df.rename(columns={'shorthand':'climb_type'})

In [70]:
# GRADE
dim_grade_columns = ['id', 'score', 'fra_routes', 'fra_boulders']
dim_grade_silver_df = dim_grade_bronze_df[dim_grade_columns]

In [71]:
# USER
fact_user_silver_df = fact_user_bronze_df[fact_user_bronze_df['deactivated'] == 0]
user_silver_columns = ['id', 'country', 'sex', 'height', 'weight', 'started']
fact_user_silver_df = fact_user_silver_df[user_silver_columns]

In [90]:
# ASCENT Add column to convert unix time to timestamp, select relevant columns and filter to year between 1980 and 2017
fact_ascent_bronze_df['datetime'] = pd.to_datetime(fact_ascent_bronze_df['date'], unit='s')
ascent_silver_columns = ['id','user_id', 'grade_id', 'method_id', 'climb_type', 'year', 'chipped']
fact_ascent_silver_df = fact_ascent_bronze_df[ascent_silver_columns]
fact_ascent_silver_df = fact_ascent_silver_df[(fact_ascent_silver_df['year'] >= 1980) & (fact_ascent_silver_df['year'] <= 2017)]

# Join to grade and method tables
fact_ascent_silver_grade_df = pd.merge(fact_ascent_silver_df, dim_grade_silver_df, left_on='grade_id', right_on='id',suffixes=('_ascent', '_grade'))
fact_ascent_silver_final_df = pd.merge(fact_ascent_silver_grade_df, dim_method_silver_df, left_on='method_id', right_on='id', suffixes=('_ascent', '_method'))
fact_ascent_silver_final_columns = ['id_ascent', 'user_id', 'year', 'chipped', 'score_ascent', 'fra_routes', 'fra_boulders', 'climb_type_method']
fact_ascent_silver_final_df = fact_ascent_silver_final_df.dropna(subset=['fra_routes'])
fact_ascent_silver_final_df = fact_ascent_silver_final_df[fact_ascent_silver_final_columns]

In [91]:
# Write silver tables to delta lake
write_deltalake("climbing_project/data/silver/user", fact_user_silver_df, mode="overwrite")
write_deltalake("climbing_project/data/silver/grade", dim_grade_silver_df, mode="overwrite")
write_deltalake("climbing_project/data/silver/method", dim_method_silver_df, mode="overwrite")
write_deltalake("climbing_project/data/silver/ascent", fact_ascent_silver_final_df, mode="overwrite")

In [92]:
# Read silver tables
fact_user_silver_df_2 = DeltaTable("climbing_project/data/silver/user").to_pandas()
fact_ascent_silver_df_2 = DeltaTable("climbing_project/data/silver/ascent").to_pandas()


In [115]:
# Create gold tables

# Average height by grade
merged_df = fact_user_silver_df_2.merge(fact_ascent_silver_df_2, left_on='id', right_on='user_id')
avg_height_by_grade_df = merged_df.groupby(['fra_routes'])['height'].mean().reset_index()

# Users grouped y countries
users_grouped_by_country_df = fact_user_silver_df_2['country'].value_counts().reset_index()

In [116]:
# Write gold tables to delta lake
write_deltalake("climbing_project/data/gold/avg_height_by_grade_year", avg_height_by_grade_df, mode="overwrite")
write_deltalake("climbing_project/data/gold/users_grouped_by_country", users_grouped_by_country_df, mode="overwrite")


In [114]:
users_grouped_by_country_df

country
USA    11471
ESP     6590
DEU     3505
SWE     3434
ITA     3380
       ...  
AIA        1
MCO        1
FJI        1
BFA        1
SGS        1
Name: count, Length: 212, dtype: int64