In [1]:
from pyspark.sql import SparkSession

In [7]:
spark =SparkSession.\
builder.\
appName("Sparksql").\
getOrCreate()

In [9]:
print(spark.version)

3.5.3


In [11]:
data=spark.read.format("csv").\
option("infer schema","true").\
option("Header","true").\
option("path","iris.csv").\
load()

In [15]:
data.printSchema()

root
 |-- sepal_length: string (nullable = true)
 |-- sepal_width: string (nullable = true)
 |-- petal_length: string (nullable = true)
 |-- petal_width: string (nullable = true)
 |-- species: string (nullable = true)



In [29]:
#dataframe api
from pyspark.sql.functions import col,desc

data_2=data.select("sepal_length","species").\
filter(col("sepal_length") >5.0).\
orderBy(desc("sepal_length"))

In [31]:
data_2.printSchema()

root
 |-- sepal_length: string (nullable = true)
 |-- species: string (nullable = true)



In [33]:
data_2.show()

+------------+----------+
|sepal_length|   species|
+------------+----------+
|         7.9| virginica|
|         7.7| virginica|
|         7.7| virginica|
|         7.7| virginica|
|         7.7| virginica|
|         7.6| virginica|
|         7.4| virginica|
|         7.3| virginica|
|         7.2| virginica|
|         7.2| virginica|
|         7.2| virginica|
|         7.1| virginica|
|         7.0|versicolor|
|         6.9|versicolor|
|         6.9| virginica|
|         6.9| virginica|
|         6.9| virginica|
|         6.8|versicolor|
|         6.8| virginica|
|         6.8| virginica|
+------------+----------+
only showing top 20 rows



In [41]:
#Spark sql
data_2.createOrReplaceTempView("data")
spark.sql("""
SELECT sepal_length,species From data 
WHERE sepal_length >5.0""").show()

+------------+----------+
|sepal_length|   species|
+------------+----------+
|         7.9| virginica|
|         7.7| virginica|
|         7.7| virginica|
|         7.7| virginica|
|         7.7| virginica|
|         7.6| virginica|
|         7.4| virginica|
|         7.3| virginica|
|         7.2| virginica|
|         7.2| virginica|
|         7.2| virginica|
|         7.1| virginica|
|         7.0|versicolor|
|         6.9|versicolor|
|         6.9| virginica|
|         6.9| virginica|
|         6.9| virginica|
|         6.8|versicolor|
|         6.8| virginica|
|         6.8| virginica|
+------------+----------+
only showing top 20 rows



In [51]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.stat import Correlation
from pyspark.sql.types import DoubleType

# Convert columns to DoubleType if they are not already
data = data.withColumn("sepal_length", col("sepal_length").cast(DoubleType())) \
           .withColumn("sepal_width", col("sepal_width").cast(DoubleType())) \
           .withColumn("petal_length", col("petal_length").cast(DoubleType())) \
           .withColumn("petal_width", col("petal_width").cast(DoubleType()))

# Combine numerical features into a single vector column
assembler = VectorAssembler(inputCols=["sepal_length", "sepal_width", "petal_length", "petal_width"], outputCol="features")
data_vector = assembler.transform(data)

# Compute the correlation matrix
correlation_matrix = Correlation.corr(data_vector, "features").head()
print("Correlation matrix:\n", correlation_matrix[0])


Correlation matrix:
 DenseMatrix([[ 1.        , -0.11756978,  0.87175378,  0.81794113],
             [-0.11756978,  1.        , -0.4284401 , -0.36612593],
             [ 0.87175378, -0.4284401 ,  1.        ,  0.96286543],
             [ 0.81794113, -0.36612593,  0.96286543,  1.        ]])


In [45]:
data.groupBy("species") \
    .agg(
        {"sepal_length": "avg", "petal_length": "avg", "sepal_width": "avg", "petal_width": "avg"}
    ).show()


+----------+------------------+------------------+-----------------+------------------+
|   species|  avg(sepal_width)|  avg(petal_width)|avg(sepal_length)| avg(petal_length)|
+----------+------------------+------------------+-----------------+------------------+
| virginica|2.9739999999999998|             2.026|6.587999999999998|             5.552|
|versicolor|2.7700000000000005|1.3259999999999998|            5.936|              4.26|
|    setosa| 3.428000000000001|0.2459999999999999|5.005999999999999|1.4620000000000002|
+----------+------------------+------------------+-----------------+------------------+



