# Transform from Bronze to Silver

## Perform Silver Operations on bronze_reviews to get silver_reviews

### Read bronze_reviews Parquet

In [9]:
df_bronze_reviews = spark.read.parquet("Files/Bronze/Toronto_Ontario_Canada/bronze_reviews.parquet")

# display(df_bronze_reviews.printSchema())
# display(df_bronze_reviews.limit(5))

StatementMeta(, 4ce83310-6cbe-4818-9a0b-8e8fb2069b10, 11, Finished, Available)

### Import the types and functions

In [10]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

StatementMeta(, 4ce83310-6cbe-4818-9a0b-8e8fb2069b10, 12, Finished, Available)

### Remove unwanted columns

In [11]:
df_silver_reviews = df_bronze_reviews['listing_id','id','date','reviewer_id','comments']
display(df_silver_reviews.limit(5))

StatementMeta(, 4ce83310-6cbe-4818-9a0b-8e8fb2069b10, 13, Finished, Available)

SynapseWidget(Synapse.DataFrame, abfc7149-7710-4d81-87ec-f61c7ce21e35)

### Change fields data type

In [12]:
review_coltype_map = {
    "listing_id": LongType(),
    "id": LongType(),
    "date": DateType(),
    "reviewer_id": IntegerType(),
    "comments": StringType(),
}

for col_name, col_type in review_coltype_map.items():
    df_silver_reviews = df_silver_reviews.withColumn(col_name, df_silver_reviews[col_name].cast(col_type))

df_silver_reviews.printSchema()

StatementMeta(, 4ce83310-6cbe-4818-9a0b-8e8fb2069b10, 14, Finished, Available)

root
 |-- listing_id: long (nullable = true)
 |-- id: long (nullable = true)
 |-- date: date (nullable = true)
 |-- reviewer_id: integer (nullable = true)
 |-- comments: string (nullable = true)



### Check for nulls

In [13]:
display(df_silver_reviews.filter(df_silver_reviews['listing_id'].isNull()))
display(df_silver_reviews.filter(df_silver_reviews['id'].isNull()))
display(df_silver_reviews.filter(df_silver_reviews['reviewer_id'].isNull()))

StatementMeta(, 4ce83310-6cbe-4818-9a0b-8e8fb2069b10, 15, Finished, Available)

SynapseWidget(Synapse.DataFrame, d1518f63-5f51-49e6-9ea8-9fec746cf8c1)

SynapseWidget(Synapse.DataFrame, 8269051b-675b-42f4-96da-96157d54af42)

SynapseWidget(Synapse.DataFrame, 41f712f4-694f-40ca-8a0c-113b880eeeff)

### Rename columns

In [14]:
df_silver_reviews.columns

StatementMeta(, 4ce83310-6cbe-4818-9a0b-8e8fb2069b10, 16, Finished, Available)

['listing_id', 'id', 'date', 'reviewer_id', 'comments']

In [15]:
df_silver_reviews = df_silver_reviews.withColumnsRenamed({
        'id': 'review_id',
        'date': 'review_date',
        'comments': 'review_comments'})

df_silver_reviews.columns

StatementMeta(, 4ce83310-6cbe-4818-9a0b-8e8fb2069b10, 17, Finished, Available)

['listing_id', 'review_id', 'review_date', 'reviewer_id', 'review_comments']

### Trim review_comments column

In [16]:
df_silver_reviews = df_silver_reviews.withColumn('review_comments', trim(df_silver_reviews.review_comments))

StatementMeta(, 4ce83310-6cbe-4818-9a0b-8e8fb2069b10, 18, Finished, Available)

### Replace value "< br/ >" with ""

In [17]:
display(df_silver_reviews.filter(df_silver_reviews['review_comments'].contains("<br/>") ).select('review_comments').limit(5))

StatementMeta(, 4ce83310-6cbe-4818-9a0b-8e8fb2069b10, 19, Finished, Available)

SynapseWidget(Synapse.DataFrame, 36ca7327-df17-4ae7-b8a8-9446d9022642)

In [18]:
df_silver_reviews = df_silver_reviews.withColumn('review_comments', regexp_replace('review_comments', '<br/>', ''))

StatementMeta(, 4ce83310-6cbe-4818-9a0b-8e8fb2069b10, 20, Finished, Available)

In [19]:
display(df_silver_reviews.filter(df_silver_reviews['review_comments'].contains("<br/>") ).select('review_comments').limit(5))

StatementMeta(, 4ce83310-6cbe-4818-9a0b-8e8fb2069b10, 21, Finished, Available)

SynapseWidget(Synapse.DataFrame, 6314bf91-c56b-4925-bd31-8c5073adfe00)

### Load to silver_reviews to Delta Table

In [20]:
display(df_silver_reviews.limit(5))

StatementMeta(, 4ce83310-6cbe-4818-9a0b-8e8fb2069b10, 22, Finished, Available)

SynapseWidget(Synapse.DataFrame, f0cc3730-9f86-4401-a41f-313347f7d693)

In [21]:
df_silver_reviews.write.format("delta").mode("append").save("Tables/silver_reviews")

