In [1]:
import datetime as dt
import pandas as pd
#!pip install yfinance
import yfinance as yf

In [2]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml import Pipeline
from pyspark.sql.functions import to_date
from pyspark.ml.evaluation import RegressionEvaluator


In [3]:
# SparkSession başlatma
spark = SparkSession.builder.appName("NVDA Price Prediction").getOrCreate()

24/03/28 02:16:27 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [4]:
# Veriyi yükleme ve DataFrame oluşturma
data = spark.read.csv("file:///home/hduser/Desktop/NVDA/NVDA_histrical_data.csv", header=True, inferSchema=True)

# 'Date' sütununu tarih tipine dönüştürme 
data = data.withColumn("Date", to_date(data["Date"]))

In [5]:
data.show(5)

+----------+-------------------+-------------------+-------------------+-------------------+-------------------+--------+
|      Date|               Open|               High|                Low|              Close|          Adj Close|  Volume|
+----------+-------------------+-------------------+-------------------+-------------------+-------------------+--------+
|1999-03-22| 0.4466150104999542|0.44791701436042786|0.42447900772094727|0.42447900772094727| 0.3893754184246063| 3667200|
|1999-03-23|0.42708298563957214|0.42708298563957214|           0.390625| 0.3984380066394806|0.36548805236816406|16396800|
|1999-03-24|0.39583298563957214| 0.3984380066394806|0.38020798563957214|0.39583298563957214|0.36309847235679626| 6086400|
|1999-03-25| 0.3945310115814209|0.41666701436042786|0.39322900772094727|0.40104201436042786|0.36787667870521545| 4032000|
|1999-03-26|            0.40625|             0.4375|            0.40625|0.43619799613952637| 0.4001253545284271| 8827200|
+----------+------------

In [6]:
spark.createDataFrame(data.tail(5)).show()

+----------+------------------+------------------+------------------+------------------+------------------+--------+
|      Date|              Open|              High|               Low|             Close|         Adj Close|  Volume|
+----------+------------------+------------------+------------------+------------------+------------------+--------+
|2023-03-15|237.61000061035156|242.86000061035156|233.60000610351562|242.27999877929688|242.20230102539062|52448600|
|2023-03-16|240.27000427246094| 255.8800048828125|238.94000244140625|255.41000366210938|255.32810974121094|58325300|
|2023-03-17|259.82000732421875|  263.989990234375|256.67999267578125|            257.25|257.16754150390625|84854700|
|2023-03-20| 256.1499938964844|  260.239990234375| 251.3000030517578|             259.0| 258.9169616699219|43274700|
|2023-03-21|261.79998779296875| 263.9200134277344|253.80999755859375|  261.989990234375| 261.9059753417969|54740800|
+----------+------------------+------------------+--------------

In [7]:
df= data

In [8]:
from pyspark.sql.functions import col, count, when

# Her bir sütun için eksik değer sayısını hesaplama
missing_values = df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns])
missing_values.show()

+----+----+----+---+-----+---------+------+
|Date|Open|High|Low|Close|Adj Close|Volume|
+----+----+----+---+-----+---------+------+
|   0|   0|   0|  0|    0|        0|     0|
+----+----+----+---+-----+---------+------+



In [9]:
# 'Close' sütunu ve 'Date' sütunu ile sınırlı DataFrame'i CSV olarak kaydet
#df.write.csv("file:///home/hduser/Desktop/NVDA/NVDA.csv", header=True)


Uyarıları tamamen kapatmanın bir yolu olmasa da, günlükleri daha az ayrıntılı hale getirerek uyarıların gösterilmesini azaltabilirsiniz. Bu, Spark'ın günlük seviyesini ayarlayarak yapılabilir. Örneğin, günlük seviyesini "ERROR" olarak ayarlamak yalnızca hata mesajlarının gösterilmesini sağlayacaktır.

In [10]:
spark.sparkContext.setLogLevel("ERROR")

1. Veri Setinin Yüklenmesi ve Öznitelik Oluşturma

In [11]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import lag,col, to_date, dayofweek
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.window import Window
# Spark oturumunu başlatma
spark = SparkSession.builder.appName("Stock Price Forecasting").getOrCreate()

# Gecikmeli öznitelikler ekleyin
for i in range(1, 6):
    df = df.withColumn(f"lag_{i}", lag(col("Close"), i).over(Window.orderBy("Date")))

# Haftanın günü gibi tarih özniteliklerini ekleyin
df = df.withColumn("DayOfWeek", dayofweek(col("Date")))

