In [46]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, avg, unix_timestamp, lag, row_number
from pyspark.sql.functions import year, to_date
from pyspark.sql.functions import date_format
from pyspark.sql.functions import sum
from pyspark.sql.window import Window
from pyspark.sql import functions as F

In [47]:
spark = SparkSession.builder.appName("Apple Sales Analysis").getOrCreate()

In [48]:
customers_df = spark.read.csv("Customer_Updated1.csv", header=True, inferSchema=True)
products_df = spark.read.csv("Products_updated.csv", header=True, inferSchema=True)
transactions_df = spark.read.csv("Transaction_Updated1.csv", header=True, inferSchema=True)

In [49]:
# Goal 1: Identifying customers who purchased both iPhones and AirPods
iphone_and_airpods = transactions_df.filter(
    (col('product_name').like('%iPhone%')) | (col('product_name').like('%AirPod%'))
).groupBy('customer_id').agg(
    F.countDistinct('product_name').alias('distinct_purchases')
).filter(
    col('distinct_purchases') == 2  # Ensure both iPhone and AirPods are purchased
).select('customer_id')

# Join with customers_df to get customer names
customer_info = customers_df.join(iphone_and_airpods, on='customer_id', how='inner') \
    .select('customer_id', 'customer_name')

# Show the result
customer_info.show()

+-----------+-------------------+
|customer_id|      customer_name|
+-----------+-------------------+
|        100|  Elizabeth Daniels|
|        101|Jeffrey Walters PhD|
|        102|       Daniel Young|
|        103|        Kevin Grant|
|        104|  Hunter Williamson|
|        105|       Gerald Moran|
|        106|          Toni Hill|
|        107|     Felicia Gordon|
|        109|   Kimberly Kaufman|
|        111|      Richard Smith|
|        113|       Carol Torres|
|        114|     Richard Golden|
|        115|    Angela Richards|
|        116|  Christopher White|
|        117|      Paul Ferguson|
|        118|        Robert Peck|
|        119|        Peter Zhang|
|        120|         Juan Lopez|
|        121|      Shirley Lucas|
|        122|      Tracy Mcguire|
+-----------+-------------------+
only showing top 20 rows



In [50]:
# Goal 2: Finding customers who bought only iPhones and AirPods and no other products
iphone_airpods_only = transactions_df.filter(
    (col('product_name').like('%iPhone%')) | (col('product_name').like('%AirPod%'))
).groupBy('customer_id').agg(
    F.countDistinct('product_name').alias('distinct_products')
).filter(
    col('distinct_products') == 2  # Ensure only iPhone and AirPod are bought, no other products
).select('customer_id')

# Join with customers_df to get customer names
customer_info = customers_df.join(iphone_airpods_only, on='customer_id', how='inner') \
    .select('customer_id', 'customer_name')

# Show the result
customer_info.show()

+-----------+-------------------+
|customer_id|      customer_name|
+-----------+-------------------+
|        100|  Elizabeth Daniels|
|        101|Jeffrey Walters PhD|
|        102|       Daniel Young|
|        103|        Kevin Grant|
|        104|  Hunter Williamson|
|        105|       Gerald Moran|
|        106|          Toni Hill|
|        107|     Felicia Gordon|
|        109|   Kimberly Kaufman|
|        111|      Richard Smith|
|        113|       Carol Torres|
|        114|     Richard Golden|
|        115|    Angela Richards|
|        116|  Christopher White|
|        117|      Paul Ferguson|
|        118|        Robert Peck|
|        119|        Peter Zhang|
|        120|         Juan Lopez|
|        121|      Shirley Lucas|
|        122|      Tracy Mcguire|
+-----------+-------------------+
only showing top 20 rows



In [52]:
# Query to compute average price by category
avg_price_by_category = products_df.groupBy("category").agg(avg("price").alias("avg_price"))
avg_price_by_category.show()

+----------+------------------+
|  category|         avg_price|
+----------+------------------+
| Accessory|1008.4539939332659|
|    Laptop| 979.5555555555555|
|    Tablet| 993.6969696969697|
|Smartphone|  986.046696472926|
+----------+------------------+



In [53]:
# Query to count products by category
product_count_by_category = products_df.groupBy("category").agg(count("product_id").alias("product_count"))
product_count_by_category.show()

+----------+-------------+
|  category|product_count|
+----------+-------------+
| Accessory|         1978|
|    Laptop|         2016|
|    Tablet|         1980|
|Smartphone|         4026|
+----------+-------------+



In [54]:
# Query to count customers by location
customers_by_location = customers_df.groupBy("location").agg(count("customer_id").alias("customer_count"))
customers_by_location.show()

