**Load the row data (Bronze layer)**

In [0]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("S3 CSV Loader") \
    .getOrCreate()

aws_access_key_id = ''
aws_secret_access_key = '' 


spark.conf.set("fs.s3a.access.key", aws_access_key_id)
spark.conf.set("fs.s3a.secret.key", aws_secret_access_key)
spark.conf.set("fs.s3a.endpoint", "s3.amazonaws.com")  

df = spark.read.format("csv") \
    .option("inferSchema", "true") \
    .option("header",  "true") \
    .option("sep", ",") \
    .load("s3a://layer1-bronze-bucket/")


df.show()


output_path = "/mnt/datalake"
df.write.mode("overwrite").parquet(output_path)

print(f"DataFrame saved to {output_path} for further preprocessing.")

+--------------------+--------------+------+-------+----+-----------+----+-----+--------------+--------------------+------------------+------------------+------------+
|                 App|      Category|Rating|Reviews|Size|   Installs|Type|Price|Content Rating|              Genres|      Last Updated|       Current Ver| Android Ver|
+--------------------+--------------+------+-------+----+-----------+----+-----+--------------+--------------------+------------------+------------------+------------+
|Photo Editor & Ca...|ART_AND_DESIGN|   4.1|    159| 19M|    10,000+|Free|    0|      Everyone|        Art & Design|   January 7, 2018|             1.0.0|4.0.3 and up|
| Coloring book moana|ART_AND_DESIGN|   3.9|    967| 14M|   500,000+|Free|    0|      Everyone|Art & Design;Pret...|  January 15, 2018|             2.0.0|4.0.3 and up|
|U Launcher Lite –...|ART_AND_DESIGN|   4.7|  87510|8.7M| 5,000,000+|Free|    0|      Everyone|        Art & Design|    August 1, 2018|             1.2.4|4.0.3 

**Normalizing the Data , clean it and  Store Proceesed code to Silver Bucket**

In [0]:
df.count()

Out[2]: 10841

In [0]:
df.show(1)

+--------------------+--------------+------+-------+----+--------+----+-----+--------------+------------+---------------+-----------+------------+
|                 App|      Category|Rating|Reviews|Size|Installs|Type|Price|Content Rating|      Genres|   Last Updated|Current Ver| Android Ver|
+--------------------+--------------+------+-------+----+--------+----+-----+--------------+------------+---------------+-----------+------------+
|Photo Editor & Ca...|ART_AND_DESIGN|   4.1|    159| 19M| 10,000+|Free|    0|      Everyone|Art & Design|January 7, 2018|      1.0.0|4.0.3 and up|
+--------------------+--------------+------+-------+----+--------+----+-----+--------------+------------+---------------+-----------+------------+
only showing top 1 row



In [0]:
df.printSchema()

root
 |-- App: string (nullable = true)
 |-- Category: string (nullable = true)
 |-- Rating: string (nullable = true)
 |-- Reviews: string (nullable = true)
 |-- Size: string (nullable = true)
 |-- Installs: string (nullable = true)
 |-- Type: string (nullable = true)
 |-- Price: string (nullable = true)
 |-- Content Rating: string (nullable = true)
 |-- Genres: string (nullable = true)
 |-- Last Updated: string (nullable = true)
 |-- Current Ver: string (nullable = true)
 |-- Android Ver: string (nullable = true)



In [0]:
from pyspark.sql import functions as F
from pyspark.sql.types import DoubleType, IntegerType

df = df.drop("Size", "Content Rating", "Last Updated", "Current Ver", "Android Ver")

df = df.withColumn("Rating", df["Rating"].cast(IntegerType())) \
       .withColumn("Reviews", df["Reviews"].cast(IntegerType())) \
       .withColumn("Installs", F.regexp_replace(df["Installs"], "[^0-9]", "").cast(IntegerType())) \
       .withColumn("Price", F.regexp_replace(df["Price"], "[$]", "").cast(IntegerType()))

transformed_df = df.na.drop().dropDuplicates()
transformed_df.write.format("delta").mode("overwrite").save("s3a://layer2-silver-bucket/")

**Loading the Silver Data**  
Aggregate the data and perform some business-level transformations.


In [0]:
silver_df = spark.read.format("delta").load("s3a://layer2-silver-bucket/")

silver_df.show()


+--------------------+-------------------+------+-------+---------+----+-----+-----------------+
|                 App|           Category|Rating|Reviews| Installs|Type|Price|           Genres|
+--------------------+-------------------+------+-------+---------+----+-----+-----------------+
|350 Diy Room Deco...|     ART_AND_DESIGN|     4|     27|    10000|Free|    0|     Art & Design|
|  Google My Business|           BUSINESS|     4|  70991|  5000000|Free|    0|         Business|
|Mail1Click - Secu...|      COMMUNICATION|     4|    255|    10000|Free|    0|    Communication|
|SilverSingles: Th...|             DATING|     3|    149|    10000|Free|    0|           Dating|
|A&E - Watch Full ...|      ENTERTAINMENT|     4|  29706|  1000000|Free|    0|    Entertainment|
|  All Events in City|             EVENTS|     4|   3782|   100000|Free|    0|           Events|
|       HungerStation|     FOOD_AND_DRINK|     3|  22513|  1000000|Free|    0|     Food & Drink|
|Home Decor Showpi...|     HOU

