In [1]:
import duckdb

In [2]:
movies_details = duckdb.sql("select * from read_parquet('data/transformed/movies_details')")
movies_revenue = duckdb.sql("""select * from read_parquet('data/transformed/**/*.parquet')""")
duckdb.sql("""SET memory_limit = '16GB';
            SET temp_directory = '/temp_dir.tmp/';""")

In [4]:
dim_genre = duckdb.sql("""select genre, row_number() over() as genre_id
                        from (select distinct trim(unnest(genre)) as genre,  from movies_details)""")

genre_groups = duckdb.sql("select distinct list_sort(genre) as genre_group, row_number() over() as genre_group_id from movies_details")
genre_groups_unnested = duckdb.sql("select trim(unnest(genre_group)) as genre, genre_group_id, genre_group from genre_groups")
genre_bridge = duckdb.sql("""select genre_group_id,
                            genre_id,
                            array_to_string(genre_group, '#') as genre_group
                            from genre_groups_unnested
                            LEFT JOIN dim_genre using (genre)""")

dim_genre_group = duckdb.sql("select distinct genre_group_id from genre_groups")

duckdb.sql("""
    COPY 
        dim_genre_group 
    TO 'data/aggregated/dim_genre_group' 
    (FORMAT PARQUET, overwrite_or_ignore, COMPRESSION GZIP)""")
duckdb.sql("""
    COPY 
        genre_bridge 
    TO 'data/aggregated/genre_bridge' 
    (FORMAT PARQUET, overwrite_or_ignore, COMPRESSION GZIP)""")
duckdb.sql("""
    COPY 
        dim_genre 
    TO 'data/aggregated/dim_genre' 
    (FORMAT PARQUET, overwrite_or_ignore, COMPRESSION GZIP)""")

In [None]:
dim_actor = duckdb.sql("""select actor, row_number() over() as actor_id
                        from (select distinct trim(unnest(Actors)) as actor from movies_details)""")

actor_groups = duckdb.sql("select distinct list_sort(Actors) as actor_group, row_number() over() as actor_group_id from movies_details")
actor_groups_unnested = duckdb.sql("select trim(unnest(actor_group)) as actor, actor_group_id, actor_group from actor_groups")
actor_bridge = duckdb.sql("""select actor_group_id,
                            actor_id,
                            array_to_string(actor_group, '#') as actor_group
                            from actor_groups_unnested
                            LEFT JOIN dim_actor using (actor)""")

dim_actor_group = duckdb.sql("select distinct actor_group_id from actor_groups")

duckdb.sql("""
    COPY 
        dim_actor_group 
    TO 'data/aggregated/dim_actor_group' 
    (FORMAT PARQUET, overwrite_or_ignore, COMPRESSION GZIP)""")
duckdb.sql("""
    COPY 
        actor_bridge 
    TO 'data/aggregated/actor_bridge' 
    (FORMAT PARQUET, overwrite_or_ignore, COMPRESSION GZIP)""")
duckdb.sql("""
    COPY 
        dim_actor 
    TO 'data/aggregated/dim_actor' 
    (FORMAT PARQUET, overwrite_or_ignore, COMPRESSION GZIP)""")

In [5]:
dim_language = duckdb.sql("""select language, row_number() over() as language_id
                        from (select distinct trim(unnest(Language)) as language from movies_details)""")

language_groups = duckdb.sql("select distinct list_sort(Language) as language_group, row_number() over() as language_group_id from movies_details")
language_groups_unnested = duckdb.sql("select trim(unnest(language_group)) as language, language_group_id, language_group from language_groups")
language_bridge = duckdb.sql("""select language_group_id,
                            language_id,
                            array_to_string(language_group, '#') as language_group
                            from language_groups_unnested
                            LEFT JOIN dim_language using (language)""")


dim_language_group = duckdb.sql("select distinct language_group_id from language_groups")

duckdb.sql("""
    COPY 
        dim_language_group 
    TO 'data/aggregated/dim_language_group' 
    (FORMAT PARQUET, overwrite_or_ignore, COMPRESSION GZIP)""")
