In [None]:
! pip install --upgrade pyspark
! pip install scikit-learn --upgrade

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.2.tar.gz (281.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.4/281.4 MB[0m [31m4.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 KB[0m [31m14.3 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.2-py2.py3-none-any.whl size=281824028 sha256=f049855bdeb6fc3fb55581f7daff5f610e3df8035116e7633b2a0ab9a3008272
  Stored in directory: /root/.cache/pip/wheels/6c/e3/9b/0525ce8a69478916513509d43693511463c6468db0de237c86
Successfully built pyspark
Installing collected packages: py4j, pyspa

In [None]:
import pyspark
import pyspark.sql.functions as sqlfunc

from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator


from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate() 
spark

In [None]:
! wget https://mldataset0123.s3.amazonaws.com/original.csv

--2023-03-20 01:44:24--  https://mldataset0123.s3.amazonaws.com/original.csv
Resolving mldataset0123.s3.amazonaws.com (mldataset0123.s3.amazonaws.com)... 54.231.202.81, 54.231.233.177, 54.231.168.201, ...
Connecting to mldataset0123.s3.amazonaws.com (mldataset0123.s3.amazonaws.com)|54.231.202.81|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 86874 (85K) [text/csv]
Saving to: ‘original.csv’


2023-03-20 01:44:25 (1.49 MB/s) - ‘original.csv’ saved [86874/86874]



In [None]:
df = spark.read.csv('original.csv',header=True)

In [None]:
df = df.withColumn("Salary", sqlfunc.regexp_replace(sqlfunc.col("Salary"), "[\$,]", "").alias("Salary"))
df = df.withColumn("Salary", df.Salary.cast('float'))
df = df.withColumn("Latitude", df.Salary.cast('float'))
df = df.withColumn("Longitude", df.Salary.cast('float'))

In [None]:
columns_to_drop = ['id', 'first_name','last_name']
df = df.drop(*columns_to_drop)
df = df.na.drop()

In [None]:
df.show()

+------+---------------+--------------------+--------+--------+---------+
|gender|           City|            JobTitle|  Salary|Latitude|Longitude|
+------+---------------+--------------------+--------+--------+---------+
|Female|      Nowa Ruda| Assistant Professor|57438.18|57438.18| 57438.18|
|Female|         Bulgan|       Programmer II| 62846.6| 62846.6|  62846.6|
|  Male|  Divnomorskoye|Budget/Accounting...|61489.23|61489.23| 61489.23|
|  Male|      Mytishchi|            VP Sales|63863.09|63863.09| 63863.09|
|Female|Kinsealy-Drinan|      Civil Engineer|30101.16|30101.16| 30101.16|
|  Male|      Trélissac|Desktop Support T...|46116.36|46116.36| 46116.36|
|  Male|         Heitan|VP Product Manage...| 73697.1| 73697.1|  73697.1|
|  Male|       Arbeláez|Mechanical System...|68098.42|68098.42| 68098.42|
|Female|       El Cardo|Nuclear Power Eng...|13604.63|13604.63| 13604.63|
|Female|    Wangqingtuo|Systems Administr...| 73423.7| 73423.7|  73423.7|
|  Male|      Sułkowice|Compensation A

In [None]:
# Prepare the data for training by converting categorical variables to numeric using StringIndexer
gender_indexer = StringIndexer(inputCol="gender", outputCol="genderIndex")
job_title_indexer = StringIndexer(inputCol="JobTitle", outputCol="JobTitleIndex")
city_indexer = StringIndexer(inputCol="City", outputCol="CityIndex")

df = gender_indexer.fit(df).transform(df)
df = job_title_indexer.fit(df).transform(df)
df = city_indexer.fit(df).transform(df)


# Regression 

In [None]:
# Prepare the data for training by combining the features into a single vector
assembler = VectorAssembler(inputCols=["genderIndex", "CityIndex", "JobTitleIndex", "Latitude","Longitude"],
                            outputCol="features")
data = assembler.transform(df).select("features", "Salary")

# Split the data into training and testing sets
(trainingData, testData) = data.randomSplit([0.7, 0.3])


# Train a linear regression model on the training data
lr = LinearRegression(labelCol="Salary", featuresCol="features")
model = lr.fit(trainingData)

# Make predictions on the testing data
predictions = model.transform(testData)

# Evaluate the performance of the model using RMSE
evaluator = RegressionEvaluator(labelCol="Salary", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)

# Print the RMSE of the model
print("Root Mean Squared Error (RMSE) = %g" % rmse)

# Show the coefficients and intercept of the linear regression model
print("Coefficients: %s" % str(model.coefficients))
print("Intercept: %s" % str(model.intercept))

Root Mean Squared Error (RMSE) = 0.000149844
Coefficients: [0.0002671763409683114,-2.7001771699159474e-07,-1.8263218471722112e-07,0.5000000000978084,0.5000000000978084]
Intercept: -2.847271534037752e-06


# Classifiction

In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import LogisticRegression

# Prepare the data for training by combining the features into a single vector
assembler = VectorAssembler(inputCols=["genderIndex", "CityIndex", "JobTitleIndex", "Latitude","Longitude"],
                            outputCol="features")
data = assembler.transform(df).select("features", "genderIndex")

# Split the data into training and testing sets
(trainingData, testData) = data.randomSplit([0.7, 0.3])

# Train a decision tree model on the training data
lr = LogisticRegression(labelCol="genderIndex", featuresCol="features")
model = lr.fit(trainingData)

# Make predictions on the testing data
predictions = model.transform(testData)

# Evaluate the performance of the model using AUC-ROC
evaluator = BinaryClassificationEvaluator(labelCol="genderIndex")
auc_roc = evaluator.evaluate(predictions)

# Print the AUC-ROC of the model
print("Area Under ROC = %g" % auc_roc)

# Show the coefficients and intercept of the logistic regression model
print("Coefficients: %s" % str(model.coefficients))
print("Intercept: %s" % str(model.intercept))

Area Under ROC = 1
Coefficients: [37.99244980169675,5.293395765998999e-07,0.00010444676682240483,-6.6830712682331e-08,-6.683071268078985e-08]
Intercept: -19.01148095500658


# Decision Tree

In [None]:
! wget https://mldataset0123.s3.amazonaws.com/iris.csv

--2023-03-20 03:23:36--  https://mldataset0123.s3.amazonaws.com/iris.csv
Resolving mldataset0123.s3.amazonaws.com (mldataset0123.s3.amazonaws.com)... 52.216.236.235, 3.5.10.23, 52.217.67.84, ...
Connecting to mldataset0123.s3.amazonaws.com (mldataset0123.s3.amazonaws.com)|52.216.236.235|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 4615 (4.5K) [text/csv]
Saving to: ‘iris.csv’


2023-03-20 03:23:36 (258 MB/s) - ‘iris.csv’ saved [4615/4615]



In [None]:
# Load the Iris dataset from a CSV file
iris = spark.read.csv("iris.csv", header=True, inferSchema=True)


species_indexer = StringIndexer(inputCol="species", outputCol="speciesIndex")

iris = species_indexer.fit(iris).transform(iris)

In [None]:
iris.groupBy('speciesIndex').count().show()

+------------+-----+
|speciesIndex|count|
+------------+-----+
|         0.0|   50|
|         1.0|   50|
|         2.0|   50|
+------------+-----+



In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Prepare the data for training by combining the features into a single vector
assembler = VectorAssembler(inputCols=["sepal_length", "sepal_width", "petal_length", "petal_width"],
                            outputCol="features")
data = assembler.transform(iris).select("features", "speciesIndex")

# Split the data into training and testing sets
(trainingData, testData) = data.randomSplit([0.7, 0.3])

# Train a decision tree model on the training data
dt = DecisionTreeClassifier(labelCol="speciesIndex", featuresCol="features")
model = dt.fit(trainingData)

# Make predictions on the testing data
predictions = model.transform(testData)

# Evaluate the accuracy of the model using the F1 score
evaluator = MulticlassClassificationEvaluator(labelCol="speciesIndex", predictionCol="prediction", metricName="f1")
accuracy = evaluator.evaluate(predictions)

# Print the accuracy of the model
print("Accuracy = %g" % accuracy)

# Show the decision tree rules
print("Learned classification tree model:")
print(model.toDebugString)

Accuracy = 0.908435
Learned classification tree model:
DecisionTreeClassificationModel: uid=DecisionTreeClassifier_c988c4e1b25b, depth=4, numNodes=13, numClasses=3, numFeatures=4
  If (feature 2 <= 2.35)
   Predict: 0.0
  Else (feature 2 > 2.35)
   If (feature 2 <= 4.95)
    If (feature 3 <= 1.65)
     Predict: 1.0
    Else (feature 3 > 1.65)
     If (feature 1 <= 2.8499999999999996)
      Predict: 2.0
     Else (feature 1 > 2.8499999999999996)
      Predict: 1.0
   Else (feature 2 > 4.95)
    If (feature 3 <= 1.65)
     If (feature 3 <= 1.55)
      Predict: 2.0
     Else (feature 3 > 1.55)
      Predict: 1.0
    Else (feature 3 > 1.65)
     Predict: 2.0

