<a href="https://colab.research.google.com/github/JacekPardyak/vps/blob/master/pyspark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [2]:
!wget -q https://downloads.apache.org/spark/spark-3.1.2/spark-3.1.2-bin-hadoop2.7.tgz
!tar xf spark-3.1.2-bin-hadoop2.7.tgz

In [3]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop2.7"

In [4]:
!pip install -q findspark
import findspark
findspark.init()

In [5]:
from pyspark.sql import SparkSession
spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

In [6]:
!wget --continue https://raw.githubusercontent.com/JacekPardyak/vps/master/data/vps_churn_data_py.json -O /tmp/vps_churn_data_py.json
!wget --continue https://raw.githubusercontent.com/JacekPardyak/vps/master/data/vps_test_data_py.json -O /tmp/vps_test_data_py.json

--2021-09-04 21:24:57--  https://raw.githubusercontent.com/JacekPardyak/vps/master/data/vps_churn_data_py.json
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.109.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 416 Range Not Satisfiable

    The file is already fully retrieved; nothing to do.

--2021-09-04 21:24:57--  https://raw.githubusercontent.com/JacekPardyak/vps/master/data/vps_test_data_py.json
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.109.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 416 Range Not Satisfiable

    The file is already fully retrieved; nothing to do.



In [7]:
vps = spark.read.json("/tmp/vps_churn_data_py.json")
pro = spark.read.json("/tmp/vps_test_data_py.json")


In [8]:
vps.printSchema()

root
 |-- cpu_load_max_gradient: double (nullable = true)
 |-- cpu_load_mean_m_3: double (nullable = true)
 |-- cpu_load_monthly_mean_delta: double (nullable = true)
 |-- disk_octets_read_max_gradient: double (nullable = true)
 |-- disk_octets_read_mean_m_3: double (nullable = true)
 |-- disk_octets_read_monthly_mean_delta: double (nullable = true)
 |-- disk_octets_write_max_gradient: double (nullable = true)
 |-- disk_octets_write_mean_m_3: double (nullable = true)
 |-- disk_octets_write_monthly_mean_delta: double (nullable = true)
 |-- disk_ops_read_max_gradient: double (nullable = true)
 |-- disk_ops_read_mean_m_3: double (nullable = true)
 |-- disk_ops_read_monthly_mean_delta: double (nullable = true)
 |-- disk_ops_write_max_gradient: double (nullable = true)
 |-- disk_ops_write_mean_m_3: double (nullable = true)
 |-- disk_ops_write_monthly_mean_delta: double (nullable = true)
 |-- id: long (nullable = true)
 |-- is_churn: long (nullable = true)
 |-- network_rx_max_gradient: double

In [9]:
vps.show(4,False)

+---------------------+-----------------+---------------------------+-----------------------------+-------------------------+-----------------------------------+------------------------------+--------------------------+------------------------------------+--------------------------+----------------------+--------------------------------+---------------------------+-----------------------+---------------------------------+---+--------+-----------------------+-------------------+-----------------------------+-----------------------+-------------------+-----------------------------+
|cpu_load_max_gradient|cpu_load_mean_m_3|cpu_load_monthly_mean_delta|disk_octets_read_max_gradient|disk_octets_read_mean_m_3|disk_octets_read_monthly_mean_delta|disk_octets_write_max_gradient|disk_octets_write_mean_m_3|disk_octets_write_monthly_mean_delta|disk_ops_read_max_gradient|disk_ops_read_mean_m_3|disk_ops_read_monthly_mean_delta|disk_ops_write_max_gradient|disk_ops_write_mean_m_3|disk_ops_write_monthly

In [10]:
vps.count()

283

In [11]:
vps.select('id', 'cpu_load_max_gradient', 'is_churn').show(5)

+---+---------------------+--------+
| id|cpu_load_max_gradient|is_churn|
+---+---------------------+--------+
|100|           0.44565168|       1|
|101|          -0.00172695|       0|
|102|            0.3733529|       0|
|103|          -0.03158981|       1|
|104|          -0.09440948|       0|
+---+---------------------+--------+
only showing top 5 rows



In [12]:
vps.describe().show()

+-------+---------------------+------------------+---------------------------+-----------------------------+-------------------------+-----------------------------------+------------------------------+--------------------------+------------------------------------+--------------------------+----------------------+--------------------------------+---------------------------+-----------------------+---------------------------------+----------------+-------------------+-----------------------+-------------------+-----------------------------+-----------------------+-------------------+-----------------------------+
|summary|cpu_load_max_gradient| cpu_load_mean_m_3|cpu_load_monthly_mean_delta|disk_octets_read_max_gradient|disk_octets_read_mean_m_3|disk_octets_read_monthly_mean_delta|disk_octets_write_max_gradient|disk_octets_write_mean_m_3|disk_octets_write_monthly_mean_delta|disk_ops_read_max_gradient|disk_ops_read_mean_m_3|disk_ops_read_monthly_mean_delta|disk_ops_write_max_gradient|disk