StatementMeta(, 4ce83310-6cbe-4818-9a0b-8e8fb2069b10, 23, Finished, Available)

## Perform Silver Operations on bronze_listings to get silver_location and silver_listings

### Read bronze_listing Parquet without custom schema

In [22]:
df_bronze_listings = spark.read.parquet("Files/Bronze/Toronto_Ontario_Canada/bronze_listings.parquet")


# display(df_bronze_listings.printSchema())
display(df_bronze_listings.limit(5))

StatementMeta(, 4ce83310-6cbe-4818-9a0b-8e8fb2069b10, 24, Finished, Available)

SynapseWidget(Synapse.DataFrame, 25e6744d-ae56-402a-afe9-b468ca9a3177)

### Import types and functions

In [23]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.window import Window

StatementMeta(, 4ce83310-6cbe-4818-9a0b-8e8fb2069b10, 25, Finished, Available)

#### Extract metadata information

In [25]:
regex_str = "/Files/Bronze/([^/]+)/[^/]+.parquet$"
df_bronze_listings = df_bronze_listings.withColumn("location_key", regexp_extract(input_file_name(), regex_str, 1))
display(df_bronze_listings.limit(5))

StatementMeta(, 4ce83310-6cbe-4818-9a0b-8e8fb2069b10, 27, Finished, Available)

SynapseWidget(Synapse.DataFrame, 56022009-1a6f-489b-9a27-f3cec7ea30ff)

### silver_locations

##### Remove Unwanted Columns

In [26]:
df_silver_location = df_bronze_listings.select(df_bronze_listings['location_key'])

StatementMeta(, 4ce83310-6cbe-4818-9a0b-8e8fb2069b10, 28, Finished, Available)

##### Removed duplicates

In [27]:
df_silver_location = df_silver_location.drop_duplicates()

StatementMeta(, 4ce83310-6cbe-4818-9a0b-8e8fb2069b10, 29, Finished, Available)

##### Extract city, province_state, country and a location_id

In [28]:
df_silver_location = df_silver_location.select([
    monotonically_increasing_id().alias('location_id'),
    df_silver_location['location_key'],
    split(df_silver_location['location_key'], '_')[0].alias('city'),
    split(df_silver_location['location_key'], '_')[1].alias('province_state'),
    split(df_silver_location['location_key'], '_')[2].alias('country')
    ])

window_spec = Window.orderBy(df_silver_location['location_id'])

df_silver_location = df_silver_location.withColumn("location_id", row_number().over(window_spec))

StatementMeta(, 4ce83310-6cbe-4818-9a0b-8e8fb2069b10, 30, Finished, Available)

##### Load to silver_location to Delta Table

In [29]:
display(df_silver_location.limit(5))

StatementMeta(, 4ce83310-6cbe-4818-9a0b-8e8fb2069b10, 31, Finished, Available)

SynapseWidget(Synapse.DataFrame, 3a2d4317-a4eb-44b9-bfee-a00a603bd89e)

In [31]:
df_silver_location.write.format("delta").mode("append").save("Tables/silver_locations")

StatementMeta(, 4ce83310-6cbe-4818-9a0b-8e8fb2069b10, 33, Finished, Available)

### silver_listings 

In [32]:
display(df_bronze_listings.limit(5))

StatementMeta(, 4ce83310-6cbe-4818-9a0b-8e8fb2069b10, 34, Finished, Available)

SynapseWidget(Synapse.DataFrame, 0b9f5dc1-67fd-4ad2-95db-f29cd770d777)

##### Remove Unwanted Columns

In [33]:
df_silver_listing = df_bronze_listings.select(
    df_bronze_listings['id'].alias('listing_id'),
    df_bronze_listings['name'].alias('listing_name'),
    df_bronze_listings['host_id'],
    df_bronze_listings['host_since'],
    df_bronze_listings['host_location'],
    df_bronze_listings['host_response_time'],
    df_bronze_listings['host_response_rate'],
    df_bronze_listings['host_acceptance_rate'],
    df_bronze_listings['host_is_superhost'],
    df_bronze_listings['host_total_listings_count'],
    df_bronze_listings['host_has_profile_pic'],
    df_bronze_listings['host_identity_verified'],
    df_bronze_listings['neighbourhood'],
    df_bronze_listings['latitude'],
    df_bronze_listings['longitude'],
    df_bronze_listings['property_type'],
    df_bronze_listings['room_type'],
    df_bronze_listings['accommodates'],
    df_bronze_listings['bedrooms'],
    df_bronze_listings['amenities'],
    df_bronze_listings['price'],
    df_bronze_listings['minimum_nights'],
    df_bronze_listings['maximum_nights'],
    df_bronze_listings['first_review'],
    df_bronze_listings['last_review'],
    df_bronze_listings['review_scores_rating'],
    df_bronze_listings['review_scores_accuracy'],
    df_bronze_listings['review_scores_cleanliness'],
    df_bronze_listings['review_scores_checkin'],
    df_bronze_listings['review_scores_communication'],
    df_bronze_listings['review_scores_location'],
    df_bronze_listings['review_scores_value'],
    df_bronze_listings['instant_bookable'],
    df_bronze_listings['location_key']
    )

