In [10]:
from pyspark.sql.functions import udf,col
from pyspark.sql.types import DoubleType
from pyspark.sql import SparkSession
import re

spark=SparkSession.builder.appName("udf").getOrCreate()

df=spark.read.csv("saas.csv",header=True,inferSchema=True)

def convert_currency(val):
  if val in ["N/A","UNKNOWN","","NA"]:
    return None

  if val is None:
    return None
  val=val.replace("$","").strip()
  val = re.sub(r"\(.*?\)", "", val).strip()
  if val.endswith("B"):
    return float(val.replace("B","").strip())*1000000000
  elif val.endswith("M"):
    return float(val.replace("M","").strip())*1000000
  elif val.endswith("T"):
    return float(val.replace("T","").strip())*1000000000000
  elif val.endswith("K"):
    return float(val.replace("K","").strip())*1000
  else:
    return float(val)

convert_currency_udf = udf(convert_currency, DoubleType())

df=df.withColumn("Valuation_Num",convert_currency_udf(col("Valuation")))\
     .withColumn("ARR_Num",convert_currency_udf(col("ARR")))\
     .withColumn("Funding_Num",convert_currency_udf(col("Total Funding")))
df.show()



+------------+------------+--------------------+--------------------+-------------+------+-------------------+---------+--------------------+--------------------+---------+-------------+--------------------+--------------------+
|Company Name|Founded Year|                  HQ|            Industry|Total Funding|   ARR|          Valuation|Employees|       Top Investors|             Product|G2 Rating|Valuation_Num|             ARR_Num|         Funding_Num|
+------------+------------+--------------------+--------------------+-------------+------+-------------------+---------+--------------------+--------------------+---------+-------------+--------------------+--------------------+
|   Microsoft|        1975|    Redmond, WA, USA| Enterprise Software|          $1B| $270B|                $3T|  221,000|Bill Gates, Paul ...|Azure, Office 365...|      4.4|       3.0E12|              2.7E11|               1.0E9|
|  Salesforce|        1999|San Francisco, CA...|                 CRM|       $65.4M|$

In [14]:
from pyspark.sql.window import Window
from pyspark.sql.functions import rank,col

performers_window=Window.partitionBy("Industry").orderBy(col("Valuation_Num").desc())
ranked_df=df.withColumn("Rank",rank().over(performers_window)).filter(col("Rank")<=2)
ranked_df.select("Industry","Company Name","Valuation","Valuation_Num","Rank").show()

+--------------------+------------+--------------+-------------+--------------+
|            Industry|Company Name|     Valuation|Valuation_Num|Valuation_Rank|
+--------------------+------------+--------------+-------------+--------------+
|                 APM| AppDynamics| $3.7B (Cisco)|        3.7E9|             1|
|                BNPL|      Affirm|          $12B|       1.2E10|             1|
|Business Intellig...|      Looker|$2.6B (Google)|        2.6E9|             1|
|               CI/CD|    CircleCI|         $1.7B|        1.7E9|             1|
|                 CRM|  Salesforce|       $227.8B|     2.278E11|             1|
|        Card Issuing|     Marqeta|         $4.3B|        4.3E9|             1|
|      Cloud Security|     Zscaler|          $30B|       3.0E10|             1|
|      Cloud Security|    Netskope|         $7.5B|        7.5E9|             2|
|       Cloud Storage|     Dropbox|         $8.5B|        8.5E9|             1|
|       Cloud Storage|         Box|     

In [20]:
from pyspark.sql.window import Window
from pyspark.sql.functions import lag, col

arr_window=Window.partitionBy("Industry").orderBy(col("ARR_NUM").desc())

lag_df=df.withColumn("Previous_ARR",lag("ARR_NUM").over(arr_window))\
         .withColumn("ARR_difference",col("Previous_ARR")-col("ARR_NUM"))\
         .filter(col("ARR_difference")>1000000000)

lag_df.select("Industry","Company Name","ARR","ARR_Num","Previous_ARR","ARR_difference").show()



+-------------------+------------+------+-------+--------------------+--------------------+
|           Industry|Company Name|   ARR|ARR_Num|        Previous_ARR|      ARR_difference|
+-------------------+------------+------+-------+--------------------+--------------------+
|     Cloud Security|    Netskope| $500M|  5.0E8|               1.6E9|               1.1E9|
|      Cloud Storage|         Box|   $1B|  1.0E9|               2.5E9|               1.5E9|
|     Communications| RingCentral| $2.2B|  2.2E9|4.0999999999999995E9|1.8999999999999995E9|
|      Cybersecurity| CrowdStrike| $3.1B|  3.1E9|               7.5E9|               4.4E9|
|     Data Analytics|    Palantir| $2.2B|  2.2E9|               3.7E9|               1.5E9|
|           Database|       Redis| $100M|  1.0E8|               1.7E9|               1.6E9|
|             Design|       Figma| $600M|  6.0E8|               2.0E9|               1.4E9|
|Enterprise Software|         SAP|$32.5B|3.25E10|              2.7E11|          

In [21]:
from pyspark.sql.functions import col,when

df=df.withColumn("Rating",when(col("G2 Rating")>=4.7,"Excellent")\
                          .when((col("G2 Rating")>=4.3)& (col("G2 Rating")<4.7),"Very Good")\
                          .when((col("G2 Rating")>=4.0) & (col("G2 Rating")<4.3),"Good")\
                          .otherwise("Average")
              )

