In [26]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("../data/S&P500_data_analysis").getOrCreate()

25/07/30 03:07:54 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [27]:
df = spark.read \
    .option("header", True) \
    .option("inferSchema", True) \
    .option("delimiter", ",") \
    .option("quote", '"') \
    .option("escape", '"') \
    .option("multiLine", True) \
    .option("samplingRatio", "1.0") \
    .csv("../data/sp500_companies.csv")


In [28]:
print(df.columns)
print(len(df.columns))


['Exchange', 'Symbol', 'Shortname', 'Longname', 'Sector', 'Industry', 'Currentprice', 'Marketcap', 'Ebitda', 'Revenuegrowth', 'City', 'State', 'Country', 'Fulltimeemployees', 'Longbusinesssummary', 'Weight']
16


In [29]:
df.show(5)

+--------+------+--------------------+--------------------+--------------------+--------------------+------------+-------------+------------+-------------+-------------+-----+-------------+-----------------+--------------------+--------------------+
|Exchange|Symbol|           Shortname|            Longname|              Sector|            Industry|Currentprice|    Marketcap|      Ebitda|Revenuegrowth|         City|State|      Country|Fulltimeemployees| Longbusinesssummary|              Weight|
+--------+------+--------------------+--------------------+--------------------+--------------------+------------+-------------+------------+-------------+-------------+-----+-------------+-----------------+--------------------+--------------------+
|     NMS|  AAPL|          Apple Inc.|          Apple Inc.|          Technology|Consumer Electronics|      254.49|3846819807232|134660997120|        0.061|    Cupertino|   CA|United States|           164000|Apple Inc. design...| 0.06920915243972749|


In [30]:
num_rows = df.count()

In [31]:
print(num_rows)

502


In [32]:
df = df.drop("Shortname", "Longbusinesssummary")

In [33]:
df.show(1)


+--------+------+----------+----------+--------------------+------------+-------------+------------+-------------+---------+-----+-------------+-----------------+-------------------+
|Exchange|Symbol|  Longname|    Sector|            Industry|Currentprice|    Marketcap|      Ebitda|Revenuegrowth|     City|State|      Country|Fulltimeemployees|             Weight|
+--------+------+----------+----------+--------------------+------------+-------------+------------+-------------+---------+-----+-------------+-----------------+-------------------+
|     NMS|  AAPL|Apple Inc.|Technology|Consumer Electronics|      254.49|3846819807232|134660997120|        0.061|Cupertino|   CA|United States|           164000|0.06920915243972749|
+--------+------+----------+----------+--------------------+------------+-------------+------------+-------------+---------+-----+-------------+-----------------+-------------------+
only showing top 1 row


In [34]:
print(df.columns)

['Exchange', 'Symbol', 'Longname', 'Sector', 'Industry', 'Currentprice', 'Marketcap', 'Ebitda', 'Revenuegrowth', 'City', 'State', 'Country', 'Fulltimeemployees', 'Weight']


In [35]:
from pyspark.sql.functions import (
    min, max, mean, avg, sum, count, stddev, col
)

df.select(
    min("Currentprice").alias("Min_Currentprice"),
    max("Currentprice").alias("Max_Currentprice"),
    min("Marketcap").alias("Min_Marketcap"),
    max("Marketcap").alias("Max_Marketcap"),
    min("Ebitda").alias("Min_Ebitda"),
    max("Ebitda").alias("Max_Ebitda"),
).show()



+----------------+----------------+-------------+-------------+-----------+------------+
|Min_Currentprice|Max_Currentprice|Min_Marketcap|Max_Marketcap| Min_Ebitda|  Max_Ebitda|
+----------------+----------------+-------------+-------------+-----------+------------+
|             9.4|         8276.78|   4664099328|3846819807232|-3991000064|149547008000|
+----------------+----------------+-------------+-------------+-----------+------------+



In [36]:
df.select(min("Revenuegrowth").alias("Min_Revenuegrowth"),
    max("Revenuegrowth").alias("Max_Revenuegrowth"),
    min("Fulltimeemployees").alias("Min_FTemployees"),
    max("Fulltimeemployees").alias("Max_FTemployees"),
    min("Weight").alias("Min_Weight"),
    max("Weight").alias("Max_Weight")).show()

