In [1]:
import pyspark
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pymongo import MongoClient
from pyspark.sql import SQLContext
from pyspark.sql import functions as F

In [2]:
conf = pyspark.SparkConf().set("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:10.4.0") \
    .setMaster("local") \
    .setAppName("BDA_Ass") \
    .setAll([("spark.driver.memory", "40g"), ("spark.executor.memory", "50g")])

In [3]:
client = MongoClient('mongodb://localhost:27017/')
db = client['College_DB']

In [4]:
sc = SparkContext(conf=conf)

24/09/26 21:54:40 WARN Utils: Your hostname, scorched-one-HP-Pavilion-Laptop-15-eh1xxx resolves to a loopback address: 127.0.1.1; using 192.168.46.68 instead (on interface wlo1)
24/09/26 21:54:40 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/home/scorched-one/anaconda3/envs/ML/lib/python3.12/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/scorched-one/.ivy2/cache
The jars for the packages stored in: /home/scorched-one/.ivy2/jars
org.mongodb.spark#mongo-spark-connector_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-3a1af8a0-bb51-4301-978e-00b895ab463b;1.0
	confs: [default]
	found org.mongodb.spark#mongo-spark-connector_2.12;10.4.0 in central
	found org.mongodb#mongodb-driver-sync;5.1.4 in central
	[5.1.4] org.mongodb#mongodb-driver-sync;[5.1.1,5.1.99)
	found org.mongodb#bson;5.1.4 in central
	found org.mongodb#mongodb-driver-core;5.1.4 in central
	found org.mongodb#bson-record-codec;5.1.4 in central
:: resolution report :: resolve 11423ms :: artifacts dl 7ms
	:: modules in use:
	org.mongodb#bson;5.1.4 from central in [default]
	org.mongodb#bson-record-codec;5.1.4 from central in [default]
	org.mongodb#mongodb-driver-core;5.1.4 from central in [default]
	org.mongodb#mongodb-driver-sync;5.1.4 from central in [default]
	org.mongodb.spark#mongo-spa

In [5]:
sqlC = SQLContext(sc)



In [6]:
mongo_uri = "mongodb://localhost:27017/College_DB"

In [7]:
departments_df = sqlC.read.format("mongo")
instructors_df = sqlC.read.format("mongo")
students_df = sqlC.read.format("mongo")
courses_df = sqlC.read.format("mongo")
enrollments_df = sqlC.read.format("mongo")

In [8]:
def fetch_students_in_course(db):
    return list(db.Students.aggregate([
        {
            "$lookup": {
                "from": "Enrollments",
                "localField": "_id",
                "foreignField": "student_id",
                "as": "enrollment"
            }
        },
        {
            "$unwind": "$enrollment"
        },
        {
            "$lookup": {
                "from": "Courses",
                "localField": "enrollment.course_id",
                "foreignField": "_id",
                "as": "course"
            }
        },
        {
            "$unwind": "$course"
        },
        {
            "$match": {
                "course.name": "Data Structures"
            }
        },
        {
            "$project": {
                "name": 1,
                "student_type": 1
            }
        }
    ]))

def average_students_per_instructor(db):
    return list(db.Courses.aggregate([
        {
            "$lookup": {
                "from": "Enrollments",
                "localField": "_id",
                "foreignField": "course_id",
                "as": "enrollment"
            }
        },
        {
            "$group": {
                "_id": "$instructor_id",
                "enrollment_count": {"$sum": {"$size": {"$ifNull": ["$enrollment", []]}}}
            }
        },
        {
            "$group": {
                "_id": None,
                "average": {"$avg": "$enrollment_count"}
            }
        }
    ]))

def list_courses_by_department(db):
    return list(db.Courses.aggregate([
        {
            "$lookup": {
                "from": "Departments",
                "localField": "department_id",
                "foreignField": "_id",
                "as": "department"
            }
        },
        {
            "$unwind": "$department"
        },
        {
            "$match": {
                "department.name": "Computer Science"
            }
        },
        {
            "$project": {
                "name": 1
            }
        }
    ]))

def total_students_per_department(db):
    return list(db.Students.aggregate([
        {
            "$lookup": {
                "from": "Departments",
                "localField": "department_id",
                "foreignField": "_id",
                "as": "department"
            }
        },
        {
            "$unwind": "$department"
        },
        {
            "$group": {
                "_id": "department.name",
                "total_students": {"$sum": 1}
            }
        }
    ]))

