In [0]:
# Load the table as a DataFrame
df = spark.table("default.bank")

# Show some rows
df.show()

# Print schema
df.printSchema()

In [0]:
display(df)

In [0]:
df = df.na.drop(subset=["age", "job"])  # drops rows where either age or job is null

In [0]:
df.count(), df.dropDuplicates().count()

In [0]:
from pyspark.sql.functions import col

df = (
    df.withColumn("age", col("age").cast("int"))
    .withColumn("balance", col("balance").cast("float"))
    .withColumn("day", col("day").cast("int"))
    .withColumn("duration", col("duration").cast("int"))
    .withColumn("campaign", col("campaign").cast("int"))
    .withColumn("pdays", col("pdays").cast("int"))
    .withColumn("previous", col("previous").cast("int"))
)

In [0]:
df.printSchema()

Customer profile table


In [0]:
from pyspark.sql.functions import monotonically_increasing_id

df = df.withColumn("customer_id", monotonically_increasing_id())

In [0]:
display(df)

In [0]:
Customer_df = df.select(
    "age",
    "job",
    "marital",
    "education",
    "default",
    "balance",
    "housing",
    "loan",
    "customer_id",
)
Customer_df.write.mode("overwrite").saveAsTable("Customer_profile")

In [0]:
campaign_df = df.select(
    "contact", "day", "month", "duration", "campaign", "customer_id"
)
campaign_df.write.mode("ignore").saveAsTable("contact_campaign")

### Customer Profile table

In [0]:
df_customer = spark.table("default.customer_profile")
df_customer.show(10)

### Contact Campaign Table

In [0]:
df_contact_campaign = spark.table("default.contact_campaign")
df_contact_campaign.show(5)

In [0]:
prev_df = df.select("pdays", "previous", "poutcome", "customer_id")
prev_df.write.mode("overwrite").saveAsTable("previous_campaign")

### Previous campaign outcome table


In [0]:
df_prev_campaign = spark.table("default.previous_campaign")

In [0]:
df_prev_campaign.show()

In [0]:
df_customer.show()

In [0]:
# Casting columns into proper data types

from pyspark.sql.functions import col


df_customer = (
    df_customer.withColumn("age", col("age").cast("int"))
    .withColumn("balance", col("balance").cast("float"))
    .withColumn("customer_id", col("customer_id").cast("long"))
)

In [0]:
df_customer.printSchema()

In [0]:
# df_customer = df_customer.withColumn(
#     "age_group",
#     when(df_customer.age < 25, "Youth")
#     .when((df_customer.age >= 25) & (df_customer.age < 60), "Adult")
#     .otherwise("Senior")
# )
from pyspark.sql.functions import when

df_customer = df_customer.withColumn(
    "age_group",
    when(col("age") < 25, "Youth")
    .when((col("age") >= 25) & (col("age") < 60), "Adult")
    .otherwise("Senior"),
)
df_customer.show()

In [0]:
display(df_customer)

In [0]:
# Balance category
df_customer = df_customer.withColumn(
    "balance_category",
    when(col("balance") < 0, "Low")
    .when((col("balance") >= 0) & (col("balance") < 1000), "Medium")
    .otherwise("High"),
)

In [0]:
df_customer.select(
    "customer_id", "age", "age_group", "balance", "balance_category"
).show(10)

In [0]:
# Customer Count by Age Group
from pyspark.sql.functions import desc

df_cust_age = df_customer.groupBy("age_group").count().orderBy(desc("count"))
df_cust_age.show()

In [0]:
from pyspark.sql.functions import avg, round

df_avg_balance = (
    df_customer.groupBy("job")
    .agg((round(avg("balance"), 2).alias("avg_balance")))
    .orderBy(desc("avg_balance"))
)
df_avg_balance.show()

In [0]:
df_edu_loans = (
    df_customer.groupBy("education", "loan", "housing").count().orderBy("education")
)

In [0]:
display(df_customer)

In [0]:
df_customer = (
    df_customer.withColumnRenamed("default", "Credit_default")
    .withColumnRenamed("loan", "personal_loan")
    .withColumnRenamed("housing", "housing_loan")
)
df_customer.show()

In [0]:
display(df_contact_campaign)

In [0]:
df_contact_campaign = df_contact_campaign.withColumnRenamed(
    "duration", "call_duration"
).withColumnRenamed("campaign", "current_campaign")
df_contact_campaign.show()

In [0]:
df_contact_campaign.printSchema()

In [0]:
# Only customers with long calls and there count
df_long_calls = df_contact_campaign.where("call_duration > 300")
df_long_calls.count()

In [0]:
# Top 5 Campaigns by call duration
from pyspark.sql.functions import sum, desc

