In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.getOrCreate()

In [3]:
data_path = '/Users/ashle/Springboard/Spark SQL/Data'

In [4]:
json_df_path = data_path + '/utilization.json'
df_util = spark.read.format('json').load(json_df_path)

In [5]:
df_util.show(10)

+---------------+-------------------+-----------+---------+-------------+
|cpu_utilization|     event_datetime|free_memory|server_id|session_count|
+---------------+-------------------+-----------+---------+-------------+
|           0.57|03/05/2019 08:06:14|       0.51|      100|           47|
|           0.47|03/05/2019 08:11:14|       0.62|      100|           43|
|           0.56|03/05/2019 08:16:14|       0.57|      100|           62|
|           0.57|03/05/2019 08:21:14|       0.56|      100|           50|
|           0.35|03/05/2019 08:26:14|       0.46|      100|           43|
|           0.41|03/05/2019 08:31:14|       0.58|      100|           48|
|           0.57|03/05/2019 08:36:14|       0.35|      100|           58|
|           0.41|03/05/2019 08:41:14|        0.4|      100|           58|
|           0.53|03/05/2019 08:46:14|       0.35|      100|           62|
|           0.51|03/05/2019 08:51:14|        0.6|      100|           45|
+---------------+-------------------+-

**EDA with Dataframes**

In [6]:
df_util.createOrReplaceTempView('utilization')

In [7]:
df_util.count()

500000

In [8]:
df_util.describe().show()

+-------+-------------------+-------------------+-------------------+------------------+------------------+
|summary|    cpu_utilization|     event_datetime|        free_memory|         server_id|     session_count|
+-------+-------------------+-------------------+-------------------+------------------+------------------+
|  count|             500000|             500000|             500000|            500000|            500000|
|   mean| 0.6205177399999977|               null|0.37912809999999963|             124.5|          69.59616|
| stddev|0.15875173872912898|               null| 0.1583093127837621|14.430884120553216|14.850676696352755|
|    min|               0.22|03/05/2019 08:06:14|                0.0|               100|                32|
|    max|                1.0|04/09/2019 01:22:46|               0.78|               149|               105|
+-------+-------------------+-------------------+-------------------+------------------+------------------+



In [9]:
df_util.stat.corr('cpu_utilization', 'free_memory')

-0.4704771573080734

In [10]:
df_util.stat.corr('session_count', 'free_memory')

-0.5008320848876565

In [11]:
df_util.stat.freqItems(('server_id', 'session_count')).show()

