In [None]:
%md
# Extract

In [None]:
df = spark.read.table("default.apple_sales_2024_dataset")
display(df)

In [None]:
df.printSchema()

In [None]:
df.describe().show()

In [None]:
%md
# Transform

In [None]:
from pyspark.sql.functions import col, sum

In [None]:
df_nulls = df.select([sum(col(c).isNull().cast("int")).alias(c) for c in df.columns])

In [None]:
print("Current column names:")
print(df.columns)

In [None]:
display(df_nulls)

In [None]:
df_duplicates = df.distinct()

In [None]:
print(f"Number of rows before removing duplicates: {df.count()}")
print(f"Number of rows after removing duplicates: {df_duplicates.count()}")

In [None]:
display(df_duplicates)

In [None]:
df_renamed = df.withColumnRenamed("State", "state") \
               .withColumnRenamed("Region", "region") \
               .withColumnRenamed("iPhone Sales (in million units)", "iphone_sales_million_units") \
               .withColumnRenamed("iPad Sales (in million units)", "ipad_sales_million_units") \
               .withColumnRenamed("Mac Sales (in million units)", "mac_sales_million_units") \
               .withColumnRenamed("Wearables (in million units)", "wearables_sales_million_units") \
               .withColumnRenamed("Services Revenue (in billion $)", "services_revenue_billion") \
               .withColumnRenamed("Total_Sales (in Million Units)", "total_sales_million_units")

In [None]:
display(df_renamed)

In [None]:
print("Updated column names:")
print(df_renamed.columns)

In [None]:
df.printSchema()

In [None]:
total_rows_after_aggregation = df_aggregated.count()
print(f"Total number of rows after aggregation: {total_rows_after_aggregation}")

In [None]:
from pyspark.sql.functions import sum

In [None]:
df_aggregated = df.groupBy("State", "Region").agg(
    sum("iPhone Sales (in million units)").alias("Total_iPhone_Sales (in million units)"),
    sum("iPad Sales (in million units)").alias("Total_iPad_Sales (in million units)"),
    sum("Mac Sales (in million units)").alias("Total_Mac_Sales (in million units)"),
    sum("Wearables (in million units)").alias("Total_Wearables_Sales (in million units)"),
    sum("Services Revenue (in billion $)").alias("Total_Services_Revenue (in billion $)"),
    sum("Total_Sales (in Million Units)").alias("Total_Sales (in million units)")
)

In [None]:
display(df_aggregated)

In [None]:
df_aggregated = df_aggregated.select(
    "State",
    "Region",
    "Total_iPhone_Sales (in million units)",
    "Total_iPad_Sales (in million units)",
    "Total_Mac_Sales (in million units)",
    "Total_Wearables_Sales (in million units)",
    "Total_Sales (in million units)",
    "Total_Services_Revenue (in billion $)"
)

In [None]:
display(df_aggregated)

In [None]:
df_aggregated.columns

In [None]:
new_df = df_aggregated

In [None]:
display(new_df)

In [None]:
display(new_df)

In [None]:
from pyspark.sql.functions import round

In [None]:
rounded_new_df = new_df.withColumn("Total_iPhone_Sales (in million units)", round(new_df["Total_iPhone_Sales (in million units)"], 2)) \
                       .withColumn("Total_iPad_Sales (in million units)", round(new_df["Total_iPad_Sales (in million units)"], 2)) \
                       .withColumn("Total_Mac_Sales (in million units)", round(new_df["Total_Mac_Sales (in million units)"], 2)) \
                       .withColumn("Total_Wearables_Sales (in million units)", round(new_df["Total_Wearables_Sales (in million units)"], 2)) \
                       .withColumn("Total_Sales (in million units)", round(new_df["Total_Sales (in million units)"], 2)) \
                       .withColumn("Total_Services_Revenue (in billion $)", round(new_df["Total_Services_Revenue (in billion $)"], 2))