# Eğitim için kullanılacak özniteliklerin listesini oluştur
input_cols = [f"lag_{i}" for i in range(1, 6)] + ['Open', 'High', 'Low', 'Volume', 'DayOfWeek']

In [12]:
from pyspark.sql import functions as F

for i in range(1, 6):
    # 'lag' sütunlarını mevcut 'Close' değeriyle doldurun
    df = df.withColumn(f"lag_{i}", F.coalesce(df[f"lag_{i}"], df["Close"]))

In [13]:
#df.show(6)

In [14]:
from pyspark.sql.functions import dayofweek
# 'DayOfWeek' sütunu için eksik değerleri doldurun
#df = df.withColumn('DayOfWeek', dayofweek(df['Date']))
# 'DayOfWeek' sütunu için `null` değerleri kontrol edin ve doldurun
#df = df.na.fill({'DayOfWeek': 0})

# VectorAssembler'ı tekrar çalıştırın
vectorAssembler = VectorAssembler(inputCols=input_cols, outputCol='features')
df = vectorAssembler.transform(df)

# Veri setini eğitim ve test olarak ayırma
train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)


2. Model Eğitimi ve Katsayıların Gözlemlenmesi

In [15]:
from pyspark.ml.regression import LinearRegression

# Modeli tanımla ve eğit
lr = LinearRegression(featuresCol="features", labelCol="Close")
model = lr.fit(train_df)

# Eğitim ve test setleri üzerinde tahminler yap
train_predictions = model.transform(train_df)
test_predictions = model.transform(test_df)

# Model katsayılarını ve intercept değerini yazdır
print("Intercept:", model.intercept)
print("Coefficients:")
for i, coeff in enumerate(model.coefficients):
    print(f"lag_{i+1} coefficient: {coeff}")


Intercept: 0.04888933354776447
Coefficients:
lag_1 coefficient: -0.07215812355408208
lag_2 coefficient: -0.01639787411257297
lag_3 coefficient: -0.006880200372128203
lag_4 coefficient: 0.0033984379254951946
lag_5 coefficient: 0.017200398278379554
lag_6 coefficient: -0.5719389608544221
lag_7 coefficient: 0.7894061625420268
lag_8 coefficient: 0.859678723134829
lag_9 coefficient: 9.72973647944246e-11
lag_10 coefficient: -0.014206453313706149


3. Model Değerlendirme (RMSE ve MSE)

In [16]:
from pyspark.ml.evaluation import RegressionEvaluator

# RMSE ve MSE değerlendirici tanımla ve hesapla
evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="Close")

train_rmse = evaluator.evaluate(train_predictions, {evaluator.metricName: "rmse"})
test_rmse = evaluator.evaluate(test_predictions, {evaluator.metricName: "rmse"})
train_mse = evaluator.evaluate(train_predictions, {evaluator.metricName: "mse"})
test_mse = evaluator.evaluate(test_predictions, {evaluator.metricName: "mse"})

print(f"Train RMSE: {train_rmse}, Train MSE: {train_mse}")
print(f"Test RMSE: {test_rmse}, Test MSE: {test_mse}")


Train RMSE: 0.8257366129326283, Train MSE: 0.6818409539374493
Test RMSE: 0.8891719836317359, Test MSE: 0.7906268164755961


4. Gelecekteki 5 Gün İçin Tahminlerin Yapılması

In [17]:
from datetime import datetime, timedelta

# Burada son bilinen verileri kullanarak gelecekteki 5 gün için tahminler yapacağız.
# Kodun bu kısmı, son kapanış fiyatlarından yola çıkarak yeni öznitelikler oluşturur ve tahminleri yapar.

# Modelin en son tahminlerinden ve bilinen değerlerden yeni gecikmeli öznitelikler yaratma
last_row = train_df.orderBy(F.desc("Date")).first()
last_known_values = [last_row["Close"]] + [last_row[f"lag_{i}"] for i in range(1, 5)]

# Gelecekteki tarihler için tahminler yapma
future_dates = [last_row["Date"] + timedelta(days=i) for i in range(1, 6)]
future_predictions = []

In [18]:
df.show(5)

