In [1]:
!pip install pyspark
!pip install -U -q PyDrive
!apt install openjdk-8-jdk-headless -qq
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m2.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=46331e91cac2c62747a8f6c89d0fbe93d6086190bd5ac7a58a7c68dc76d08dfd
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1
The following additional packages will be installed:
  libxtst6 openjdk-8-jre-headless
Suggested packages:
  openjdk-8-demo openjdk-8-source libnss-mdns fonts-dejavu-extra fonts-nanum fonts-ipafont-gothic
  fonts-ipafont-mincho fonts-wqy-microhei fonts-wqy-zenhei fonts-indic

In [2]:
import pyspark
import pyspark.sql  as pyspark_sql
import pyspark.sql.types as pyspark_types
import pyspark.sql.functions  as F
from pyspark import SparkContext, SparkConf
from pyspark.sql.functions import row_number, desc

# create the session
conf = SparkConf().set("spark.ui.port", "4050")

# create the context
sc = pyspark.SparkContext(conf=conf)
spark = pyspark_sql.SparkSession.builder.getOrCreate()

# Classification

In [9]:
from sklearn.datasets import load_iris
import pandas as pd
import numpy as np


data = load_iris()
cols = [i.replace('(cm)','').strip().replace(' ','_') for i in data.feature_names] + ['label'] # Column name cleanup
pdf = pd.DataFrame(np.c_[data.data, data.target], columns=cols)
df = spark.createDataFrame(pdf, ["feature1", "feature2", "feature3", "feature4", "label"])
df.show()

+--------+--------+--------+--------+-----+
|feature1|feature2|feature3|feature4|label|
+--------+--------+--------+--------+-----+
|     5.1|     3.5|     1.4|     0.2|  0.0|
|     4.9|     3.0|     1.4|     0.2|  0.0|
|     4.7|     3.2|     1.3|     0.2|  0.0|
|     4.6|     3.1|     1.5|     0.2|  0.0|
|     5.0|     3.6|     1.4|     0.2|  0.0|
|     5.4|     3.9|     1.7|     0.4|  0.0|
|     4.6|     3.4|     1.4|     0.3|  0.0|
|     5.0|     3.4|     1.5|     0.2|  0.0|
|     4.4|     2.9|     1.4|     0.2|  0.0|
|     4.9|     3.1|     1.5|     0.1|  0.0|
|     5.4|     3.7|     1.5|     0.2|  0.0|
|     4.8|     3.4|     1.6|     0.2|  0.0|
|     4.8|     3.0|     1.4|     0.1|  0.0|
|     4.3|     3.0|     1.1|     0.1|  0.0|
|     5.8|     4.0|     1.2|     0.2|  0.0|
|     5.7|     4.4|     1.5|     0.4|  0.0|
|     5.4|     3.9|     1.3|     0.4|  0.0|
|     5.1|     3.5|     1.4|     0.3|  0.0|
|     5.7|     3.8|     1.7|     0.3|  0.0|
|     5.1|     3.8|     1.5|    

In [10]:
# Convert label (string) to integer for classification
from pyspark.ml.feature import StringIndexer
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(df)
df = labelIndexer.transform(df)
df.show()

+--------+--------+--------+--------+-----+------------+
|feature1|feature2|feature3|feature4|label|indexedLabel|
+--------+--------+--------+--------+-----+------------+
|     5.1|     3.5|     1.4|     0.2|  0.0|         0.0|
|     4.9|     3.0|     1.4|     0.2|  0.0|         0.0|
|     4.7|     3.2|     1.3|     0.2|  0.0|         0.0|
|     4.6|     3.1|     1.5|     0.2|  0.0|         0.0|
|     5.0|     3.6|     1.4|     0.2|  0.0|         0.0|
|     5.4|     3.9|     1.7|     0.4|  0.0|         0.0|
|     4.6|     3.4|     1.4|     0.3|  0.0|         0.0|
|     5.0|     3.4|     1.5|     0.2|  0.0|         0.0|
|     4.4|     2.9|     1.4|     0.2|  0.0|         0.0|
|     4.9|     3.1|     1.5|     0.1|  0.0|         0.0|
|     5.4|     3.7|     1.5|     0.2|  0.0|         0.0|
|     4.8|     3.4|     1.6|     0.2|  0.0|         0.0|
|     4.8|     3.0|     1.4|     0.1|  0.0|         0.0|
|     4.3|     3.0|     1.1|     0.1|  0.0|         0.0|
|     5.8|     4.0|     1.2|   

