In [10]:
# -- Question 8

# We have a data which consists of following columns
# -Row number
# -First name of student
# -Last name of student
# -Course number
# -Grade 

# Write an efficient Spark code to calculate
# 1 - Min grade of each student
# 2.- Max grade of each student
# 3.- GPA 
# 4.- Number of courses taken

# I took the liberty of creating sample data since it would allow easier implementation and checing the results. 



from pyspark.sql import SparkSession
from pyspark.sql.functions import col, min, max, avg, when, count

spark = SparkSession.builder.appName("grades").getOrCreate()

# Sample data
data = [
    (1, 'Magnus',  'Carlsen',      'CS101', 88),
    (1, 'Magnus',  'Carlsen',      'CS102', 90),
    (2, 'Hikaru',  'Nakamura',     'CS101', 85),
    (2, 'Hikaru',  'Nakamura',     'CS102', 89),
    (3, 'Fabiano', 'Caruana',      'CS101', 91),
    (3, 'Fabiano', 'Caruana',      'CS103', 87),
    (4, 'Ian',     'Nepomniatchi', 'CS102', 79),
    (4, 'Ian',     'Nepomniatchi', 'CS104', 81),
    (5, 'Wesley',  'So',           'CS103', 83),
    (5, 'Wesley',  'So',           'CS101', 80)
]

columns = ['ID', 'First Name', 'Last Name', 'Course ID', 'Grade']
df = spark.createDataFrame(data, columns)

# Required Metrics
df = df.withColumn("GPA Scale", 
                   when(col('Grade') >= 90, 4.0)
                   .when(col('Grade') >= 80, 3.0)
                   .when(col('Grade') >= 70, 2.0)
                   .when(col('Grade') >= 60, 1.0)
                   .otherwise(0.0))

# Required Metrics
results = df.groupBy('First Name', 'Last Name') \
    .agg(
        min(col('Grade')).alias('Min Grade'),
        max(col('Grade')).alias('Max Grade'),
        avg(col('GPA Scale')).alias('GPA (4.0 Scale)'),
        count(col('Course ID')).alias('Number of Courses Taken')
    )

results.show()
spark.stop()




+----------+------------+---------+---------+---------------+-----------------------+
|First Name|   Last Name|Min Grade|Max Grade|GPA (4.0 Scale)|Number of Courses Taken|
+----------+------------+---------+---------+---------------+-----------------------+
|    Magnus|     Carlsen|       88|       90|            3.5|                      2|
|    Hikaru|    Nakamura|       85|       89|            3.0|                      2|
|   Fabiano|     Caruana|       87|       91|            3.5|                      2|
|       Ian|Nepomniatchi|       79|       81|            2.5|                      2|
|    Wesley|          So|       80|       83|            3.0|                      2|
+----------+------------+---------+---------+---------------+-----------------------+



In [9]:
# -- Question 9

# - Estimation area of a circle:
# - Use Spark to estimate area of the unit circle by "throwing darts" at the circle. 
# - Assume you don’t know how to calculate area of a circle in a closed form
# - but you know how to calculate area of a square. 
# - You throw random darts/points in the 2 by 2 square ((-1, -1) to (1,1)) 
# - and count how many falls in the unit circle, a circle with radius of one. 
# - The fraction can be used to estimate of the area of the unit circle.


from pyspark.sql import SparkSession
import random

spark = SparkSession.builder.appName("circle").getOrCreate()


# generate a random x-cordinate and y-cordinate between -1 and +1. 
# generate the distance from the origin (0, 0) by squaring the co-ordinates.
# check the point computed is within the circle (i.e. distance from the origin is within 1 unit of distance). 
def isPointInCircle(_):
    try:
        x = random.uniform(-1, 1)        
        y = random.uniform(-1, 1)
        
        distance_squared = x**2 + y**2
        
        if distance_squared <= 1: return 1
        else: return 0
            
    except Exception as e:
        print(f"Error occurred while calculating point: {e}")
        return -1

darts = 1000000