+----------+-------------------+-------------------+-------------------+-------------------+-------------------+--------+-------------------+-------------------+-------------------+-------------------+-------------------+---------+--------------------+
|      Date|               Open|               High|                Low|              Close|          Adj Close|  Volume|              lag_1|              lag_2|              lag_3|              lag_4|              lag_5|DayOfWeek|            features|
+----------+-------------------+-------------------+-------------------+-------------------+-------------------+--------+-------------------+-------------------+-------------------+-------------------+-------------------+---------+--------------------+
|1999-03-22| 0.4466150104999542|0.44791701436042786|0.42447900772094727|0.42447900772094727| 0.3893754184246063| 3667200|0.42447900772094727|0.42447900772094727|0.42447900772094727|0.42447900772094727|0.42447900772094727|        2|[0.4244790

In [None]:
future_dates

In [None]:
input_cols = [f"lag_{i}" for i in range(1, 6)] + ['Open', 'High', 'Low', 'Volume', 'DayOfWeek']


In [None]:
vectorAssembler = VectorAssembler(inputCols=input_cols, outputCol='features')


In [None]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

# Modeli tanımla ve eğit
lr = LinearRegression(featuresCol="features", labelCol="Close")
model = lr.fit(train_df)

# Eğitim ve test setleri üzerinde tahminler yap
train_predictions = model.transform(train_df)
test_predictions = model.transform(test_df)

# RMSE değerlendiricisini tanımla ve hesapla
evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="Close", metricName="rmse")
train_rmse = evaluator.evaluate(train_predictions)
test_rmse = evaluator.evaluate(test_predictions)

print(f"Train RMSE: {train_rmse}")
print(f"Test RMSE: {test_rmse}")


In [None]:
from datetime import timedelta

# İleri tarihli 5 gün için tahmin yapma
last_row = train_df.orderBy(F.desc("Date")).first()
last_known_values = [last_row["Close"]] + [last_row[f"lag_{i}"] for i in range(1, 5)]

future_predictions = []

for i in range(5):
    # Yeni bir giriş özellik vektörü oluşturma
    future_features = last_known_values[1:] + [last_known_values[0]]  # Özelliklerin sırası önemli olabilir
    
    # VectorAssembler kullanmadan giriş özelliklerini birleştirme
    future_features_vector = spark.createDataFrame([(*future_features, last_row["DayOfWeek"])], input_cols)
    
    # Modeli kullanarak tahmin yapma
    prediction = model.transform(future_features_vector).collect()[0]['prediction']
    
    # Tahmini listeye ekleme
    future_predictions.append(prediction)
    
    # Son bilinen değerleri güncelleme
    last_known_values = [prediction] + last_known_values[:-1]

    # Tarih bilgisini bir sonraki güne kaydırma
    last_row["Date"] += timedelta(days=1)

# Tahminleri yazdırma
for i, prediction in enumerate(future_predictions):
    future_date = last_row["Date"] + timedelta(days=i)
    print(f"Tarih: {future_date}, Tahmin: {prediction}")


In [19]:
from pyspark.ml.linalg import Vectors
from pyspark.sql import Row

# Gelecekteki tarihler için tahminler yapma
future_predictions = []

for i in range(5):
    # Yeni bir giriş özellik vektörü oluşturma
    future_features = last_known_values[1:] + [last_known_values[0]]  # Özelliklerin sırası önemli olabilir
    
    # Özellikleri bir vektöre dönüştürme
    features_vector = Vectors.dense(future_features)
    
    # Vektörü bir PySpark DataFrame içindeki vektör sütununa benzer bir şekilde işleme
    row = Row(features=features_vector)
    df_future = spark.createDataFrame([row])
    
    # Modeli kullanarak tahmin yapma
    prediction = model.transform(df_future).select('prediction').collect()[0]['prediction']
    
    # Tahmini listeye ekleme
    future_predictions.append(prediction)
    
    # Son bilinen değerleri güncelleme
    last_known_values = [prediction] + last_known_values[:-1]

    # Tarih bilgisini bir sonraki güne kaydırma
    last_row["Date"] += timedelta(days=1)

# Tahminleri yazdırma
for i, prediction in enumerate(future_predictions):
    future_date = last_row["Date"] + timedelta(days=i)
    print(f"Tarih: {future_date}, Tahmin: {prediction}")


24/03/28 02:16:52 ERROR Executor: Exception in task 31.0 in stage 36.0 (TID 86)
org.apache.spark.SparkException: [FAILED_EXECUTE_UDF] Failed to execute user defined function (PredictionModel$$Lambda$3969/1820683712: (struct<type:tinyint,size:int,indices:array<int>,values:array<double>>) => double).
	at org.apache.spark.sql.errors.QueryExecutionErrors$.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala:217)
	at org.apache.spark.sql.errors.QueryExecutionErrors.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)
	at org.apache.spark.rdd.RDD.$anonfun$mapPa

