In [0]:
from pyspark.ml.feature import StringIndexer, VectorAssembler,CountVectorizer,OneHotEncoder
from pyspark.ml.regression import LinearRegression
import pandas as pd
from statsmodels.tsa.statespace.sarimax import SARIMAX
from sklearn.preprocessing import OneHotEncoder
from pyspark.sql.functions import col, count, when, isnull
from pyspark.sql import Row
import pandas as pd
import matplotlib.pyplot as plt
from pyspark.sql.functions import split
from pyspark.ml import Pipeline
from pyspark.ml.regression import RandomForestRegressor

In [0]:
df = spark.sql("SELECT * FROM unicorn_companies_clean")
display(df)


In [0]:
df.printSchema()

missing_values = df.select(
    [count(when(isnull(c), c)).alias(c) for c in df.columns]
)
display(missing_values)

In [0]:
df = df.drop("Portfolio_Exits")
df = df.dropna(subset=["Founded_Year", "Total_Raised"])
df = df.fillna({"Investors_Count": 0})
#df = df.withColumn("Select_Investors", split(df["Select_Investors"], ", "))
num_col = [
    "Founded_Year",
    "Total_Raised",
    "Investors_Count"
]
cat_col = [
    "Country",
    "City",
    "Industry",
    "Financial_Stage",
    "Deal_Terms"
]
multi_col = [
    "Select_Investors"
]
features = num_col + cat_col + multi_col

In [0]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, CountVectorizer, VectorAssembler
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import split

# Clean and prepare data
df = df.drop("Portfolio_Exits")
df = df.dropna(subset=["Founded_Year", "Total_Raised"])
df = df.fillna({"Investors_Count": 0})
df = df.withColumn("Select_Investors", split(df["Select_Investors"], ", "))

num_col = ["Founded_Year", "Total_Raised", "Investors_Count"]
cat_col = ["Country", "City", "Industry", "Financial_Stage", "Deal_Terms"]

# --- Encode categorical columns ---
indexers = [
    StringIndexer(inputCol=col, outputCol=f"{col}_indexed", handleInvalid="keep")
    for col in cat_col
]

encoders = [
    OneHotEncoder(inputCols=[f"{col}_indexed"], outputCols=[f"{col}_encoded"])
    for col in cat_col
]

# --- Encode multi-value investor column ---
vectorizer = CountVectorizer(inputCol="Select_Investors", outputCol="Select_Investors_vec")

# --- Combine features ---
assembler_inputs = [f"{col}_encoded" for col in cat_col] + num_col + ["Select_Investors_vec"]
assembler = VectorAssembler(inputCols=assembler_inputs, outputCol="features")

# --- Model ---
rf = RandomForestRegressor(featuresCol="features", labelCol="Valuation_B")

# --- Pipeline ---
pipeline = Pipeline(stages=indexers + encoders + [vectorizer, assembler, rf])

# --- Train/test split ---
train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)

# --- Train ---

model = pipeline.fit(train_df)

# --- Predict ---
predictions = model.transform(test_df)
predictions.select("Valuation_B", "prediction").show(10)



In [0]:

# --- Evaluate ---
rmse = RegressionEvaluator(
    labelCol="Valuation_B", predictionCol="prediction", metricName="rmse"
).evaluate(predictions)
mae = RegressionEvaluator(labelCol="Valuation_B", predictionCol="prediction", metricName="mae").evaluate(predictions)
r2 = RegressionEvaluator(labelCol="Valuation_B", predictionCol="prediction", metricName="r2").evaluate(predictions)
print(f"MAE: {mae:.2f}, RÂ²: {r2:.2f}")
print(f"Root Mean Squared Error (RMSE): {rmse:.2f}")

In [0]:
# Compute the AUC
auc = evaluator.evaluate(predictions)
print(f"Area Under ROC: {auc}")

In [0]:

future_years = spark.createDataFrame([
    Row(founded_year=2025, IndustryIndex=1.0),
    Row(founded_year=2026, IndustryIndex=1.0)
])


future_data = assembler.transform(future_years)
predictions = lr_model.transform(future_data)
predictions.select("founded_year", "prediction").show()


In [0]:


preds_pd = predictions.toPandas()
plt.plot(preds_pd['founded_year'], preds_pd['prediction'])
plt.title("Forecasted Number of Unicorns by Industry")
plt.xlabel("Year")
plt.ylabel("Predicted Unicorn Count")
plt.show()
