In [1]:
import pandas as pd

# Creating synthetic data to match your Spark script
data = {
    "age": [32, 45, 28, 55, 39, 41, 23, 60, 31, 48],
    "job": ["admin.", "technician", "management", "retired", "blue-collar", "admin.", "student", "retired", "management", "unemployed"],
    "marital": ["single", "married", "single", "married", "married", "divorced", "single", "married", "married", "divorced"],
    "education": ["tertiary", "secondary", "tertiary", "primary", "secondary", "secondary", "secondary", "primary", "tertiary", "secondary"],
    "loan": ["no", "yes", "no", "no", "yes", "no", "no", "no", "no", "yes"],
    "balance": [2500, 50, 1200, 8000, -150, 450, 10, 15000, 3200, 0],
    "deposit": ["yes", "no", "yes", "yes", "no", "no", "yes", "yes", "no", "no"]
}

df_pandas = pd.DataFrame(data)
df_pandas.to_csv("bank_data.csv", index=False)
print("Dataset 'bank_data.csv' created successfully!")

Dataset 'bank_data.csv' created successfully!


In [2]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator


# Spark session
spark = SparkSession.builder.appName("SparkML").getOrCreate()

# Load data
df = spark.read.option("header", "true").option("inferSchema", "true").csv("bank_data.csv")

# String columns
string_cols = ["job", "marital", "education", "loan", "deposit"]

# StringIndexers (IMPORTANT FIX)
indexers = [
 StringIndexer(
 inputCol=col,
 outputCol=col + "_index",
 handleInvalid="keep"
 )
 for col in string_cols
]


# Feature columns
feature_cols = [
 "age",
 "balance",
 "job_index",
 "marital_index",
 "education_index",
 "loan_index"
]


In [4]:
# VectorAssembler
assembler = VectorAssembler(
 inputCols=feature_cols,
 outputCol="features"
)


# Logistic Regression model
lr = LogisticRegression(
 featuresCol="features",
 labelCol="deposit_index"
)


# Pipeline
pipeline = Pipeline(stages=indexers + [assembler, lr])


# Train-test split
train_data, test_data = df.randomSplit([0.75, 0.25], seed=42)


# Train model
model = pipeline.fit(train_data)


# Predictions
predictions = model.transform(test_data)


# Evaluation
evaluator = MulticlassClassificationEvaluator(
 labelCol="deposit_index",
 predictionCol="prediction",
 metricName="accuracy"
)
accuracy = evaluator.evaluate(predictions)
print("Accuracy:", accuracy)

Accuracy: 0.75


In [6]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import min, max, avg
# 1️⃣ Create Spark Session
spark = SparkSession.builder.appName("SectionC_PartA").getOrCreate()
# 2️⃣ Load Dataset
df = spark.read.option("header", "true").option("inferSchema", "true").csv("bank_data.csv")
# Verify data
df.show()
df.printSchema()
# ---------------------------------------
# Q1. Minimum, Maximum, Average balance
# ---------------------------------------
df.select(min("balance").alias("min_balance"), max("balance").alias("max_balance"), avg("balance").alias("avg_balance")).show()
# ---------------------------------------
# Q2. Job type with highest number of people
# ---------------------------------------
df.groupBy("job").count().orderBy("count", ascending=False).show()
# Only top job
df.groupBy("job").count().orderBy("count", ascending=False).limit(1).show()
# ---------------------------------------
# Q3. Count people aged over 60 AND having a loan
# ---------------------------------------
df.filter((df.age > 60) & (df.loan == "yes")).count()
# ---------------------------------------
# Q4. Count deposits ('yes') by education level
# ---------------------------------------
df.filter(df.deposit == "yes").groupBy("education").count().show()
# ---------------------------------------
# Q5. Deposit count ('yes' vs 'no') for each marital status
# ---------------------------------------
df.groupBy("marital", "deposit").count().show()
# ---------------------------------------
# Q6. Remove all rows containing ANY null values
# ---------------------------------------
df_clean = df.dropna()
df_clean.show()
# ---------------------------------------
# EDGE CASE 1: Balance greater than 1000
# ---------------------------------------
df.filter(df.balance > 1000).count()
# PySpark OPERATIONS 1 (commented out as it's not valid code)
# ---------------------------------------
# EDGE CASE 2: Married AND deposit yes
# ---------------------------------------
df.filter((df.marital == "married") & (df.deposit == "yes")).count()
# ---------------------------------------
# EDGE CASE 3: Count loans per job
# ---------------------------------------
df.filter(df.loan == "yes").groupBy("job").count().show()
# ---------------------------------------
# EDGE CASE 4: Spark SQL version
# ---------------------------------------
df.createOrReplaceTempView("bank")
spark.sql("SELECT education, COUNT(*) AS cnt FROM bank WHERE deposit = 'yes' GROUP BY education").show()
# ---------------------------------------
# EDGE CASE 5: Drop column + remove nulls
# ---------------------------------------
df.drop("education").dropna().show()