Py4JJavaError: An error occurred while calling o569.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 31 in stage 36.0 failed 1 times, most recent failure: Lost task 31.0 in stage 36.0 (TID 86) (192.168.125.213 executor driver): org.apache.spark.SparkException: [FAILED_EXECUTE_UDF] Failed to execute user defined function (PredictionModel$$Lambda$3969/1820683712: (struct<type:tinyint,size:int,indices:array<int>,values:array<double>>) => double).
	at org.apache.spark.sql.errors.QueryExecutionErrors$.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala:217)
	at org.apache.spark.sql.errors.QueryExecutionErrors.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:888)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:888)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:139)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.IllegalArgumentException: requirement failed: BLAS.dot(x: Vector, y:Vector) was given Vectors with non-matching sizes: x.size = 5, y.size = 10
	at scala.Predef$.require(Predef.scala:281)
	at org.apache.spark.ml.linalg.BLAS$.dot(BLAS.scala:123)
	at org.apache.spark.ml.regression.LinearRegressionModel.predict(LinearRegression.scala:746)
	at org.apache.spark.ml.regression.LinearRegressionModel.predict(LinearRegression.scala:696)
	at org.apache.spark.ml.PredictionModel.$anonfun$transformImpl$1(Predictor.scala:204)
	at org.apache.spark.ml.PredictionModel.$anonfun$transformImpl$1$adapted(Predictor.scala:203)
	... 18 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2785)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2721)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2720)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2720)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1206)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1206)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1206)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2984)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2923)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2912)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:971)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2263)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2284)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2303)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2328)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1019)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:405)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1018)
	at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:448)
	at org.apache.spark.sql.Dataset.$anonfun$collectToPython$1(Dataset.scala:4036)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4206)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:526)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4204)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:118)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4204)
	at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:4033)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.spark.SparkException: [FAILED_EXECUTE_UDF] Failed to execute user defined function (PredictionModel$$Lambda$3969/1820683712: (struct<type:tinyint,size:int,indices:array<int>,values:array<double>>) => double).
	at org.apache.spark.sql.errors.QueryExecutionErrors$.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala:217)
	at org.apache.spark.sql.errors.QueryExecutionErrors.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:888)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:888)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:139)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
Caused by: java.lang.IllegalArgumentException: requirement failed: BLAS.dot(x: Vector, y:Vector) was given Vectors with non-matching sizes: x.size = 5, y.size = 10
	at scala.Predef$.require(Predef.scala:281)
	at org.apache.spark.ml.linalg.BLAS$.dot(BLAS.scala:123)
	at org.apache.spark.ml.regression.LinearRegressionModel.predict(LinearRegression.scala:746)
	at org.apache.spark.ml.regression.LinearRegressionModel.predict(LinearRegression.scala:696)
	at org.apache.spark.ml.PredictionModel.$anonfun$transformImpl$1(Predictor.scala:204)
	at org.apache.spark.ml.PredictionModel.$anonfun$transformImpl$1$adapted(Predictor.scala:203)
	... 18 more


In [None]:
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_squared_error

# Öznitelikler ve hedef değişkeni uygun formatta düzenleme
X_train = train_df[['lag_1', 'lag_2', 'lag_3', 'lag_4', 'lag_5', 'Open', 'High', 'Low', 'Volume', 'DayOfWeek']]
y_train = train_df['Close']
X_test = test_df[['lag_1', 'lag_2', 'lag_3', 'lag_4', 'lag_5', 'Open', 'High', 'Low', 'Volume', 'DayOfWeek']]
y_test = test_df['Close']

# Modeli tanımla ve eğit
model = LinearRegression()
model.fit(X_train, y_train)

# Tahminler yap
train_predictions = model.predict(X_train)
test_predictions = model.predict(X_test)

# Hata metriklerini hesapla
train_rmse = mean_squared_error(y_train, train_predictions, squared=False)
test_rmse = mean_squared_error(y_test, test_predictions, squared=False)

print(f"Train RMSE: {train_rmse}")
print(f"Test RMSE: {test_rmse}")


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import lag,to_date, year, month, dayofmonth, dayofweek, dayofyear
from pyspark.sql.window import Window
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

# Spark session başlatma
spark = SparkSession.builder.appName("Time Series Forecasting").getOrCreate()

# Veri setini yükleme ve tarih sütununu düzeltme
df = df.withColumn("Date", to_date(df["Date"], "yyyy-MM-dd"))

# Zaman damgası öznitelikleri ekleyin
df = df.withColumn("Year", year("Date"))
df = df.withColumn("Month", month("Date"))
df = df.withColumn("DayOfMonth", dayofmonth("Date"))
df = df.withColumn("DayOfWeek", dayofweek("Date"))
df = df.withColumn("DayOfYear", dayofyear("Date"))

# Gecikmeli öznitelikler oluştur
windowSpec = Window.orderBy("Date")
for i in range(1, 6):
    df = df.withColumn(f"lag_{i}", lag("Close", i).over(windowSpec))

