# Part 1: PySpark

### Assignment Overview

This notebook implements a **PySpark-based data analysis workflow** in 3 steps:

1. **Step 1**: Load and explore the dataset using PySpark DataFrames
2. **Step 2**: Perform SQL-style queries and aggregations using PySpark SQL
3. **Step 3**: Analyze results and extract insights

### Tools & Frameworks

* **PySpark**: Distributed data processing
* **Spark SQL**: SQL-style querying
* **Python**: Data analysis and scripting

In [15]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import SparkSession
from pyspark.sql.utils import AnalysisException

print("Libraries imported successfully!")
print(f"PySpark version: {pyspark.__version__}")

spark = SparkSession.builder.appName("FINC612").getOrCreate()

data_path = "sp500_constituents.json"
sp500_df = spark.read.json(data_path)
sp500_df.createOrReplaceTempView("sp500")
print("Dataset Loaded Successfully.")

Libraries imported successfully!
PySpark version: 3.5.1
Dataset Loaded Successfully.


## Step 1: Company Count by Sector in California

### Objective  
Load S&P 500 constituents and count the number of companies by sector located in California.  

### Implementation  
1. Initialize SparkSession  
2. Load `sp500_constituents.json` into PySpark DataFrame  
3. Register DataFrame as SQL view  
4. Run SQL query to filter companies in California (`state = 'CA'`)  
5. Group by `sector` and count companies  
6. Display sector-wise counts in descending order  


In [1]:
def company_count_by_sector_in_california(file_path="sp500_constituents.json"):
    """
    Task 1: Company Count by Sector in California
    ------------------------------------------------
    Load the S&P 500 constituents JSON file as a PySpark DataFrame and
    display the number of companies by sector located in California.

    Args:
        file_path (str): Path to the JSON file containing S&P500 data.

    Returns:
        DataFrame: PySpark DataFrame showing sector-wise company counts in California.
    """
    print("=" * 60)
    print("Task 1: Company Count by Sector in California")
    print("=" * 60)

    try:
        # Initialize Spark session
        spark = SparkSession.builder.appName("SP500 Analysis").getOrCreate()
        print("SparkSession started successfully.")

        # Load dataset
        print(f"Loading dataset from {file_path} ...")
        df = spark.read.json(file_path)
        df.createOrReplaceTempView("sp500")
        print("Dataset loaded and registered as SQL view 'sp500'.")

        # Perform SQL query
        print("Running SQL query to count companies by sector in California...")
        query = """
        SELECT sector, COUNT(*) AS company_count
        FROM sp500
        WHERE state = 'CA'
        GROUP BY sector
        ORDER BY company_count DESC
        """
        result_df = spark.sql(query)

        # Show results
        print("\nCompany count by sector in California:")
        result_df.show(truncate=False)

        return result_df

    except AnalysisException as e:
        print(f"SQL/Schema error: {e}")
        return None

    except Exception as e:
        print(f"Error during analysis: {e}")
        return None


In [2]:
# Execute Task 1
result_df = company_count_by_sector_in_california("sp500_constituents.json")

if result_df:
    print("\nTask 1 completed! Successfully retrieved company counts by sector in California.")
    print("=" * 60)
else:
    print("Error: Unable to perform Task 1. Program terminated.")
    raise SystemExit


Task 1: Company Count by Sector in California
SparkSession started successfully.
Loading dataset from sp500_constituents.json ...
Dataset loaded and registered as SQL view 'sp500'.
Running SQL query to count companies by sector in California...

Company count by sector in California:
+----------------------+-------------+
|sector                |company_count|
+----------------------+-------------+
|Technology            |31           |
|Healthcare            |9            |
|Communication Services|7            |
|Real Estate           |6            |
|Consumer Cyclical     |5            |
|Financial Services    |4            |
|Utilities             |3            |
|Consumer Defensive    |2            |
|Energy                |1            |
+----------------------+-------------+


Task 1 completed! Successfully retrieved company counts by sector in California.


## Step 2.1: Financial Ratios of Healthcare Companies in California

### Objective  
Extract financial ratios of Healthcare companies in California from the S&P 500 dataset.  

### Implementation  
1. Load S&P 500 JSON into PySpark DataFrame  
2. Register as SQL view (`sp500`)  
3. Run SQL query filtering Healthcare sector in CA  
4. Select symbol, current ratio, quick ratio, profit margin, ROE, beta, P/B  
5. Display results  