df_top_5 = (
    df_contact_campaign.groupBy("current_campaign")
    .agg(sum("call_duration").alias("total_duration"))
    .orderBy(desc("total_duration"))
    .limit(5)
)

df_top_5.show()

In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import rank

# Rank campaigns by call duration
windowSpec = Window.partitionBy("month").orderBy(col("call_duration").desc())
df_with_rank = df_contact_campaign.withColumn("duration_rank", rank().over(windowSpec))
df_with_rank.show(40)

In [0]:
# Number of calls per day
df_calls_per_day = (
    df_contact_campaign.groupBy("day").count().orderBy(col("count").desc())
)
df_calls_per_day.show()

In [0]:
display(df_prev_campaign)

In [0]:
df_prev_campaign.printSchema()

In [0]:
df_prev_campaign = df_prev_campaign = df_prev_campaign.selectExpr(
    "pdays as days_since_last_contact",
    "previous as previous_contact_count",
    "poutcome as previous_outcome",
    "customer_id",
)

In [0]:
df_prev_campaign.printSchema()

In [0]:
df_prev_campaign.show()

In [0]:
from pyspark.sql.functions import when, col

df_prev_campaign = df_prev_campaign.withColumn(
    "contact_recency",
    when(col("days_since_last_contact") == -1, "Never Contacted")
    .when((col("days_since_last_contact") <= 7), "Last Week")
    .when((col("days_since_last_contact") <= 30), "Last Month")
    .when((col("days_since_last_contact") <= 90), "1-3 Months Ago")
    .otherwise("3+ Months Ago"),
)

In [0]:
df_prev_campaign.show()

In [0]:
df_prev_campaign = df_prev_campaign.withColumn(
    "activity_level",
    when(col("previous_contact_count") == 0, "Inactive")
    .when(col("previous_contact_count") <= 2, "Low")
    .when(col("previous_contact_count") <= 5, "Medium")
    .otherwise("High"),
)

In [0]:
df_customer.createOrReplaceTempView("customer")
df_contact_campaign.createOrReplaceTempView("contact_camp")
df_prev_campaign.createOrReplaceTempView("prev_camp")

In [0]:
df_customer.printSchema()

In [0]:
df_contact_campaign.printSchema()

In [0]:
df_prev_campaign.printSchema()

In [0]:
%sql
select
  *
from
  customer;

### Total Customers by Marital Status

In [0]:
%sql
-- select distinct age,marital,count(customer_id) as total_customer from customer
-- group by age,marital order by total_customer desc;
select distinct
  marital,
  count(customer_id) as total_customers
from
  customer
group by
  marital
order by
  total_customers desc;

#### Average Balance per Education Level


In [0]:
%sql
select
  education,
  round(avg(balance), 2) as avg_balance_per_edu
from
  customer
group by
  1
order by
  2 desc;

### Which Month Had the Most Contact Attempts?


In [0]:
%sql
select
  *
from
  contact_campaign;

In [0]:
%sql
select
  month,
  count(campaign) as no_of_times_contacted
from
  contact_campaign
group by
  month
order by
  no_of_times_contacted desc
limit 1;

###  Top 5 Jobs by Personal Loan Count

In [0]:
%sql
select
  *
from
  customer;

In [0]:
%sql
select
  job,
  count(*) as cnt
from
  customer
where
  personal_loan = 'yes'
group by
  job
order by
  cnt desc
limit 5;

###  Customers Contacted Multiple Times in Current Campaign

In [0]:
%sql
select
  sum(current_campaign) as contacted_times,
  customer_id
from
  contact_camp
where
  current_campaign > 1
group by
  customer_id
order by
  contacted_times desc;

### Housing Loan vs Personal Loan Cross-tab

In [0]:
%sql
SELECT
  housing_loan,
  personal_loan,
  COUNT(*) AS count
FROM
  customer
GROUP BY
  housing_loan,
  personal_loan
ORDER BY
  count DESC;

### What is the average call duration for each job role? Show top 5 jobs with highest average call duration.

In [0]:
%sql
select
  c.job,
  round(avg(co.call_duration), 2) as avg_call_duration
from
  customer c
    inner join contact_camp co
      on c.customer_id = co.customer_id
group by
  job
order by
  avg_call_duration desc
limit 5;

In [0]:
%sql
select
  *
from
  contact_campaign;

###  Find customers who were contacted more than 3 times in both previous and current campaigns.

In [0]:
%sql
select
  c.customer_id,
  current_campaign,
  previous_contact_count
from
  customer c
    inner join contact_camp co
      on c.customer_id = co.customer_id
    join prev_camp pc
      on c.customer_id = pc.customer_id
where
  current_campaign > 3
  and previous_contact_count > 3
order by
  1 asc;