+--------------------+-----------------------+
| server_id_freqItems|session_count_freqItems|
+--------------------+-----------------------+
|[146, 137, 101, 1...|   [92, 101, 83, 104...|
+--------------------+-----------------------+



In [14]:
df_util_sample = df_util.sample(fraction = 0.05, withReplacement = False)

In [16]:
df_util_sample.count()

25115

**EDA with Spark SQL**

In [18]:
spark.sql('SELECT min(cpu_utilization), max(cpu_utilization), stddev(cpu_utilization) FROM utilization').show()

+--------------------+--------------------+-----------------------+
|min(cpu_utilization)|max(cpu_utilization)|stddev(cpu_utilization)|
+--------------------+--------------------+-----------------------+
|                0.22|                 1.0|    0.15875173872912898|
+--------------------+--------------------+-----------------------+



In [20]:
spark.sql('SELECT server_id, min(cpu_utilization), max(cpu_utilization), stddev(cpu_utilization) \
          FROM utilization \
          GROUP BY server_id').show()

+---------+--------------------+--------------------+-----------------------+
|server_id|min(cpu_utilization)|max(cpu_utilization)|stddev(cpu_utilization)|
+---------+--------------------+--------------------+-----------------------+
|      103|                0.56|                0.96|    0.11617507884178263|
|      100|                0.27|                0.67|     0.1152264191787964|
|      101|                 0.6|                 1.0|    0.11651726263197697|
|      102|                0.56|                0.96|    0.11549678751286807|
|      107|                0.45|                0.85|    0.11597417369783877|
|      104|                0.51|                0.91|    0.11521679513850511|
|      106|                0.22|                0.62|    0.11531539914568233|
|      105|                0.29|                0.69|    0.11510721467869486|
|      110|                0.35|                0.75|    0.11533251724450223|
|      108|                0.55|                0.95|    0.11563

In [21]:
spark.sql('SELECT server_id, FLOOR(cpu_utilization*100/10) AS bucket FROM utilization').show()

+---------+------+
|server_id|bucket|
+---------+------+
|      100|     5|
|      100|     4|
|      100|     5|
|      100|     5|
|      100|     3|
|      100|     4|
|      100|     5|
|      100|     4|
|      100|     5|
|      100|     5|
|      100|     3|
|      100|     6|
|      100|     6|
|      100|     5|
|      100|     2|
|      100|     4|
|      100|     4|
|      100|     6|
|      100|     4|
|      100|     5|
+---------+------+
only showing top 20 rows



In [22]:
spark.sql('SELECT count(*), FLOOR(cpu_utilization*100/10) AS bucket FROM utilization GROUP BY bucket ORDER BY bucket').show()

+--------+------+
|count(1)|bucket|
+--------+------+
|    8186|     2|
|   37029|     3|
|   68046|     4|
|  104910|     5|
|  116725|     6|
|   88242|     7|
|   56598|     8|
|   20207|     9|
|      57|    10|
+--------+------+



**Timeseries Analysis**

In [23]:
df_util.createOrReplaceTempView('utilization')

In [25]:
spark.sql('SELECT server_id, min(cpu_utilization), max(cpu_utilization), stddev(cpu_utilization) \
          FROM utilization \
          GROUP BY server_id').show()

+---------+--------------------+--------------------+-----------------------+
|server_id|min(cpu_utilization)|max(cpu_utilization)|stddev(cpu_utilization)|
+---------+--------------------+--------------------+-----------------------+
|      103|                0.56|                0.96|    0.11617507884178263|
|      100|                0.27|                0.67|     0.1152264191787964|
|      101|                 0.6|                 1.0|    0.11651726263197697|
|      102|                0.56|                0.96|    0.11549678751286807|
|      107|                0.45|                0.85|    0.11597417369783877|
|      104|                0.51|                0.91|    0.11521679513850511|
|      106|                0.22|                0.62|    0.11531539914568233|
|      105|                0.29|                0.69|    0.11510721467869486|
|      110|                0.35|                0.75|    0.11533251724450223|
|      108|                0.55|                0.95|    0.11563

In [28]:
sql_window = spark.sql('SELECT event_datetime, server_id, cpu_utilization,  \
            avg(cpu_utilization) OVER (PARTITION BY server_id) AS avg_server_util \
            FROM utilization')

In [29]:
sql_window.show()

+-------------------+---------+---------------+------------------+
|     event_datetime|server_id|cpu_utilization|   avg_server_util|
+-------------------+---------+---------------+------------------+
|03/05/2019 08:06:31|      110|           0.68|0.5537749999999892|
|03/05/2019 08:11:31|      110|           0.58|0.5537749999999892|
|03/05/2019 08:16:31|      110|           0.55|0.5537749999999892|
|03/05/2019 08:21:31|      110|           0.63|0.5537749999999892|
|03/05/2019 08:26:31|      110|           0.63|0.5537749999999892|
|03/05/2019 08:31:31|      110|           0.71|0.5537749999999892|
|03/05/2019 08:36:31|      110|           0.67|0.5537749999999892|
|03/05/2019 08:41:31|      110|           0.55|0.5537749999999892|
|03/05/2019 08:46:31|      110|           0.37|0.5537749999999892|
|03/05/2019 08:51:31|      110|            0.7|0.5537749999999892|
|03/05/2019 08:56:31|      110|           0.67|0.5537749999999892|
|03/05/2019 09:01:31|      110|           0.56|0.5537749999999

In [30]:
sql_window2 = spark.sql('SELECT event_datetime, server_id, cpu_utilization,  \
            avg(cpu_utilization) OVER (PARTITION BY server_id) AS avg_server_util, \
            cpu_utilization - avg(cpu_utilization) OVER (PARTITION BY server_id) AS delta_server_util \
            FROM utilization')
sql_window2.show()

+-------------------+---------+---------------+------------------+--------------------+
|     event_datetime|server_id|cpu_utilization|   avg_server_util|   delta_server_util|
+-------------------+---------+---------------+------------------+--------------------+
|03/05/2019 08:06:31|      110|           0.68|0.5537749999999892|  0.1262250000000108|
|03/05/2019 08:11:31|      110|           0.58|0.5537749999999892|0.026225000000010712|
|03/05/2019 08:16:31|      110|           0.55|0.5537749999999892|-0.00377499999998...|
|03/05/2019 08:21:31|      110|           0.63|0.5537749999999892| 0.07622500000001076|
|03/05/2019 08:26:31|      110|           0.63|0.5537749999999892| 0.07622500000001076|
|03/05/2019 08:31:31|      110|           0.71|0.5537749999999892| 0.15622500000001072|
|03/05/2019 08:36:31|      110|           0.67|0.5537749999999892| 0.11622500000001079|
|03/05/2019 08:41:31|      110|           0.55|0.5537749999999892|-0.00377499999998...|
|03/05/2019 08:46:31|      110| 

In [31]:
sql_window3 = spark.sql('SELECT event_datetime, server_id, cpu_utilization,  \
            avg(cpu_utilization) OVER (PARTITION BY server_id ORDER BY event_datetime \
                                        ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) AS avg_server_util \
            FROM utilization')
sql_window3.show()

+-------------------+---------+---------------+-------------------+
|     event_datetime|server_id|cpu_utilization|    avg_server_util|
+-------------------+---------+---------------+-------------------+
|03/05/2019 08:06:31|      110|           0.68|               0.63|
|03/05/2019 08:11:31|      110|           0.58| 0.6033333333333334|
|03/05/2019 08:16:31|      110|           0.55| 0.5866666666666666|
|03/05/2019 08:21:31|      110|           0.63| 0.6033333333333334|
|03/05/2019 08:26:31|      110|           0.63| 0.6566666666666666|
|03/05/2019 08:31:31|      110|           0.71| 0.6699999999999999|
|03/05/2019 08:36:31|      110|           0.67| 0.6433333333333333|
|03/05/2019 08:41:31|      110|           0.55| 0.5300000000000001|
|03/05/2019 08:46:31|      110|           0.37|               0.54|
|03/05/2019 08:51:31|      110|            0.7|               0.58|
|03/05/2019 08:56:31|      110|           0.67| 0.6433333333333334|
|03/05/2019 09:01:31|      110|           0.56| 

**Basic Machine Learning**

In [34]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans

In [35]:
df_util.show()

+---------------+-------------------+-----------+---------+-------------+
|cpu_utilization|     event_datetime|free_memory|server_id|session_count|
+---------------+-------------------+-----------+---------+-------------+
|           0.57|03/05/2019 08:06:14|       0.51|      100|           47|
|           0.47|03/05/2019 08:11:14|       0.62|      100|           43|
|           0.56|03/05/2019 08:16:14|       0.57|      100|           62|
|           0.57|03/05/2019 08:21:14|       0.56|      100|           50|
|           0.35|03/05/2019 08:26:14|       0.46|      100|           43|
|           0.41|03/05/2019 08:31:14|       0.58|      100|           48|
|           0.57|03/05/2019 08:36:14|       0.35|      100|           58|
|           0.41|03/05/2019 08:41:14|        0.4|      100|           58|
|           0.53|03/05/2019 08:46:14|       0.35|      100|           62|
|           0.51|03/05/2019 08:51:14|        0.6|      100|           45|
|           0.32|03/05/2019 08:56:14| 

In [36]:
#combines columns into one vector
vectorAssembler = VectorAssembler(inputCols = ['cpu_utilization', 'free_memory', 'session_count'], outputCol = 'features')

In [37]:
vcluster_df = vectorAssembler.transform(df_util)

In [38]:
vcluster_df.show()

+---------------+-------------------+-----------+---------+-------------+----------------+
|cpu_utilization|     event_datetime|free_memory|server_id|session_count|        features|
+---------------+-------------------+-----------+---------+-------------+----------------+
|           0.57|03/05/2019 08:06:14|       0.51|      100|           47|[0.57,0.51,47.0]|
|           0.47|03/05/2019 08:11:14|       0.62|      100|           43|[0.47,0.62,43.0]|
|           0.56|03/05/2019 08:16:14|       0.57|      100|           62|[0.56,0.57,62.0]|
|           0.57|03/05/2019 08:21:14|       0.56|      100|           50|[0.57,0.56,50.0]|
|           0.35|03/05/2019 08:26:14|       0.46|      100|           43|[0.35,0.46,43.0]|
|           0.41|03/05/2019 08:31:14|       0.58|      100|           48|[0.41,0.58,48.0]|
|           0.57|03/05/2019 08:36:14|       0.35|      100|           58|[0.57,0.35,58.0]|
|           0.41|03/05/2019 08:41:14|        0.4|      100|           58| [0.41,0.4,58.0]|

In [39]:
kmeans = KMeans().setK(3)

In [40]:
kmeans = kmeans.setSeed(1)

In [41]:
#by default looks for data in column called 'features', so we only have to specify dataframe here
kmodel = kmeans.fit(vcluster_df)

In [42]:
kmodel.clusterCenters()

[array([ 0.52047775,  0.47836303, 51.79927162]),
 array([ 0.62881549,  0.37094643, 70.43030159]),
 array([ 0.71931575,  0.28104316, 88.23965784])]

**-part2-**

In [43]:
from pyspark.ml.regression import LinearRegression

In [44]:
vectorAssembler = VectorAssembler(inputCols = ['cpu_utilization'], outputCol = 'features')

In [45]:
df_vutil = vectorAssembler.transform(df_util)

In [46]:
df_vutil.show()

+---------------+-------------------+-----------+---------+-------------+--------+
|cpu_utilization|     event_datetime|free_memory|server_id|session_count|features|
+---------------+-------------------+-----------+---------+-------------+--------+
|           0.57|03/05/2019 08:06:14|       0.51|      100|           47|  [0.57]|
|           0.47|03/05/2019 08:11:14|       0.62|      100|           43|  [0.47]|
|           0.56|03/05/2019 08:16:14|       0.57|      100|           62|  [0.56]|
|           0.57|03/05/2019 08:21:14|       0.56|      100|           50|  [0.57]|
|           0.35|03/05/2019 08:26:14|       0.46|      100|           43|  [0.35]|
|           0.41|03/05/2019 08:31:14|       0.58|      100|           48|  [0.41]|
|           0.57|03/05/2019 08:36:14|       0.35|      100|           58|  [0.57]|
|           0.41|03/05/2019 08:41:14|        0.4|      100|           58|  [0.41]|
|           0.53|03/05/2019 08:46:14|       0.35|      100|           62|  [0.53]|
|   

In [47]:
lr = LinearRegression(featuresCol = 'features', labelCol = 'session_count')

In [48]:
lrModel = lr.fit(df_vutil)

In [49]:
lrModel.coefficients

DenseVector([47.024])

In [50]:
lrModel.intercept

40.41695103552787

In [51]:
lrModel.summary.rootMeanSquaredError

12.837990225931918