In [55]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("UDF and Window Functions").getOrCreate()
sc = spark.sparkContext

In [56]:
from pyspark.sql.functions import col,asc,desc,udf,rank,row_number,lag,lead,when,split,trim
from pyspark.sql.window import Window
from pyspark.sql.types import DoubleType

In [57]:
df = spark.read.option("header",True).option("inferSchema",True).csv(r"C:\Users\Admin\Downloads\csv_files\top_100_saas_companies_2025.csv")

In [None]:
df.dtypes

[('Company Name', 'string'),
 ('Founded Year', 'int'),
 ('HQ', 'string'),
 ('Industry', 'string'),
 ('Total Funding', 'string'),
 ('ARR', 'string'),
 ('Valuation', 'string'),
 ('Employees', 'string'),
 ('Top Investors', 'string'),
 ('Product', 'string'),
 ('G2 Rating', 'double')]

In [59]:
# The columns we need to clean

df.select("ARR","Valuation","Total Funding").show()

+------+-------------------+-------------+
|   ARR|          Valuation|Total Funding|
+------+-------------------+-------------+
| $270B|                $3T|          $1B|
|$37.9B|            $227.8B|       $65.4M|
|$19.4B|              $240B|        $2.5M|
|$52.9B|              $350B|          $2K|
|$32.5B|              $215B|          N/A|
|$14.4B|              $180B|        $273M|
| $8.9B|              $147B|       $82.5M|
| $7.3B|               $65B|      $249.9M|
| $4.5B|               $85B|      $145.5M|
| $7.1B|               $95B|      $122.3M|
| $3.5B|               $55B|         $60M|
| $2.8B|               $75B|        $1.4B|
| $2.2B|               $32B|      $100.5M|
| $2.5B|               $10B|      $514.3M|
| $1.7B|$27.7B (Salesforce)|        $1.4B|
| $400M|               $10B|        $353M|
| $2.1B|               $44B|      $147.9M|
| $1.7B|               $26B|      $311.2M|
| $2.2B|               $25B|      $230.5M|
| $4.1B|               $12B|      $261.3M|
+------+---

In [60]:
# Problem 1: UDF – Standardize Currency Columns
# Title: Preparing Financial Data for Analytics
# Scenario: Your financial model requires Valuation, ARR, and Total Funding to be in numeric format, 
# but they’re stored as strings with “$” and “M”/“B” suffixes.
# Task:
# Create a UDF that converts values like "$1.2B" to 1_200_000_000.0
# Apply it to create Valuation_Num, ARR_Num, and Funding_Num columns
# Use This For: Enabling numeric analysis, ranking, and aggregation on these fields.


def clean_numeric_cols(val):
    if val is None or val == "N/A":
        return 0.0
    val = val.replace("$","")
    val = val.split("(")[0].strip()
    if "T" in val:
        return float(val.replace("T","")) * 1_000_000_000_000
    if "B" in val:
        return float(val.replace("B","")) * 1_000_000_000 
    if "M" in val:
        return float(val.replace("M","")) * 1_000_000
    if "K" in val:
        return float(val.replace("K","")) * 1_000

clean = udf(clean_numeric_cols,DoubleType())

df = df.withColumn("Valuation_Num", clean(col("Valuation")))\
           .withColumn("ARR_Num", clean(col("ARR")))\
           .withColumn("Total_Funding_Num", clean(col("Total Funding")))

In [61]:
# We can also clean in this way too...!!!

# def clean_numeric_cols(val):
#     if val is None or val == "N/A":
#         return 0.0
#     val = val.replace("$", "").strip()
#     val = val.split("(")[0].strip()
#     multiplier = 1
#     if val.endswith("T"):
#         multiplier = 1_000_000_000_000
#         val = val.replace("T", "")
#     elif val.endswith("B"):
#         multiplier = 1_000_000_000
#         val = val.replace("B", "")
#     elif val.endswith("M"):
#         multiplier = 1_000_000
#         val = val.replace("M", "")
#     elif val.endswith("K"):
#         multiplier = 1_000
#         val = val.replace("K", "")
#     return float(val) * multiplier
    
# clean = udf(clean_numeric_cols,DoubleType())

# df = df.withColumn("Valuation_Num", clean(col("Valuation")))\
    #        .withColumn("ARR_Num", clean(col("ARR")))\
    #        .withColumn("Total_Funding_Num", clean(col("Total Funding")))
# df.show()

In [62]:
df.select("Valuation_Num","ARR_Num","Total_Funding_Num").show(3)

+-------------+-------+-------------------+
|Valuation_Num|ARR_Num|  Total_Funding_Num|
+-------------+-------+-------------------+
|       3.0E12| 2.7E11|              1.0E9|
|     2.278E11|3.79E10|6.540000000000001E7|
|       2.4E11|1.94E10|          2500000.0|
+-------------+-------+-------------------+
only showing top 3 rows



In [63]:
# Problem 2: Window Function – Identify Top Performers per Industry
# Title: Top 2 Companies by Valuation Within Each Industry
# Scenario: Your dashboard highlights the highest-valued SaaS companies within each industry segment.
# Task:
# Partition by Industry
# Order by Valuation_Num (desc)
# Use rank() to assign rankings
# Return only top 2 ranked companies per industry