### Use a CTE to calculate average balance by education level, and then show only those above 8000.


In [0]:
%sql
with avg_bal as (
  select
    education,
    avg(balance) as avg_b
  from
    customer
  group by
    education
)
select
  *
from
  avg_bal
where
  avg_b > 1000;

### For each job, rank customers by highest balance using window function. Show top 2 per job.

In [0]:
%sql
SELECT
  *
FROM
  (
    SELECT
      c.customer_id,
      c.job,
      c.balance,
      RANK() OVER (PARTITION BY job ORDER BY balance DESC) AS rank
    FROM
      customer c
  ) ranked
WHERE
  rank <= 2;

####  Create a CTE that categorizes customers into risk levels based on balance, and find how many in each category own housing loans.

In [0]:
%sql
WITH risk_cte AS (
  SELECT *,
    CASE 
      WHEN balance < 0 THEN 'High Risk'
      WHEN balance BETWEEN 0 AND 5000 THEN 'Medium Risk'
      ELSE 'Low Risk'
    END AS risk_level
  FROM customer
)
SELECT risk_level, housing_loan, COUNT(*) AS total
FROM risk_cte
GROUP BY risk_level, housing_loan;


In [0]:
display(df_customer)

In [0]:
df.show()

In [0]:
type(df_customer)


In [0]:
pandas_contact = df_contact_campaign.toPandas()
pandas_prev = df_prev_campaign.toPandas()

In [0]:
plt.figure(figsize=(8,6))
sns.countplot(data=pandas_prev, x="previous_outcome", order=pandas_prev["previous_outcome"].value_counts().index)
plt.title("Previous Contact Outcomes")
plt.show()


In [0]:
plt.figure(figsize=(10,6))
sns.countplot(data=df_customer, x="age_group", hue="balance_category")
plt.title("Balance Category across Age Groups")
plt.xticks(rotation=45)
plt.show()


In [0]:
plt.figure(figsize=(10,6))
sns.histplot(pandas_contact['call_duration'], bins=30, kde=True, color='skyblue')
plt.title("Distribution of Call Duration")
plt.xlabel("Call Duration (seconds)")
plt.ylabel("Count")
plt.show()


In [0]:
import matplotlib.pyplot as plt

df_customer['job'].value_counts().plot.pie(autopct='%1.1f%%', figsize=(8,8))
plt.title("Job Distribution")
plt.ylabel("")  # Hides the y-label
plt.show()


In [0]:
import matplotlib.pyplot as plt

df_customer['age_group'].value_counts().sort_index().plot(kind='bar', color='lightcoral', edgecolor='black')
plt.title("Number of Customers by Age Group")
plt.xlabel("Age Group")
plt.ylabel("Number of Customers")
plt.xticks(rotation=0)
plt.grid(axis='y', linestyle='--', alpha=0.6)
plt.show()


In [0]:
pandas_df = df.toPandas()


In [0]:
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt

# Use your pandas_df with 'deposit' and 'campaign'
deposit_yes = pandas_df[pandas_df['deposit'] == 'yes']

plt.figure(figsize=(8,5))
sns.countplot(data=deposit_yes, x='campaign', color='mediumseagreen')
plt.title("Times Contacted in Current Campaign (Customers Who Subscribed)")
plt.xlabel("Times Contacted (campaign)")
plt.ylabel("Number of Customers")
plt.grid(axis='y', linestyle='--', alpha=0.6)
plt.show()


In [0]:
grouped = pandas_df.groupby(['previous', 'deposit']).size().unstack().fillna(0)
grouped['success_rate'] = (grouped['yes'] / grouped.sum(axis=1)) * 100

grouped[['success_rate']].plot(kind='bar', color='purple', legend=False)
plt.title("Success Rate by Previous Contact Count")
plt.ylabel("Success Rate (%)")
plt.xlabel("Previous Contact Count")
plt.grid(axis='y', linestyle='--', alpha=0.6)
plt.show()


In [0]:
import seaborn as sns
import matplotlib.pyplot as plt

sns.barplot(data=pandas_df, x='job', y=(pandas_df['deposit'] == 'yes').astype(int), ci=None, palette='Blues')
plt.title("Subscription Rate by Job")
plt.xlabel("Job")
plt.ylabel("Deposit Subscription Rate")
plt.xticks(rotation=45)
plt.show()


In [0]:
sns.countplot(data=pandas_df, x='loan', hue='deposit', palette='Set1')
plt.title("Loan Status vs Deposit Subscription")
plt.show()


In [0]:
order = pandas_df['month'].value_counts().index
sns.countplot(data=pandas_df, x='month', hue='deposit', order=order, palette='Set3')
plt.title("Month-wise Subscription Outcome")
plt.xticks(rotation=45)
plt.show()