In [18]:
def financial_ratios_healthcare_california(file_path="sp500_constituents.json"):
    """
    Task 2: Financial Ratios of Healthcare Companies in California
    ----------------------------------------------------------------
    Load the S&P 500 constituents JSON file as a PySpark DataFrame and
    display symbol, current ratio, quick ratio, profit margin, return on equity,
    beta, and price-to-book ratio of Healthcare companies in California.

    Args:
        file_path (str): Path to the JSON file containing S&P500 data.

    Returns:
        DataFrame: PySpark DataFrame with selected financial ratios.
    """
    print("=" * 60)
    print("Task 2: Financial Ratios of Healthcare Companies in California")
    print("=" * 60)

    try:
        # Initialize Spark session
        spark = SparkSession.builder.appName("SP500 Analysis").getOrCreate()
        print("SparkSession started successfully.")

        # Load dataset
        print(f"Loading dataset from {file_path} ...")
        df = spark.read.json(file_path)
        df.createOrReplaceTempView("sp500")
        print("Dataset loaded and registered as SQL view 'sp500'.")

        # Perform SQL query
        print("Running SQL query to extract Healthcare companies' financial ratios in California...")
        query = """
        SELECT
            symbol,
            currentRatio       AS current_ratio,
            quickRatio         AS quick_ratio,
            profitMargins      AS profit_margin,
            returnOnEquity     AS roe,
            beta,
            priceToBook        AS price_to_book
        FROM sp500
        WHERE sector = 'Healthcare'
          AND state IN ('CA', 'California')
        ORDER BY symbol
        """
        result_df = spark.sql(query)

        # Show results
        print("\nHealthcare Companies in California - Financial Ratios:")
        result_df.show(truncate=False)

        return result_df

    except Exception as e:
        print(f"Error during analysis: {e}")
        return None


In [19]:
# Execute Task 2
healthcare_df = financial_ratios_healthcare_california("sp500_constituents.json")

if healthcare_df:
    print("\nTask 2 completed! Successfully retrieved financial ratios of Healthcare companies in California.")
    print("=" * 60)
else:
    print("Error: Unable to perform Task 2 Program terminated.")
    raise SystemExit


Task 2: Financial Ratios of Healthcare Companies in California
SparkSession started successfully.
Loading dataset from sp500_constituents.json ...
Dataset loaded and registered as SQL view 'sp500'.
Running SQL query to extract Healthcare companies' financial ratios in California...

Healthcare Companies in California - Financial Ratios:
+------+-------------+-----------+-------------+-----------+-----+-------------+
|symbol|current_ratio|quick_ratio|profit_margin|roe        |beta |price_to_book|
+------+-------------+-----------+-------------+-----------+-----+-------------+
|A     |2.089        |1.478      |0.198        |0.21954    |1.071|6.54127      |
|AMGN  |1.257        |0.843      |0.122370005  |0.67553    |0.557|28.295267    |
|COO   |1.908        |0.806      |0.100710005  |0.050180003|0.998|2.2238133    |
|DXCM  |1.467        |1.232      |0.14287      |0.27628002 |1.142|16.861761    |
|EW    |NULL         |NULL       |0.76746005   |0.17097001 |1.104|4.4957986    |
|GILD  |1.5  

### Task 3: Analyst Recommendations for Healthcare Companies in California
- Initialize Spark session  
- Load `sp500_constituents.json` into PySpark DataFrame  
- Register DataFrame as SQL view `sp500`  
- Run SQL query to filter:
  - Sector = 'Healthcare'
  - State = 'CA' or 'California'
- Select required columns:
  - symbol, analyst_opinions, recommendationKey, recommendationMean  
  - current_price, target_low/mean/median/high_price  
  - Calculate `target_median_profit = (targetMedianPrice / currentPrice) - 1`  
- Order results by `recommendationMean ASC` and `analyst_opinions DESC`  
- Display results using `.show()`  


