In [0]:
from pyspark.sql.functions import col, udf, year, to_date,expr, when, avg,expr
from pyspark.sql.types import DoubleType
import pandas as pd
from pyspark.sql import SparkSession

Initializes a Spark session, which is essential for executing Spark code

In [0]:
spark = SparkSession.builder.appName("ClientAnalysis").getOrCreate()

Loads datasets from Databricks Catalog into Spark DataFrames

In [0]:
# Load Spark tables directly from Databricks Catalog
payment_df = spark.table("default.payment_information")
subscription_df = spark.table("default.subscription_information")
industry_df = spark.table("default.industry_client_details")
financial_df = spark.table("default.finanical_information")

# Display first 5 rows from each DataFrame
print("Payment Information Table:")
payment_df.show(5, truncate=False)

print("Subscription Information Table:")
subscription_df.show(5, truncate=False)

print("Industry Client Details Table:")
industry_df.show(5, truncate=False)

print("Financial Information Table:")
financial_df.show(5, truncate=False)


Payment Information Table:
+----------+------------+-----------+--------------+
|client_id |payment_date|amount_paid|payment_method|
+----------+------------+-----------+--------------+
|6292156167|2019-09-16  |447.0      |Bank Transfer |
|7462725203|2018-05-21  |379.7      |Bank Transfer |
|4698004907|2021-09-11  |435.1      |Check         |
|3510240337|2020-12-07  |413.1      |Check         |
|7501599785|2019-03-04  |61.1       |Bank Transfer |
+----------+------------+-----------+--------------+
only showing top 5 rows
Subscription Information Table:
+----------+-----------------+----------+----------+-------+
|client_id |subscription_type|start_date|end_date  |renewed|
+----------+-----------------+----------+----------+-------+
|1131383004|Yearly           |2020-11-11|2021-11-11|false  |
|4309371709|Monthly          |2021-05-24|2021-06-23|true   |
|3183675157|Yearly           |2021-12-25|2022-12-25|true   |
|5371694837|Monthly          |2020-03-14|2020-04-13|true   |
|5157113076|M

In [0]:
# QUESTION 1: Count finance lending and blockchain clients
# Answer: Filters rows by industry ("Finance Lending", "Block Chain") and counts unique client IDs.
finance_clients = industry_df.filter(col("industry") == "Finance Lending").select("client_id").distinct().count()
blockchain_clients = industry_df.filter(col("industry") == "Block Chain").select("client_id").distinct().count()
print("Finance Lending clients: " + str(finance_clients))
print("Blockchain clients: " + str(blockchain_clients))

Finance Lending clients: 22
Blockchain clients: 25


In [0]:
# QUESTION 2: Industry with the highest renewal rate
# Answer : Joins subscription and industry data on client ID to combine subscription renewal data with industry information
subscription_industry_df = subscription_df.join(industry_df, "client_id", "inner")

# Spark can't directly average boolean values; converting True/False to 1/0 allows numerical averaging.

subscription_industry_df = subscription_industry_df.withColumn(
    "renewed_numeric",
    when(col("renewed") == True, 1).otherwise(0)
)

# Compute renewal rates correctly
#Groups by industry and calculates average renewal rate, ordering results descendingly.
renewal_rates = subscription_industry_df.groupBy("industry") \
    .agg({"renewed_numeric": "avg"}) \
    .withColumnRenamed("avg(renewed_numeric)", "renewal_rate") \
    .orderBy(col("renewal_rate").desc())

highest_renewal_industry = renewal_rates.first()['industry']
print(f"Industry with Highest Renewal Rate: {highest_renewal_industry}")


Industry with Highest Renewal Rate: Gaming


In [0]:
## QUESTION 3: Average inflation rate during renewals
# Ensure dates in correct format
subscription_df = subscription_df.withColumn("end_date", to_date(col("end_date")))
financial_df = financial_df.withColumn("start_date", to_date(col("start_date"))) \
                             .withColumn("end_date", to_date(col("end_date")))




In [0]:
# Joins subscription data with financial data based on whether the subscription renewal date (end_date) falls within a financial period (start_date to end_date in financial data).
joined_df = subscription_df.filter(col("renewed") == True) \
    .join(
        financial_df,
        (subscription_df.end_date >= financial_df.start_date) & 
        (subscription_df.end_date <= financial_df.end_date),
        "left"
    )



In [0]:
# Computes the average inflation rate across all renewal events
average_inflation_rate = joined_df.agg(avg("inflation_rate").alias("avg_inflation_rate")) \
                                  .collect()[0]["avg_inflation_rate"]

print(f"Average Inflation Rate at Renewal: {average_inflation_rate:.2f}%")

Average Inflation Rate at Renewal: 4.31%


In [0]:
# QUESTION 4: Median amount paid each year for all payment methods
#Converts payment_date from a string to a date format, then extracts the year into a separate column
payment_df = payment_df.withColumn("payment_date", to_date(col("payment_date"), "M/d/yyyy")) \
                       .withColumn("year", year(col("payment_date")))

# Groups payments by year and calculates the median (percentile_approx with 0.5 as the percentile) amount paid each year.
#percentile_approx(amount_paid, 0.5) calculates the approximate median (50th percentile).
median_payments_df = payment_df.groupBy("year") \
    .agg(expr("percentile_approx(amount_paid, 0.5)").alias("median_amount")) \
    .orderBy("year")

print("Median Amount Paid Each Year:")
median_payments_df.show()

# Ends the Spark session, releasing resources after the analysis.
spark.stop()

Median Amount Paid Each Year:
+----+-------------+
|year|median_amount|
+----+-------------+
|2018|        235.7|
|2019|        348.1|
|2020|        284.5|
|2021|        300.7|
|2022|        285.0|
+----+-------------+



	status = StatusCode.INTERNAL
	details = "[INVALID_HANDLE.SESSION_CLOSED] The handle b8253de7-b88f-443b-a746-90e57d55f5f3 is invalid. Session was closed. SQLSTATE: HY000"
	debug_error_string = "UNKNOWN:Error received from peer  {grpc_message:"[INVALID_HANDLE.SESSION_CLOSED] The handle b8253de7-b88f-443b-a746-90e57d55f5f3 is invalid. Session was closed. SQLSTATE: HY000", grpc_status:13, created_time:"2025-03-30T09:31:47.416722626+00:00"}"
>.


3.11.10 (main, Sep  7 2024, 18:35:41) [GCC 11.4.0]
