# CSCI461 - Fall 2023 - Lab #10 (DataFrames, SQL functionality in Spark, MLlib (Machine Learning library in Spark))

## Installing Pyspark

In [None]:
!pip install pyspark



In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True) # Property used to format output tables better
spark

## Data Frame Operations

In [None]:
auto_df = spark.read.format("csv").option("header", True).load("/content/auto-mpg.csv")

In [None]:
auto_df

mpg,cylinders,displacement,horsepower,weight,acceleration,modelyear,origin,carname
18.0,8,307.0,130.0,3504.0,12.0,70,1,chevrolet chevell...
15.0,8,350.0,165.0,3693.0,11.5,70,1,buick skylark 320
18.0,8,318.0,150.0,3436.0,11.0,70,1,plymouth satellite
16.0,8,304.0,150.0,3433.0,12.0,70,1,amc rebel sst
17.0,8,302.0,140.0,3449.0,10.5,70,1,ford torino
15.0,8,429.0,198.0,4341.0,10.0,70,1,ford galaxie 500
14.0,8,454.0,220.0,4354.0,9.0,70,1,chevrolet impala
14.0,8,440.0,215.0,4312.0,8.5,70,1,plymouth fury iii
14.0,8,455.0,225.0,4425.0,10.0,70,1,pontiac catalina
15.0,8,390.0,190.0,3850.0,8.5,70,1,amc ambassador dpl


### Inserting a new field to a Data Frame

In [None]:
from pyspark.sql.functions import upper, lower

auto_df = auto_df.withColumn("upper", upper(auto_df.carname)).withColumn(
    "lower", lower(auto_df.carname)
)

In [None]:
auto_df

mpg,cylinders,displacement,horsepower,weight,acceleration,modelyear,origin,carname,upper,lower
18.0,8,307.0,130.0,3504.0,12.0,70,1,chevrolet chevell...,CHEVROLET CHEVELL...,chevrolet chevell...
15.0,8,350.0,165.0,3693.0,11.5,70,1,buick skylark 320,BUICK SKYLARK 320,buick skylark 320
18.0,8,318.0,150.0,3436.0,11.0,70,1,plymouth satellite,PLYMOUTH SATELLITE,plymouth satellite
16.0,8,304.0,150.0,3433.0,12.0,70,1,amc rebel sst,AMC REBEL SST,amc rebel sst
17.0,8,302.0,140.0,3449.0,10.5,70,1,ford torino,FORD TORINO,ford torino
15.0,8,429.0,198.0,4341.0,10.0,70,1,ford galaxie 500,FORD GALAXIE 500,ford galaxie 500
14.0,8,454.0,220.0,4354.0,9.0,70,1,chevrolet impala,CHEVROLET IMPALA,chevrolet impala
14.0,8,440.0,215.0,4312.0,8.5,70,1,plymouth fury iii,PLYMOUTH FURY III,plymouth fury iii
14.0,8,455.0,225.0,4425.0,10.0,70,1,pontiac catalina,PONTIAC CATALINA,pontiac catalina
15.0,8,390.0,190.0,3850.0,8.5,70,1,amc ambassador dpl,AMC AMBASSADOR DPL,amc ambassador dpl


### Alter a Data Frame field

In [None]:
from pyspark.sql.functions import col, concat, lit

auto_df = auto_df.withColumn("modelyear", concat(lit("19"), col("modelyear")))

In [None]:
auto_df