+-----------------+-----------------+---------------+---------------+-------------------+-------------------+
|Min_Revenuegrowth|Max_Revenuegrowth|Min_FTemployees|Max_FTemployees|         Min_Weight|         Max_Weight|
+-----------------+-----------------+---------------+---------------+-------------------+-------------------+
|           -0.602|            1.632|             28|        2100000|8.39130444266517E-5|0.06920915243972749|
+-----------------+-----------------+---------------+---------------+-------------------+-------------------+



In [37]:
df.printSchema()


from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler, StandardScaler


df_clean = df.select("Symbol", "Industry", "Marketcap", "Ebitda", "Revenuegrowth", "Currentprice") \
             .dropna()

assembler = VectorAssembler(inputCols=["Marketcap", "Ebitda", "Revenuegrowth", "Currentprice"],
                            outputCol="assembled_features")
assembled_df = assembler.transform(df_clean)

scaler = StandardScaler(inputCol="assembled_features", outputCol="features",
                        withStd=True, withMean=True)
scaled_df = scaler.fit(assembled_df).transform(assembled_df)

kmeans = KMeans(k=5, seed=42, featuresCol="features", predictionCol="cluster")
model = kmeans.fit(scaled_df)
clustered_df = model.transform(scaled_df)

clustered_df.select("Symbol", "Industry", "cluster").show()


clustered_df.select("Symbol", "Industry", "cluster").orderBy("cluster").show(50)

centers = model.clusterCenters()
for center in centers:
    print(center)



