In [18]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, isnull, when, count, col
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression

pyspark.__version__

'3.1.1'

In [3]:
spark = SparkSession.builder.appName("AutoMPG").getOrCreate()

In [4]:
spark

In [7]:
df = spark.read.format("csv").option('header', 'true').load('../data/auto-mpg.csv')
print(df.show(5))
print(df.count())
print(df.columns)
print(df.dtypes)

+---+---------+------------+----------+------+------------+----------+
|mpg|cylinders|displacement|horsepower|weight|acceleration|model-year|
+---+---------+------------+----------+------+------------+----------+
| 18|        8|         307|       130|  3504|          12|        70|
| 15|        8|         350|       165|  3693|        11.5|        70|
| 18|        8|         318|       150|  3436|          11|        70|
| 16|        8|         304|       150|  3433|          12|        70|
| 17|        8|         302|       140|  3449|        10.5|        70|
+---+---------+------------+----------+------+------------+----------+
only showing top 5 rows

None
398
['mpg', 'cylinders', 'displacement', 'horsepower', 'weight', 'acceleration', 'model-year']
[('mpg', 'string'), ('cylinders', 'string'), ('displacement', 'string'), ('horsepower', 'string'), ('weight', 'string'), ('acceleration', 'string'), ('model-year', 'string')]


In [11]:
dataset = df.select(col('mpg').cast('float'), 
                    col('cylinders').cast('float'), 
                    col('displacement').cast('float'), 
                    col('horsepower').cast('float'), 
                    col('weight').cast('float'), 
                    col('acceleration').cast('float'), 
                    col('model-year').cast('float'))
dataset.show(5)

+----+---------+------------+----------+------+------------+----------+
| mpg|cylinders|displacement|horsepower|weight|acceleration|model-year|
+----+---------+------------+----------+------+------------+----------+
|18.0|      8.0|       307.0|     130.0|3504.0|        12.0|      70.0|
|15.0|      8.0|       350.0|     165.0|3693.0|        11.5|      70.0|
|18.0|      8.0|       318.0|     150.0|3436.0|        11.0|      70.0|
|16.0|      8.0|       304.0|     150.0|3433.0|        12.0|      70.0|
|17.0|      8.0|       302.0|     140.0|3449.0|        10.5|      70.0|
+----+---------+------------+----------+------+------------+----------+
only showing top 5 rows



In [13]:
dataset.select([count(when(isnull(c), c)).alias(c) for c in dataset.columns]).show()

+---+---------+------------+----------+------+------------+----------+
|mpg|cylinders|displacement|horsepower|weight|acceleration|model-year|
+---+---------+------------+----------+------+------------+----------+
|  0|        0|           0|         0|     0|           0|         0|
+---+---------+------------+----------+------+------------+----------+



In [15]:
required_features = ['cylinders', 'displacement', 'horsepower', 'weight', 'acceleration', 'model-year']
assembler = VectorAssembler(inputCols=required_features, outputCol = 'features')
transformed_data = assembler.transform(dataset)
transformed_data.show(5)

+----+---------+------------+----------+------+------------+----------+--------------------+
| mpg|cylinders|displacement|horsepower|weight|acceleration|model-year|            features|
+----+---------+------------+----------+------+------------+----------+--------------------+
|18.0|      8.0|       307.0|     130.0|3504.0|        12.0|      70.0|[8.0,307.0,130.0,...|
|15.0|      8.0|       350.0|     165.0|3693.0|        11.5|      70.0|[8.0,350.0,165.0,...|
|18.0|      8.0|       318.0|     150.0|3436.0|        11.0|      70.0|[8.0,318.0,150.0,...|
|16.0|      8.0|       304.0|     150.0|3433.0|        12.0|      70.0|[8.0,304.0,150.0,...|
|17.0|      8.0|       302.0|     140.0|3449.0|        10.5|      70.0|[8.0,302.0,140.0,...|
+----+---------+------------+----------+------+------------+----------+--------------------+
only showing top 5 rows



In [24]:
lm = LinearRegression(labelCol = 'mpg', featuresCol = 'features')
lm = lm.fit(transformed_data)
print(f"Coefs = {lm.coefficients}")
print(f"Intercept = {lm.intercept}")
print(f"r2 = {lm.summary.r2}")

Coefs = [-0.2582414241943512,0.007248834118746308,0.00014607550879027937,-0.006928776824311312,0.08101327338155707,0.7554346102698345]
Intercept = -14.594543226140484
r2 = 0.8086877083987737