In [47]:
data.filter((col("sepal_length") > 6) & (col("petal_width") > 1.5)).show()


+------------+-----------+------------+-----------+---------+
|sepal_length|sepal_width|petal_length|petal_width|  species|
+------------+-----------+------------+-----------+---------+
|         7.1|        3.0|         5.9|        2.1|virginica|
|         7.6|        3.0|         6.6|        2.1|virginica|
|         7.3|        2.9|         6.3|        1.8|virginica|
|         7.2|        3.6|         6.1|        2.5|virginica|
|         7.7|        3.8|         6.7|        2.2|virginica|
|         7.7|        2.6|         6.9|        2.3|virginica|
|         7.7|        2.8|         6.7|        2.0|virginica|
|         7.2|        3.2|         6.0|        1.8|virginica|
|         7.2|        3.0|         5.8|        1.6|virginica|
|         7.4|        2.8|         6.1|        1.9|virginica|
|         7.9|        3.8|         6.4|        2.0|virginica|
|         7.7|        3.0|         6.1|        2.3|virginica|
+------------+-----------+------------+-----------+---------+



In [49]:
data.groupBy("species").agg(
    {"sepal_length": "stddev", "sepal_width": "stddev", "petal_length": "stddev", "petal_width": "stddev"}
).show()


+----------+-------------------+-------------------+--------------------+--------------------+
|   species|stddev(sepal_width)|stddev(petal_width)|stddev(sepal_length)|stddev(petal_length)|
+----------+-------------------+-------------------+--------------------+--------------------+
| virginica| 0.3224966381726375| 0.2746500556366674|   0.635879593274432|  0.5518946956639833|
|versicolor| 0.3137983233784114|0.19775268000454407|  0.5161711470638635|  0.4699109772399579|
|    setosa| 0.3790643690962886|0.10538558938004565|  0.3524896872134513|  0.1736639964801841|
+----------+-------------------+-------------------+--------------------+--------------------+



In [53]:
from pyspark.sql.functions import expr

# Example: Calculate ratio of petal length to petal width as a new feature
data = data.withColumn("petal_length_to_width_ratio", expr("petal_length / petal_width"))


In [55]:
data

DataFrame[sepal_length: double, sepal_width: double, petal_length: double, petal_width: double, species: string, petal_length_to_width_ratio: double]

In [57]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler

# Assemble features into a vector
assembler = VectorAssembler(inputCols=["sepal_length", "sepal_width", "petal_length", "petal_width"], outputCol="features")
data_vector = assembler.transform(data)

# Apply KMeans clustering
kmeans = KMeans(k=3, seed=1)  # k=3 for the three Iris species
model = kmeans.fit(data_vector)
predictions = model.transform(data_vector)
predictions.select("species", "prediction").show(10)  # View cluster assignments


+-------+----------+
|species|prediction|
+-------+----------+
| setosa|         1|
| setosa|         1|
| setosa|         1|
| setosa|         1|
| setosa|         1|
| setosa|         1|
| setosa|         1|
| setosa|         1|
| setosa|         1|
| setosa|         1|
+-------+----------+
only showing top 10 rows



In [59]:
from pyspark.ml.feature import PCA

pca = PCA(k=2, inputCol="features", outputCol="pcaFeatures")
pca_model = pca.fit(data_vector)
result = pca_model.transform(data_vector)
result.select("pcaFeatures").show(5)


+--------------------+
|         pcaFeatures|
+--------------------+
|[-2.8182395066394...|
|[-2.7882234453146...|
|[-2.6133745635497...|
|[-2.7570222769675...|
|[-2.7736485960544...|
+--------------------+
only showing top 5 rows



In [61]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Prepare the data by encoding labels
from pyspark.ml.feature import StringIndexer
indexer = StringIndexer(inputCol="species", outputCol="label")
data_indexed = indexer.fit(data_vector).transform(data_vector)

# Split data into training and test sets
train, test = data_indexed.randomSplit([0.8, 0.2], seed=1234)

# Train logistic regression model
lr = LogisticRegression(featuresCol="features", labelCol="label")
lr_model = lr.fit(train)
predictions = lr_model.transform(test)

# Evaluate the model
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print(f"Accuracy: {accuracy}")


Accuracy: 1.0
