#Implementing Data Transformation Pipelines

We’ll implement data pipelines using PySpark to:
- Filter and Transform the data.
- Enrich it for BI requirements (e.g., calculate aggregates, derive new features).
- Optimize the pipeline for scalability and performance.

In [0]:
# Import necessary libraries
from pyspark.sql.functions import avg, count, desc, col

# Paths to BI schema data in S3
bi_path = "dbfs:/mnt/de-yelpdataset-raw/yelpdataset/star-schema-bidata/"
dim_business = spark.read.parquet(bi_path + "dim_business")
dim_user = spark.read.parquet(bi_path + "dim_user")
dim_time = spark.read.parquet(bi_path + "dim_time")
fact_reviews = spark.read.parquet(bi_path + "fact_reviews")

#Business Insights Pipeline
Identifying top businesses by review count and average rating

In [0]:
#Joins fact_reviews with dim_business.
#Aggregates review counts and average ratings.
#Filters businesses with over 50 reviews and ratings ≥ 4.0.
#Outputs the top businesses by rating and reviews.

business_insights = fact_reviews.join(dim_business, "business_id") \
    .groupBy("business_id", "name", "categories", "city", "state") \
    .agg(
        count("review_id").alias("total_reviews"),
        avg("rating").alias("average_rating")
    ) \
    .filter(col("total_reviews") > 50) \
    .filter(col("average_rating") >= 4.0) \
    .orderBy(desc("average_rating"), desc("total_reviews"))

#Displaying the top businesses by rating and reviews.

In [0]:
#Displaying the top businesses by rating and reviews.
business_insights.display()

business_id,name,categories,city,state,total_reviews,average_rating
1RqfozJoosHAsKZhc5PY7w,Walls Jewelry Repairing,"Watch Repair, Local Services, Shopping, Jewelry",Nashville,TN,114,5.0
-siOxQQcGKtb-04dX0Cxnw,ella & louie flowers,"Flowers & Gifts, Florists, Shopping, Event Planning & Services, Floral Designers",Santa Barbara,CA,104,5.0
4-P4Bzqd01YvKX9tp7IGfQ,Drink & Learn,"Beer, Wine & Spirits, Hotels & Travel, Food, Tours",New Orleans,LA,90,5.0
dhLARBhUnJloLa8xZ1Stpw,Steves iPhone Repair,"Professional Services, Local Services, Electronics Repair, Mobile Phone Repair, IT Services & Computer Repair",Cherry Hill,NJ,78,5.0
LTqm4uY4GIYHfzuh5pVZNQ,Jeramie Lu Photography,"Event Photography, Event Planning & Services, Photographers, Session Photography",Reno,NV,77,5.0
x-SCuOwghy4GlZdVOKjt4g,Twisted Twig Fine Florals,"Florists, Event Planning & Services, Shopping, Flowers & Gifts, Party & Event Planning",Santa Barbara,CA,74,5.0
2FQoAp9w0G_NhuZMqo9bfA,ByCherry Photography,"Session Photography, Wedding Planning, Event Planning & Services, Event Photography, Photographers, Photo Booth Rentals",Santa Barbara,CA,73,5.0
DnkpXhc5DKdeBT-0jG13JQ,Carpinteria Lock & Key,"Security Systems, Keys & Locksmiths, Home Services",Carpinteria,CA,70,5.0
NLx84Im37HcZkcFBO2cxlw,"Jack & Melody Cote - Chase International, Reno Real Estate","Real Estate Agents, Home Services, Real Estate, Real Estate Services",Reno,NV,69,5.0
v8I7yA-t1onNs2Ce81J2MQ,A Beloved Friends Pet Crematory Of Northern Nevada,"Local Services, Pet Services, Funeral Services & Cemeteries, Pets, Pet Cremation Services",Reno,NV,68,5.0


# User Insights Pipeline
Identifying top users by total reviews

In [0]:
#Joins fact_reviews with dim_user.
#Aggregates total reviews and average ratings given by users.
#Outputs the most active users.

