## List all files in bucket

In [25]:
import boto3
import yaml

def get_files_in_bucket(bucket_name="<bucket_name>"):
    file_keys_in_bucket = []

    # Get Access Token
    with open("config.yml") as f:
        config = yaml.safe_load(f)
        client_id = config["aws_access_key_id"]
        client_secret = config["aws_secret_access_key"]

    # Create a boto3 session with your credentials
    session = boto3.Session(
        aws_access_key_id = client_id,
        aws_secret_access_key = client_secret,
        region_name='us-east-1'
    )

    s3_client = session.client('s3')
    # Iterate through objects in the bucket
    paginator = s3_client.get_paginator('list_objects_v2')
    for page in paginator.paginate(Bucket=bucket_name):
      for obj in page.get('Contents', []):
        file_keys_in_bucket.append(obj['Key'])

    return file_keys_in_bucket

### Pick today's file

In [26]:
from datetime import date
today = date.today().strftime("%Y-%m-%d")

new_release_file =''

file_keys = get_files_in_bucket()

for file in file_keys:

    if file[-14:-4] == today:
         new_release_file = file

    else: pass

### Create Dataframes

In [29]:
from pyspark.sql import SparkSession
import yaml

def read_csv_file_into_df(s3_bucket = '<bucket_name>'):

    # Get Access Token
    with open("config.yml") as f:
        config = yaml.safe_load(f)
        aws_access_key_id = config["aws_access_key_id"]
        aws_secret_access_key = config["aws_secret_access_key"]

    # Create a Spark session
    spark = SparkSession.builder \
        .appName('ReadCSVFromS3') \
        .config('spark.hadoop.fs.s3a.access.key', aws_access_key_id) \
        .config('spark.hadoop.fs.s3a.secret.key', aws_secret_access_key) \
        .config('spark.hadoop.fs.s3a.impl', 'org.apache.hadoop.fs.s3a.S3AFileSystem') \
        .config('spark.hadoop.fs.s3a.endpoint', 's3.amazonaws.com') \
        .getOrCreate()
    

    new_release_s3_file_path = new_release_file
    new_release_s3_url = f's3a://{s3_bucket}/{new_release_s3_file_path}'

    # Read CSV file into DataFrame
    new_release_album_df = spark.read.csv(new_release_s3_url, header=True, inferSchema=True)


    return new_release_album_df



In [30]:
new_release_album_df = read_csv_file_into_df()

In [31]:
# Strip white spaces for all string columns
string_columns = [field.name for field in new_release_album_df.schema.fields if field.dataType == 'StringType']
for column in string_columns:
    df = df.withColumn(column, trim(col(column)))

In [8]:
df = new_release_album_df[['album_id', 'album_name', 'album_type', 'album_url', 'album_release_date', 'artist_id', 'artist_name','artist_url', 'track_id', 'track_url', 'track_number','track_duration', 'track_name',  'track_explicit', 'pull_date', 'artist_type']]

In [9]:
df = df.withColumnRenamed('track_explicit','track_explicit_flag')

In [10]:
df.show(10)

+--------------------+--------------------+-----------+--------------------+------------------+--------------------+----------------+--------------------+--------------------+--------------------+------------+--------------+--------------------+-------------------+----------+-----------+
|            album_id|          album_name| album_type|           album_url|album_release_date|           artist_id|     artist_name|          artist_url|            track_id|           track_url|track_number|track_duration|          track_name|track_explicit_flag| pull_date|artist_type|
+--------------------+--------------------+-----------+--------------------+------------------+--------------------+----------------+--------------------+--------------------+--------------------+------------+--------------+--------------------+-------------------+----------+-----------+
|4ZfTvGloipTYCjdwd...|Live on Mountain ...|compilation|https://open.spot...|        2024-04-19|2QoU3awHVdcHS8LrZ...|           Wilco|

In [24]:
from pyspark.sql import SparkSession

# Create a Spark session
spark = SparkSession.builder \
    .appName('WriteToPostgres') \
    .config('spark.jars', r'C:\Spark\spark-3.4.3-bin-hadoop3\postgresql-42.7.3.jar') \
    .getOrCreate()


# PostgreSQL connection properties
rds_endpoint = "<rds_instance>.us-east-1.rds.amazonaws.com"
rds_port = config["rds_port"]
rds_db_name = config["rds_db_name"]
rds_user = config["rds_user_name"]
rds_password = config["rds_password"]
table_name = config["rds_table_name"]

# JDBC URL for PostgreSQL
jdbc_url = f"jdbc:postgresql://{rds_endpoint}:{rds_port}/{rds_db_name}"

# Connection properties
connection_properties = {
    "user": rds_user,
    "password": rds_password,
    "driver": "org.postgresql.Driver"
}

# Write DataFrame to PostgreSQL table
df.write.jdbc(url=jdbc_url, table=table_name, mode='append', properties=connection_properties)

print("Data written to PostgreSQL successfully!")


Data written to PostgreSQL successfully!