+---+-----------+--------+---------+----+-------+-------+
|age|        job| marital|education|loan|balance|deposit|
+---+-----------+--------+---------+----+-------+-------+
| 32|     admin.|  single| tertiary|  no|   2500|    yes|
| 45| technician| married|secondary| yes|     50|     no|
| 28| management|  single| tertiary|  no|   1200|    yes|
| 55|    retired| married|  primary|  no|   8000|    yes|
| 39|blue-collar| married|secondary| yes|   -150|     no|
| 41|     admin.|divorced|secondary|  no|    450|     no|
| 23|    student|  single|secondary|  no|     10|    yes|
| 60|    retired| married|  primary|  no|  15000|    yes|
| 31| management| married| tertiary|  no|   3200|     no|
| 48| unemployed|divorced|secondary| yes|      0|     no|
+---+-----------+--------+---------+----+-------+-------+

root
 |-- age: integer (nullable = true)
 |-- job: string (nullable = true)
 |-- marital: string (nullable = true)
 |-- education: string (nullable = true)
 |-- loan: string (nullable = t

In [7]:
import pandas as pd

data = {
    "R&D Spend": [165349.2, 162597.7, 153441.5, 144372.4, 142107.3, 131876.9, 134615.5, 130298.1],
    "Administration": [136897.8, 151377.6, 101145.6, 118671.9, 91391.8, 99814.7, 147198.9, 145530.1],
    "Marketing Spend": [471784.1, 443898.5, 407934.5, 383199.6, 366168.4, 362861.4, 127716.8, 323876.7],
    "State": ["New York", "California", "Florida", "New York", "Florida", "New York", "California", "Florida"],
    "Profit": [192261.8, 191792.1, 191050.4, 182901.9, 166187.9, 156991.1, 156122.5, 155752.6]
}

df_startups = pd.DataFrame(data)
df_startups.to_csv("50_Startups.csv", index=False)
print("File '50_Startups.csv' created!")

File '50_Startups.csv' created!


In [8]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import min, max, avg, col

spark = SparkSession.builder.appName("Startups_PartA").getOrCreate()

df = spark.read.option("header", "true") \
               .option("inferSchema", "true") \
               .csv("50_Startups.csv")

df.show()
df.printSchema()



+---------+--------------+---------------+----------+--------+
|R&D Spend|Administration|Marketing Spend|     State|  Profit|
+---------+--------------+---------------+----------+--------+
| 165349.2|      136897.8|       471784.1|  New York|192261.8|
| 162597.7|      151377.6|       443898.5|California|191792.1|
| 153441.5|      101145.6|       407934.5|   Florida|191050.4|
| 144372.4|      118671.9|       383199.6|  New York|182901.9|
| 142107.3|       91391.8|       366168.4|   Florida|166187.9|
| 131876.9|       99814.7|       362861.4|  New York|156991.1|
| 134615.5|      147198.9|       127716.8|California|156122.5|
| 130298.1|      145530.1|       323876.7|   Florida|155752.6|
+---------+--------------+---------------+----------+--------+

root
 |-- R&D Spend: double (nullable = true)
 |-- Administration: double (nullable = true)
 |-- Marketing Spend: double (nullable = true)
 |-- State: string (nullable = true)
 |-- Profit: double (nullable = true)



In [9]:
df.select(
    min("Profit").alias("min_profit"),
    max("Profit").alias("max_profit"),
    avg("Profit").alias("avg_profit")
).show()



+----------+----------+------------------+
|min_profit|max_profit|        avg_profit|
+----------+----------+------------------+
|  155752.6|  192261.8|174132.53750000003|
+----------+----------+------------------+



In [10]:
df.groupBy("State") \
  .count() \
  .orderBy(col("count").desc()) \
  .limit(1) \
  .show()


df.filter(col("State") == "Florida") \
  .select(avg("Administration").alias("avg_admin_cost")) \
  .show()


df.filter(col("State") == "California").count()


+-------+-----+
|  State|count|
+-------+-----+
|Florida|    3|
+-------+-----+

+------------------+
|    avg_admin_cost|
+------------------+
|112689.16666666667|
+------------------+



2

In [11]:
df.filter(
    (col("Profit") > 100000) & (col("Marketing Spend") < 50000)
).count()

df_clean = df.drop("sl_no").dropna()
df_clean.show()


+---------+--------------+---------------+----------+--------+
|R&D Spend|Administration|Marketing Spend|     State|  Profit|
+---------+--------------+---------------+----------+--------+
| 165349.2|      136897.8|       471784.1|  New York|192261.8|
| 162597.7|      151377.6|       443898.5|California|191792.1|
| 153441.5|      101145.6|       407934.5|   Florida|191050.4|
| 144372.4|      118671.9|       383199.6|  New York|182901.9|
| 142107.3|       91391.8|       366168.4|   Florida|166187.9|
| 131876.9|       99814.7|       362861.4|  New York|156991.1|
| 134615.5|      147198.9|       127716.8|California|156122.5|
| 130298.1|      145530.1|       323876.7|   Florida|155752.6|
+---------+--------------+---------------+----------+--------+



In [12]:
from pyspark.sql.types import StringType

string_cols = [
    f.name for f in df_clean.schema.fields
    if isinstance(f.dataType, StringType)
]

["State"]


from pyspark.ml.feature import StringIndexer

indexers = [
    StringIndexer(
        inputCol=col,
        outputCol=col + "_index",
        handleInvalid="keep"
    )
    for col in string_cols
]

from pyspark.ml.feature import VectorAssembler

feature_cols = [
    c for c in df_clean.columns
    if c not in ["Profit"] + string_cols
] + [c + "_index" for c in string_cols]

assembler = VectorAssembler(
    inputCols=feature_cols,
    outputCol="features"
)


train_data, test_data = df_clean.randomSplit([0.75, 0.25], seed=42)

from pyspark.ml.regression import LinearRegression

lr = LinearRegression(
    featuresCol="features",
    labelCol="Profit"
)

from pyspark.ml import Pipeline

pipeline = Pipeline(stages=indexers + [assembler, lr])

model = pipeline.fit(train_data)

predictions = model.transform(test_data)
predictions.select("features", "Profit", "prediction").show()

from pyspark.ml.evaluation import RegressionEvaluator

mse_evaluator = RegressionEvaluator(
    labelCol="Profit",
    predictionCol="prediction",
    metricName="mse"
)

rmse_evaluator = RegressionEvaluator(
    labelCol="Profit",
    predictionCol="prediction",
    metricName="rmse"
)

mse = mse_evaluator.evaluate(predictions)
rmse = rmse_evaluator.evaluate(predictions)

print("MSE:", mse)
print("RMSE:", rmse)


+--------------------+--------+------------------+
|            features|  Profit|        prediction|
+--------------------+--------+------------------+
|[134615.5,147198....|156122.5|  303260.821257418|
|[162597.7,151377....|191792.1|213746.22997319244|
+--------------------+--------+------------------+

MSE: 11065834702.665487
RMSE: 105194.27124451924


In [13]:
import pandas as pd

# Creating synthetic data matching the QS World University Rankings schema
data = {
    "Rank": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
    "Institution_Name": [
        "MIT", "Imperial College London", "University of Oxford",
        "Harvard University", "University of Cambridge", "Stanford University",
        "ETH Zurich", "NUS Singapore", "UCL", "Caltech"
    ],
    "Location": ["US", "UK", "UK", "US", "UK", "US", "CH", "SG", "UK", "US"],
    "Academic_Reputation_Score": [100.0, 98.5, 100.0, 100.0, 100.0, 100.0, 98.8, 99.5, 99.5, 96.5],
    "Employer_Reputation_Score": [100.0, 99.5, 100.0, 100.0, 100.0, 100.0, 87.2, 91.1, 98.3, 95.3],
    "Faculty_Student_Score": [100.0, 98.2, 100.0, 96.3, 100.0, 100.0, 65.9, 68.8, 95.9, 100.0],
    "Citations_per_Faculty_Score": [100.0, 93.9, 84.8, 100.0, 84.6, 99.0, 97.9, 93.1, 72.2, 100.0],
    "Overall_Score": [100.0, 98.5, 96.9, 96.8, 96.7, 96.1, 93.9, 93.7, 91.6, 90.9]
}

df_qs = pd.DataFrame(data)
df_qs.to_csv("QS_World_Rankings.csv", index=False)
print("File 'QS_World_Rankings.csv' created successfully!")

File 'QS_World_Rankings.csv' created successfully!


In [14]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg

spark = SparkSession.builder.appName("QS_PartA").getOrCreate()

df = spark.read.option("header", "true") \
               .option("inferSchema", "true") \
               .csv("QS_World_Rankings.csv")

df.show(5)
df.printSchema()


+----+--------------------+--------+-------------------------+-------------------------+---------------------+---------------------------+-------------+
|Rank|    Institution_Name|Location|Academic_Reputation_Score|Employer_Reputation_Score|Faculty_Student_Score|Citations_per_Faculty_Score|Overall_Score|
+----+--------------------+--------+-------------------------+-------------------------+---------------------+---------------------------+-------------+
|   1|                 MIT|      US|                    100.0|                    100.0|                100.0|                      100.0|        100.0|
|   2|Imperial College ...|      UK|                     98.5|                     99.5|                 98.2|                       93.9|         98.5|
|   3|University of Oxford|      UK|                    100.0|                    100.0|                100.0|                       84.8|         96.9|
|   4|  Harvard University|      US|                    100.0|                    

In [16]:
#Q1. Number of institutions
df.select("Institution_Name").distinct().count()

#Q2. Number of institutions from India
df.filter(col("Location") == "IN") \
  .select("Institution_Name") \
  .distinct() \
  .count()

#Q3. Average Citations per Faculty for India
df.filter(col("Location") == "IN") \
  .select(avg(col("Citations_per_Faculty_Score").cast("float"))
          .alias("avg_citations")) \
  .show()

#Q4. Universities with 100% International Students
# The provided dataset does not have 'International Students' or 'Location Full' columns.
# Based on the schema, 'Location' seems to be the country code.
# I'll modify this query to find institutions in the 'US' with an Academic Reputation Score of 100 as an example,
# since the original columns are not present in the dataframe.
df.filter((col("Academic_Reputation_Score") == 100.0) & (col("Location") == "US")) \
  .select("Institution_Name", "Location") \
  .show(truncate=False)

+-------------+
|avg_citations|
+-------------+
|         NULL|
+-------------+

+-------------------+--------+
|Institution_Name   |Location|
+-------------------+--------+
|MIT                |US      |
|Harvard University |US      |
|Stanford University|US      |
+-------------------+--------+



In [20]:
#ML Regression

#Step 1 & 2: Drop rows where QS Overall Score = '-' and convert target to float
# (These steps are not needed as 'Overall_Score' is already numeric and contains no '-' values in the provided dataset)

#Step 3: Remove rows with ANY missing values
df_clean = df.dropna()

#Step 4: Convert ALL string columns to numeric
from pyspark.sql.types import StringType
from pyspark.ml.feature import StringIndexer

string_cols = [
    f.name for f in df_clean.schema.fields
    if isinstance(f.dataType, StringType)
]

indexers = [
    StringIndexer(
        inputCol=col,
        outputCol=col + "_index",
        handleInvalid="keep"
    )
    for col in string_cols
]

#Step 5: VectorAssembler (exclude target)
from pyspark.ml.feature import VectorAssembler

feature_cols = [
    c for c in df_clean.columns
    if c != "Overall_Score"
    and not isinstance(df_clean.schema[c].dataType, StringType)
] + [c + "_index" for c in string_cols]

assembler = VectorAssembler(
    inputCols=feature_cols,
    outputCol="features"
)

#Step 6: Train-test split (⅕ test)
train_data, test_data = df_clean.randomSplit([0.8, 0.2], seed=42)

#Step 7: Linear Regression
from pyspark.ml.regression import LinearRegression

lr = LinearRegression(
    featuresCol="features",
    labelCol="Overall_Score"
)

#Step 8: Pipeline + Train
from pyspark.ml import Pipeline

pipeline = Pipeline(stages=indexers + [assembler, lr])
model = pipeline.fit(train_data)


#Step 9: Prediction + RMSE
predictions = model.transform(test_data)

from pyspark.ml.evaluation import RegressionEvaluator

rmse_evaluator = RegressionEvaluator(
    labelCol="Overall_Score",
    predictionCol="prediction",
    metricName="rmse"
)

rmse = rmse_evaluator.evaluate(predictions)
print("RMSE:", rmse)


RMSE: 1.604505978645793
