In [1]:
import pandas as pd
import os
import io
import sys
import threading

src_dir = '../src'
if src_dir not in sys.path:
  sys.path.append(src_dir)
from postgres_functions import *


from dotenv import load_dotenv
load_dotenv()
import psycopg2
import pyarrow.parquet as pq
from parallel_process_functions import *
from sql_queries import *
from pyspark.sql import SparkSession

In [2]:
cur, conn = create_database()
drop_tables(cur, conn)
create_tables(cur, conn)
conn.close()

In [3]:
spark = SparkSession.builder \
  .appName("EtlPostgres") \
  .config("spark.jars", os.getcwd() + "/postgresql-42.7.1.jar") \
  .config("spark.driver.memory", "4g") \
  .config("spark.executor.memory", "4g") \
  .getOrCreate()

In [4]:
# parquet_path = os.path.join(os.getcwd(), 'tracks_data.parquet')
# df = read_parquet_to_df(parquet_path)
# df.head()
# insert_execute(df, insert_artists, conn, cur)
# insert_execute(df, insert_albums, conn, cur)
# insert_execute(df, insert_tracks, conn, cur)
# insert_execute(df, insert_playlists, conn, cur)
# insert_execute(df, insert_playlist_tracks, conn, cur)

In [5]:
parquet_path = os.path.join(os.getcwd(), 'tracks_data.parquet')
spark_df = read_parquet_to_spark_df(spark, parquet_path)
spark_df.show(5)

+---+-----------------+--------------------+--------------------+--------------------+--------------------+-----------+--------------------+------------+-------------+--------------------+
|pos|      artist_name|           track_uri|          artist_uri|          track_name|           album_uri|duration_ms|          album_name|playlist_pid|playlist_name|playlist_description|
+---+-----------------+--------------------+--------------------+--------------------+--------------------+-----------+--------------------+------------+-------------+--------------------+
|  0|    Missy Elliott|spotify:track:0Ua...|spotify:artist:2w...|Lose Control feat...|spotify:album:6vV...|     226863|        The Cookbook|           0|   Throwbacks|                NULL|
|  1|   Britney Spears|spotify:track:6I9...|spotify:artist:26...|               Toxic|spotify:album:0z7...|     198800|         In The Zone|           0|   Throwbacks|                NULL|
|  2|           Beyonc|spotify:track:0Wq...|spotify:art

In [6]:
jdbc_url = "jdbc:postgresql://127.0.0.1:5432/music_dwh"
properties = {"user": os.getenv('DWH_USER'), "password": os.getenv('DWH_PWD')}

artists_spark_df = spark_df.select('artist_uri', 'artist_name').dropDuplicates(['artist_uri'])
albums_spark_df = spark_df.select('album_uri', 'album_name').dropDuplicates(['album_uri'])
tracks_spark_df = spark_df.select('track_uri', 'track_name', 'artist_uri', 'album_uri', 'duration_ms').dropDuplicates(['track_uri'])
playlists_spark_df = spark_df.select('playlist_pid', 'playlist_name', 'playlist_description').dropDuplicates(['playlist_pid'])
playlist_tracks_spark_df = spark_df.select('playlist_pid', 'track_uri', 'pos').dropDuplicates()

In [7]:
write_to_db(artists_spark_df, 'artists', jdbc_url, properties)

In [8]:
write_to_db(albums_spark_df, 'albums', jdbc_url, properties)

In [9]:
write_to_db(tracks_spark_df, 'tracks', jdbc_url, properties)

In [10]:
write_to_db(playlists_spark_df, 'playlists', jdbc_url, properties)

In [11]:
write_to_db(playlist_tracks_spark_df, 'playlist_tracks', jdbc_url, properties)