# Null değerlerden kurtulun (ilk 5 satırda olabilir)
df = df.na.drop()

# Öznitelikleri bir vektör haline getirme
feature_cols = [f"lag_{i}" for i in range(1, 6)] + ["Year", "Month", "DayOfMonth", "DayOfWeek", "DayOfYear"]
vectorAssembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
df = vectorAssembler.transform(df)

# Veri setini eğitim ve test olarak ayırma
train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)

# Lineer regresyon modelini tanımlayın ve eğitin
lr = LinearRegression(featuresCol="features", labelCol="Close")
lr_model = lr.fit(train_df)

# Eğitim veri setinde tahminleri hesaplama
train_predictions = lr_model.transform(train_df)
test_predictions = lr_model.transform(test_df)

# MSE ve RMSE değerlerini hesaplama
evaluator = RegressionEvaluator(labelCol="Close", predictionCol="prediction", metricName="rmse")
train_rmse = evaluator.evaluate(train_predictions)
test_rmse = evaluator.evaluate(test_predictions)
print(f"Train RMSE: {train_rmse}, Test RMSE: {test_rmse}")

# Bu kısımda gelecekteki tarihler için tahminleri nasıl yapacağınıza bağlı olarak devam edebilirsiniz.
# Örneğin, df'deki son tarih ve gerekli önceki kapanış fiyatları kullanılarak gelecekteki tarihler için tahmin yapılabilir.


In [None]:
# Model katsayılarını ve intercept değerini görüntüleme
coefficients = model.coefficients
intercept = model.intercept

print(f"Intercept (β0): {intercept}")
print("Coefficients (β1, β2, ..., βn):")
for i, coeff in enumerate(coefficients):
    print(f"β{i+1}: {coeff}")

In [None]:
from pyspark.sql import functions as F
from datetime import datetime, timedelta

# Son bilinen kapanış fiyatlarını ve tarih bilgilerini al
last_known_close_prices = df.select("Close").orderBy(F.desc("Date")).limit(5).collect()
last_known_date = df.select(F.max("Date")).collect()[0][0]

# Gelecekteki tarihler için bir liste hazırlayın
future_dates = [last_known_date + timedelta(days=i) for i in range(1, 6)]

# Gelecekteki her bir tarih için tahminleri saklayacak bir liste
future_predictions = []

# Gelecekteki her bir tarih için tahmin yapmak için döngü
for i in range(1, 6):
    # Yeni bir DataFrame oluşturun
    new_row = spark.createDataFrame([(future_dates[i-1],)], ["Date"])
    
    # Gelecekteki her bir tarih için gerekli gecikmeli öznitelikleri ekleyin
    for j in range(5):
        lag_col_name = f"lag_{j+1}"
        if j < i:
            lag_value = float(last_known_close_prices[j][0])
        else:
            # future_predictions listesine güvenli bir şekilde erişim
            # İlk iterasyonda, listeye henüz tahmin eklenmemiş olacağından dolayı bir kontrol ekleniyor
            lag_value = float(future_predictions[j-i][1]) if len(future_predictions) > j-i else None
        
        if lag_value is not None:
            new_row = new_row.withColumn(lag_col_name, F.lit(lag_value))

    # Öznitelik vektörünü oluşturun ve tahmin yapın
    # Eğer lag_value None ise, bu satırda bir hata oluşacaktır, bu nedenle tüm lag değerleri doldurulmalıdır
    new_row = vectorAssembler.transform(new_row)
    prediction = lr_model.transform(new_row).select("prediction").collect()[0][0]
    
    # Gelecekteki tahminleri listeye ekleyin
    if None not in [lag_value for lag_value in lag_values]:
        future_predictions.append((future_dates[i-1], prediction))
        
# Tahminleri DataFrame olarak dönüştürün ve gösterin
future_predictions_df = spark.createDataFrame(future_predictions, ["Date", "Prediction"])
future_predictions_df.show()


1. Veri Setini Yükleme ve İlk İşleme

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_date

# Spark session başlatma
spark = SparkSession.builder.appName("Time Series Forecasting").getOrCreate()

# Veri setinin, 'Date' sütununu tarih tipine çevirme
df = df.withColumn("Date", to_date(df["Date"], "yyyy-MM-dd"))


2. Öznitelik Mühendisliği ve Veri Setini Bölme

In [None]:
from pyspark.sql.functions import lag
from pyspark.sql.window import Window
from pyspark.ml.feature import VectorAssembler

# Window tanımı yapma
windowSpec = Window.orderBy("Date")