window_spec = Window.partitionBy("Industry").orderBy(col("Valuation_Num").desc())
df = df.withColumn("ranking",rank().over(window_spec))
df.filter(col("ranking") <= 2).select("Industry","ranking","Company Name").show()



+--------------------+-------+------------+
|            Industry|ranking|Company Name|
+--------------------+-------+------------+
|                 APM|      1| AppDynamics|
|                BNPL|      1|      Affirm|
|Business Intellig...|      1|      Looker|
|               CI/CD|      1|    CircleCI|
|                 CRM|      1|  Salesforce|
|        Card Issuing|      1|     Marqeta|
|      Cloud Security|      1|     Zscaler|
|      Cloud Security|      2|    Netskope|
|       Cloud Storage|      1|     Dropbox|
|       Cloud Storage|      2|         Box|
|       Collaboration|      1|        Miro|
|Collaboration Sof...|      1|   Atlassian|
|      Communications|      1|      Twilio|
|      Communications|      2| RingCentral|
|        Construction|      1|     Procore|
|      Contact Center|      1|       Five9|
|     Corporate Cards|      1|        Brex|
|   Creative Software|      1|       Adobe|
|       Customer Data|      1|     Segment|
| Customer Engagement|      1|  

In [64]:
# Problem 3: Window Function – ARR Growth Gaps
# Title: Understand Revenue Distribution Among Competitors
# Scenario: You’re analyzing how ARR drops off between market leaders and followers in each industry.
# Task:
# Partition by Industry and order by ARR_Num (desc)
# Use lag() to compute ARR_Difference with the previous company
# Show companies where this drop exceeds 1 billion USD

window_arr = Window.partitionBy("Industry").orderBy(col("ARR_Num").desc())
df = df.withColumn("ARR_Difference", col("ARR_Num") - lag(col("ARR_Num")).over(window_arr))
df.filter(col("ARR_Difference") <= 1000000000).select("Industry","Company Name","ARR_Num","ARR_Difference",).show()


+-------------------+------------+-------+--------------------+
|           Industry|Company Name|ARR_Num|      ARR_Difference|
+-------------------+------------+-------+--------------------+
|     Cloud Security|    Netskope|  5.0E8|              -1.1E9|
|      Cloud Storage|         Box|  1.0E9|              -1.5E9|
|     Communications| RingCentral|  2.2E9|-1.89999999999999...|
|Customer Engagement|       Braze|  4.2E8|             -1.76E8|
|      Cybersecurity| CrowdStrike|  3.1E9|              -4.4E9|
|     Data Analytics|    Palantir|  2.2E9|              -1.5E9|
|           Database|       Redis|  1.0E8|              -1.6E9|
|             Design|       Figma|  6.0E8|              -1.4E9|
|             DevOps|       JFrog| 2.65E8|             -3.15E8|
|Enterprise Software|         SAP|3.25E10|           -2.375E11|
|           Payments|      Stripe| 1.4E10|              -5.7E9|
|  Product Analytics|    Mixpanel|  1.0E8|             -1.33E8|
|   Sales Engagement|   Salesloft|  2.0E

In [65]:
# Problem 4: CASE WHEN – Label Companies by G2 Rating
# Title: Classify Companies Based on User Sentiment
# Scenario: Your company wants to group products into buckets based on their G2 ratings.
# Task:
# Create a column Rating_Tier with:
# Excellent (≥ 4.7)
# Very Good (4.3 to < 4.7)
# Good (4.0 to < 4.3)
# Average (< 4.0)


df = df.withColumn("Rating Tier",when(col("G2 Rating") >= 4.7, "Excellent")\
              .when((col("G2 Rating") >= 4.3) & (col("G2 Rating") < 4.7),"Very Good")\
              .when((col("G2 Rating") >= 4.0) & (col("G2 rating") < 4.3),"Good")\
              .otherwise("Average"))

df.select("Industry","Company Name","G2 Rating","Rating Tier").show()

+--------------------+------------+---------+-----------+
|            Industry|Company Name|G2 Rating|Rating Tier|
+--------------------+------------+---------+-----------+
| Enterprise Software|   Microsoft|      4.4|  Very Good|
|                 CRM|  Salesforce|      4.3|  Very Good|
|   Creative Software|       Adobe|      4.5|  Very Good|
|Database & Enterp...|      Oracle|      4.0|       Good|
| Enterprise Software|         SAP|      4.1|       Good|
|  Financial Software|      Intuit|      4.4|  Very Good|
|IT Service Manage...|  ServiceNow|      4.4|  Very Good|
|        HR & Finance|     Workday|      4.2|       Good|
|Video Communications|        Zoom|      4.5|  Very Good|
|          E-commerce|     Shopify|      4.4|  Very Good|
|Collaboration Sof...|   Atlassian|      4.3|  Very Good|
|    Data Warehousing|   Snowflake|      4.4|  Very Good|
|   Marketing & Sales|     HubSpot|      4.4|  Very Good|
|  Digital Agreements|    DocuSign|      4.5|  Very Good|
|  Team Commun