root
 |-- Exchange: string (nullable = true)
 |-- Symbol: string (nullable = true)
 |-- Longname: string (nullable = true)
 |-- Sector: string (nullable = true)
 |-- Industry: string (nullable = true)
 |-- Currentprice: double (nullable = true)
 |-- Marketcap: long (nullable = true)
 |-- Ebitda: long (nullable = true)
 |-- Revenuegrowth: double (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Fulltimeemployees: integer (nullable = true)
 |-- Weight: double (nullable = true)



25/07/30 03:08:01 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS


+------+--------------------+-------+
|Symbol|            Industry|cluster|
+------+--------------------+-------+
|  AAPL|Consumer Electronics|      1|
|  NVDA|      Semiconductors|      4|
|  MSFT|Software - Infras...|      1|
|  AMZN|     Internet Retail|      4|
| GOOGL|Internet Content ...|      4|
|  GOOG|Internet Content ...|      4|
|  META|Internet Content ...|      4|
|  TSLA|  Auto Manufacturers|      0|
|  AVGO|      Semiconductors|      0|
| BRK-B|Insurance - Diver...|      4|
|   WMT|     Discount Stores|      0|
|   LLY|Drug Manufacturer...|      0|
|     V|     Credit Services|      0|
|    MA|     Credit Services|      0|
|  ORCL|Software - Infras...|      0|
|   XOM|Oil & Gas Integrated|      0|
|   UNH|    Healthcare Plans|      0|
|  COST|     Discount Stores|      0|
|    PG|Household & Perso...|      0|
|    HD|Home Improvement ...|      0|
+------+--------------------+-------+
only showing top 20 rows
+------+--------------------+-------+
|Symbol|            Indus

In [38]:
clustered_df.groupBy("cluster").agg(
    {"Marketcap": "avg", "Ebitda": "avg", "Revenuegrowth": "avg", "Currentprice": "avg"}
).show()


+-------+-------------------+-------------------+------------------+--------------------+
|cluster|        avg(Ebitda)| avg(Revenuegrowth)| avg(Currentprice)|      avg(Marketcap)|
+-------+-------------------+-------------------+------------------+--------------------+
|      1|    1.3560649728E11|             0.1105|           345.545|   3.546444201984E12|
|      3|4.530989098666667E9|0.07966666666666666|           5526.28|8.238051874133333E10|
|      4|   1.08077166592E11|0.30383333333333334| 297.0733333333333|2.137219803818666...|
|      2|4.674987370666667E9| 1.1681666666666666| 79.27833333333332|4.245643127466666...|
|      0|5.107223338684327E9|0.05213907284768215|188.41830022075044| 7.00590632406181E10|
+-------+-------------------+-------------------+------------------+--------------------+



In [39]:
from pyspark.sql import functions as F

# Add a new column 'risk_level' based on the cluster number
clustered_df_with_risk = clustered_df.withColumn(
    "risk_level",
    F.when(clustered_df["cluster"] == 1, "Low to Medium")  # Cluster 1 (Large, established, high profitability)
    .when(clustered_df["cluster"] == 3, "High")  # Cluster 3 (High stock prices, lower profitability)
    .when(clustered_df["cluster"] == 4, "Low to Medium")  # Cluster 4 (Profitable, fast-growing, large market cap)
    .when(clustered_df["cluster"] == 2, "Medium to High")  # Cluster 2 (High-growth, low-profit)
    .otherwise("Medium")  # Default risk level for cluster 0 (Low-growth, low-profit)
)

# Show the DataFrame with risk levels
clustered_df_with_risk.select("Symbol", "Industry", "cluster", "risk_level").orderBy("risk_level").show(30)


+------+--------------------+-------+-------------+
|Symbol|            Industry|cluster|   risk_level|
+------+--------------------+-------+-------------+
|  BKNG|     Travel Services|      3|         High|
|   AZO|    Specialty Retail|      3|         High|
|   NVR|Residential Const...|      3|         High|
|  AAPL|Consumer Electronics|      1|Low to Medium|
|  NVDA|      Semiconductors|      4|Low to Medium|
|  MSFT|Software - Infras...|      1|Low to Medium|
|  AMZN|     Internet Retail|      4|Low to Medium|
| GOOGL|Internet Content ...|      4|Low to Medium|
|  GOOG|Internet Content ...|      4|Low to Medium|
|  META|Internet Content ...|      4|Low to Medium|
| BRK-B|Insurance - Diver...|      4|Low to Medium|
|  TSLA|  Auto Manufacturers|      0|       Medium|
|  AVGO|      Semiconductors|      0|       Medium|
|   WMT|     Discount Stores|      0|       Medium|
|   LLY|Drug Manufacturer...|      0|       Medium|
|     V|     Credit Services|      0|       Medium|
|    MA|    

In [40]:

sector_counts = df.groupBy("Sector").count()

sector_counts.show()

+--------------------+-----+
|              Sector|count|
+--------------------+-----+
|  Consumer Defensive|   37|
|  Financial Services|   67|
|Communication Ser...|   22|
|              Energy|   22|
|          Healthcare|   62|
|         Real Estate|   31|
|           Utilities|   32|
|         Industrials|   70|
|          Technology|   82|
|   Consumer Cyclical|   55|
|     Basic Materials|   22|
+--------------------+-----+



In [41]:
sector_performance = df.groupBy("Sector").agg(
    avg("Marketcap").alias("Avg_Marketcap"),
    avg("Revenuegrowth").alias("Avg_Revenuegrowth"),
    avg("Ebitda").alias("Avg_Ebitda")
)
sector_performance.show()

+--------------------+--------------------+--------------------+--------------------+
|              Sector|       Avg_Marketcap|   Avg_Revenuegrowth|          Avg_Ebitda|
+--------------------+--------------------+--------------------+--------------------+
|  Consumer Defensive|8.794158331848648E10|0.010297297297297295| 6.398984998054054E9|
|  Financial Services|1.060919841853134...| 0.12295522388059699| 8.869586042947369E9|
|Communication Ser...|    3.54729383424E11| 0.05247619047619048|2.550577312872727...|
|              Energy|7.266690166690909E10|0.007727272727272729|1.170328989381818...|
|          Healthcare|8.371662572180646E10| 0.07709677419354836| 6.161792758580646E9|
|         Real Estate|3.488674026529032E10| 0.08126666666666667|          2.306212E9|
|           Utilities|      3.929280544E10|          0.04415625|       5.087529688E9|
|         Industrials|5.378002489051428E10| 0.04514285714285713|3.9021598340571427E9|
|          Technology|2.108172365674146...| 0.11582716

In [42]:

spark.stop()