# Geçmiş kapanış fiyatlarından öznitelikler oluşturma (lag features)
for i in range(1, 6):
    df = df.withColumn(f"lag_{i}", lag("Close", i).over(windowSpec))

# Null değerleri kaldırma
df = df.na.drop()

# Öznitelikleri bir vektör haline getirme
vectorAssembler = VectorAssembler(inputCols=[f"lag_{i}" for i in range(1, 6)], outputCol="features")

# Veri setini öznitelikler ve etiket (label) olarak ayarlama
df = vectorAssembler.transform(df)


3. Model Eğitimi ve Değerlendirme

In [None]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

# Modeli tanımlama ve eğitim veri seti üzerinde eğitme
lr = LinearRegression(featuresCol="features", labelCol="Close")
model = lr.fit(df)

# Eğitim veri setindeki tahminleri hesaplama
predictions = model.transform(df)

# MSE ve RMSE değerlerini hesaplama
evaluator = RegressionEvaluator(labelCol="Close", predictionCol="prediction", metricName="rmse")
mse = evaluator.evaluate(predictions, {evaluator.metricName: "mse"})
rmse = evaluator.evaluate(predictions, {evaluator.metricName: "rmse"})
print(f"MSE: {mse}, RMSE: {rmse}")


4. Geleceğe Yönelik Tahminlerin Yapılması

Bu adım, mevcut veri setinizin sonundaki tarihler için 22-26 Mart tarihleri arasında 5 gün ekleyerek geleceğe yönelik tahminler yapılmasını içerir. Eğer bu tarihler veri setinizin dışındaysa, bu tahminleri yapabilmek için ekstra öznitelikler oluşturmanız gerekecektir.

In [None]:
from datetime import datetime, timedelta
import pandas as pd

# En son tarihi al ve geleceğe dönük tarihler oluştur
last_date = df.select("Date").rdd.max()[0]
future_dates = [last_date + timedelta(days=x) for x in range(1, 6)]
future_df = pd.DataFrame(future_dates, columns=["Date"])

# Spark DataFrame'e dönüştür ve öznitelikler oluştur
future_sdf = spark.createDataFrame(future_df)
# Burada geleceğe yönelik öznitelikler oluşturulacak ve tahminler yapılacak


In [None]:
future_sdf.show()

In [None]:
future_predictions.show()

In [None]:
# Önceki adımlarda oluşturduğunuz modeli kullanarak tahmin yapma
# 'future_sdf' içindeki gecikmeli özniteliklerin adları: 'lag_1', 'lag_2', 'lag_3', 'lag_4', 'lag_5'
# Bu özniteliklerin nasıl hesaplanacağı modelinize ve veri setinize bağlıdır

# Örneğin, son 5 günün kapanış fiyatlarını kullanarak bu öznitelikleri oluşturun:
# Bu örnekte, 'df' mevcut veri setinizdir ve son günün kapanış fiyatlarına sahip olduğunuzu varsayıyoruz

from pyspark.sql import functions as F

# 'future_sdf' için gerekli öznitelikleri oluşturma
for i in range(1, 6):
    # Son günün kapanış fiyatını 'lag_1' olarak kullanarak geriye doğru ilerleyin
    # Burada veri setinizden uygun bir şekilde öznitelikleri doldurmanız gerekmekte
    future_sdf = future_sdf.withColumn(f"lag_{i}", F.lit(df.select(F.col("Close")).collect()[-i][0]))

# Öznitelik vektörünü oluşturun
future_sdf = vectorAssembler.transform(future_sdf)

# Tahmin yapma
future_predictions = model.transform(future_sdf)

# Tahmin edilen değerleri gösterme
future_predictions.select("Date", "prediction").show()


In [None]:
# En son gerçek veriyi veya önceki tahmini kullanarak lag özniteliklerini güncelleme işlevi
def update_lag_features(last_values, new_prediction):
    # Son tahmin değerini ilk gecikme olarak ekleyin ve diğerlerini bir adım öteleme
    updated_values = [new_prediction] + last_values[:-1]
    return updated_values

# İlk gecikme değerleri olarak mevcut bilinen son kapanış fiyatlarını kullanma
last_known_values = [df.select(F.col("Close")).collect()[-i][0] for i in range(1, 6)]

# Gelecekteki her bir tarih için tahminler yapma
for future_date in future_dates:
    # Yeni lag özniteliklerini oluşturma
    new_row = [future_date] + last_known_values
    new_sdf = spark.createDataFrame([new_row], schema=future_sdf.schema)
    
    # Öznitelik vektörünü oluştur
    new_sdf = vectorAssembler.transform(new_sdf)
    
    # Model ile tahmin yapma
    new_prediction = model.transform(new_sdf).select("prediction").collect()[0][0]
    
    # Tahminleri saklama ve sonraki adım için lag özniteliklerini güncelleme
    last_known_values = update_lag_features(last_known_values, new_prediction)