In [11]:
# Split the data into training and test sets
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=["feature1", "feature2", "feature3", "feature4"], outputCol="features")

df = assembler.transform(df)
(trainingData, testData) = df.randomSplit([0.7, 0.3])

df.show()

+--------+--------+--------+--------+-----+------------+-----------------+
|feature1|feature2|feature3|feature4|label|indexedLabel|         features|
+--------+--------+--------+--------+-----+------------+-----------------+
|     5.1|     3.5|     1.4|     0.2|  0.0|         0.0|[5.1,3.5,1.4,0.2]|
|     4.9|     3.0|     1.4|     0.2|  0.0|         0.0|[4.9,3.0,1.4,0.2]|
|     4.7|     3.2|     1.3|     0.2|  0.0|         0.0|[4.7,3.2,1.3,0.2]|
|     4.6|     3.1|     1.5|     0.2|  0.0|         0.0|[4.6,3.1,1.5,0.2]|
|     5.0|     3.6|     1.4|     0.2|  0.0|         0.0|[5.0,3.6,1.4,0.2]|
|     5.4|     3.9|     1.7|     0.4|  0.0|         0.0|[5.4,3.9,1.7,0.4]|
|     4.6|     3.4|     1.4|     0.3|  0.0|         0.0|[4.6,3.4,1.4,0.3]|
|     5.0|     3.4|     1.5|     0.2|  0.0|         0.0|[5.0,3.4,1.5,0.2]|
|     4.4|     2.9|     1.4|     0.2|  0.0|         0.0|[4.4,2.9,1.4,0.2]|
|     4.9|     3.1|     1.5|     0.1|  0.0|         0.0|[4.9,3.1,1.5,0.1]|
|     5.4|     3.7|     1

In [12]:
# Create a DecisionTreeClassifier model
from pyspark.ml.classification import DecisionTreeClassifier

dt = DecisionTreeClassifier()

# Train the model on the training data
model = dt.fit(trainingData)

# Make predictions on the test data
predictions = model.transform(testData)

# Evaluate the model performance (optional)
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(metricName="accuracy")
accuracy = evaluator.evaluate(predictions)

print("Test Accuracy:", accuracy)

# Stop the SparkSession
spark.stop()

Test Accuracy: 0.9069767441860465


# Regression

In [26]:
from sklearn.datasets import fetch_california_housing
import pandas as pd
from pyspark.sql import SparkSession

# Fetch California housing data with feature names
housing = fetch_california_housing(as_frame=True)
data_df = housing.frame  # data_df is a pandas DataFrame

# SparkSession creation (replace with your Spark configuration)
spark = SparkSession.builder.appName("CaliforniaHousing").getOrCreate()

# Convert pandas DataFrame to Spark DataFrame
spark_df = spark.createDataFrame(data_df, ["x1", "x2", "x3", "x4", "x5", "x6", "x7", "x8", "y"])

# Now you can use spark_df for further processing in Spark
spark_df.show()

+------+----+------------------+------------------+------+------------------+-----+-------+-----+
|    x1|  x2|                x3|                x4|    x5|                x6|   x7|     x8|    y|
+------+----+------------------+------------------+------+------------------+-----+-------+-----+
|8.3252|41.0| 6.984126984126984|1.0238095238095237| 322.0|2.5555555555555554|37.88|-122.23|4.526|
|8.3014|21.0| 6.238137082601054|0.9718804920913884|2401.0| 2.109841827768014|37.86|-122.22|3.585|
|7.2574|52.0| 8.288135593220339| 1.073446327683616| 496.0|2.8022598870056497|37.85|-122.24|3.521|
|5.6431|52.0|5.8173515981735155|1.0730593607305936| 558.0| 2.547945205479452|37.85|-122.25|3.413|
|3.8462|52.0| 6.281853281853282|1.0810810810810811| 565.0|2.1814671814671813|37.85|-122.25|3.422|
|4.0368|52.0| 4.761658031088083|1.1036269430051813| 413.0| 2.139896373056995|37.85|-122.25|2.697|
|3.6591|52.0|4.9319066147859925|0.9513618677042801|1094.0|2.1284046692607004|37.84|-122.25|2.992|
|  3.12|52.0| 4.7975

