In [None]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.3.tar.gz (317.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.3/317.3 MB[0m [31m4.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.3-py2.py3-none-any.whl size=317840625 sha256=f53126af89a6dee5f3beda3d8ce2af16bda7a6b9282c2582f76b2f700e9865c7
  Stored in directory: /root/.cache/pip/wheels/1b/3a/92/28b93e2fbfdbb07509ca4d6f50c5e407f48dce4ddbda69a4ab
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.3


In [None]:
from pyspark.sql import SparkSession
from pyspark import SparkContext

spark = SparkSession.builder.appName("RDD Operations").getOrCreate()
sc = SparkContext.getOrCreate()

In [None]:
sc

In [None]:
# Loading the datasets as RDDs
orders_rdd = sc.textFile("/content/orders_1gb.csv")
order_items_rdd = sc.textFile("/content/Order_items.csv")
customers_rdd = sc.textFile("/content/Customers.csv")

In [None]:
customers_rdd.take(5)
orders_rdd.take(5)
order_items_rdd.take(5)

['1,1,957,1,299.98,299.98',
 '2,2,1073,1,199.99,199.99',
 '3,2,502,5,250.0,50.0',
 '4,2,403,1,129.99,129.99',
 '5,4,897,2,49.98,24.99']

In [None]:
# Map data into key-value pairs
orders_mapped = orders_rdd.map(lambda x: (x.split(",")[0], x))  # key is order id
order_items_mapped = order_items_rdd.map(lambda x: (x.split(",")[1], x))  # key is order id
customers_mapped = customers_rdd.map(lambda x: (x.split(",")[0], x))

In [None]:
#Joining Orders and Order Items
orders_joined = orders_mapped.join(order_items_mapped)

In [None]:
#Find Top 10 Customers by Total Orders
orders_customers = orders_rdd.map(lambda x: (x.split(",")[2], x))  # customer_id as key
customer_orders_joined = orders_customers.join(customers_mapped)

# Map customer IDs to orders and reduce by customer_id to get order count
customer_order_count = customer_orders_joined.map(lambda x: (x[0], 1)).reduceByKey(lambda x, y: x + y)

# Get top 10 customers by order count
top_10_customers = customer_order_count.takeOrdered(10, key=lambda x: -x[1])
print(top_10_customers)

[('6316', 443), ('12431', 442), ('5897', 441), ('569', 440), ('5654', 414), ('4320', 413), ('221', 413), ('5283', 412), ('12284', 410), ('5624', 410)]


In [None]:
# Top 10 products by quantity sold
product_quantities = order_items_rdd.map(lambda x: (x.split(",")[0], int(x.split(",")[3])))

# Reduce by product_id to get total quantities sold
product_quantity_count = product_quantities.reduceByKey(lambda x, y: x + y)

# Get top 10 product IDs with the highest quantities sold
top_10_products = product_quantity_count.takeOrdered(10, key=lambda x: -x[1])
print(top_10_products)

[('10', 5), ('16', 5), ('33', 5), ('60', 5), ('63', 5), ('84', 5), ('107', 5), ('113', 5), ('115', 5), ('128', 5)]


In [None]:
# Customers from Caguas City
caguas_customers = customers_rdd.filter(lambda x: "Caguas" in x.split(",")[6])

# Count
caguas_count = caguas_customers.count()
print(caguas_count)

4584


In [None]:
#Top 3 States with most Customers
customers_by_state = customers_rdd.map(lambda x: (x.split(",")[7], 1)).reduceByKey(lambda x, y: x + y)

# Get top 3
top_3_states = customers_by_state.takeOrdered(3, key=lambda x: -x[1])
print(top_3_states)

[('PR', 4771), ('CA', 2012), ('NY', 775)]


In [None]:
#Number of customer spendign 1000$
customer_spent = order_items_rdd.map(lambda x: (x.split(",")[1], float(x.split(",")[4]))).reduceByKey(lambda x, y: x + y)

# Filter customers who spent more than $1000
customers_over_1000 = customer_spent.filter(lambda x: x[1] > 1000)

# Count
over_1000_count = customers_over_1000.count()
print(over_1000_count)

7519


In [None]:
# Filter orders to get only those with "CLOSED" status - state with max closed orders
closed_orders = orders_rdd.filter(lambda x: "CLOSED" in x.split(",")[3])

#  Map closed orders to (cust_id, order_row)
closed_orders_mapped = closed_orders.map(lambda x: (x.split(",")[2], x))  # cust_id is at index 2 in orders

# Join closed orders with customers on cust_id (which is index 0 in customers)
closed_orders_with_customers = closed_orders_mapped.join(customers_mapped)

# Map to (state, 1) where state is at index 7 in customers and count by state
closed_orders_by_state = closed_orders_with_customers.map(lambda x: (x[1][1].split(",")[7], 1)).reduceByKey(lambda x, y: x + y)

state_with_most_closed = closed_orders_by_state.takeOrdered(1, key=lambda x: -x[1])
print(state_with_most_closed)

[('PR', 79869)]


In [None]:
# Count active customers
active_customers = orders_rdd.map(lambda x: x.split(",")[1]).distinct()

# Count active customers (those who placed at least one order)
active_customers_count = active_customers.count()
print(active_customers_count)


364


In [None]:
#Sorted States by Revenue

# Join orders and customers based on cust_id (customers_mapped uses cust_id as key)
revenue_by_state = orders_mapped.join(customers_mapped).map(
    lambda x: (x[1][1].split(",")[7], float(x[1][0].split(",")[2])))  # Extract state from customers (index 7) and order value from orders (index 2)

# Reduce by state to get total revenue and sort by revenue
total_revenue_by_state = revenue_by_state.reduceByKey(lambda x, y: x + y).sortBy(lambda x: -x[1])


revenue_by_state_sorted = total_revenue_by_state.collect()
print(revenue_by_state_sorted)


[('PR', 825192144.0), ('CA', 346220588.0), ('NY', 134419656.0), ('TX', 110311712.0), ('IL', 94453380.0), ('FL', 65122260.0), ('OH', 47191872.0), ('MI', 44348640.0), ('PA', 44327864.0), ('AZ', 36887228.0), ('NJ', 36533420.0), ('GA', 30157484.0), ('MD', 28832524.0), ('NC', 26465824.0), ('CO', 23130324.0), ('VA', 22643012.0), ('OR', 20076196.0), ('MA', 19923148.0), ('NV', 18337900.0), ('MO', 17587584.0), ('TN', 17415076.0), ('HI', 13846308.0), ('NM', 13764604.0), ('CT', 12801628.0), ('UT', 12761896.0), ('WI', 12414248.0), ('WA', 12200776.0), ('LA', 11068512.0), ('DC', 8469244.0), ('MN', 7426552.0), ('SC', 6961024.0), ('KY', 6368124.0), ('IN', 6246324.0), ('KS', 5167400.0), ('DE', 4069128.0), ('OK', 3130428.0), ('RI', 3049844.0), ('WV', 2757048.0), ('AR', 2365804.0), ('ND', 1929536.0), ('ID', 1610280.0), ('MT', 1373092.0), ('AL', 851144.0), ('IA', 625940.0)]
