In [1]:
pip install pyspark

Note: you may need to restart the kernel to use updated packages.


In [2]:
pip install findspark

Note: you may need to restart the kernel to use updated packages.


In [3]:
import findspark
findspark.init()

In [4]:
from pyspark.sql import SparkSession
from pyspark.sql import *
from pyspark.sql.types import DoubleType,IntegerType
import pyspark.sql.functions as F

spark = SparkSession \
    .builder \
    .appName("S ICP14") \
    .getOrCreate()

In [8]:
from pyspark.ml.linalg import SparseVector
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [9]:
adult_data_df = spark.read.load(r"adult.csv", format="csv", header=True, delimiter=",")
adult_data_df.show()

+---+-----------------+------+-------------+-------------+--------------------+------------------+--------------+-------------------+-------+------------+------------+--------------+--------------+------+
|age|        workclass|fnlwgt|    education|education-num|      marital-status|        occupation|  relationship|               race|    sex|capital-gain|capital-loss|hours-per-week|native-country|     X|
+---+-----------------+------+-------------+-------------+--------------------+------------------+--------------+-------------------+-------+------------+------------+--------------+--------------+------+
| 39|        State-gov| 77516|    Bachelors|           13|       Never-married|      Adm-clerical| Not-in-family|              White|   Male|        2174|           0|            40| United-States| <=50K|
| 50| Self-emp-not-inc| 83311|    Bachelors|           13|  Married-civ-spouse|   Exec-managerial|       Husband|              White|   Male|           0|           0|            1

In [10]:
adult_data_df = adult_data_df.withColumn("age", adult_data_df["age"].cast(IntegerType()))
adult_data_df = adult_data_df.withColumn("fnlwgt", adult_data_df["fnlwgt"].cast(IntegerType()))
adult_data_df = adult_data_df.withColumn("education-num", adult_data_df["education-num"].cast(IntegerType()))
adult_data_df = adult_data_df.withColumn("capital-gain", adult_data_df["capital-gain"].cast(IntegerType()))
adult_data_df = adult_data_df.withColumn("capital-loss", adult_data_df["capital-loss"].cast(IntegerType()))
adult_data_df = adult_data_df.withColumn("hours-per-week", adult_data_df["hours-per-week"].cast(IntegerType()))

In [11]:
adult_data_df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: integer (nullable = true)
 |-- education: string (nullable = true)
 |-- education-num: integer (nullable = true)
 |-- marital-status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital-gain: integer (nullable = true)
 |-- capital-loss: integer (nullable = true)
 |-- hours-per-week: integer (nullable = true)
 |-- native-country: string (nullable = true)
 |-- X: string (nullable = true)



In [12]:
adult_data_df = adult_data_df.withColumn("label", adult_data_df['hours-per-week'] - 0)
adult_data_df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: integer (nullable = true)
 |-- education: string (nullable = true)
 |-- education-num: integer (nullable = true)
 |-- marital-status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital-gain: integer (nullable = true)
 |-- capital-loss: integer (nullable = true)
 |-- hours-per-week: integer (nullable = true)
 |-- native-country: string (nullable = true)
 |-- X: string (nullable = true)
 |-- label: integer (nullable = true)



In [13]:
assem = VectorAssembler(inputCols=adult_data_df.columns[10:13], outputCol='features')
x = assem.transform(adult_data_df)
x.show(5)

+---+-----------------+------+----------+-------------+-------------------+------------------+--------------+------+-------+------------+------------+--------------+--------------+------+-----+-----------------+
|age|        workclass|fnlwgt| education|education-num|     marital-status|        occupation|  relationship|  race|    sex|capital-gain|capital-loss|hours-per-week|native-country|     X|label|         features|
+---+-----------------+------+----------+-------------+-------------------+------------------+--------------+------+-------+------------+------------+--------------+--------------+------+-----+-----------------+
| 39|        State-gov| 77516| Bachelors|           13|      Never-married|      Adm-clerical| Not-in-family| White|   Male|        2174|           0|            40| United-States| <=50K|   40|[2174.0,0.0,40.0]|
| 50| Self-emp-not-inc| 83311| Bachelors|           13| Married-civ-spouse|   Exec-managerial|       Husband| White|   Male|           0|           0|  

In [14]:
train,test = x.randomSplit([0.6, 0.4], 1234)
nb1 = NaiveBayes(smoothing=1.0, modelType="multinomial")
model1 = nb1.fit(train)
predictions = model1.transform(test)
predictions.show(3)
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction",
                                              metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test set accuracy = " + str(accuracy))