mpg,cylinders,displacement,horsepower,weight,acceleration,modelyear,origin,carname,upper,lower
18.0,8,307.0,130.0,3504.0,12.0,1970,1,chevrolet chevell...,CHEVROLET CHEVELL...,chevrolet chevell...
15.0,8,350.0,165.0,3693.0,11.5,1970,1,buick skylark 320,BUICK SKYLARK 320,buick skylark 320
18.0,8,318.0,150.0,3436.0,11.0,1970,1,plymouth satellite,PLYMOUTH SATELLITE,plymouth satellite
16.0,8,304.0,150.0,3433.0,12.0,1970,1,amc rebel sst,AMC REBEL SST,amc rebel sst
17.0,8,302.0,140.0,3449.0,10.5,1970,1,ford torino,FORD TORINO,ford torino
15.0,8,429.0,198.0,4341.0,10.0,1970,1,ford galaxie 500,FORD GALAXIE 500,ford galaxie 500
14.0,8,454.0,220.0,4354.0,9.0,1970,1,chevrolet impala,CHEVROLET IMPALA,chevrolet impala
14.0,8,440.0,215.0,4312.0,8.5,1970,1,plymouth fury iii,PLYMOUTH FURY III,plymouth fury iii
14.0,8,455.0,225.0,4425.0,10.0,1970,1,pontiac catalina,PONTIAC CATALINA,pontiac catalina
15.0,8,390.0,190.0,3850.0,8.5,1970,1,amc ambassador dpl,AMC AMBASSADOR DPL,amc ambassador dpl


### Inserting a field with multiple conditions

In [None]:
from pyspark.sql.functions import col, when

auto_df = auto_df.withColumn(
    "mpg_class",
    when(col("mpg") <= 20, "low")
    .when(col("mpg") <= 30, "mid")
    .when(col("mpg") <= 40, "high")
    .otherwise("very high"),
)

In [None]:
auto_df

mpg,cylinders,displacement,horsepower,weight,acceleration,modelyear,origin,carname,upper,lower,mpg_class
18.0,8,307.0,130.0,3504.0,12.0,1970,1,chevrolet chevell...,CHEVROLET CHEVELL...,chevrolet chevell...,low
15.0,8,350.0,165.0,3693.0,11.5,1970,1,buick skylark 320,BUICK SKYLARK 320,buick skylark 320,low
18.0,8,318.0,150.0,3436.0,11.0,1970,1,plymouth satellite,PLYMOUTH SATELLITE,plymouth satellite,low
16.0,8,304.0,150.0,3433.0,12.0,1970,1,amc rebel sst,AMC REBEL SST,amc rebel sst,low
17.0,8,302.0,140.0,3449.0,10.5,1970,1,ford torino,FORD TORINO,ford torino,low
15.0,8,429.0,198.0,4341.0,10.0,1970,1,ford galaxie 500,FORD GALAXIE 500,ford galaxie 500,low
14.0,8,454.0,220.0,4354.0,9.0,1970,1,chevrolet impala,CHEVROLET IMPALA,chevrolet impala,low
14.0,8,440.0,215.0,4312.0,8.5,1970,1,plymouth fury iii,PLYMOUTH FURY III,plymouth fury iii,low
14.0,8,455.0,225.0,4425.0,10.0,1970,1,pontiac catalina,PONTIAC CATALINA,pontiac catalina,low
15.0,8,390.0,190.0,3850.0,8.5,1970,1,amc ambassador dpl,AMC AMBASSADOR DPL,amc ambassador dpl,low


### Removing a field

In [None]:
auto_df = auto_df.drop("horsepower")

In [None]:
auto_df

mpg,cylinders,displacement,weight,acceleration,modelyear,origin,carname,upper,lower,mpg_class
18.0,8,307.0,3504.0,12.0,1970,1,chevrolet chevell...,CHEVROLET CHEVELL...,chevrolet chevell...,low
15.0,8,350.0,3693.0,11.5,1970,1,buick skylark 320,BUICK SKYLARK 320,buick skylark 320,low
18.0,8,318.0,3436.0,11.0,1970,1,plymouth satellite,PLYMOUTH SATELLITE,plymouth satellite,low
16.0,8,304.0,3433.0,12.0,1970,1,amc rebel sst,AMC REBEL SST,amc rebel sst,low
17.0,8,302.0,3449.0,10.5,1970,1,ford torino,FORD TORINO,ford torino,low
15.0,8,429.0,4341.0,10.0,1970,1,ford galaxie 500,FORD GALAXIE 500,ford galaxie 500,low
14.0,8,454.0,4354.0,9.0,1970,1,chevrolet impala,CHEVROLET IMPALA,chevrolet impala,low
14.0,8,440.0,4312.0,8.5,1970,1,plymouth fury iii,PLYMOUTH FURY III,plymouth fury iii,low
14.0,8,455.0,4425.0,10.0,1970,1,pontiac catalina,PONTIAC CATALINA,pontiac catalina,low
15.0,8,390.0,3850.0,8.5,1970,1,amc ambassador dpl,AMC AMBASSADOR DPL,amc ambassador dpl,low


