In [None]:
%idle_timeout 2880
%glue_version 4.0
%worker_type G.1X
%number_of_workers 5

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
  
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

In [None]:
dyf = glueContext.create_dynamic_frame.from_catalog(database='sothebys-db', table_name='scraped_data_raw')
dyf.printSchema()

In [None]:
df = dyf.toDF()
df.show()

In [None]:
# Extract the top-level DataFrame
collection_df = df.select("page_id", "collection_id", "category", "title", "details", "price", "link", "partition_0")
print("Top-level DataFrame:")
collection_df.show()

In [None]:
from pyspark.sql.functions import split, udf, col
from pyspark.sql.types import StructType, StructField, StringType, DateType, TimestampType, StructField

collection_df = collection_df.withColumn("info", split(col("details"), '\|'))


# Define a UDF to get the last element from the split array
def get_last_element(arr):
    return arr[-1] if arr else None

get_last_element_udf = udf(get_last_element, StringType())

# Apply the UDF to get the 'city' column
collection_df = collection_df.withColumn("city", get_last_element_udf(col("info")))

# Drop the intermediate 'info' column if it's no longer needed
collection_df = collection_df.drop("info")

# Show the DataFrame
collection_df.show()

In [None]:
from datetime import date, datetime

months = {
    "January": 1,
    "February": 2,
    "March": 3,
    "April": 4,
    "May": 5,
    "June": 6,
    "July": 7,
    "August": 8,
    "September": 9,
    "October": 10,
    "November": 11,
    "December": 12
}

def get_date(_date):
    if len(_date) == 3:
        start_day_end_day = _date[0].split('–')
        start_day = ''
        end_day = ''
        
        if len(start_day_end_day) == 1:
            start_day = int(start_day_end_day[0])
            end_day = int(start_day_end_day[0])
        elif  len(start_day_end_day) == 2:
            start_day = int(start_day_end_day[0])
            end_day = int(start_day_end_day[1])
            
        start_month = months[_date[1]]
        end_month = months[_date[1]]
        year = int(_date[2])
        
        start_date = date(day=start_day, month=start_month, year=year)
        end_date = date(day=end_day, month=end_month, year=year)
        return [start_date, end_date]
    
    elif len(_date) == 4:
        start_day = int(_date[0])
        start_month = months[_date[1].split('–')[0]]
        end_day = int(_date[1].split('–')[1])
        end_month = months[_date[2]]
        year = int(_date[3])
        
        start_date = date(day=start_day, month=start_month, year=year)
        end_date = date(day=end_day, month=end_month, year=year)
        return [start_date, end_date]

def get_time(_time):
    time_format = "%I:%M %p"
    
    if len(_time) == 3:
        time_of_day = _time[0]
        am_pm = _time[1]
        time = datetime.strptime(time_of_day+ ' ' + am_pm, time_format).time()
        time_zone = _time[2]
        return [time, time_zone]
    return [datetime.time, '']

def get_date_time(_date_time):
    if len(_date_time) == 1:
        start_date, end_date = get_date(_date_time[0].strip().split())
        return start_date, end_date, ''
    
    elif len(_date_time) == 2:
        start_date, end_date = get_date(_date_time[0].strip().split())
        _time, time_zone = get_time(_date_time[1].strip().split())
        
        
        return datetime.combine(start_date, _time), datetime.combine(end_date, _time), time_zone
    
    return '', ''

In [None]:
from datetime import date, datetime

# Register the UDFs
get_date_time_udf = udf(
    lambda dt: get_date_time(dt.split('|')[:-1]), 
    StructType(
        [
            StructField("start_date", DateType(), True),
            StructField("end_date", DateType(), True),
            StructField("time_zone", StringType(), True)
        ]
    )
)

collection_df = collection_df.withColumn("date_time_struct", get_date_time_udf(col("details")))

# Split the struct into separate columns
collection_df = collection_df.withColumn(
    "start_date", 
    col("date_time_struct.start_date")
).withColumn(
    "end_date", 
    col("date_time_struct.end_date")
).withColumn(
    "time_zone",
    col("date_time_struct.time_zone")
).drop("date_time_struct", "details")

collection_df.show()

In [None]:
from pyspark.sql.types import IntegerType

