<a href="https://colab.research.google.com/github/Ajith-rajput/Stock-Analysis-project/blob/main/Stockanalysis_1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install pyspark py4j

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m3.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425344 sha256=504f31ebdd5fefa3ba85481b4c1b96290f05226728f6b61ee36b3962225034fa
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
import os

def initialize_spark():
    return SparkSession.builder.appName("StockAnalysisProgram").getOrCreate()

def load_csv(spark, file_path):
    return spark.read.option("header", "true").csv(file_path)

def add_symbol_column(df, file_name):
    return df.withColumn("Symbol", F.lit(os.path.splitext(file_name)[0]))

def generate_summary_report(df):
    return df.groupBy("Sector").agg(
        F.avg("open").alias("Avg Open Price"),
        F.avg("close").alias("Avg Close Price"),
        F.max("high").alias("Max High Price"),
        F.min("low").alias("Min Low Price"),
        F.avg("volume").alias("Avg Volume")
    )

# Define the folder path
folder_path = "/content/Data_source" # Replace with path

# Initialize SparkSession
spark = initialize_spark()

# Get list of file paths using list comprehension
file_paths = [os.path.join(folder_path, file_name) for file_name in os.listdir(folder_path) if os.path.isfile(os.path.join(folder_path, file_name))]


In [None]:
# Load CSV files into DataFrames and add a column
main_df = None
dfs = []

for file_path in file_paths:
    file_name = os.path.basename(file_path)

    # Set "symbol_metadata.csv" as default main_df assuming it has the details of all company
    if file_name == "symbol_metadata.csv":
        main_df = load_csv(spark, file_path)
    else:
        df = load_csv(spark, file_path)
        df = add_symbol_column(df, file_name)
        dfs.append(df)

for df in dfs:
    common_column = "Symbol"  # common column if needed can be changed
    joined_df = main_df.join(df, common_column, "inner")
    dfs[dfs.index(df)] = joined_df

# Initialize merged_df with the first DataFrame
merged_df = dfs[0]

# Union all DataFrames in dfs_to_merge
for df in dfs[1:]:
     merged_df = merged_df.unionAll(df)

In [None]:
# Generate summary reports ALL TIME
summary_report = generate_summary_report(merged_df)

# Show the summary report
summary_report.show()


+--------------------+------------------+------------------+--------------+-------------+-------------------+
|              Sector|    Avg Open Price|   Avg Close Price|Max High Price|Min Low Price|         Avg Volume|
+--------------------+------------------+------------------+--------------+-------------+-------------------+
|ENERGY & TRANSPOR...| 17.56280019904608|17.556030487850713|       99.9900|       0.1980|  4669611.431967552|
|             FINANCE|19.604653790169127|19.610372925764274|        9.9900|      10.0000| 10065.089239726794|
|       LIFE SCIENCES| 45.30218615437486|45.309679145671055|       99.9300|      10.2900| 1772380.0983737975|
|       MANUFACTURING|32.826966348273345| 32.84054371785455|       92.3200|      10.0850|  5396276.952240999|
|REAL ESTATE & CON...|10.176505082417581|10.168689285714288|        9.9900|      10.0000|  212976.1813186813|
|          TECHNOLOGY| 72.61458867796587| 72.61408919774018|       99.9900|       0.0900|1.149873897677966E7|
|    TRADE

In [None]:
# Accept user input for start date, end date, and sectors
start_date = input("Enter start date (yyyy-mm-dd): ")
end_date = input("Enter end date (yyyy-mm-dd): ")
sectors = [sector.strip().upper() for sector in input("Enter sectors (comma-separated, e.g., Sector1,Sector2): ").split(",")]

# Filter the DataFrame based on the user-provided date range and sectors
filtered_df = merged_df.filter((merged_df["timestamp"] >= start_date) &
                               (merged_df["timestamp"] <= end_date) &
                               (merged_df["Sector"].isin(sectors)))

# Generate summary reports for the selected sectors
summary_report_for_given_time = generate_summary_report(filtered_df)

# Show the summary report
summary_report_for_given_time.show()



Enter start date (yyyy-mm-dd): 2022-01-01
Enter end date (yyyy-mm-dd): 2022-02-20
Enter sectors (comma-separated, e.g., Sector1,Sector2): technology
+----------+-----------------+-----------------+--------------+-------------+-------------------+
|    Sector|   Avg Open Price|  Avg Close Price|Max High Price|Min Low Price|         Avg Volume|
+----------+-----------------+-----------------+--------------+-------------+-------------------+
|TECHNOLOGY|81.04880147058823|80.94974264705881|       96.8200|     154.7000|2.531880049264706E7|
+----------+-----------------+-----------------+--------------+-------------+-------------------+



In [None]:
# Accept user input for start date, end date, and sector
start_date = input("Enter start date (yyyy-mm-dd): ")
end_date = input("Enter end date (yyyy-mm-dd): ")
sector = input("Enter sector: ").strip().upper()  # Removing trailing spaces

# Filter the DataFrame based on the user-provided sector
filtered_df_symbol = merged_df.filter((merged_df["Sector"] == sector))

# Generate summary reports for all symbols in the selected sector
summary_report_symbol = filtered_df_symbol.groupBy("Symbol", "Name").agg(
    F.avg("open").alias("Avg Open Price"),
    F.avg("close").alias("Avg Close Price"),
    F.max("high").alias("Max High Price"),
    F.min("low").alias("Min Low Price"),
    F.avg("volume").alias("Avg Volume")
)

# Show the summary report for symbols in the selected sector
summary_report_symbol.show()


Enter start date (yyyy-mm-dd): 2022-01-01
Enter end date (yyyy-mm-dd): 2022-02-20
Enter sector: technology
+------+--------------------+------------------+------------------+--------------+-------------+--------------------+
|Symbol|                Name|    Avg Open Price|   Avg Close Price|Max High Price|Min Low Price|          Avg Volume|
+------+--------------------+------------------+------------------+--------------+-------------+--------------------+
|  AAPL|           Apple Inc| 174.2094461716107|174.19959467045692|       99.9900|     100.0000|2.9121742522472907E7|
|  DELL|Dell Technologies...| 65.07967503075035| 65.10261574415749|       99.9000|     100.1100|   2790933.971709717|
|  NTAP|          NetApp Inc|40.921400248711926| 40.92299943151524|       99.0000|      10.0000|  5492976.7871735655|
|  QMCO| Quantum Corporation|3.8011919168591386|3.8045603126665557|        9.9900|       0.0900|  1139171.3583229703|
+------+--------------------+------------------+------------------+