### Choosing particular fields from a Data Frame

In [None]:
df = auto_df.select(["mpg", "cylinders", "displacement"])

In [None]:
df

mpg,cylinders,displacement
18.0,8,307.0
15.0,8,350.0
18.0,8,318.0
16.0,8,304.0
17.0,8,302.0
15.0,8,429.0
14.0,8,454.0
14.0,8,440.0
14.0,8,455.0
15.0,8,390.0


### Determine the dimensions of a Data Frame

In [None]:
auto_df.count()

398

### Get the fields data types

In [None]:
auto_df.dtypes

[('mpg', 'string'),
 ('cylinders', 'string'),
 ('displacement', 'string'),
 ('weight', 'string'),
 ('acceleration', 'string'),
 ('modelyear', 'string'),
 ('origin', 'string'),
 ('carname', 'string'),
 ('upper', 'string'),
 ('lower', 'string'),
 ('mpg_class', 'string')]

### Implementing a Map-type transformation on a Data Frame

In [None]:
def map_function(row):
    if row.acceleration is not None:
        return [float(row.acceleration) * 10]
    else:
        return [None]

df = auto_df.rdd.map(map_function).toDF()

In [None]:
df

_1
120.0
115.0
110.0
120.0
105.0
100.0
90.0
85.0
100.0
85.0


### Utilizing the Custom Function on a Data Frame

In [None]:
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType

first_word_udf = udf(lambda x: x.split()[0], StringType())
df = auto_df.withColumn("manufacturer", first_word_udf(col("carname")))
df

mpg,cylinders,displacement,weight,acceleration,modelyear,origin,carname,upper,lower,mpg_class,manufacturer
18.0,8,307.0,3504.0,12.0,1970,1,chevrolet chevell...,CHEVROLET CHEVELL...,chevrolet chevell...,low,chevrolet
15.0,8,350.0,3693.0,11.5,1970,1,buick skylark 320,BUICK SKYLARK 320,buick skylark 320,low,buick
18.0,8,318.0,3436.0,11.0,1970,1,plymouth satellite,PLYMOUTH SATELLITE,plymouth satellite,low,plymouth
16.0,8,304.0,3433.0,12.0,1970,1,amc rebel sst,AMC REBEL SST,amc rebel sst,low,amc
17.0,8,302.0,3449.0,10.5,1970,1,ford torino,FORD TORINO,ford torino,low,ford
15.0,8,429.0,4341.0,10.0,1970,1,ford galaxie 500,FORD GALAXIE 500,ford galaxie 500,low,ford
14.0,8,454.0,4354.0,9.0,1970,1,chevrolet impala,CHEVROLET IMPALA,chevrolet impala,low,chevrolet
14.0,8,440.0,4312.0,8.5,1970,1,plymouth fury iii,PLYMOUTH FURY III,plymouth fury iii,low,plymouth
14.0,8,455.0,4425.0,10.0,1970,1,pontiac catalina,PONTIAC CATALINA,pontiac catalina,low,pontiac
15.0,8,390.0,3850.0,8.5,1970,1,amc ambassador dpl,AMC AMBASSADOR DPL,amc ambassador dpl,low,amc


In [None]:
from pyspark.sql.functions import col

df = auto_df.orderBy("carname")
df

