### (optional) drop tables
Useful when you're doing tests, to see how your pipeline behaves in different scenarios.

In [149]:
# spark.sql('DROP TABLE lakehouse_sandbox.dim_books')
# spark.sql('DROP TABLE lakehouse_sandbox.fact_reviews')
# spark.sql('DROP TABLE lakehouse_sandbox.dim_users')

StatementMeta(, b104af5d-862b-47c3-96f9-592750aa28f1, 151, Finished, Available, Finished)

### Read book_ratings

In [2]:
from pyspark.sql import functions as F

df = spark.read.format("csv").option("header","true").load("Files/book_ratings/source/")
                                                                            # books_rating1000
                                                                            # books_rating_newuser_newbook
                                                                            # books_rating_newuser_oldbook
                                                                            # books_rating_olduser_newbook

display(df)

StatementMeta(, d4e88c28-dd2b-4f78-985a-4303b737eb8f, 4, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 34221dde-7b9c-4d57-b4ef-61380e3c47d4)

### Change column's CamelCase to snake_case

In [151]:
import re

pattern = re.compile(r"(?<=[a-z])(?=[A-Z])|(?<=[A-Z])(?=[A-Z][a-z])|/")

for c in df.columns:
    df = df.withColumnRenamed(c, pattern.sub('_', c).lower())

StatementMeta(, b104af5d-862b-47c3-96f9-592750aa28f1, 153, Finished, Available, Finished)

### Rename some columns

In [152]:
df = df.withColumnRenamed('id', 'book_id')
df = df.withColumnRenamed('profile_name', 'username')

StatementMeta(, b104af5d-862b-47c3-96f9-592750aa28f1, 154, Finished, Available, Finished)

### Add `review_id` column

In [153]:
df = df.select(F.expr("uuid()").alias('review_id'), "*")

StatementMeta(, b104af5d-862b-47c3-96f9-592750aa28f1, 155, Finished, Available, Finished)

### Add new "price_category" column
And assign `[cheap, average, etc.]` categories based on `price` col

In [156]:
#Using When otherwise
df = df.withColumn("price_category", F.when(df.price == 0, "")
 .when(df.price < 10, "cheap")
 .when(df.price < 25, "average")
 .when(df.price < 100, "expensie")
 .when(df.price >= 100, "crazy expensive")
 .when(df.price.isNull() ,"")
 .otherwise(df.price))

display(df)

StatementMeta(, b104af5d-862b-47c3-96f9-592750aa28f1, 158, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 5a4b15f2-318d-4ea5-96c1-d78a49ca43f2)

### Apply schema
reason I didn't do it while loading the df = I want to apply schema using "clean" column names (user_id instead of UserId, etc.)

In [154]:
from pyspark.sql import types

schema = types.StructType([
    types.StructField("review_id", types.StringType(), False),
    types.StructField("book_id", types.StringType(), False),
    types.StructField("title", types.StringType(), True),
    types.StructField("price", types.FloatType(), True),
    types.StructField("user_id", types.StringType(), False),
    types.StructField("username", types.StringType(), True),
    types.StructField("review_helpfulness", types.StringType(), True),
    types.StructField("review_score", types.FloatType(), True),
    types.StructField("review_time", types.IntegerType(), True),
    types.StructField("review_summary", types.StringType(), True),
    types.StructField("review_text", types.StringType(), True),
    types.StructField("price_category", types.StringType(), True)
])

# Convert the columns to the correct types based on the schema
for field in schema:
    df = df.withColumn(field.name, df[field.name].cast(field.dataType))

display(df)

StatementMeta(, b104af5d-862b-47c3-96f9-592750aa28f1, 156, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 3e431a2e-4a70-4953-b8cf-03777de128cc)

### Drop duplicates
Keep one review per book per person

In [157]:
df = df.dropDuplicates(['book_id', 'user_id']) #TODO

df.count()

StatementMeta(, b104af5d-862b-47c3-96f9-592750aa28f1, 159, Finished, Available, Finished)

1

### Create or append to table
if doesn't exist, create
if exists, append

In [158]:
tables_columns = {
        'dim_users': ['user_id', 'username'],
        'dim_books': ['book_id', 'title', 'price'],
        'fact_reviews': ['review_id', 'book_id', 'user_id', 'review_helpfulness', 'review_score', 'review_time', 'review_summary', 'review_text']
        }

for table, columns in tables_columns.items(): #TODO
    try:
        query = f'SELECT * FROM lakehouse_sandbox.{table}'
        existing_df = spark.sql(query)
        print(f'Table {table} exists (with {existing_df.count()} rows). Appending data instead...')
        
        first_col = columns[0]

        new_records_df = df.select(columns).dropDuplicates([first_col])

        filtered_new_records_df = new_records_df.join(existing_df, on=first_col, how="left_anti")

        filtered_new_records_df.write.mode("append").format("delta").saveAsTable(table)

        print(f'New records appended to table {table}.\n')
    except Exception as e:
        print(f'Creating table {table}...')
        print(f"error: {e}")

        first_col = columns[0]

        new_records_df = df.select(columns).dropDuplicates([first_col])

        new_records_df.write.mode("overwrite").format("delta").saveAsTable(f'lakehouse_sandbox.{table}')

        print(f'Table {table} created. It has {new_records_df.count()} rows.\n')

StatementMeta(, b104af5d-862b-47c3-96f9-592750aa28f1, 160, Finished, Available, Finished)

Table dim_users exists (with 767 rows). Appending data instead...
New records appended to table dim_users.

Table dim_books exists (with 115 rows). Appending data instead...
New records appended to table dim_books.

Table fact_reviews exists (with 811 rows). Appending data instead...
New records appended to table fact_reviews.



In [159]:
print('AFTER APPEND:\n')
for table, columns in tables_columns.items():
    query = f'SELECT * FROM lakehouse_sandbox.{table}'
    print(f'Table {table} exists (with {spark.sql(query).count()} rows).')

StatementMeta(, b104af5d-862b-47c3-96f9-592750aa28f1, 161, Finished, Available, Finished)

AFTER APPEND:

Table dim_users exists (with 767 rows).
Table dim_books exists (with 115 rows).
Table fact_reviews exists (with 812 rows).


In [160]:
# query = spark.sql("select * from lakehouse_sandbox.dim_users where user_id = 'A9BXR6TI1KLBD'")
# display(query)

StatementMeta(, b104af5d-862b-47c3-96f9-592750aa28f1, 162, Finished, Available, Finished)