In [None]:
display(rounded_new_df)

In [None]:
final_df = rounded_new_df
final_df.show()

In [None]:
display(final_df)

In [None]:
%md
# Load

In [None]:
final_df.write.format("delta") \
    .option("overwriteSchema", "true") \
    .mode("overwrite") \
    .saveAsTable("apple_sales_2024_dataset")

In [None]:
spark.sql("SELECT * FROM apple_sales_2024_dataset").display()

In [None]:
%md
#Queries

In [None]:
## Total Sales by Region
spark.sql("""
    SELECT Region, SUM(`Total_Sales (in million units)`) AS total_revenue
    FROM apple_sales_2024_dataset
    GROUP BY Region
    ORDER BY total_revenue DESC
""").display()

In [None]:
## Top 5 States by Sales
spark.sql("""
    SELECT State, SUM(`Total_Sales (in million units)`) AS total_revenue
    FROM apple_sales_2024_dataset
    GROUP BY State
    ORDER BY total_revenue DESC
    LIMIT 5
""").display()

In [None]:
## Most Sold Apple Product
spark.sql("""
    SELECT 'iPhone' AS Product, ROUND(SUM(`Total_iPhone_Sales (in million units)`), 2) AS Total_Units_Sold FROM apple_sales_2024_dataset
    UNION ALL
    SELECT 'iPad', ROUND(SUM(`Total_iPad_Sales (in million units)`), 2) FROM apple_sales_2024_dataset
    UNION ALL
    SELECT 'Mac', ROUND(SUM(`Total_Mac_Sales (in million units)`), 2) FROM apple_sales_2024_dataset
    UNION ALL
    SELECT 'Wearables', ROUND(SUM(`Total_Wearables_Sales (in million units)`), 2) FROM apple_sales_2024_dataset
    ORDER BY Total_Units_Sold DESC
""").display()

In [None]:
## Percentage Contribution of Each Product to Total Sales
spark.sql("""
    WITH TotalSales AS (
        SELECT 
            SUM(`Total_iPhone_Sales (in million units)`) AS iPhone_Sales,
            SUM(`Total_iPad_Sales (in million units)`) AS iPad_Sales,
            SUM(`Total_Mac_Sales (in million units)`) AS Mac_Sales,
            SUM(`Total_Wearables_Sales (in million units)`) AS Wearables_Sales
        FROM apple_sales_2024_dataset
    )
    SELECT 
        'iPhone' AS Product, 
        ROUND((iPhone_Sales / (iPhone_Sales + iPad_Sales + Mac_Sales + Wearables_Sales)) * 100, 2) AS Percentage_Contribution
    FROM TotalSales
    UNION ALL
    SELECT 
        'iPad', 
        ROUND((iPad_Sales / (iPhone_Sales + iPad_Sales + Mac_Sales + Wearables_Sales)) * 100, 2) 
    FROM TotalSales
    UNION ALL
    SELECT 
        'Mac', 
        ROUND((Mac_Sales / (iPhone_Sales + iPad_Sales + Mac_Sales + Wearables_Sales)) * 100, 2) 
    FROM TotalSales
    UNION ALL
    SELECT 
        'Wearables', 
        ROUND((Wearables_Sales / (iPhone_Sales + iPad_Sales + Mac_Sales + Wearables_Sales)) * 100, 2) 
    FROM TotalSales
    ORDER BY Percentage_Contribution DESC
""").display()

In [None]:
## Percentage Contribution of Each Region to Total Sales
spark.sql("""
    SELECT Region,
           SUM(`Total_Sales (in million units)`) AS Total_Sales,
           ROUND(100 * SUM(`Total_Sales (in million units)`) / 
           (SELECT SUM(`Total_Sales (in million units)`) FROM apple_sales_2024_dataset), 2) AS Sales_Percentage
    FROM apple_sales_2024_dataset
    GROUP BY Region
    ORDER BY Sales_Percentage DESC
""").display()