mpg,cylinders,displacement,weight,acceleration,modelyear,origin,carname,upper,lower,mpg_class
13.0,8,360.0,3821.0,11.0,1973,1,amc ambassador br...,AMC AMBASSADOR BR...,amc ambassador br...,low
15.0,8,390.0,3850.0,8.5,1970,1,amc ambassador dpl,AMC AMBASSADOR DPL,amc ambassador dpl,low
17.0,8,304.0,3672.0,11.5,1972,1,amc ambassador sst,AMC AMBASSADOR SST,amc ambassador sst,low
19.4,6,232.0,3210.0,17.2,1978,1,amc concord,AMC CONCORD,amc concord,low
24.3,4,151.0,3003.0,20.1,1980,1,amc concord,AMC CONCORD,amc concord,mid
18.1,6,258.0,3410.0,15.1,1978,1,amc concord d/l,AMC CONCORD D/L,amc concord d/l,low
23.0,4,151.0,3035.0,20.5,1982,1,amc concord dl,AMC CONCORD DL,amc concord dl,mid
20.2,6,232.0,3265.0,18.2,1979,1,amc concord dl 6,AMC CONCORD DL 6,amc concord dl 6,low
21.0,6,199.0,2648.0,15.0,1970,1,amc gremlin,AMC GREMLIN,amc gremlin,mid
19.0,6,232.0,2634.0,13.0,1971,1,amc gremlin,AMC GREMLIN,amc gremlin,low


In [None]:
from pyspark.sql.functions import col
df = auto_df.orderBy(col("carname").desc())
df

mpg,cylinders,displacement,weight,acceleration,modelyear,origin,carname,upper,lower,mpg_class
31.9,4,89.0,1925.0,14.0,1979,2,vw rabbit custom,VW RABBIT CUSTOM,vw rabbit custom,high
44.3,4,90.0,2085.0,21.7,1980,2,vw rabbit c (diesel),VW RABBIT C (DIESEL),vw rabbit c (diesel),very high
29.0,4,90.0,1937.0,14.2,1976,2,vw rabbit,VW RABBIT,vw rabbit,mid
41.5,4,98.0,2144.0,14.7,1980,2,vw rabbit,VW RABBIT,vw rabbit,very high
44.0,4,97.0,2130.0,24.6,1982,2,vw pickup,VW PICKUP,vw pickup,very high
43.4,4,90.0,2335.0,23.7,1980,2,vw dasher (diesel),VW DASHER (DIESEL),vw dasher (diesel),very high
30.7,6,145.0,3160.0,19.6,1981,2,volvo diesel,VOLVO DIESEL,volvo diesel,mid
17.0,6,163.0,3140.0,13.6,1978,2,volvo 264gl,VOLVO 264GL,volvo 264gl,low
20.0,4,130.0,3150.0,15.7,1976,2,volvo 245,VOLVO 245,volvo 245,low
22.0,4,121.0,2945.0,14.5,1975,2,volvo 244dl,VOLVO 244DL,volvo 244dl,mid


In [None]:
df = auto_df.select("cylinders").distinct()
df

cylinders
3
8
5
6
4


In [None]:
df = auto_df.dropDuplicates(["carname"])
df

mpg,cylinders,displacement,weight,acceleration,modelyear,origin,carname,upper,lower,mpg_class
13.0,8,360.0,3821.0,11.0,1973,1,amc ambassador br...,AMC AMBASSADOR BR...,amc ambassador br...,low
15.0,8,390.0,3850.0,8.5,1970,1,amc ambassador dpl,AMC AMBASSADOR DPL,amc ambassador dpl,low
17.0,8,304.0,3672.0,11.5,1972,1,amc ambassador sst,AMC AMBASSADOR SST,amc ambassador sst,low
19.4,6,232.0,3210.0,17.2,1978,1,amc concord,AMC CONCORD,amc concord,low
18.1,6,258.0,3410.0,15.1,1978,1,amc concord d/l,AMC CONCORD D/L,amc concord d/l,low
23.0,4,151.0,3035.0,20.5,1982,1,amc concord dl,AMC CONCORD DL,amc concord dl,mid
20.2,6,232.0,3265.0,18.2,1979,1,amc concord dl 6,AMC CONCORD DL 6,amc concord dl 6,low
21.0,6,199.0,2648.0,15.0,1970,1,amc gremlin,AMC GREMLIN,amc gremlin,mid
18.0,6,199.0,2774.0,15.5,1970,1,amc hornet,AMC HORNET,amc hornet,low
18.0,6,258.0,2962.0,13.5,1971,1,amc hornet sporta...,AMC HORNET SPORTA...,amc hornet sporta...,low