# Son tahminleri içeren DataFrame'i gösterme


In [None]:
from pyspark.sql import Row

# Gelecekteki her bir tarih için tahmin yapmak için döngü
for i in range(1, 6):
    # Yeni tahmin için gerekli gecikmeleri hazırla
    # Burada son bilinen gerçek değerleri kullanıyorsunuz
    lag_values = [float(df.select(F.col("Close")).collect()[-j][0]) for j in range(i, i+5)]
    future_date = last_date + timedelta(days=i)
    
    # Yeni bir satır oluştur. Features sütunu daha sonra doldurulacak
    new_row = Row(Date=future_date, lag_1=lag_values[0], lag_2=lag_values[1], 
                  lag_3=lag_values[2], lag_4=lag_values[3], lag_5=lag_values[4])
    
    # Spark DataFrame'ine dönüştür ve öznitelik vektörünü oluştur
    new_sdf = spark.createDataFrame([new_row])
    new_sdf = vectorAssembler.transform(new_sdf)
    
    # Tahmin yap
    new_prediction = model.transform(new_sdf).select("prediction").collect()[0][0]
    
    # İleri tahminler için yeni gecikmeli değerler oluştur
    lag_values.pop(0) # En eski lag değerini çıkar
    lag_values.append(new_prediction) # Yeni tahmini ekle
    
    # Tahmini kaydet
    future_predictions.append((future_date, new_prediction))


In [None]:
# Gelecekteki tahminleri saklayacak bir liste oluştur
future_predictions_list = []

# Gelecekteki her bir tarih için tahmin yapmak için döngü
for i in range(1, 6):
    # Yeni tahmin için gerekli gecikmeleri hazırla
    # Burada son bilinen gerçek değerleri kullanıyorsunuz
    lag_values = [float(df.select(F.col("Close")).collect()[-j][0]) for j in range(i, i+5)]
    future_date = last_date + timedelta(days=i)
    
    # Yeni bir satır oluştur. Features sütunu daha sonra doldurulacak
    new_row = Row(Date=future_date, lag_1=lag_values[0], lag_2=lag_values[1], 
                  lag_3=lag_values[2], lag_4=lag_values[3], lag_5=lag_values[4])
    
    # Spark DataFrame'ine dönüştür ve öznitelik vektörünü oluştur
    new_sdf = spark.createDataFrame([new_row])
    new_sdf = vectorAssembler.transform(new_sdf)
    
    # Tahmin yap
    new_prediction = model.transform(new_sdf).select("prediction").collect()[0][0]
    
    # İleri tahminler için yeni gecikmeli değerler oluştur
    lag_values.pop(0) # En eski lag değerini çıkar
    lag_values.append(new_prediction) # Yeni tahmini ekle
    
    # Tahmini ve tarihi listeye ekle
    future_predictions_list.append((future_date, new_prediction))

# Listeyi DataFrame'e dönüştür
future_predictions_df = spark.createDataFrame(future_predictions_list, ["Date", "Prediction"])

# Son tahminleri göster
future_predictions_df.show()


Adım 1: Öznitelik Mühendisliği
Veri setinizi Spark DataFrame'e dönüştürün.
lag fonksiyonunu kullanarak, her bir tarih için önceki günlerin kapanış fiyatlarını içeren yeni sütunlar ekleyin. Bu sütunlar, modelin öğrenme sürecinde kullanılacak öznitelikler olacak.

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Time Series Forecasting").getOrCreate()

In [None]:
from pyspark.sql.window import Window
from pyspark.sql.functions import lag, col

window = Window.orderBy("Date")
for i in range(1, 6):  # 5 günlük gecikmeler
    df = df.withColumn(f"lag_{i}", lag("Close", i).over(window))
df.show(5)

Adım 2: Model Eğitimi
Öznitelikler hazır olduğunda, bir makine öğrenimi modeli seçin ve eğitin. Örneğin, RandomForestRegressor kullanabilirsiniz.

In [None]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml import Pipeline

assembler = VectorAssembler(inputCols=[f"lag_{i}" for i in range(1, 6)], outputCol="features")
rf = RandomForestRegressor(featuresCol="features", labelCol="Close")

pipeline = Pipeline(stages=[assembler, rf])

# Eğitim veri setinde null değerler içeren satırlar kaldırılmalıdır
df = df.na.drop()
model = pipeline.fit(df)