In [13]:
vps.select("is_churn").distinct().show()

+--------+
|is_churn|
+--------+
|       0|
|       1|
+--------+



In [15]:
from pyspark.sql import functions as F
vps.groupBy("is_churn").agg(F.avg("cpu_load_mean_m_3")).show()

+--------+----------------------+
|is_churn|avg(cpu_load_mean_m_3)|
+--------+----------------------+
|       0|     34.67781949229732|
|       1|    28.083668920296297|
+--------+----------------------+



In [16]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator



In [17]:
data = spark.read.format("libsvm").load("/content/spark-3.1.2-bin-hadoop2.7/data/mllib/sample_lda_libsvm_data.txt")
data.printSchema()

root
 |-- label: double (nullable = true)
 |-- features: vector (nullable = true)



In [85]:
data.describe().show()

+-------+-----------------+
|summary|            label|
+-------+-----------------+
|  count|               12|
|   mean|              5.5|
| stddev|3.605551275463989|
|    min|              0.0|
|    max|             11.0|
+-------+-----------------+



In [86]:
# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(data)

# Automatically identify categorical features, and index them.
# Set maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)

# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])

# Train a RandomForest model.
rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", numTrees=10)

# Convert indexed labels back to original labels.
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel",
                               labels=labelIndexer.labels)

# Chain indexers and forest in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, rf, labelConverter])

# Train model.  This also runs the indexers.
model = pipeline.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("predictedLabel", "label", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))

rfModel = model.stages[2]
print(rfModel)  # summary only

+--------------+-----+--------------------+
|predictedLabel|label|            features|
+--------------+-----+--------------------+
|           7.0|  2.0|(11,[0,1,2,5,6,8,...|
|           6.0|  3.0|(11,[0,1,3,6,8,9,...|
|           8.0|  5.0|(11,[0,1,3,4,5,6,...|
|           7.0| 10.0|(11,[0,1,2,3,5,6,...|
|           8.0| 11.0|(11,[0,1,4,5,6,7,...|
+--------------+-----+--------------------+

Test Error = 1
RandomForestClassificationModel: uid=RandomForestClassifier_047bf96574f7, numTrees=10, numClasses=12, numFeatures=11


In [81]:
# Load and parse the data file, converting it to a DataFrame.
data = vps

# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
# labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(data)

# Automatically identify categorical features, and index them.
# Set maxCategories so features with > 4 distinct values are treated as continuous.
# featureIndexer =\
#    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)
# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])



In [18]:
from pyspark.ml.classification import LogisticRegression

# Load training data
training = spark.read.format("libsvm").load("/content/spark-3.1.2-bin-hadoop2.7/data/mllib/sample_libsvm_data.txt")

lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)

# Fit the model
lrModel = lr.fit(training)

# Print the coefficients and intercept for logistic regression
print("Coefficients: " + str(lrModel.coefficients))
print("Intercept: " + str(lrModel.intercept))

# We can also use the multinomial family for binary classification
mlr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8, family="multinomial")

# Fit the model
mlrModel = mlr.fit(training)

# Print the coefficients and intercepts for logistic regression with multinomial family
print("Multinomial coefficients: " + str(mlrModel.coefficientMatrix))
print("Multinomial intercepts: " + str(mlrModel.interceptVector))

Coefficients: (692,[244,263,272,300,301,328,350,351,378,379,405,406,407,428,433,434,455,456,461,462,483,484,489,490,496,511,512,517,539,540,568],[-7.353983524188241e-05,-9.102738505589566e-05,-0.0001946743054690423,-0.00020300642473486603,-3.147618331486458e-05,-6.842977602660821e-05,1.5883626898236275e-05,1.4023497091368928e-05,0.0003543204752496838,0.00011443272898171099,0.00010016712383666487,0.0006014109303795511,0.0002840248179122765,-0.00011541084736508905,0.000385996886312906,0.0006350195574241097,-0.00011506412384575733,-0.0001527186586498689,0.0002804933808994214,0.0006070117471191665,-0.0002008459663247435,-0.00014210755792901347,0.0002739010341160883,0.0002773045624496811,-9.838027027269408e-05,-0.00038085224435175833,-0.00025315198008554285,0.0002774771477075434,-0.00024436197639191286,-0.0015394744687597679,-0.00023073328411330604])
Intercept: 0.22456315961250245
Multinomial coefficients: 2 X 692 CSRMatrix
(0,244) 0.0
(0,263) 0.0001
(0,272) 0.0001
(0,300) 0.0001
(0,350) -0