user_insights = fact_reviews.join(dim_user, "user_id") \
    .groupBy("user_id", "name") \
    .agg(
        count("review_id").alias("total_reviews"),
        avg("rating").alias("average_rating_given")
    ) \
    .orderBy(desc("total_reviews"))

#Displaying the top users by total reviews and average rating given.

In [0]:
user_insights.display()

user_id,name,total_reviews,average_rating_given
_BcWyKQL16ndpBdggh2kNA,Karen,3048,3.637795275590552
Xw7ZjaGfr0WNVt6s_5KZfA,Marielle,1840,4.072826086956522
0Igx-a1wAstiBDerGxXk2A,Jen,1747,3.990269032627361
-G7Zkl1wIWBBmD0KRy_sCw,Gerald,1682,3.652794292508918
ET8n-r7glWYqZhuR6GcdNw,Michelle,1653,4.046581972171809
bYENop4BuQepBjM1-BI3fA,Steven,1578,3.85361216730038
1HM81n6n4iPIFU5d2Lokhw,Shannon,1554,3.045045045045045
fr1Hz2acAb3OaL3l6DyKNg,Boon,1447,3.946786454733932
wXdbkFZsfDR7utJvbWElyA,Ken,1396,4.210601719197708
Um5bfs5DH6eizgjH3xZsvg,Dianna,1391,3.804457225017973


# Time-Based Aggregates
 Aggregate reviews by month and year

In [0]:
#Joins fact_reviews with dim_time.
#Groups reviews by year and month.
#Calculates total reviews and average ratings per period.

time_aggregates = fact_reviews.join(dim_time, fact_reviews["date"] == dim_time["full_date"]) \
    .groupBy("year", "month") \
    .agg(
        count("review_id").alias("total_reviews"),
        avg("rating").alias("average_rating")
    ) \
    .orderBy("year", "month")

#Displaying aggregate reviews and average ratings by year and month.

In [0]:
time_aggregates.display()

year,month,total_reviews,average_rating
2005,2,3,4.333333333333333
2005,3,74,3.6486486486486487
2005,4,26,4.153846153846154
2005,5,108,4.083333333333333
2005,6,38,4.026315789473684
2005,7,259,3.671814671814672
2005,8,66,4.166666666666667
2005,9,69,3.8840579710144927
2005,10,39,4.205128205128205
2005,11,70,3.842857142857143


#Writing transformed data back to AWS S3 storage in the transformed-data folder

In [0]:
output_path = "dbfs:/mnt/de-yelpdataset-raw/yelpdataset/transformed-data/"

business_insights.write.mode("overwrite").parquet(output_path + "business_insights")
user_insights.write.mode("overwrite").parquet(output_path + "user_insights")
time_aggregates.write.mode("overwrite").parquet(output_path + "time_aggregates")

# Now to Verify Transformed Data

print("\nSample Data from Business Insights:")
spark.read.parquet(output_path + "business_insights").show(5)

print("\nSample Data from User Insights:")
spark.read.parquet(output_path + "user_insights").show(5)

print("\nSample Data from Time-Based Aggregates:")
spark.read.parquet(output_path + "time_aggregates").show(5)


Sample Data from Business Insights:
+--------------------+--------------------+--------------------+-------------+-----+-------------+--------------+
|         business_id|                name|          categories|         city|state|total_reviews|average_rating|
+--------------------+--------------------+--------------------+-------------+-----+-------------+--------------+
|1RqfozJoosHAsKZhc...|Walls Jewelry Rep...|Watch Repair, Loc...|    Nashville|   TN|          114|           5.0|
|-siOxQQcGKtb-04dX...|ella & louie flowers|Flowers & Gifts, ...|Santa Barbara|   CA|          104|           5.0|
|4-P4Bzqd01YvKX9tp...|       Drink & Learn|Beer, Wine & Spir...|  New Orleans|   LA|           90|           5.0|
|dhLARBhUnJloLa8xZ...|Steves iPhone Repair|Professional Serv...|  Cherry Hill|   NJ|           78|           5.0|
|LTqm4uY4GIYHfzuh5...|Jeramie Lu Photog...|Event Photography...|         Reno|   NV|           77|           5.0|
+--------------------+--------------------+--------