StatementMeta(, 4ce83310-6cbe-4818-9a0b-8e8fb2069b10, 35, Finished, Available)

#### Change Data Type

In [34]:
listing_coltype_map = {
'listing_id': LongType(),
'host_id': LongType(),
'listing_name': StringType(),
'host_since': DateType(),
'host_location': StringType(),
'host_response_time': StringType(),
'host_response_rate': StringType(),
'host_acceptance_rate': StringType(),
'host_is_superhost': StringType(),
'host_total_listings_count': IntegerType(),
'host_has_profile_pic': StringType(),
'host_identity_verified': StringType(),
'neighbourhood': StringType(),
'latitude': DecimalType(8,6),
'longitude': DecimalType(9,6),
'property_type': StringType(),
'room_type': StringType(),
'accommodates': IntegerType(),
'bedrooms': IntegerType(),
'amenities': StringType(),
'price': DoubleType(),
'minimum_nights': IntegerType(),
'maximum_nights': IntegerType(),
'first_review': DateType(),
'last_review': DateType(),
'review_scores_rating': DoubleType(),
'review_scores_accuracy': DoubleType(),
'review_scores_cleanliness': DoubleType(),
'review_scores_checkin': DoubleType(),
'review_scores_communication': DoubleType(),
'review_scores_location': DoubleType(),
'review_scores_value': DoubleType(),
'instant_bookable': StringType(),
'location_key': StringType(),
}

for col_name, col_type in listing_coltype_map.items():
    df_silver_listing = df_silver_listing.withColumn(col_name, df_silver_listing[col_name].cast(col_type))

StatementMeta(, 4ce83310-6cbe-4818-9a0b-8e8fb2069b10, 36, Finished, Available)

##### Check schema for data type change

In [35]:
df_silver_listing.printSchema()

StatementMeta(, 4ce83310-6cbe-4818-9a0b-8e8fb2069b10, 37, Finished, Available)

root
 |-- listing_id: long (nullable = true)
 |-- listing_name: string (nullable = true)
 |-- host_id: long (nullable = true)
 |-- host_since: date (nullable = true)
 |-- host_location: string (nullable = true)
 |-- host_response_time: string (nullable = true)
 |-- host_response_rate: string (nullable = true)
 |-- host_acceptance_rate: string (nullable = true)
 |-- host_is_superhost: string (nullable = true)
 |-- host_total_listings_count: integer (nullable = true)
 |-- host_has_profile_pic: string (nullable = true)
 |-- host_identity_verified: string (nullable = true)
 |-- neighbourhood: string (nullable = true)
 |-- latitude: decimal(8,6) (nullable = true)
 |-- longitude: decimal(9,6) (nullable = true)
 |-- property_type: string (nullable = true)
 |-- room_type: string (nullable = true)
 |-- accommodates: integer (nullable = true)
 |-- bedrooms: integer (nullable = true)
 |-- amenities: string (nullable = true)
 |-- price: double (nullable = true)
 |-- minimum_nights: integer (nullab

### Join with silver_location to get location id for FK to location

In [36]:
df_joined = df_silver_listing.join(df_silver_location, df_silver_listing.location_key == df_silver_location.location_key)

df_silver_listing = df_joined.select(
    df_joined['listing_id'],
    df_joined['listing_name'],
    df_joined['host_id'],
    df_joined['host_since'],
    df_joined['host_location'],
    df_joined['host_response_time'],
    df_joined['host_response_rate'],
    df_joined['host_acceptance_rate'],
    df_joined['host_is_superhost'],
    df_joined['host_total_listings_count'],
    df_joined['host_has_profile_pic'],
    df_joined['host_identity_verified'],
    df_joined['neighbourhood'],
    df_joined['latitude'],
    df_joined['longitude'],
    df_joined['property_type'],
    df_joined['room_type'],
    df_joined['accommodates'],
    df_joined['bedrooms'],
    df_joined['amenities'],
    df_joined['price'],
    df_joined['minimum_nights'],
    df_joined['maximum_nights'],
    df_joined['first_review'],
    df_joined['last_review'],
    df_joined['review_scores_rating'],
    df_joined['review_scores_accuracy'],
    df_joined['review_scores_cleanliness'],
    df_joined['review_scores_checkin'],
    df_joined['review_scores_communication'],
    df_joined['review_scores_location'],
    df_joined['review_scores_value'],
    df_joined['instant_bookable'],
    df_joined['location_id']
    )

StatementMeta(, 4ce83310-6cbe-4818-9a0b-8e8fb2069b10, 38, Finished, Available)

##### Load to silver_listing to Delta Table

In [37]:
display(df_silver_listing.limit(5))

StatementMeta(, 4ce83310-6cbe-4818-9a0b-8e8fb2069b10, 39, Finished, Available)

SynapseWidget(Synapse.DataFrame, a858bed9-3bbc-49fc-807c-469435720ff2)

In [38]:
df_silver_listing.write.format("delta").mode("append").save("Tables/silver_listings")

StatementMeta(, 4ce83310-6cbe-4818-9a0b-8e8fb2069b10, 40, Finished, Available)