+--------------------+--------------+
|            location|customer_count|
+--------------------+--------------+
|     Patrickbury, CO|             1|
|     Port George, HI|             1|
|   West Meredith, UT|             1|
|   Lake Jennifer, IN|             1|
|      Sandraview, CT|             1|
|        Markfort, NH|             1|
|     Kennethfort, MN|             1|
|       Jamesview, RI|             1|
|    New Markview, AZ|             1|
| East Sarahburgh, AL|             1|
|      Torresberg, FL|             1|
|North Michelefort...|             1|
|        Goodbury, MA|             1|
|South Kennethfurt...|             1|
|       Calebstad, IA|             1|
|    South Joseph, WA|             1|
|   Mitchellhaven, NH|             1|
| South Maryhaven, FL|             1|
|Lake Russellville...|             1|
|      Port Scott, VA|             1|
+--------------------+--------------+
only showing top 20 rows



In [55]:
# Query to count customers by location and sort in descending order
top_locations = (
    customers_df.groupBy("location")
    .agg(count("customer_id").alias("customer_count"))
    .orderBy("customer_count", ascending=False)
    .limit(10)
)

top_locations.show()

+------------------+--------------+
|          location|customer_count|
+------------------+--------------+
|   South Sarah, ME|             3|
|Stephaniemouth, NY|             2|
|  East Anthony, CA|             2|
|   Kristinside, WA|             2|
|     Julieview, UT|             2|
|   New Melissa, AR|             2|
|     Brownbury, AR|             2|
|    East David, HI|             2|
| West Kimberly, OH|             2|
|   Johnsontown, VA|             2|
+------------------+--------------+



In [56]:
# Group by product_name and count transactions
top_products = transactions_df.groupBy("product_name") \
    .agg(count("transaction_id").alias("sales_count")) \
    .orderBy(col("sales_count").desc())

top_products.show(10)  # Display top 10 products

+------------+-----------+
|product_name|sales_count|
+------------+-----------+
|      iPhone|       2047|
| Apple Watch|       2038|
|     AirPods|       1989|
|     MacBook|       1987|
|        iPad|       1939|
+------------+-----------+



In [59]:
# Group by customer_id and count transactions to find most frequent customers
most_frequent_customers = transactions_df.groupBy("customer_id") \
    .agg(count("transaction_id").alias("transaction_count")) \
    .orderBy(col("transaction_count").desc())

most_frequent_customers.show(10)

+-----------+-----------------+
|customer_id|transaction_count|
+-----------+-----------------+
|        876|               24|
|        253|               24|
|        816|               24|
|        605|               23|
|        365|               21|
|        384|               20|
|        437|               20|
|        994|               20|
|        708|               20|
|        468|               20|
+-----------+-----------------+
only showing top 10 rows



In [60]:
# Perform an inner join between customer and transaction datasets on customer_id
customer_transactions = customers_df.join(
    transactions_df,
    on="customer_id",
    how="inner"
)

# Group by customer details and count the number of transactions per customer
customer_transaction_summary = customer_transactions.groupBy(
    "customer_id", "customer_name", "join_date", "location"
).agg(
    count("transaction_id").alias("transaction_count")
)

customer_transaction_summary.show()

+-----------+-------------------+----------+--------------------+-----------------+
|customer_id|      customer_name| join_date|            location|transaction_count|
+-----------+-------------------+----------+--------------------+-----------------+
|        596|     Joseph Stevens| 2/22/2019|     Rachelshire, RI|               13|
|        673|    Crystal Beltran|11/12/2024|East Kristinhaven...|               14|
|        586|Mr. Daniel King Jr.|11/14/2023|       Wuchester, AL|               12|
|        182|     Savannah Johns| 7/21/2019|        Tinafort, MD|                4|
|        217|         Tonya Kemp| 1/10/2021|      Martinside, VT|               11|
|        311|        Curtis Rice|10/22/2015|    New Annatown, SD|                8|
|        381|     Frank Franklin|  3/9/2015|Lake Michaelchest...|               15|
|        495|      Mario Maxwell|10/20/2023|      Hermantown, WI|               10|
|        621|        Amanda Hess|  3/8/2016|   Howardborough, NJ|           

In [61]:
# Assume your DataFrame is named 'transactions'
# Step 1: Filter by customer_id
filtered_data = transactions_df.filter(col("customer_id") == 863)
 
# Step 2: Group by product_name and count
product_counts = (
    filtered_data.groupBy("product_name")
    .agg(count("*").alias("order_count"))
    .orderBy(col("order_count").desc())
)
 
# Step 3: Show results
product_counts.show()

+------------+-----------+
|product_name|order_count|
+------------+-----------+
|      iPhone|          3|
|        iPad|          2|
|     MacBook|          2|
| Apple Watch|          2|
|     AirPods|          2|
+------------+-----------+