Adım 3: Tahmin
Modeli eğittikten sonra, gelecek değerleri tahmin etmek için kullanabilirsiniz.

In [None]:
predictions = model.transform(df)
predictions.select("Date", "Close", "prediction").show(5)

In [None]:
spark.createDataFrame(predictions.select("Date", "Close", "prediction").tail(5)).show()

In [None]:
from datetime import datetime, timedelta
from pyspark.sql import SparkSession

# Spark session başlatma
spark = SparkSession.builder.appName("Future Dates").getOrCreate()

# Veri setinizin son tarihini string olarak alın
last_date_str = '2023-03-21'
last_date = datetime.strptime(last_date_str, "%Y-%m-%d")

# Gelecekteki 5 gün için tarih listesi oluştur
future_dates = [last_date + timedelta(days=x) for x in range(1, 6)]

# Gelecekteki tarihler için pandas DataFrame oluştur
future_df = pd.DataFrame(future_dates, columns=['Date'])

# Pandas DataFrame'ini Spark DataFrame'ine dönüştür
future_sdf = spark.createDataFrame(future_df)

# Sonuçları göster
future_sdf.show()


In [None]:
from pyspark.ml.regression import LinearRegression

# Öznitelikleri hazırlama ve vektör haline getirme
vectorAssembler = VectorAssembler(inputCols=["lag_1", "lag_2", "lag_3", "lag_4", "lag_5"], outputCol="features")

# Lineer regresyon modelini tanımlama
lr = LinearRegression(featuresCol="features", labelCol="Close")

# Pipeline oluşturma
pipeline = Pipeline(stages=[vectorAssembler, lr])

# Modeli eğitme
model = pipeline.fit(train_data)

# Son 5 günlük tahminler için özniteliklerin oluşturulması
# Bu kısım, mevcut verinizin son gününden itibaren ileri tarihler için yapılmalıdır
# Örneğin, son gün 2023-03-21 ise, 2023-03-22 için öznitelikler oluşturup model.transform kullanarak tahmin yapmalısınız

# Not: Bu örnekte, 'future_dates' adında ileri tarihleri içeren bir DataFrame'iniz olduğunu varsayıyoruz
# Bu DataFrame, tahmin etmek istediğiniz tarihler için gerekli öznitelikleri (gecikmeleri vb.) içermelidir

# Tahmin yapma
predictions = model.transform(future_sdf)
predictions.select("Date", "prediction").show()


In [None]:
from pyspark.ml.feature import StandardScaler


In [None]:

# Özellikleri bir vektör haline getirme
assembler = VectorAssembler(inputCols=['Open', 'High', 'Low', 'Volume'], outputCol="features")
df_assembled = assembler.transform(df)

# Ölçeklendirme için StandardScaler kullanma
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures", withStd=True, withMean=True)
scalerModel = scaler.fit(df_assembled)
df_scaled = scalerModel.transform(df_assembled)

df_scaled.show(5)

# ozellik muhendisligi

In [None]:
from pyspark.sql.window import Window
from pyspark.sql.functions import lag

windowSpec = Window.orderBy('Date')
df = df.withColumn('Prev_Close', lag(df['Close']).over(windowSpec))
df.show(5)

In [None]:
from pyspark.sql.functions import avg

short_window = Window.orderBy('Date').rowsBetween(-30, 0)
long_window = Window.orderBy('Date').rowsBetween(-90, 0)

df = df.withColumn('Short_Average', avg('Close').over(short_window))
df = df.withColumn('Long_Average', avg('Close').over(long_window))
df.show(5)

In [None]:
from pyspark.sql.functions import col

df = df.withColumn('Day_Pct_Change', (col('Close') - col('Prev_Close')) / col('Prev_Close'))
df.show(5)

In [None]:
from pyspark.sql.functions import log

df = df.withColumn('Log_Volume', log(col('Volume')))
df.show(5)

In [None]:
# Specifying properties and target variable
feature_columns = ['Open', 'High', 'Low', 'Volume']  # Örnek özellikler
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
data = assembler.transform(df)

In [None]:
# Target variable
data = data.withColumn("label", data["Close"])

In [None]:
# Separate training and test sets
train_data, test_data = data.randomSplit([0.8, 0.2], seed=42)

In [None]:
# Model creation and training
lr = LinearRegression(featuresCol='features', labelCol='label')
model = lr.fit(train_data)

In [None]:
# Prediction on the test set
predictions = model.transform(test_df)

In [None]:
predictions.show(5)

In [None]:
# Evaluating performance
evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE) on test data: {rmse}")