# create an RDD with parallelize API with the number of darts specified. 
# perform the simulation: map each dart to 1 if it is inside the circle. 
# compute the area of unit circle based on the fraction of points inside the circle. 
# print the estimated area of the circle. 


rdd = spark.sparkContext.parallelize(range(0, darts))
points = rdd.map(isPointInCircle).reduce(lambda a, b: a + b)
area = 4 * (points / darts)
print(f"Area of Circle Estimate: {area}")

spark.stop()


Area of Circle Estimate: 3.141976


In [20]:
# -- Question 10
# -- Load the data from a file called “Assignment2_customer-orders.csv”.
# -- Write a pyspark and report
    # -- 5 top Customers who spent the most.
    # -- If you consider top 10 customers who spent the most, which item has been purchased the most.


# -- This question is done in two parts: 
    # -- Without Spark, to check the dataframe of the null values and datatypes
    # -- With Spark, as specified by the assignment. 

    
# This part is without Spark - just to check the dataset.     

import pandas as pd

file_path = "/Users/keshavsaraogi/Desktop/BU/sem3/big-data/ASSIGNMENTS/ASSIGNMENT_2/customer-orders.csv"
df = pd.read_csv(file_path)

null_values = df.isnull().sum()
data_types = df.dtypes

unique_customers = df['CustomerId'].nunique()
unique_items = df['Item purchased'].nunique()
descriptive_stats = df.describe()

print("Null values in each column:\n", null_values)
print("\nData types of each column:\n", data_types)
print(f"\nNumber of unique customers: {unique_customers}")
print(f"Number of unique items: {unique_items}")
print("\nDescriptive statistics:\n", descriptive_stats)



Null values in each column:
 CustomerId        0
Item purchased    0
PurchacePrice     0
dtype: int64

Data types of each column:
 CustomerId          int64
Item purchased      int64
PurchacePrice     float64
dtype: object

Number of unique customers: 100
Number of unique items: 1000

Descriptive statistics:
           CustomerId  Item purchased  PurchacePrice
count  597589.000000   597589.000000  597589.000000
mean       49.507382     1499.369100      50.009081
std        28.868750      288.774443      28.882208
min         0.000000     1000.000000       0.000000
25%        24.000000     1249.000000      25.000000
50%        50.000000     1499.000000      50.000000
75%        75.000000     1749.000000      75.030000
max        99.000000     1999.000000     100.000000


In [21]:
# Question 10 - With Spark


from pyspark.sql import SparkSession
from pyspark.sql.functions import sum as spark_sum, desc

spark = SparkSession.builder.appName("customer-order").getOrCreate()
df = spark.read.csv(file_path, header=True, inferSchema=True)

topFiveSpendingCustomers = df\
    .groupBy("CustomerId")\
    .agg(spark_sum("PurchacePrice").alias("TotalSpent"))\
    .orderBy(desc("TotalSpent"))\
    .limit(5)

topTenSpendingCustomers = df\
    .groupBy("CustomerId")\
    .agg(spark_sum("PurchacePrice").alias("TotalSpent"))\
    .orderBy(desc("TotalSpent"))\
    .limit(10)

topTenSpendingCustomersID = [row["CustomerId"] for row in topTenSpendingCustomers.collect()]

filteredTopTenCustomers = df\
    .filter(df\
        .CustomerId\
        .isin(topTenSpendingCustomersID))

mostPurchasedItem = filtered_top_10_df\
    .groupBy("Item purchased")\
    .count()\
    .orderBy(desc("Count"))\
    .limit(1)

topFiveSpendingCustomers.show()
mostPurchasedItem.show()

+----------+------------------+
|CustomerId|        TotalSpent|
+----------+------------------+
|        28| 309717.5700000002|
|        73| 308126.8399999998|
|        52| 307093.3200000001|
|        33| 306820.3300000001|
|        58|305663.10999999975|
+----------+------------------+

+--------------+-----+
|Item purchased|count|
+--------------+-----+
|          1198|   94|
+--------------+-----+