In [None]:
df.count()

305

In [None]:
from pyspark.sql.functions import desc

# No sorting.
df = auto_df.groupBy("cylinders").count()
df

cylinders,count
3,4
8,103
5,3
6,84
4,204


In [None]:
from pyspark.sql.functions import desc

# With sorting.
df = auto_df.groupBy("cylinders").count().orderBy(desc("count"))
df

cylinders,count
4,204
8,103
6,84
3,4
5,3


In [None]:
from pyspark.sql.functions import col

df = auto_df.groupBy("cylinders").count().where(col("count") > 100)
df

cylinders,count
8,103
4,204


## SQL functionality in Spark

In [None]:
sql_df = spark.read.option("header",True).csv("/content/auto-mpg.csv").createOrReplaceTempView("mpgTable")

In [None]:
df = spark.sql("SELECT carname, weight FROM mpgTable")

In [None]:
df

carname,weight
chevrolet chevell...,3504.0
buick skylark 320,3693.0
plymouth satellite,3436.0
amc rebel sst,3433.0
ford torino,3449.0
ford galaxie 500,4341.0
chevrolet impala,4354.0
plymouth fury iii,4312.0
pontiac catalina,4425.0
amc ambassador dpl,3850.0


In [None]:
df_0 = spark.sql("SELECT * FROM mpgTable WHERE cylinders > 4")
df_0

mpg,cylinders,displacement,horsepower,weight,acceleration,modelyear,origin,carname
18.0,8,307.0,130.0,3504.0,12.0,70,1,chevrolet chevell...
15.0,8,350.0,165.0,3693.0,11.5,70,1,buick skylark 320
18.0,8,318.0,150.0,3436.0,11.0,70,1,plymouth satellite
16.0,8,304.0,150.0,3433.0,12.0,70,1,amc rebel sst
17.0,8,302.0,140.0,3449.0,10.5,70,1,ford torino
15.0,8,429.0,198.0,4341.0,10.0,70,1,ford galaxie 500
14.0,8,454.0,220.0,4354.0,9.0,70,1,chevrolet impala
14.0,8,440.0,215.0,4312.0,8.5,70,1,plymouth fury iii
14.0,8,455.0,225.0,4425.0,10.0,70,1,pontiac catalina
15.0,8,390.0,190.0,3850.0,8.5,70,1,amc ambassador dpl


## MLlib (Machine Learning library in Spark)

In [None]:
auto_df

mpg,cylinders,displacement,weight,acceleration,modelyear,origin,carname,upper,lower,mpg_class
18.0,8,307.0,3504.0,12.0,1970,1,chevrolet chevell...,CHEVROLET CHEVELL...,chevrolet chevell...,low
15.0,8,350.0,3693.0,11.5,1970,1,buick skylark 320,BUICK SKYLARK 320,buick skylark 320,low
18.0,8,318.0,3436.0,11.0,1970,1,plymouth satellite,PLYMOUTH SATELLITE,plymouth satellite,low
16.0,8,304.0,3433.0,12.0,1970,1,amc rebel sst,AMC REBEL SST,amc rebel sst,low
17.0,8,302.0,3449.0,10.5,1970,1,ford torino,FORD TORINO,ford torino,low
15.0,8,429.0,4341.0,10.0,1970,1,ford galaxie 500,FORD GALAXIE 500,ford galaxie 500,low
14.0,8,454.0,4354.0,9.0,1970,1,chevrolet impala,CHEVROLET IMPALA,chevrolet impala,low
14.0,8,440.0,4312.0,8.5,1970,1,plymouth fury iii,PLYMOUTH FURY III,plymouth fury iii,low
14.0,8,455.0,4425.0,10.0,1970,1,pontiac catalina,PONTIAC CATALINA,pontiac catalina,low
15.0,8,390.0,3850.0,8.5,1970,1,amc ambassador dpl,AMC AMBASSADOR DPL,amc ambassador dpl,low