def get_price(_price_str):
    if _price_str != 'n/a':
        _price = _price_str.split(':')[1]
        _price, _currency = _price.split()
        return int(_price.replace(',', '')), _currency
    return -1, ''

get_price_udf = udf(lambda dt: get_price(dt), StructType([
    StructField("price", IntegerType(), True),
    StructField("currency", StringType(), True)
]))


collection_df = collection_df.withColumn("price_currency_struct", get_price_udf(col("price")))

collection_df = collection_df.withColumn(
    "price", 
    col("price_currency_struct.price")
).withColumn(
    "currency", col("price_currency_struct.currency")
).drop("price_currency_struct")

collection_df = collection_df.withColumn("price", col("price").cast("integer"))
collection_df.show()

In [None]:
# Extract the address nested DataFrame
from pyspark.sql.functions import explode

items_df = df.withColumn("items", explode("items")).select('items.*')
print("Address DataFrame:")
items_df.show()

In [None]:
items_df = items_df.withColumn('partition_0', items_df.page_id)
items_df.show()

In [None]:
from pyspark.sql.functions import split, col, lit, when

# Fill missing values for 'author' and 'price_sold'
items_df = items_df.withColumn(
    'author', 
    when(col('author').isNull(),lit('-1.-1'))
    .otherwise(col('author'))
)
items_df = items_df.withColumn(
    'price_sold', 
    when(col('price_sold').isNull(),lit('-1 -1'))
    .otherwise(col('price_sold'))
)

# Split 'author' into 'item_id' and 'author'
items_df = items_df.withColumn(
    'item_id', 
    split(col('author'), '\\.')
    .getItem(0)
)

items_df = items_df.withColumn(
    'author', 
    split(col('author'), '\\.')
    .getItem(1)
)

# Split 'price_sold' into 'price_sold' and 'currency'
items_df = items_df.withColumn(
    'currency', 
    split(col('price_sold'), ' ')
    .getItem(1)
)

items_df = items_df.withColumn(
    'price_sold', 
    split(col('price_sold'), ' ')
    .getItem(0)
)

# Split 'estimated_price' into 'low_estimate' and 'high_estimate'
items_df = items_df.withColumn(
    'low_estimate', 
    split(col('estimated_price'), '-')
    .getItem(0)
)

items_df = items_df.withColumn(
    'high_estimate', 
    split(
        split(col('estimated_price'), '-').getItem(1), ' '
    ).getItem(1))

# Drop the 'estimated_price' column
items_df = items_df.drop('estimated_price')

# Show the transformed DataFrame
items_df.show()

In [None]:
items_df = items_df.withColumn("price_sold", col("price_sold").cast("integer"))
items_df = items_df.withColumn("item_id", col("item_id").cast("integer"))
items_df = items_df.withColumn("low_estimate", col("low_estimate").cast("integer"))
items_df = items_df.withColumn("high_estimate", col("high_estimate").cast("integer"))
items_df.printSchema()

In [None]:
from awsglue.dynamicframe import DynamicFrame

collection_dyf = DynamicFrame.fromDF(collection_df, glueContext, "collections")

s3output = glueContext.getSink(
  path="s3://scraped-data-clean/collections",
  connection_type="s3",
  updateBehavior="UPDATE_IN_DATABASE",
  partitionKeys=['partition_0'],
  compression="snappy",
  enableUpdateCatalog=True,
  transformation_ctx="s3output",
)

s3output.setCatalogInfo(
  catalogDatabase="sothebys-db", catalogTableName="collections"
)

s3output.setFormat("glueparquet")

s3output.writeFrame(collection_dyf)

In [None]:
items_dyf = DynamicFrame.fromDF(items_df, glueContext, "items")

s3output = glueContext.getSink(
  path="s3://scraped-data-clean/items",
  connection_type="s3",
  updateBehavior="UPDATE_IN_DATABASE",
  partitionKeys=['partition_0'],
  compression="snappy",
  enableUpdateCatalog=True,
  transformation_ctx="s3output",
)

s3output.setCatalogInfo(
  catalogDatabase="sothebys-db", catalogTableName="items"
)

s3output.setFormat("glueparquet")

s3output.writeFrame(items_dyf)