In [2]:
pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
     -------------------------------------- 316.9/316.9 MB 1.2 MB/s eta 0:00:00
  Preparing metadata (setup.py): started
  Preparing metadata (setup.py): finished with status 'done'
Collecting py4j==0.10.9.7
  Downloading py4j-0.10.9.7-py2.py3-none-any.whl (200 kB)
     -------------------------------------- 200.5/200.5 kB 6.1 MB/s eta 0:00:00
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py): started
  Building wheel for pyspark (setup.py): still running...
  Building wheel for pyspark (setup.py): finished with status 'done'
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425388 sha256=e12766d11060c9a5370b552697aa4755956d28fee317636a1c71b085f260f115
  Stored in directory: c:\users\abc\appdata\local\pip\cache\wheels\72\3c\32\f0f20da5a933f8c6c5a1a2184a9e516652ed3eebeb49f5ddd0
Successfully built pyspark
Installing collected packages: py4j, pyspark
Succes

In [95]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml import Pipeline
from pyspark.sql.functions import col
from pyspark.ml.evaluation import RegressionEvaluator

In [96]:
from pyspark.sql import SparkSession

try:
    # If a SparkContext already exists, use it
    spark = SparkSession.builder.getOrCreate()
except:
    # If not, create a new SparkSession
    spark = SparkSession.builder.appName("StockAnalysis").getOrCreate()


In [97]:
from pyspark.sql import SparkSession
from pyspark import SparkContext

# Stop existing SparkContext if it exists
if 'sc' in locals() or 'sc' in globals():
    SparkContext.stop()

# Create a new SparkSession
spark = SparkSession.builder.appName("StockAnalysis").getOrCreate()

# Load the dataset
file_path = "C:/dataset/HistoricalQuotes.csv"
stock = spark.read.csv(file_path, header=True, inferSchema=True)


In [98]:
stock.printSchema()

root
 |-- Date: string (nullable = true)
 |--  Close/Last: string (nullable = true)
 |--  Volume: double (nullable = true)
 |--  Open: string (nullable = true)
 |--  High: string (nullable = true)
 |--  Low: string (nullable = true)



In [99]:
stock.show(5)

+----------+-----------+----------+--------+--------+--------+
|      Date| Close/Last|    Volume|    Open|    High|     Low|
+----------+-----------+----------+--------+--------+--------+
|02/28/2020|    $273.36|1.067212E8| $257.26| $278.41| $256.37|
|02/27/2020|    $273.52|8.015138E7|  $281.1|    $286| $272.96|
|02/26/2020|    $292.65|4.967843E7| $286.53| $297.88|  $286.5|
|02/25/2020|    $288.08|5.766836E7| $300.95| $302.53| $286.13|
|02/24/2020|    $298.18|5.554883E7| $297.26| $304.18| $289.23|
+----------+-----------+----------+--------+--------+--------+
only showing top 5 rows



In [100]:
# Check for null values in the DataFrame
null_check_columns = stock.columns  

for column in null_check_columns:
    # Enclose the column name with backticks
    null_count = stock.filter(col(f"`{column}`").isNull()).count()
    print(f"Null count in column '{column}': {null_count}")

# Alternatively, you can use isNotNull() to check for non-null values
non_null_check_columns = stock.columns

for column in non_null_check_columns:
    # Enclose the column name with backticks
    non_null_count = stock.filter(col(f"`{column}`").isNotNull()).count()
    print(f"Non-null count in column '{column}': {non_null_count}")

# Stop the Spark session
spark.stop()

Null count in column 'Date': 0
Null count in column ' Close/Last': 0
Null count in column ' Volume': 0
Null count in column ' Open': 0
Null count in column ' High': 0
Null count in column ' Low': 0
Non-null count in column 'Date': 2518
Non-null count in column ' Close/Last': 2518
Non-null count in column ' Volume': 2518
Non-null count in column ' Open': 2518
Non-null count in column ' High': 2518
Non-null count in column ' Low': 2518


In [101]:



# Assuming 'Date' is a string, convert it to a numerical format
indexer = StringIndexer(inputCol="Date", outputCol="DateIndex")
stock = indexer.fit(stock).transform(stock)

# Define the feature columns manually (excluding the target 'Close' column)
feature_cols = ["DateIndex", "Open", "High", "Low", "Volume"]

# Create a new DataFrame with the selected columns
selected_data = stock.select(["Close"] + feature_cols)

# Define the linear regression model
lr = LinearRegression(featuresCol='features', labelCol='Close')

# Create a feature vector manually without using VectorAssembler
assembler = VectorAssembler(inputCols=feature_cols, outputCol='features')
pipeline = Pipeline(stages=[assembler, lr])

# Split the data into training and testing sets
train_data, test_data = selected_data.randomSplit([0.8, 0.2], seed=123)

# Fit the model
model = pipeline.fit(train_data)

# Make predictions on the test data
predictions = model.transform(test_data)

# Show the predictions
predictions.select("Close", "prediction", *feature_cols).show()


AssertionError: 