In [28]:
# Assemble features into a single vector
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=["x1", "x2", "x3", "x4", "x5", "x6", "x7", "x8"], outputCol="features")
spark_df = assembler.transform(spark_df)

spark_df = spark_df.withColumnRenamed("y", "label")

spark_df.show()

+------+----+------------------+------------------+------+------------------+-----+-------+-----+--------------------+
|    x1|  x2|                x3|                x4|    x5|                x6|   x7|     x8|label|            features|
+------+----+------------------+------------------+------+------------------+-----+-------+-----+--------------------+
|8.3252|41.0| 6.984126984126984|1.0238095238095237| 322.0|2.5555555555555554|37.88|-122.23|4.526|[8.3252,41.0,6.98...|
|8.3014|21.0| 6.238137082601054|0.9718804920913884|2401.0| 2.109841827768014|37.86|-122.22|3.585|[8.3014,21.0,6.23...|
|7.2574|52.0| 8.288135593220339| 1.073446327683616| 496.0|2.8022598870056497|37.85|-122.24|3.521|[7.2574,52.0,8.28...|
|5.6431|52.0|5.8173515981735155|1.0730593607305936| 558.0| 2.547945205479452|37.85|-122.25|3.413|[5.6431,52.0,5.81...|
|3.8462|52.0| 6.281853281853282|1.0810810810810811| 565.0|2.1814671814671813|37.85|-122.25|3.422|[3.8462,52.0,6.28...|
|4.0368|52.0| 4.761658031088083|1.10362694300518

In [30]:
# Create a LinearRegression model
from pyspark.ml.regression import LinearRegression
lr = LinearRegression()

# Train the model on the data
model = lr.fit(spark_df)


In [31]:
# Make predictions on new data
predictions = model.transform(spark_df)
predictions.select("features", "prediction").show()

# Print the model coefficients and intercept
print("Coefficients:", model.coefficients)
print("Intercept:", model.intercept)

# Stop the SparkSession
spark.stop()

+--------------------+------------------+
|            features|        prediction|
+--------------------+------------------+
|[8.3252,41.0,6.98...| 4.131649827074092|
|[8.3014,21.0,6.23...|3.9766064386950646|
|[7.2574,52.0,8.28...| 3.676570941048361|
|[5.6431,52.0,5.81...|3.2415984958222523|
|[3.8462,52.0,6.28...| 2.413587434987811|
|[4.0368,52.0,4.76...|2.6752770171934017|
|[3.6591,52.0,4.93...|2.3953942948275326|
|[3.12,52.0,4.7975...|2.2466875187591526|
|[2.0804,42.0,4.29...|1.7916266674291492|
|[3.6912,52.0,4.97...| 2.428328072362106|
|[3.2031,52.0,5.47...|2.2207706646639096|
|[3.2705,52.0,4.77...|2.2888013257793673|
|[3.075,52.0,5.322...|2.1373228456041247|
|[2.6736,52.0,4.0,...|2.1673205731827423|
|[1.9167,52.0,4.26...|1.7442360667615304|
|[2.125,50.0,4.242...|1.8581610556018404|
|[2.775,52.0,5.939...|1.9683863539533917|
|[2.1202,52.0,4.05...|1.8340113729812018|
|[1.9911,50.0,5.34...|1.6945958066558475|
|[2.6033,52.0,5.46...|1.9712530540102122|
+--------------------+------------

# Clustering

In [75]:
from sklearn.datasets import load_wine
from pyspark.ml.clustering import KMeans
wine = load_wine()
data = wine.data  # Feature matrix
target = wine.target  # Target labels (wine types)

In [76]:
from pyspark.sql import SparkSession

# SparkSession creation (replace with your Spark configuration)
spark = SparkSession.builder.appName("WineKMeans").getOrCreate()

wine_cols = ['_1',
 '_2',
 '_3',
 '_4',
 '_5',
 '_6',
 '_7',
 '_8',
 '_9',
 '_10',
 '_11',
 '_12',
 '_13']

# Create columns from feature names
wine_data = spark.createDataFrame(data, wine_cols)

# Assemble features into a single vector
assembler = VectorAssembler(inputCols=wine_cols, outputCol="features")
df = assembler.transform(wine_data)
df.show()

+-----+----+----+----+-----+----+----+----+----+----+----+----+------+--------------------+
|   _1|  _2|  _3|  _4|   _5|  _6|  _7|  _8|  _9| _10| _11| _12|   _13|            features|
+-----+----+----+----+-----+----+----+----+----+----+----+----+------+--------------------+
|14.23|1.71|2.43|15.6|127.0| 2.8|3.06|0.28|2.29|5.64|1.04|3.92|1065.0|[14.23,1.71,2.43,...|
| 13.2|1.78|2.14|11.2|100.0|2.65|2.76|0.26|1.28|4.38|1.05| 3.4|1050.0|[13.2,1.78,2.14,1...|
|13.16|2.36|2.67|18.6|101.0| 2.8|3.24| 0.3|2.81|5.68|1.03|3.17|1185.0|[13.16,2.36,2.67,...|
|14.37|1.95| 2.5|16.8|113.0|3.85|3.49|0.24|2.18| 7.8|0.86|3.45|1480.0|[14.37,1.95,2.5,1...|
|13.24|2.59|2.87|21.0|118.0| 2.8|2.69|0.39|1.82|4.32|1.04|2.93| 735.0|[13.24,2.59,2.87,...|
| 14.2|1.76|2.45|15.2|112.0|3.27|3.39|0.34|1.97|6.75|1.05|2.85|1450.0|[14.2,1.76,2.45,1...|
|14.39|1.87|2.45|14.6| 96.0| 2.5|2.52| 0.3|1.98|5.25|1.02|3.58|1290.0|[14.39,1.87,2.45,...|
|14.06|2.15|2.61|17.6|121.0| 2.6|2.51|0.31|1.25|5.05|1.06|3.58|1295.0|[14.06,2.1

In [77]:
# Create a KMeans model with 2 clusters
kmeans = KMeans(k=3)

# Train the model on the data
model = kmeans.fit(df)

# Make predictions on the data
predictions = model.transform(df)

# Print the predictions
predictions.show()

# Stop the SparkSession
spark.stop()

+-----+----+----+----+-----+----+----+----+----+----+----+----+------+--------------------+----------+
|   _1|  _2|  _3|  _4|   _5|  _6|  _7|  _8|  _9| _10| _11| _12|   _13|            features|prediction|
+-----+----+----+----+-----+----+----+----+----+----+----+----+------+--------------------+----------+
|14.23|1.71|2.43|15.6|127.0| 2.8|3.06|0.28|2.29|5.64|1.04|3.92|1065.0|[14.23,1.71,2.43,...|         0|
| 13.2|1.78|2.14|11.2|100.0|2.65|2.76|0.26|1.28|4.38|1.05| 3.4|1050.0|[13.2,1.78,2.14,1...|         0|
|13.16|2.36|2.67|18.6|101.0| 2.8|3.24| 0.3|2.81|5.68|1.03|3.17|1185.0|[13.16,2.36,2.67,...|         0|
|14.37|1.95| 2.5|16.8|113.0|3.85|3.49|0.24|2.18| 7.8|0.86|3.45|1480.0|[14.37,1.95,2.5,1...|         0|
|13.24|2.59|2.87|21.0|118.0| 2.8|2.69|0.39|1.82|4.32|1.04|2.93| 735.0|[13.24,2.59,2.87,...|         1|
| 14.2|1.76|2.45|15.2|112.0|3.27|3.39|0.34|1.97|6.75|1.05|2.85|1450.0|[14.2,1.76,2.45,1...|         0|
|14.39|1.87|2.45|14.6| 96.0| 2.5|2.52| 0.3|1.98|5.25|1.02|3.58|1290.0|[14