In [11]:
def analyst_recommendations_healthcare_california(file_path="sp500_constituents.json"):
    """
    Task 3: Analyst Recommendations for Healthcare Companies in California
    ----------------------------------------------------------------------
    Show analyst opinions, recommendations, and price targets for all Healthcare
    companies in California. Also calculate target median profit
    (target_median_price / current_price - 1).

    Args:
        file_path (str): Path to the JSON file containing S&P500 data.

    Returns:
        DataFrame: PySpark DataFrame with analyst recommendations and target prices.
    """
    print("=" * 60)
    print("Task 3: Analyst Recommendations for Healthcare Companies in California")
    print("=" * 60)

    try:
        # Initialize Spark session
        spark = SparkSession.builder.appName("SP500 Analysis").getOrCreate()
        print("SparkSession started successfully.")

        # Load dataset
        print(f"Loading dataset from {file_path} ...")
        df = spark.read.json(file_path)
        df.createOrReplaceTempView("sp500")
        print("Dataset loaded and registered as SQL view 'sp500'.")

        # Perform SQL query
        print("Running SQL query to extract analyst recommendations for Healthcare companies in California...")
        query = """
        SELECT
          symbol,
          numberOfAnalystOpinions AS analyst_opinions,
          recommendationKey,
          recommendationMean,
          currentPrice                      AS current_price,
          targetLowPrice                    AS target_low_price,
          targetMeanPrice                   AS target_mean_price,
          targetMedianPrice                 AS target_median_price,
          targetHighPrice                   AS target_high_price,
          CASE
            WHEN currentPrice IS NOT NULL AND currentPrice <> 0
              THEN (targetMedianPrice / currentPrice) - 1
            ELSE NULL
          END                               AS target_median_profit
        FROM sp500
        WHERE sector = 'Healthcare'
          AND state IN ('CA', 'California')
        ORDER BY recommendationMean ASC, analyst_opinions DESC
        """
        result_df = spark.sql(query)

        # Show results
        print("\nAnalyst Recommendations — Healthcare Companies in California:")
        result_df.show(truncate=False)

        return result_df

    except Exception as e:
        print(f"Error during analysis: {e}")
        return None


In [12]:
# Execute Task 3
analyst_df = analyst_recommendations_healthcare_california("sp500_constituents.json")

if analyst_df:
    print("\nTask 3 completed! Successfully retrieved analyst recommendations for Healthcare companies in California.")
    print("=" * 60)
else:
    print("Error: Unable to perform Task 3. Program terminated.")
    raise SystemExit


Task 3: Analyst Recommendations for Healthcare Companies in California
SparkSession started successfully.
Loading dataset from sp500_constituents.json ...
Dataset loaded and registered as SQL view 'sp500'.
Running SQL query to extract analyst recommendations for Healthcare companies in California...

Analyst Recommendations — Healthcare Companies in California:
+------+----------------+-----------------+------------------+-------------+----------------+-----------------+-------------------+-----------------+--------------------+
|symbol|analyst_opinions|recommendationKey|recommendationMean|current_price|target_low_price|target_mean_price|target_median_price|target_high_price|target_median_profit|
+------+----------------+-----------------+------------------+-------------+----------------+-----------------+-------------------+-----------------+--------------------+
|RMD   |14              |none             |NULL              |229.65       |195.0           |269.54144        |277.5       

## Summary and Conclusions

### Key Findings

1. **Financial Ratios**:

   * Strong liquidity (high current & quick ratios) seen in **ISRG, RMD, and A**, indicating good short-term solvency.
   * **Profit margins** are highly dispersed, with **EW** showing extremely high margin (0.76), while **GILD** and **MOH** remain very low.
   * **Return on Equity (ROE)** varies widely, with **AMGN** demonstrating the highest (0.67) and **COO** among the weakest (0.05).
   * **Valuations** differ significantly — **AMGN** trades at a very high P/B (28.3), while **COO** remains relatively low (2.2).

2. **Analyst Recommendations**:

   * Most California Healthcare stocks carry **“buy”** consensus, with **DXCM** getting a **strong buy**.
   * Price targets suggest meaningful **upside potential** for **COO (+27.7%)**, **RMD (+20.8%)**, and **MOH (+16.9%)**.
   * **GILD** is the only stock with a **negative target median profit (-2.7%)**, indicating limited optimism.

### Investment Implications

* **Attractive Opportunities**: ISRG, COO, RMD, and MOH stand out as favorable opportunities given solid ratios and positive analyst outlook.
* **Caution Required**: GILD presents valuation and profitability concerns despite strong analyst coverage.
* **Risk-Reward Balance**: AMGN offers stability (high ROE) but limited upside at current valuations.

### Technical Insights

* **Data Integration**: Joining financial ratios with analyst recommendations provides a comprehensive company-level view.
* **Profitability vs. Valuation**: Firms with strong profitability are not always attractively valued, requiring a balanced analysis.
* **Analyst Signals**: Target median profit metric helps quantify market sentiment and expected performance.

### Future Considerations

* **Comparative Benchmarking**: Extend analysis to other sectors/states for broader context.
* **Time-Series Monitoring**: Track quarterly changes in ratios and recommendation trends.
* **Risk Assessment**: Consider beta exposure for portfolio construction, especially ISRG and DXCM with higher volatility.
* **Deep Dive on Outliers**: Investigate why **EW** has extremely high profit margins and why **GILD** faces persistent analyst skepticism.