In [0]:
from pyspark.sql.functions import col, when, avg, sum, expr

#Calculate Average Rating by Category
avg_rating_by_category = silver_df.groupBy("Category").agg(avg("Rating").alias("avg_rating"))
avg_rating_by_category.show()

+-------------------+------------------+
|           Category|        avg_rating|
+-------------------+------------------+
|             EVENTS| 4.044444444444444|
|             SPORTS|3.8076923076923075|
|             COMICS| 3.706896551724138|
|            WEATHER|3.7866666666666666|
|      VIDEO_PLAYERS|           3.60625|
|  AUTO_AND_VEHICLES|3.7260273972602738|
|          PARENTING|              3.82|
|      ENTERTAINMENT|3.7387387387387387|
|    PERSONALIZATION| 3.925806451612903|
|   TRAVEL_AND_LOCAL| 3.707317073170732|
| HEALTH_AND_FITNESS| 3.793103448275862|
|BOOKS_AND_REFERENCE| 3.864406779661017|
|     FOOD_AND_DRINK|3.7264150943396226|
|        PHOTOGRAPHY|3.8092105263157894|
|           BUSINESS|3.7111111111111112|
|             FAMILY| 3.772409778812573|
|           SHOPPING|3.8606965174129355|
|     HOUSE_AND_HOME| 3.735294117647059|
|               GAME| 3.863128491620112|
|          EDUCATION| 3.953488372093023|
+-------------------+------------------+
only showing top

In [0]:
#Summarize Total Installs and Average Reviews per Category
installs_reviews_summary = silver_df.groupBy("Category").agg(
    sum("Installs").alias("total_installs"),
    avg("Reviews").alias("avg_reviews")
)
installs_reviews_summary.show()

+-------------------+--------------+------------------+
|           Category|total_installs|       avg_reviews|
+-------------------+--------------+------------------+
|             EVENTS|      15949410|3568.6666666666665|
|             SPORTS|    1528531465|228398.97552447554|
|             COMICS|      56036100| 58309.39655172414|
|            WEATHER|     426096500|194729.26666666666|
|      VIDEO_PLAYERS|    6221897200|        689873.075|
|  AUTO_AND_VEHICLES|      53129800| 15940.13698630137|
|          PARENTING|      31116110|          19072.18|
|      ENTERTAINMENT|    2455660000|  428565.009009009|
|    PERSONALIZATION|    2074352930|242557.54193548387|
|   TRAVEL_AND_LOCAL|    6361859300|  271048.843902439|
| HEALTH_AND_FITNESS|    1360006220|118170.71264367815|
|BOOKS_AND_REFERENCE|    1916291655|123575.24293785311|
|     FOOD_AND_DRINK|     257777750| 72370.33018867925|
|        PHOTOGRAPHY|    9721243130|       672030.8125|
|           BUSINESS|     863518120| 45769.27777

In [0]:
#Categorize Apps by Install Range
silver_df = silver_df.withColumn("Install_Range", when(col("Installs") < 50000, "Low")
                                      .when((col("Installs") >= 50000) & (col("Installs") < 500000), "Medium")
                                      .otherwise("High"))
silver_df.select("App", "Installs", "Install_Range").show()

+--------------------+---------+-------------+
|                 App| Installs|Install_Range|
+--------------------+---------+-------------+
|350 Diy Room Deco...|    10000|          Low|
|  Google My Business|  5000000|         High|
|Mail1Click - Secu...|    10000|          Low|
|SilverSingles: Th...|    10000|          Low|
|A&E - Watch Full ...|  1000000|         High|
|  All Events in City|   100000|       Medium|
|       HungerStation|  1000000|         High|
|Home Decor Showpi...|    50000|       Medium|
|           IHSS Help|     1000|          Low|
|Tapatalk - 100,00...| 10000000|         High|
|PagesJaunes - loc...| 10000000|         High|
|3D Live Neon Weed...|   100000|       Medium|
|CM Launcher 3D - ...|100000000|         High|
|          HD Widgets|  1000000|         High|
|Real City Car Driver| 10000000|         High|
|               Bible|100000000|         High|
|Maps & GPS Naviga...|   100000|       Medium|
|       How Do I Look|   500000|         High|
| POF Free Da

In [0]:
# Create a Free/Paid indicator column
silver_df = silver_df.withColumn("Price_Category", when(col("Price") == 0, "Free").otherwise("Paid"))
silver_df.select("App", "Price", "Price_Category").show()

