In [None]:
!pip install yfinance


StatementMeta(, , , Waiting, )





In [None]:
# Required Libraries
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_absolute_error
import yfinance as yf

# Define the tickers
ticker_symbols = 'MSFT AAPL 2222.SR GOOGL AMZN NVDA META BRK-A LLY TSM PEP SAP TMO LIN 601857.SS ROG.SW NVS MCD ABT DIS'

# Fetch data for all tickers
tickers = yf.Tickers(ticker_symbols)

# Initialize an empty list to hold data for all tickers
all_data = []

# Iterate over each ticker, fetch the historical data, and append to the list
for ticker in ticker_symbols.split():
    data = tickers.tickers[ticker].history(start="2010-01-01").reset_index()
    data['Ticker'] = ticker  # Add a column for the ticker symbol
    all_data.append(data[['Date', 'Open', 'High', 'Low', 'Close', 'Volume', 'Ticker']])  # Exclude 'Dividends', 'Stock Splits'


# Concatenate all the dataframes into a single dataframe
combined_data = pd.concat(all_data, ignore_index=True)

# Feature Engineering
combined_data['Prev Close'] = combined_data.groupby('Ticker')['Close'].shift(1)
combined_data['Price Change %'] = ((combined_data['Close'] - combined_data['Prev Close']) / combined_data['Prev Close']) * 100
combined_data['Volume Change %'] = combined_data.groupby('Ticker')['Volume'].pct_change() * 100
combined_data.dropna(inplace=True)

# Replace infinite values with NaN
combined_data.replace([np.inf, -np.inf], np.nan, inplace=True)

# Fill NaN values with the mean of the column
for column in ['Price Change %', 'Volume Change %']:
    combined_data[column].fillna(combined_data[column].mean(), inplace=True)

# Selecting features and target variable
features = ['Open', 'High', 'Low', 'Volume', 'Price Change %', 'Volume Change %']
X = combined_data[features]
y = combined_data['Close']

# Splitting the dataset
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Model Training
rf_model = RandomForestRegressor(n_estimators=100, random_state=42)
rf_model.fit(X_train, y_train)

# Making predictions for the whole dataset to integrate with historical data
combined_data['Predicted Close'] = rf_model.predict(X)

# Evaluation (optional, since we're predicting on all data, this step is just for insight)
test_predictions = rf_model.predict(X_test)
mae = mean_absolute_error(y_test, test_predictions)
print(f"Mean Absolute Error on Test Set: {mae}")

# The final DataFrame
results_df = combined_data.copy()




StatementMeta(, , , Waiting, )





Mean Absolute Error on Test Set: 46.873355199058565


In [None]:
results_df

StatementMeta(, , , Waiting, )

Unnamed: 0,Date,Open,High,Low,Close,Volume,Ticker,Prev Close,Price Change %,Volume Change %,Predicted Close
1,2010-01-05 00:00:00-05:00,23.355882,23.545151,23.196894,23.439159,49749600,MSFT,23.431591,0.032300,29.525555,23.417340
2,2010-01-06 00:00:00-05:00,23.378587,23.530003,23.106039,23.295309,58182400,MSFT,23.439159,-0.613718,16.950488,23.287348
3,2010-01-07 00:00:00-05:00,23.189324,23.242321,22.856210,23.053051,50559700,MSFT,23.295309,-1.039944,-13.101385,23.045292
4,2010-01-08 00:00:00-05:00,22.924349,23.378595,22.894065,23.212038,51197400,MSFT,23.053051,0.689657,1.261281,23.186197
5,2010-01-11 00:00:00-05:00,23.249884,23.287739,22.803209,22.916771,68754700,MSFT,23.212038,-1.272043,34.293343,22.883548
...,...,...,...,...,...,...,...,...,...,...,...
68042,2024-02-26 00:00:00-05:00,107.519997,108.760002,107.379997,107.680000,8694200,DIS,107.739998,-0.055687,-25.842716,107.724393
68043,2024-02-27 00:00:00-05:00,107.779999,109.580002,107.779999,109.419998,7595000,DIS,107.680000,1.615897,-12.642911,109.285142
68044,2024-02-28 00:00:00-05:00,108.779999,111.040001,108.660004,110.800003,10773000,DIS,109.419998,1.261200,41.843318,110.721372
68045,2024-02-29 00:00:00-05:00,111.139999,112.750000,110.970001,111.580002,11658300,DIS,110.800003,0.703970,8.217767,111.883944


In [None]:
# Convert the combined pandas DataFrame to a Spark DataFrame
results_spark_df = spark.createDataFrame(results_df)

# Show the Spark DataFrame to verify the structure
results_spark_df.show()

StatementMeta(, , , Waiting, )

+-------------------+------------------+------------------+------------------+------------------+---------+------+------------------+--------------------+-------------------+------------------+
|               Date|              Open|              High|               Low|             Close|   Volume|Ticker|        Prev Close|      Price Change %|    Volume Change %|   Predicted Close|
+-------------------+------------------+------------------+------------------+------------------+---------+------+------------------+--------------------+-------------------+------------------+
|2010-01-05 05:00:00|23.355881705681796|23.545151409222147|23.196894403821016|23.439159393310547| 49749600|  MSFT|23.431591033935547| 0.03229980996185399| 29.525555141880446|23.417340126037598|
|2010-01-06 05:00:00|23.378586731552307|23.530003030445773|23.106039415162048| 23.29530906677246| 58182400|  MSFT|23.439159393310547| -0.6137179415194399| 16.950488044124977| 23.28734800338745|
|2010-01-07 05:00:00|23.189323

In [None]:
from pyspark.sql.functions import col

results_spark_df = results_spark_df.select([col(c).alias(
        c.replace( '(', '')
        .replace( ')', '')
        .replace( ',', '')
        .replace( ';', '')
        .replace( '{', '')
        .replace( '}', '')
        .replace( '\n', '')
        .replace( '\t', '')
        .replace( ' ', '_')
    ) for c in results_spark_df.columns])

StatementMeta(, , , Waiting, )

In [None]:
results_spark_df.write.mode("overwrite").format("delta").save("Tables/" + 'StocksData')

StatementMeta(, , , Waiting, )