duckdb.sql("""
    COPY 
        language_bridge 
    TO 'data/aggregated/language_bridge' 
    (FORMAT PARQUET, overwrite_or_ignore, COMPRESSION GZIP)""")
duckdb.sql("""
    COPY 
        dim_language 
    TO 'data/aggregated/dim_language' 
    (FORMAT PARQUET, overwrite_or_ignore, COMPRESSION GZIP)""")

In [6]:
dim_director = duckdb.sql("""select Director, row_number() over() as director_id
                        from (select distinct trim(unnest(Director)) as director from movies_details)""")

director_groups = duckdb.sql("select distinct list_sort(Director) as director_group, row_number() over() as director_group_id from movies_details")
director_groups_unnested = duckdb.sql("select trim(unnest(director_group)) as director, director_group_id, director_group from director_groups")
director_bridge = duckdb.sql("""select director_group_id,
                            director_id,
                            array_to_string(director_group, '#') as director_group
                            from director_groups_unnested
                            LEFT JOIN dim_director using (director)""")

dim_director_group = duckdb.sql("select distinct director_group_id from director_groups")

duckdb.sql("""
    COPY 
        dim_director_group 
    TO 'data/aggregated/dim_director_group' 
    (FORMAT PARQUET, overwrite_or_ignore, COMPRESSION GZIP)""")
duckdb.sql("""
    COPY 
        director_bridge 
    TO 'data/aggregated/director_bridge' 
    (FORMAT PARQUET, overwrite_or_ignore, COMPRESSION GZIP)""")
duckdb.sql("""
    COPY 
        dim_director 
    TO 'data/aggregated/dim_director' 
    (FORMAT PARQUET, overwrite_or_ignore, COMPRESSION GZIP)""")

In [7]:
dim_writer = duckdb.sql("""select Writer, row_number() over() as writer_id
                        from (select distinct trim(unnest(Writer)) as writer from movies_details)""")

writer_groups = duckdb.sql("select distinct list_sort(writer) as writer_group, row_number() over() as writer_group_id from movies_details")
writer_groups_unnested = duckdb.sql("select trim(unnest(writer_group)) as writer, writer_group_id, writer_group from writer_groups")
writer_bridge = duckdb.sql("""select writer_group_id,
                            writer_id,
                            array_to_string(writer_group, '#') as writer_group
                            from writer_groups_unnested
                            LEFT JOIN dim_writer using (writer)""")
dim_writer_group = duckdb.sql("select distinct writer_group_id from writer_groups")


duckdb.sql("""
    COPY 
        writer_bridge 
    TO 'data/aggregated/writer_bridge' 
    (FORMAT PARQUET, overwrite_or_ignore, COMPRESSION GZIP)""")
duckdb.sql("""
    COPY 
        dim_writer 
    TO 'data/aggregated/dim_writer' 
    (FORMAT PARQUET, overwrite_or_ignore, COMPRESSION GZIP)""")
duckdb.sql("""
    COPY 
        dim_writer_group 
    TO 'data/aggregated/dim_writer_group' 
    (FORMAT PARQUET, overwrite_or_ignore, COMPRESSION GZIP)""")

In [8]:
dim_country = duckdb.sql("""select country, row_number() over() as country_id
                        from (select distinct trim(unnest(Country)) as country from movies_details)""")

country_groups = duckdb.sql("select distinct list_sort(Country) as country_group, row_number() over() as country_group_id from movies_details")
country_groups_unnested = duckdb.sql("select trim(unnest(country_group)) as country, country_group_id, country_group from country_groups")
country_bridge = duckdb.sql("""select 
                                country_group_id,
                                country_id,
                                array_to_string(country_group, '#') as country_group
                            from country_groups_unnested
                            LEFT JOIN dim_country using (country)""")
dim_country_group = duckdb.sql("select distinct country_group_id from country_groups")


duckdb.sql("""
    COPY 
        country_bridge 
    TO 'data/aggregated/country_bridge' 
    (FORMAT PARQUET, overwrite_or_ignore, COMPRESSION GZIP)""")