In [None]:
auto_df.cache()

mpg,cylinders,displacement,weight,acceleration,modelyear,origin,carname,upper,lower,mpg_class
18.0,8,307.0,3504.0,12.0,1970,1,chevrolet chevell...,CHEVROLET CHEVELL...,chevrolet chevell...,low
15.0,8,350.0,3693.0,11.5,1970,1,buick skylark 320,BUICK SKYLARK 320,buick skylark 320,low
18.0,8,318.0,3436.0,11.0,1970,1,plymouth satellite,PLYMOUTH SATELLITE,plymouth satellite,low
16.0,8,304.0,3433.0,12.0,1970,1,amc rebel sst,AMC REBEL SST,amc rebel sst,low
17.0,8,302.0,3449.0,10.5,1970,1,ford torino,FORD TORINO,ford torino,low
15.0,8,429.0,4341.0,10.0,1970,1,ford galaxie 500,FORD GALAXIE 500,ford galaxie 500,low
14.0,8,454.0,4354.0,9.0,1970,1,chevrolet impala,CHEVROLET IMPALA,chevrolet impala,low
14.0,8,440.0,4312.0,8.5,1970,1,plymouth fury iii,PLYMOUTH FURY III,plymouth fury iii,low
14.0,8,455.0,4425.0,10.0,1970,1,pontiac catalina,PONTIAC CATALINA,pontiac catalina,low
15.0,8,390.0,3850.0,8.5,1970,1,amc ambassador dpl,AMC AMBASSADOR DPL,amc ambassador dpl,low


In [None]:
auto_df = auto_df.withColumn("cylinders", col("cylinders").cast("int"))
auto_df = auto_df.withColumn("displacement", col("displacement").cast("double"))
auto_df = auto_df.withColumn("weight", col("weight").cast("double"))
auto_df = auto_df.withColumn("acceleration", col("acceleration").cast("double"))
auto_df = auto_df.withColumn("modelyear", col("modelyear").cast("int"))
auto_df = auto_df.withColumn("origin", col("origin").cast("int"))
auto_df = auto_df.withColumn("mpg", col("mpg").cast("int"))

In [None]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator

In [None]:
# Create a vector assembler to combine features into a single vector
feature_columns = ["cylinders", "displacement", "weight", "acceleration", "modelyear", "origin"]
vector_assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")

In [None]:
# Define the linear regression model
lr = LinearRegression(featuresCol="features", labelCol="mpg")

In [None]:
# Create a pipeline with the vector assembler and linear regression model
pipeline = Pipeline(stages=[vector_assembler, lr])

In [None]:
# Split the data into training and testing sets
train_data, test_data = auto_df.randomSplit([0.8, 0.2], seed=123)

In [None]:
# Train the model
model = pipeline.fit(train_data)

In [None]:
# Make predictions on the test set
predictions = model.transform(test_data)

In [None]:
# Evaluate the model
evaluator = RegressionEvaluator(labelCol="mpg", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE): {rmse}")

Root Mean Squared Error (RMSE): 3.689063239788798


In [None]:
# Display the coefficients and intercept of the model
coefficients = model.stages[-1].coefficients
intercept = model.stages[-1].intercept
print(f"Coefficients: {coefficients}")
print(f"Intercept: {intercept}")

Coefficients: [-0.2113766306979264,0.018979940192397262,-0.007299472545952028,0.16686018705675226,0.7464019017720807,1.677023007539527]
Intercept: -1437.6573615456782


# END OF Lab #10

In [None]:
kmeans = KMeans(featuresCol="scaledFeatures", k=3)  # Adjust k as needed

# Pipeline
pipeline = Pipeline(stages=indexers + encoders + [assembler, scaler, kmeans])

# Cache the DataFrame
auto_df.cache()

# Train the model
model = pipeline.fit(auto_df)

# Save the model
model_path = "/path/to/save/model"
model.write().overwrite().save(model_path)