In [66]:
# Problem 5: Join – Investor Tier Enrichment
# Title: Understand Impact of Tier-1 Investors
# Extra Table:
# investor_tiers = spark.createDataFrame([
#     ("Accel", "Tier 1"),
#     ("Sequoia", "Tier 1"),
#     ("Andreessen Horowitz", "Tier 1"),
#     ("SoftBank", "Tier 2"),
#     ("Lightspeed", "Tier 2"),
#     ("Unknown", "Tier 3")
# ], ["Investor", "Tier"])
# Scenario: You want to analyze companies backed by top-tier investors.
# Task:
# Extract the first investor from Top Investors
# Join with investor_tiers on investor name
# Show companies with Tier 1 and Tier 2 investors, and their valuation

data_a =[
    ("Accel", "Tier 1"),
    ("Sequoia", "Tier 1"),
    ("Andreessen Horowitz", "Tier 1"),
    ("SoftBank", "Tier 2"),
    ("Lightspeed", "Tier 2"),
    ("Unknown", "Tier 3")
]
columns_a = ["Investor","Tier"]

df_investor_tiers = spark.createDataFrame(data_a,columns_a)
df_investor_tiers.show()

df = df.withColumn("First_Investor",trim(split(col("Top Investors"),",").getItem(0)))
df_joined = df.join(df_investor_tiers, on=df["First_Investor"] == df_investor_tiers["Investor"], how="inner")
df_top_tier_investors = df_joined.filter(col("Tier").isin("Tier 1", "Tier 2")).select("Top Investors", "Valuation_Num")
df_top_tier_investors.show(20,truncate=False)




+-------------------+------+
|           Investor|  Tier|
+-------------------+------+
|              Accel|Tier 1|
|            Sequoia|Tier 1|
|Andreessen Horowitz|Tier 1|
|           SoftBank|Tier 2|
|         Lightspeed|Tier 2|
|            Unknown|Tier 3|
+-------------------+------+

+----------------------------+-------------+
|Top Investors               |Valuation_Num|
+----------------------------+-------------+
|Accel, Salesforce Ventures  |2.3E9        |
|Accel, DFJ, Greylock        |2.3E9        |
|Accel, GV, Kleiner Perkins  |3.2E9        |
|Accel, Sequoia Capital India|5.2E9        |
|Accel, Andreessen Horowitz  |2.77E10      |
|Sequoia, Lightspeed         |7.5E9        |
|Sequoia, Felicis            |3.2E9        |
|Sequoia, Coatue, Thrive     |7.3E9        |
|Sequoia, Accel, Insight     |1.25E10      |
|Sequoia, Khosla, Scale      |5.0E9        |
|Sequoia, Greylock           |9.5E10       |
|Sequoia, IVP, Battery       |4.0E9        |
|Sequoia, Benchmark, Index   |9.1E

In [67]:
# Problem 6: Join – Compare with Industry Median
# Title: Classify Companies Based on Valuation Position
# Extra Table:
# industry_medians = spark.createDataFrame([
#     ("Enterprise Software", 150_000_000_000),
#     ("CRM", 100_000_000_000),
#     ("AI", 70_000_000_000),
#     ("HRTech", 50_000_000_000),
# ], ["Industry", "Median_Valuation"])
# Scenario: The board wants to see which companies are outperforming or underperforming relative to their industry’s median valuation.
# Task:
# - Join with industry_medians on Industry
# - Create a column Valuation_Position with:
# -	Above Median if Valuation > median
# -	Below Median otherwise


data_b = [
    ("Enterprise Software", 150_000_000_000),
    ("CRM", 100_000_000_000),
    ("AI", 70_000_000_000),
    ("HRTech", 50_000_000_000),
]
columns_b = ["Industry_key", "Median_Valuation"]

df_industry_medians = spark.createDataFrame(data_b, columns_b)
df_industry_medians.show()

df_joined_i_m = df.join(df_industry_medians,on=df["Industry"] == df_industry_medians["Industry_Key"],how="inner")
df = df_joined_i_m.withColumn("Valuation_Position",when(col("Valuation_Num") > col("Median_Valuation"),"Above Median")\
                                                .otherwise("Below_Median"))
df.select("Company Name", "Industry", "Valuation_Num", "Median_Valuation", "Valuation_Position").show()


+-------------------+----------------+
|       Industry_key|Median_Valuation|
+-------------------+----------------+
|Enterprise Software|    150000000000|
|                CRM|    100000000000|
|                 AI|     70000000000|
|             HRTech|     50000000000|
+-------------------+----------------+

+------------+-------------------+-------------+----------------+------------------+
|Company Name|           Industry|Valuation_Num|Median_Valuation|Valuation_Position|
+------------+-------------------+-------------+----------------+------------------+
|         SAP|Enterprise Software|      2.15E11|    150000000000|      Above Median|
|   Microsoft|Enterprise Software|       3.0E12|    150000000000|      Above Median|
|  Salesforce|                CRM|     2.278E11|    100000000000|      Above Median|
+------------+-------------------+-------------+----------------+------------------+



In [68]:
sc.stop()