In [1]:
# Install pyspark
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.2.tar.gz (281.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.4/281.4 MB[0m [31m5.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 KB[0m [31m14.4 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.2-py2.py3-none-any.whl size=281824025 sha256=22baab5c4461f0f01003589f869058ec2490d80469bf1634bce81ac5668e2402
  Stored in directory: /root/.cache/pip/wheels/6c/e3/9b/0525ce8a69478916513509d43693511463c6468db0de237c86
Successfully built pyspark
Installing collected packages: py4j, pyspa

In [2]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.sql.functions import col
from pyspark.sql import SparkSession
import pandas as pd
import urllib.request

spark = SparkSession.builder.appName("LogisticRegressionExample").getOrCreate()

def get_irish_data(url = "https://archive.ics.uci.edu/ml/machine-learning-databases/iris/iris.data"):

  """  
  This function loads the irish data to Spark Dataframe API.
  Args: 
    url: input file path mapped from Iris Dataset UCI Machine Learning Repository.
  Returns:
    This function returns spark Dataframe API variable.
  """
  
  # File path to save the dataset
  file_path = "/tmp/iris.csv"

  # Download the file using urllib
  urllib.request.urlretrieve(url, file_path)

  # Load the dataset into a pandas dataframe
  df = pd.read_csv(file_path, header=None, names=["sepal_length", "sepal_width", "petal_length", "petal_width", "class"])
  df.to_csv('iris.csv', index=False)
  # Display the first few rows of the dataframe
  # print(df)
  # Create a SparkSession object
  spark = SparkSession.builder.appName("LogisticRegressionExample").getOrCreate()

  # Load the data into a Spark DataFrame.
  data = spark.read.csv("/content/iris.csv", header=False, inferSchema=True)
  data = data.select(col("_c0").alias("sepal_length"),
                col("_c1").alias("sepal_width"),
                col("_c2").alias("petal_length"),
                col("_c3").alias("petal_width"),
                col("_c4").alias("class"))
  return data



In [3]:
def lr_model_build():

  """  
  This function takes the input by excuting get_irish_data and builds a logistic regression machine learning model
  Args: 
    None
  Returns:
    This function returns machine learning model and the test data for evaluation.
  """

  data = get_irish_data()

  # Cast the columns to double data type
  data = data.withColumn("sepal_length", col("sepal_length").cast("double"))
  data = data.withColumn("sepal_width", col("sepal_width").cast("double"))
  data = data.withColumn("petal_length", col("petal_length").cast("double"))
  data = data.withColumn("petal_width", col("petal_width").cast("double"))

  # Convert class labels to numeric values
  indexer = StringIndexer(inputCol="class", outputCol="label")
  data = indexer.fit(data).transform(data)

  # Create feature vector
  assembler = VectorAssembler(inputCols=["sepal_length", "sepal_width", "petal_length", "petal_width"], outputCol="features")
  data = assembler.transform(data.na.drop())

  # Cast the columns to double data type
  data = data.withColumn("sepal_length", col("sepal_length").cast("double"))
  data = data.withColumn("sepal_width", col("sepal_width").cast("double"))
  data = data.withColumn("petal_length", col("petal_length").cast("double"))
  data = data.withColumn("petal_width", col("petal_width").cast("double"))

  # Split the data into training and test sets
  trainData, testData = data.randomSplit([0.7, 0.3], seed=123)

  # Train the logistic regression model
  lr = LogisticRegression(featuresCol="features", labelCol="label", family="multinomial", maxIter=100, regParam=0.0, elasticNetParam=0.0)
  lrModel = lr.fit(trainData)
  return lrModel, testData



def evaluation():

  """  
  This function takes the input by excuting evaluation and evaluated logistic regression machine learning model
  Args: 
    None
  Returns:
    This function returns machine learning models accuracy.
  """

  lrModel, testData = lr_model_build()
  # Make predictions on test data and evaluate the accuracy
  predictions = lrModel.transform(testData)
  evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
  accuracy = evaluator.evaluate(predictions)
  return accuracy

evaluation()

spark.stop()

In [7]:
from pyspark.ml.linalg import Vectors
import csv
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.sql.functions import col
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("LogisticRegressionExample").getOrCreate()


def out_3_2(li_val_1=[5.1, 3.5, 1.4, 0.2], li_val_2=[6.2, 3.4, 5.4, 2.3]):
    """
    This function takes the input by executing lr_model_build
    Args:
      custom value li_val_1 and li_val_2 is used for prediction
    Returns:
      This function returns predicted results.
    """
    lrModel, testData = lr_model_build()

    value1 = Vectors.dense(li_val_1)
    value2 = Vectors.dense(li_val_2)

    # Create a DataFrame with the custom values
    pred_data = spark.createDataFrame([(value1,), (value2,)], ["features"])

    # Use the trained logistic regression model to make predictions on the custom values
    predictions = lrModel.transform(pred_data)

    # # Show the predicted class for each custom value
    # predictions.select("prediction").show()
    prediction_list = [row.prediction for row in predictions.collect()]

    return prediction_list


def write_out_3_2():
    """
    This function takes the input by executing out_3_2
    Args:
      None
    Returns:
      This function saves an output file holding the class names.
    """
    prediction_list = out_3_2()
    with open('out_3_2.txt', 'w', newline='') as file:
        writer = csv.writer(file)
        writer.writerow(['class'])
        for prediction in prediction_list:
            if prediction == 0.0:
                writer.writerow(['Iris-setosa'])
            elif prediction == 1.0:
                writer.writerow(['Iris-versicolor'])
            elif prediction == 2.0:
                writer.writerow(['Iris-virginica'])


write_out_3_2()

spark.stop()