duckdb.sql("""
    COPY 
        dim_country 
    TO 'data/aggregated/dim_country' 
    (FORMAT PARQUET, overwrite_or_ignore, COMPRESSION GZIP)""")
duckdb.sql("""
    COPY 
        dim_country_group 
    TO 'data/aggregated/dim_country_group' 
    (FORMAT PARQUET, overwrite_or_ignore, COMPRESSION GZIP)""")

In [None]:
from_date = '1990-01-01'
to_date = '2030-01-01'

date_range = duckdb.sql(f"select CAST('{from_date}' AS date) as start_date, CAST('{to_date}' AS date) as end_date")
dates = duckdb.sql("select unnest(generate_series(start_date, end_date, interval '1 day')) as date from date_range")
dim_date = duckdb.sql(f"""
    SELECT
        date_diff('day', DATE '{from_date}', CAST(date AS date)) as date_id,
        CAST(date AS date) as DATE,
        monthname(date) as month_name,
        dayname(date) as day_name
    FROM dates
""")


duckdb.sql("""
    COPY 
        dim_date 
    TO 'data/aggregated/dim_date' 
    (FORMAT PARQUET, overwrite_or_ignore, COMPRESSION GZIP)""")

                      

In [24]:
dim_distributor = duckdb.sql("select distinct distributor, row_number() over() as distributor_id from movies_revenue")

duckdb.sql("""
    COPY  
        dim_distributor 
    TO 'data/aggregated/dim_distributor' 
    (FORMAT PARQUET, overwrite_or_ignore, COMPRESSION GZIP)""")

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

In [None]:
movierating_arrays_to_string = duckdb.sql("""
    SELECT *,
    array_to_string(Genre, '#') AS genre_group,
    array_to_string(Country, '#') AS country_group,
    array_to_string(Actors, '#') AS actor_group,
    array_to_string(Language, '#') AS language_group,
    array_to_string(Director, '#') AS director_group,
    array_to_string(Writer, '#') AS writer_group
    from movies_details movies
""")
fact_movie_raiting = duckdb.sql(""" 
    select 
        Title, 
        Rated,
        date_diff('day', DATE '1990-01-01', CAST(Released AS date)) as release_date_id,
        Runtime, Metascore, imdbRating, imdbVotes, BoxOffice,
        genre_group_id,
        actor_group_id,
        country_group_id,
        language_group_id,
        director_group_id,
        writer_group_id
    from movierating_arrays_to_string movies
    left join read_parquet('data/aggregated/genre_bridge') genre using (genre_group)
    left join read_parquet('data/aggregated/country_bridge') country using (country_group)
    left join read_parquet('data/aggregated/actor_bridge') actor using (actor_group)
    left join read_parquet('data/aggregated/language_bridge') language using (language_group)
    left join read_parquet('data/aggregated/director_bridge') director using (director_group)
    left join read_parquet('data/aggregated/writer_bridge') writer using (writer_group)
""")
duckdb.sql("""
    COPY 
        fact_movie_raiting 
    TO 'data/aggregated/fact_movie_raiting' 
    (FORMAT PARQUET, overwrite_or_ignore, COMPRESSION GZIP)""")

In [None]:
fact_movie_raiting_part_added = duckdb.sql("select *, replace(title, '/', '_') as p_title from read_parquet('data/aggregated/fact_movie_raiting')")
fact_movie_revenue = duckdb.sql(""" 
    select 
        id,
        date_diff('day', DATE '1990-01-01', CAST(date AS date)) as date_id,
        theaters,
        revenue,
        distributor_id,
        genre_group_id,
        actor_group_id,
        country_group_id,
        language_group_id,
        director_group_id,
        writer_group_id
    from movies_revenue revenue
    left join fact_movie_raiting_part_added raiting using (p_title)
    left join read_parquet('data/aggregated/dim_distributor') distributor using (distributor)
    
""")
duckdb.sql("""
    COPY 
        fact_movie_revenue 
    TO 'data/aggregated/fact_movie_revenue' 
    (FORMAT PARQUET, overwrite_or_ignore, COMPRESSION GZIP)""")

In [None]:
fact_movie_raiting.show()