df.select("Company Name","G2 Rating","Rating").orderBy(col("G2 Rating")).show()

+------------+---------+---------+
|Company Name|G2 Rating|   Rating|
+------------+---------+---------+
|      Oracle|      4.0|     Good|
|    Palantir|      4.0|     Good|
|    SendGrid|      4.0|     Good|
|         SAP|      4.1|     Good|
|      Affirm|      4.1|     Good|
|      Square|      4.2|     Good|
|     Workday|      4.2|     Good|
|         Box|      4.2|     Good|
|  Sumo Logic|      4.2|     Good|
| RingCentral|      4.2|     Good|
|         Wix|      4.2|     Good|
|      Tanium|      4.2|     Good|
|ServiceTitan|      4.2|     Good|
|       Vimeo|      4.2|     Good|
|  Salesforce|      4.3|Very Good|
|   Atlassian|      4.3|Very Good|
|      Twilio|      4.3|Very Good|
|   Mailchimp|      4.3|Very Good|
|       Asana|      4.3|Very Good|
|     Zendesk|      4.3|Very Good|
+------------+---------+---------+
only showing top 20 rows



In [23]:
from pyspark.sql import Row

investor_tiers = spark.createDataFrame([
    ("Accel", "Tier 1"),
    ("Sequoia", "Tier 1"),
    ("Andreessen Horowitz", "Tier 1"),
    ("SoftBank", "Tier 2"),
    ("Lightspeed", "Tier 2"),
    ("Unknown", "Tier 3")
], ["Investor", "Tier"])

investor_tiers.show()


+-------------------+------+
|           Investor|  Tier|
+-------------------+------+
|              Accel|Tier 1|
|            Sequoia|Tier 1|
|Andreessen Horowitz|Tier 1|
|           SoftBank|Tier 2|
|         Lightspeed|Tier 2|
|            Unknown|Tier 3|
+-------------------+------+



In [36]:
from pyspark.sql.functions import split,col

df_new=df.withColumn("First_Investor",split(col("Top Investors"),",").getItem(0))

df_newjoin=df_new.join(investor_tiers,df_new["First_Investor"]==investor_tiers["Investor"],how="inner")\
              .filter(col("Tier").isin("Tier 1","Tier 2"))

df_newjoin.select("Company Name","First_Investor","Tier","Valuation").show()


+------------------+-------------------+------+--------------------+
|      Company Name|     First_Investor|  Tier|           Valuation|
+------------------+-------------------+------+--------------------+
|           Algolia|              Accel|Tier 1|               $2.3B|
|        Sumo Logic|              Accel|Tier 1|               $2.3B|
|           Segment|              Accel|Tier 1|      $3.2B (Twilio)|
|        Freshworks|              Accel|Tier 1|               $5.2B|
|             Slack|              Accel|Tier 1| $27.7B (Salesforce)|
|          Netskope|            Sequoia|Tier 1|               $7.5B|
|           Verkada|            Sequoia|Tier 1|               $3.2B|
|              Gong|            Sequoia|Tier 1|               $7.3B|
|         Qualtrics|            Sequoia|Tier 1|$12.5B (Silver Lake)|
|       RingCentral|            Sequoia|Tier 1|                 $5B|
|Palo Alto Networks|            Sequoia|Tier 1|                $95B|
|         Amplitude|            Se

In [25]:
industry_medians = spark.createDataFrame([
    ("Enterprise Software", 150_000_000_000),
    ("CRM", 100_000_000_000),
    ("AI", 70_000_000_000),
    ("HRTech", 50_000_000_000),
], ["Industry", "Median_Valuation"])

industry_medians.show()


+-------------------+----------------+
|           Industry|Median_Valuation|
+-------------------+----------------+
|Enterprise Software|    150000000000|
|                CRM|    100000000000|
|                 AI|     70000000000|
|             HRTech|     50000000000|
+-------------------+----------------+



In [28]:
from pyspark.sql.functions import when

df_joined=df.join(industry_medians, on="Industry", how="left")

df_median=df_joined.withColumn("Valuation_position",
                               when (col("Valuation_Num") > col("Median_valuation"),"Above Median")\
                               .otherwise("Below Median")
                               )

df_median.select("Industry","Company Name","Valuation","Valuation_Num","Median_Valuation","Valuation_position").show()

+--------------------+------------+-------------------+-------------+----------------+------------------+
|            Industry|Company Name|          Valuation|Valuation_Num|Median_Valuation|Valuation_position|
+--------------------+------------+-------------------+-------------+----------------+------------------+
|Video Communications|        Zoom|               $85B|       8.5E10|            NULL|      Below Median|
|            Payments|      Stripe|               $65B|       6.5E10|            NULL|      Below Median|
|Collaboration Sof...|   Atlassian|               $55B|       5.5E10|            NULL|      Below Median|
|Database & Enterp...|      Oracle|              $350B|       3.5E11|            NULL|      Below Median|
|        HR & Finance|     Workday|               $65B|       6.5E10|            NULL|      Below Median|
|  Team Communication|       Slack|$27.7B (Salesforce)|      2.77E10|            NULL|      Below Median|
|   Creative Software|       Adobe|           