+---+---------+------+---------+-------------+--------------+----------+------------+------+-------+------------+------------+--------------+--------------+------+-----+--------------+--------------------+--------------------+----------+
|age|workclass|fnlwgt|education|education-num|marital-status|occupation|relationship|  race|    sex|capital-gain|capital-loss|hours-per-week|native-country|     X|label|      features|       rawPrediction|         probability|prediction|
+---+---------+------+---------+-------------+--------------+----------+------------+------+-------+------------+------------+--------------+--------------+------+-----+--------------+--------------------+--------------------+----------+
| 17|        ?| 34019|     10th|            6| Never-married|         ?|   Own-child| White|   Male|           0|           0|            20| United-States| <=50K|   20|[0.0,0.0,20.0]|[-99.699068497548...|[1.07468075898575...|      48.0|
| 17|        ?| 34088|     12th|            8| N

In [15]:
nb2 = NaiveBayes(smoothing=10.0, modelType="multinomial")

# train the model
model2 = nb2.fit(train)

# select example rows to display.
predictions = model2.transform(test)
predictions.show(3)
# compute accuracy on the test set
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction",
                                              metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test set accuracy = " + str(accuracy))

+---+---------+------+---------+-------------+--------------+----------+------------+------+-------+------------+------------+--------------+--------------+------+-----+--------------+--------------------+--------------------+----------+
|age|workclass|fnlwgt|education|education-num|marital-status|occupation|relationship|  race|    sex|capital-gain|capital-loss|hours-per-week|native-country|     X|label|      features|       rawPrediction|         probability|prediction|
+---+---------+------+---------+-------------+--------------+----------+------------+------+-------+------------+------------+--------------+--------------+------+-----+--------------+--------------------+--------------------+----------+
| 17|        ?| 34019|     10th|            6| Never-married|         ?|   Own-child| White|   Male|           0|           0|            20| United-States| <=50K|   20|[0.0,0.0,20.0]|[-89.690334789396...|[3.96471113735323...|      48.0|
| 17|        ?| 34088|     12th|            8| N

In [16]:
from pyspark.ml.classification import DecisionTreeClassifier
nb3 = DecisionTreeClassifier(labelCol="label", featuresCol="features")

# train the model
model3 = nb3.fit(train)

# select example rows to display.
predictions = model3.transform(test)
predictions.show(3)

# compute accuracy on the test set
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction",
                                              metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test set accuracy = " + str(accuracy))

+---+---------+------+---------+-------------+--------------+----------+------------+------+-------+------------+------------+--------------+--------------+------+-----+--------------+--------------------+--------------------+----------+
|age|workclass|fnlwgt|education|education-num|marital-status|occupation|relationship|  race|    sex|capital-gain|capital-loss|hours-per-week|native-country|     X|label|      features|       rawPrediction|         probability|prediction|
+---+---------+------+---------+-------------+--------------+----------+------------+------+-------+------------+------------+--------------+--------------+------+-----+--------------+--------------------+--------------------+----------+
| 17|        ?| 34019|     10th|            6| Never-married|         ?|   Own-child| White|   Male|           0|           0|            20| United-States| <=50K|   20|[0.0,0.0,20.0]|[0.0,13.0,24.0,22...|[0.0,0.0066666666...|      20.0|
| 17|        ?| 34088|     12th|            8| N

In [17]:
from pyspark.ml.classification import RandomForestClassifier

# create the trainer and set its parameters
nb3 = RandomForestClassifier(labelCol="label", featuresCol="features", numTrees=100)

# train the model
model3 = nb3.fit(train)

# select example rows to display.
predictions = model3.transform(test)
predictions.show(3)

# compute accuracy on the test set
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction",
                                              metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test set accuracy = " + str(accuracy))

+---+---------+------+---------+-------------+--------------+----------+------------+------+-------+------------+------------+--------------+--------------+------+-----+--------------+--------------------+--------------------+----------+
|age|workclass|fnlwgt|education|education-num|marital-status|occupation|relationship|  race|    sex|capital-gain|capital-loss|hours-per-week|native-country|     X|label|      features|       rawPrediction|         probability|prediction|
+---+---------+------+---------+-------------+--------------+----------+------------+------+-------+------------+------------+--------------+--------------+------+-----+--------------+--------------------+--------------------+----------+
| 17|        ?| 34019|     10th|            6| Never-married|         ?|   Own-child| White|   Male|           0|           0|            20| United-States| <=50K|   20|[0.0,0.0,20.0]|[0.0,0.4679580176...|[0.0,0.0046795801...|      20.0|
| 17|        ?| 34088|     12th|            8| N

In [19]:
#clustering
from pyspark.ml.clustering import KMeans
dataset_diabetes_df = spark.read.format("csv").option("header", True).option("inferSchema", True).option("delimiter", ",").load(r"C:\Users\sarik\diabetic_data.csv")
dataset_diabetes_df = dataset_diabetes_df.select("admission_type_id", "discharge_disposition_id", "admission_source_id", "time_in_hospital", "num_lab_procedures")

# vector assembler for feature columns
assembler = VectorAssembler(inputCols=dataset_diabetes_df.columns, outputCol="features")
data = assembler.transform(dataset_diabetes_df)

#k-means model.
kmeans = KMeans().setK(2).setSeed(1)

model = kmeans.fit(data)

# Make predictions
predictions = model.transform(data)

# Shows the result.
centers = model.clusterCenters()
for center in centers:
    print(center)

[ 1.83677572  3.79369994  5.86290976  4.99109951 55.71866995]
[ 2.31211955  3.5955244   5.58751622  3.48021655 23.67109071]


In [21]:
linear_data_df = spark.read.format("csv").option("header", True)\
.option("inferSchema", True).option("delimiter", ",")\
.load(r"C:\Users\sarik\imports-85.csv")
linear_data_df1 = linear_data_df.withColumnRenamed("wheel-base", "label").select("label", "length", "width", "height")

In [22]:
from pyspark.ml.regression import LinearRegression

assembler = VectorAssembler(inputCols=linear_data_df1.columns[1:], outputCol="features")
y = assembler.transform(linear_data_df1)

lr = LinearRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)
model = lr.fit(y)

# Print the coefficients and intercept for linear regression
print("Coefficients: %s" % str(model.coefficients))
print("Intercept: %s" % str(model.intercept))

# Summarize the model over the training set and print out some metrics
trainingSummary = model.summary
print("numIterations: %d" % trainingSummary.totalIterations)
print("objectiveHistory: %s" % str(trainingSummary.objectiveHistory))
trainingSummary.residuals.show()
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

Coefficients: [0.22836801258821893,0.8223218915856468,0.580595102043434]
Intercept: -26.380531957157498
numIterations: 11
objectiveHistory: [0.5, 0.38579526656819896, 0.13000842393266873, 0.12985504772567413, 0.12963704261349218, 0.12947103310674205, 0.1294164378448031, 0.1294050846483987, 0.12940508261516015, 0.1294050824628613, 0.12940508245526855]
+--------------------+
|           residuals|
+--------------------+
|  -4.611862798093398|
|  -4.611862798093398|
|  -2.501339043881387|
|-0.11328232985025011|
| -0.6777467081673763|
|  0.3413419946315486|
|  -2.878914311626758|
|  -2.878914311626758|
| -2.9950333320354474|
| -0.8412496309870932|
|  2.3922947158520174|
|  2.3922947158520174|
|  2.3922947158520174|
|  2.3922947158520174|
| -0.6335041529149237|
| -0.6335041529149237|
| -1.3908023008371515|
|  0.4019071188106693|
|   2.084135889634638|
|   2.787341183548463|
+--------------------+
only showing top 20 rows

RMSE: 2.517190
r2: 0.824407


In [23]:
from pyspark.sql.functions import col, when
logistic_df = linear_data_df.withColumn("label", when(col("num-of-doors") == "four", 1).otherwise(0)).select("label", "length", "width", "height")

In [25]:
from pyspark.ml.classification import LogisticRegression
assembler = VectorAssembler(inputCols=logistic_df.columns[1:], outputCol="features")
z = assembler.transform(logistic_df)

lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)

# Fit the model
model = lr.fit(z)

# Print the coefficients and intercept for logistic regression
print("Coefficients: " + str(model.coefficients))
print("Intercept: " + str(model.intercept))

Coefficients: [0.0,0.0,0.000100509510875788]
Intercept: 0.22531532410664368


In [26]:

mlr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8, family="multinomial")

# Fit the model
mlr_model = mlr.fit(z)

# Print the coefficients and intercepts for logistic regression with multinomial family
print("Multinomial coefficients: " + str(mlr_model.coefficientMatrix))
print("Multinomial intercepts: " + str(mlr_model.interceptVector))

Multinomial coefficients: DenseMatrix([[ 0.00000000e+00,  0.00000000e+00, -7.35292649e-05],
             [ 0.00000000e+00,  0.00000000e+00,  7.35292649e-05]])
Multinomial intercepts: [-0.11156262444620539,0.11156262444620539]