+--------------------+-----+--------------+
|                 App|Price|Price_Category|
+--------------------+-----+--------------+
|350 Diy Room Deco...|    0|          Free|
|  Google My Business|    0|          Free|
|Mail1Click - Secu...|    0|          Free|
|SilverSingles: Th...|    0|          Free|
|A&E - Watch Full ...|    0|          Free|
|  All Events in City|    0|          Free|
|       HungerStation|    0|          Free|
|Home Decor Showpi...|    0|          Free|
|           IHSS Help|    0|          Free|
|Tapatalk - 100,00...|    0|          Free|
|PagesJaunes - loc...|    0|          Free|
|3D Live Neon Weed...|    0|          Free|
|CM Launcher 3D - ...|    0|          Free|
|          HD Widgets|    0|          Free|
|Real City Car Driver|    0|          Free|
|               Bible|    0|          Free|
|Maps & GPS Naviga...|    8|          Paid|
|       How Do I Look|    0|          Free|
| POF Free Dating App|    0|          Free|
|        Instachat 😜|    0|     

perform some analysis with SQL

In [0]:
silver_df.createOrReplaceTempView("Google_store_table")


In [0]:
silver_df.columns

Out[14]: ['App',
 'Category',
 'Rating',
 'Reviews',
 'Installs',
 'Type',
 'Price',
 'Genres',
 'Install_Range',
 'Price_Category']

In [0]:
#Top 10 reviews given to the app
top_10_reviews = spark.sql("""
SELECT 
    App,
    SUM(Reviews) AS total_reviews 
FROM 
    Google_store_table 
GROUP BY 
    App
ORDER BY 
    total_reviews DESC
LIMIT 10
""")

top_10_reviews.show()


+--------------------+-------------+
|                 App|total_reviews|
+--------------------+-------------+
|           Instagram|    199664676|
|            Facebook|    156286514|
|      Subway Surfers|    138606606|
|  WhatsApp Messenger|    138228988|
|      Clash of Clans|    134667058|
|Messenger – Text ...|    113289425|
|    Candy Crush Saga|    112134492|
|         8 Ball Pool|     99386198|
|        Clash Royale|     92530298|
|            Snapchat|     68045010|
+--------------------+-------------+



In [0]:
#Top 10 installed apps and distribution of type (free/paid)
top_10_installs = spark.sql("""
SELECT 
    App,
    Install_Range,
    Price_Category,
    SUM(Installs) AS total_installs
FROM 
    Google_store_table
WHERE 
    Install_Range = 'High'
GROUP BY 
    App, Install_Range, Price_Category
ORDER BY 
    total_installs DESC
LIMIT 10
""")


top_10_installs.show()

+--------------------+-------------+--------------+--------------+
|                 App|Install_Range|Price_Category|total_installs|
+--------------------+-------------+--------------+--------------+
|      Subway Surfers|         High|          Free|    5000000000|
|       Google Photos|         High|          Free|    4000000000|
|            Hangouts|         High|          Free|    4000000000|
|           Instagram|         High|          Free|    3000000000|
|Google Chrome: Fa...|         High|          Free|    3000000000|
|         Google News|         High|          Free|    3000000000|
|        Google Drive|         High|          Free|    3000000000|
|Maps - Navigate &...|         High|          Free|    3000000000|
|    Candy Crush Saga|         High|          Free|    2500000000|
|        Temple Run 2|         High|          Free|    2500000000|
+--------------------+-------------+--------------+--------------+



In [0]:
#Top 10 categories most installed 
top_10_categories = spark.sql("""
SELECT 
    Category,
    SUM(Installs) AS total_Installs
FROM 
    Google_store_table 
GROUP BY 
    Category
ORDER BY 
    total_Installs DESC
LIMIT 10
""")

top_10_categories.show()

+------------------+--------------+
|          Category|total_Installs|
+------------------+--------------+
|              GAME|   31543862717|
|     COMMUNICATION|   24152241530|
|            SOCIAL|   12513841475|
|      PRODUCTIVITY|   12463070180|
|             TOOLS|   11440724500|
|            FAMILY|   10041130590|
|       PHOTOGRAPHY|    9721243130|
|  TRAVEL_AND_LOCAL|    6361859300|
|     VIDEO_PLAYERS|    6221897200|
|NEWS_AND_MAGAZINES|    5393110650|
+------------------+--------------+



In [0]:
#Top paid apps 
top_Paid_apps = spark.sql("""
SELECT 
    App,
    SUM(Price)
FROM 
    Google_store_table
WHERE 
    Price_Category = 'Paid'
GROUP BY 
    App
ORDER BY 
    SUM(Price) DESC
LIMIT 10
""")
top_Paid_apps.show()

+--------------------+----------+
|                 App|sum(Price)|
+--------------------+----------+
|I'm Rich - Trump ...|       400|
|  I am rich(premium)|       399|
|   I Am Rich Premium|       399|
|           I am Rich|       399|
|  I AM RICH PRO PLUS|       399|
|          I am Rich!|       399|
|      I am Rich Plus|       399|
|         💎 I'm rich|       399|
|I am rich (Most e...|       399|
|           I am rich|       399|
+--------------------+----------+



In [0]:

silver_df.write.format("delta").mode("overwrite").save("s3a://layer3-gold-bucket/")
