In [None]:
import pyspark

In [28]:
import pandas as pd
df_train = pd.DataFrame(columns=['rice',"pork","eggs","oil","price"])
df_train['rice'] = [90,80,50,80,90]
df_train['pork'] = [100,120,110,90,100]
df_train['eggs'] = [70,80,90,67,65]
df_train['oil'] = [100,100,100,120,90]
df_train['price'] = [60,70,65,75,55]

In [29]:
df_train

Unnamed: 0,rice,pork,eggs,oil,price
0,90,100,70,100,60
1,80,120,80,100,70
2,50,110,90,100,65
3,80,90,67,120,75
4,90,100,65,90,55


In [30]:
import pandas as pd
df_test = pd.DataFrame(columns=['rice',"pork","eggs","oil","price"])
df_test['rice'] = [100,110,120]
df_test['pork'] = [100,120,130]
df_test['eggs'] = [100,99,150]
df_test['oil'] = [150,220,100]
df_test['price'] = [70,90,100]

In [31]:
df_test.to_csv("test.csv")

In [32]:
df_train.to_csv("train.csv")

In [35]:
df_final = pd.concat([df_train,df_test],axis=0)

In [37]:
df_final.to_csv("data.csv")

In [38]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("ETL_T").getOrCreate()

In [39]:
spark

In [40]:
# train = spark.read.csv("train.csv",header=True , inferSchema=True)
# test = spark.read.csv("test.csv",header=True , inferSchema=True)
data = spark.read.csv("data.csv",header=True , inferSchema=True)

In [41]:
data.show()

+---+----+----+----+---+-----+
|_c0|rice|pork|eggs|oil|price|
+---+----+----+----+---+-----+
|  0|  90| 100|  70|100|   60|
|  1|  80| 120|  80|100|   70|
|  2|  50| 110|  90|100|   65|
|  3|  80|  90|  67|120|   75|
|  4|  90| 100|  65| 90|   55|
|  0| 100| 100| 100|150|   70|
|  1| 110| 120|  99|220|   90|
|  2| 120| 130| 150|100|  100|
+---+----+----+----+---+-----+



In [42]:
data.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- rice: integer (nullable = true)
 |-- pork: integer (nullable = true)
 |-- eggs: integer (nullable = true)
 |-- oil: integer (nullable = true)
 |-- price: integer (nullable = true)



In [43]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

In [44]:
assembler = VectorAssembler(
    inputCols=["rice","pork","eggs","oil"],
    outputCol="features"
)

In [45]:
output = assembler.transform(data)
final_data = output.select("features","price")

In [46]:
train , test = final_data.randomSplit([0.7,0.3])


In [47]:
lr = LinearRegression(featuresCol="features", labelCol="price")
lr_model = lr.fit(train)

In [48]:
print("Coef : " + str(lr_model.coefficients))
print("Intercept : " + str(lr_model.intercept))

Coef : [0.1616142919296813,0.170648153024401,0.2546522715170378,0.06763102557386212]
Intercept : 9.934345054228276


In [49]:
test_pred = lr_model.transform(test)
test_pred.show()

+--------------------+-----+-----------------+
|            features|price|       prediction|
+--------------------+-----+-----------------+
|[90.0,100.0,65.0,...|   55|64.18363658059474|
+--------------------+-----+-----------------+



In [50]:
evalutor = RegressionEvaluator(predictionCol="prediction",labelCol="price",metricName="rmse")
rmse = evalutor.evaluate(test_pred)
print("RMSE : ", rmse)

RMSE :  9.183636580594737


In [51]:
evalutor = RegressionEvaluator(predictionCol="prediction",labelCol="price",metricName="r2")
r2 = evalutor.evaluate(test_pred)
print("RMSE : ", r2)

RMSE :  -inf


In [52]:
spark.stop()