def instructors_taught_btech_cse_core(db):
    return list(db.Instructors.aggregate([
        {
            "$lookup": {
                "from": "Courses",
                "localField": "_id",
                "foreignField": "instructor_id",
                "as": "courses"
            }
        },
        {
            "$unwind": "$courses"
        },
        {
            "$match": {
                "courses.is_core": True,
                "courses.department_id": db.Departments.find_one({"name": "Computer Science"})["_id"]
            }
        },
        {
            "$group": {
                "_id": "$_id",
                "count": {"$sum": 1}
            }
        },
        {
            "$match": {
                "count": db.Courses.count_documents({"is_core": True, "department_id": db.Departments.find_one({"name": "Computer Science"})["_id"]})
            }
        }
    ]))

def top_courses_by_enrollment(db):
    return list(db.Courses.aggregate([
        {
            "$lookup": {
                "from": "Enrollments",
                "localField": "_id",
                "foreignField": "course_id",
                "as": "enrollment"
            }
        },
        {
            "$project": {
                "name": 1,
                "total_enrollments": {"$size": {"$ifNull": ["$enrollment", []]}}
            }
        },
        {
            "$sort": {"total_enrollments": -1}
        },
        {
            "$limit": 10
        }
    ]))

In [9]:
fetch_students_in_course(db)

[{'_id': ObjectId('66f58527375b3e4c47395fa4'),
  'name': 'Arjun Patil',
  'student_type': 'BTech'},
 {'_id': ObjectId('66f58527375b3e4c47395fa4'),
  'name': 'Arjun Patil',
  'student_type': 'BTech'},
 {'_id': ObjectId('66f58527375b3e4c47395fa5'),
  'name': 'Priya Singh',
  'student_type': 'BTech'},
 {'_id': ObjectId('66f58527375b3e4c47395fa5'),
  'name': 'Priya Singh',
  'student_type': 'BTech'},
 {'_id': ObjectId('66f58527375b3e4c47395fa8'),
  'name': 'Neha Iyer',
  'student_type': 'BTech'},
 {'_id': ObjectId('66f58527375b3e4c47395fa8'),
  'name': 'Neha Iyer',
  'student_type': 'BTech'}]

In [10]:
average_students_per_instructor(db)

[{'_id': None, 'average': 4.0}]

In [11]:
list_courses_by_department(db)

[{'_id': ObjectId('66f58527375b3e4c47395fae'), 'name': 'Data Structures'},
 {'_id': ObjectId('66f58527375b3e4c47395faf'), 'name': 'Algorithms'},
 {'_id': ObjectId('66f58527375b3e4c47395fb2'), 'name': 'Database Systems'},
 {'_id': ObjectId('66f58527375b3e4c47395fb3'), 'name': 'Data Structures'},
 {'_id': ObjectId('66f58527375b3e4c47395fb4'), 'name': 'Algorithms'},
 {'_id': ObjectId('66f58527375b3e4c47395fb7'), 'name': 'Database Systems'}]

In [12]:
total_students_per_department(db)

[{'_id': 'department.name', 'total_students': 10}]

In [13]:
instructors_taught_btech_cse_core(db)

[]

In [14]:
top_courses_by_enrollment(db)

[{'_id': ObjectId('66f58527375b3e4c47395fae'),
  'name': 'Data Structures',
  'total_enrollments': 6},
 {'_id': ObjectId('66f58527375b3e4c47395faf'),
  'name': 'Algorithms',
  'total_enrollments': 4},
 {'_id': ObjectId('66f58527375b3e4c47395fb1'),
  'name': 'Thermodynamics',
  'total_enrollments': 2},
 {'_id': ObjectId('66f58527375b3e4c47395fb2'),
  'name': 'Database Systems',
  'total_enrollments': 2},
 {'_id': ObjectId('66f58527375b3e4c47395fb0'),
  'name': 'Circuit Analysis',
  'total_enrollments': 2},
 {'_id': ObjectId('66f58527375b3e4c47395fb5'),
  'name': 'Circuit Analysis',
  'total_enrollments': 0},
 {'_id': ObjectId('66f58527375b3e4c47395fb6'),
  'name': 'Thermodynamics',
  'total_enrollments': 0},
 {'_id': ObjectId('66f58527375b3e4c47395fb7'),
  'name': 'Database Systems',
  'total_enrollments': 0},
 {'_id': ObjectId('66f58527375b3e4c47395fb3'),
  'name': 'Data Structures',
  'total_enrollments': 0},
 {'_id': ObjectId('66f58527375b3e4c47395fb4'),
  'name': 'Algorithms',
  'to

In [15]:
sc.stop()