## Reading DataFrames for Spark from CSV


In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.getOrCreate()

In [3]:
spark

In [4]:
data_path = r"C:\Users\monke\Documents\Springboard-DSC\25.2_Apache-Spark\Data"

In [5]:
event_file = data_path + "/location_temp.csv"

In [6]:
events = spark.read.load(event_file, format='csv', header='true')

In [7]:
events.show(5)

+-------------------+-----------+------------+
|         event_date|location_id|temp_celcius|
+-------------------+-----------+------------+
|03/04/2019 19:48:06|       loc0|          29|
|03/04/2019 19:53:06|       loc0|          27|
|03/04/2019 19:58:06|       loc0|          28|
|03/04/2019 20:03:06|       loc0|          30|
|03/04/2019 20:08:06|       loc0|          27|
+-------------------+-----------+------------+
only showing top 5 rows



In [8]:
events.count()

500000

In [9]:
file_path_no_header = data_path + '/utilization.csv'
df2 = spark.read.load(file_path_no_header, format='csv', header='false', inferSchema='true')

In [10]:
df2.show(5)

+-------------------+---+----+----+---+
|                _c0|_c1| _c2| _c3|_c4|
+-------------------+---+----+----+---+
|03/05/2019 08:06:14|100|0.57|0.51| 47|
|03/05/2019 08:11:14|100|0.47|0.62| 43|
|03/05/2019 08:16:14|100|0.56|0.57| 62|
|03/05/2019 08:21:14|100|0.57|0.56| 50|
|03/05/2019 08:26:14|100|0.35|0.46| 43|
+-------------------+---+----+----+---+
only showing top 5 rows



In [11]:
df2 = df2.withColumnRenamed('_c0','event_datetime') \
         .withColumnRenamed('_c1','server_id') \
         .withColumnRenamed('_c2','cpu_utilization') \
         .withColumnRenamed('_c3','free_memory') \
         .withColumnRenamed('_c4','session_count')

In [12]:
df2.show(5)

+-------------------+---------+---------------+-----------+-------------+
|     event_datetime|server_id|cpu_utilization|free_memory|session_count|
+-------------------+---------+---------------+-----------+-------------+
|03/05/2019 08:06:14|      100|           0.57|       0.51|           47|
|03/05/2019 08:11:14|      100|           0.47|       0.62|           43|
|03/05/2019 08:16:14|      100|           0.56|       0.57|           62|
|03/05/2019 08:21:14|      100|           0.57|       0.56|           50|
|03/05/2019 08:26:14|      100|           0.35|       0.46|           43|
+-------------------+---------+---------------+-----------+-------------+
only showing top 5 rows



## Basic Spark DataFrame Attributes and Methods

In [13]:
df2.columns

['event_datetime',
 'server_id',
 'cpu_utilization',
 'free_memory',
 'session_count']

In [14]:
sampling = df2.sample(False, fraction=0.1) # without replacement 10% of data

In [15]:
sampling.count()

50318

In [16]:
sorted_df = df2.sort('event_datetime')

In [17]:
sorted_df.show(5)

+-------------------+---------+---------------+-----------+-------------+
|     event_datetime|server_id|cpu_utilization|free_memory|session_count|
+-------------------+---------+---------------+-----------+-------------+
|03/05/2019 08:06:14|      100|           0.57|       0.51|           47|
|03/05/2019 08:06:16|      101|           0.86|       0.09|           93|
|03/05/2019 08:06:17|      102|           0.61|       0.12|           71|
|03/05/2019 08:06:19|      103|           0.64|       0.32|           96|
|03/05/2019 08:06:21|      104|           0.84|       0.36|           94|
+-------------------+---------+---------------+-----------+-------------+
only showing top 5 rows



### Basic Filtering

In [18]:
events.filter(events['location_id'] =='loc0').show()

+-------------------+-----------+------------+
|         event_date|location_id|temp_celcius|
+-------------------+-----------+------------+
|03/04/2019 19:48:06|       loc0|          29|
|03/04/2019 19:53:06|       loc0|          27|
|03/04/2019 19:58:06|       loc0|          28|
|03/04/2019 20:03:06|       loc0|          30|
|03/04/2019 20:08:06|       loc0|          27|
|03/04/2019 20:13:06|       loc0|          27|
|03/04/2019 20:18:06|       loc0|          27|
|03/04/2019 20:23:06|       loc0|          29|
|03/04/2019 20:28:06|       loc0|          32|
|03/04/2019 20:33:06|       loc0|          35|
|03/04/2019 20:38:06|       loc0|          32|
|03/04/2019 20:43:06|       loc0|          28|
|03/04/2019 20:48:06|       loc0|          28|
|03/04/2019 20:53:06|       loc0|          32|
|03/04/2019 20:58:06|       loc0|          34|
|03/04/2019 21:03:06|       loc0|          33|
|03/04/2019 21:08:06|       loc0|          27|
|03/04/2019 21:13:06|       loc0|          28|
|03/04/2019 2

In [19]:
events.filter(events['location_id'] =='loc0').count()

1000

In [20]:
events.filter(events['location_id'] =='loc1').count()

1000

In [21]:
events.filter(events['location_id'] =='loc0').show()

+-------------------+-----------+------------+
|         event_date|location_id|temp_celcius|
+-------------------+-----------+------------+
|03/04/2019 19:48:06|       loc0|          29|
|03/04/2019 19:53:06|       loc0|          27|
|03/04/2019 19:58:06|       loc0|          28|
|03/04/2019 20:03:06|       loc0|          30|
|03/04/2019 20:08:06|       loc0|          27|
|03/04/2019 20:13:06|       loc0|          27|
|03/04/2019 20:18:06|       loc0|          27|
|03/04/2019 20:23:06|       loc0|          29|
|03/04/2019 20:28:06|       loc0|          32|
|03/04/2019 20:33:06|       loc0|          35|
|03/04/2019 20:38:06|       loc0|          32|
|03/04/2019 20:43:06|       loc0|          28|
|03/04/2019 20:48:06|       loc0|          28|
|03/04/2019 20:53:06|       loc0|          32|
|03/04/2019 20:58:06|       loc0|          34|
|03/04/2019 21:03:06|       loc0|          33|
|03/04/2019 21:08:06|       loc0|          27|
|03/04/2019 21:13:06|       loc0|          28|
|03/04/2019 2

### Basic Aggregations

In [22]:
events.groupBy('location_id').count().show()

+-----------+-----+
|location_id|count|
+-----------+-----+
|     loc196| 1000|
|     loc226| 1000|
|     loc463| 1000|
|     loc150| 1000|
|     loc292| 1000|
|     loc311| 1000|
|      loc22| 1000|
|     loc351| 1000|
|     loc370| 1000|
|     loc419| 1000|
|      loc31| 1000|
|     loc305| 1000|
|      loc82| 1000|
|      loc90| 1000|
|     loc118| 1000|
|     loc195| 1000|
|     loc208| 1000|
|      loc39| 1000|
|      loc75| 1000|
|     loc228| 1000|
+-----------+-----+
only showing top 20 rows



In [23]:
events.orderBy('location_id').show()

+-------------------+-----------+------------+
|         event_date|location_id|temp_celcius|
+-------------------+-----------+------------+
|03/04/2019 21:23:06|       loc0|          28|
|03/04/2019 20:43:06|       loc0|          28|
|03/04/2019 21:18:06|       loc0|          33|
|03/04/2019 20:18:06|       loc0|          27|
|03/04/2019 20:38:06|       loc0|          32|
|03/04/2019 20:58:06|       loc0|          34|
|03/04/2019 21:13:06|       loc0|          28|
|03/04/2019 19:58:06|       loc0|          28|
|03/04/2019 20:13:06|       loc0|          27|
|03/04/2019 20:28:06|       loc0|          32|
|03/04/2019 20:33:06|       loc0|          35|
|03/04/2019 20:48:06|       loc0|          28|
|03/04/2019 20:53:06|       loc0|          32|
|03/04/2019 21:03:06|       loc0|          33|
|03/04/2019 21:08:06|       loc0|          27|
|03/04/2019 19:48:06|       loc0|          29|
|03/04/2019 19:53:06|       loc0|          27|
|03/04/2019 20:03:06|       loc0|          30|
|03/04/2019 2

In [24]:
events.groupBy('location_id').agg({'temp_celcius':'mean'}).show()

+-----------+-----------------+
|location_id|avg(temp_celcius)|
+-----------+-----------------+
|     loc196|           29.225|
|     loc226|           25.306|
|     loc463|           23.317|
|     loc150|           32.188|
|     loc292|           29.159|
|     loc311|           24.308|
|      loc22|           28.251|
|     loc351|           28.194|
|     loc370|            29.14|
|     loc419|           29.141|
|      loc31|           25.196|
|     loc305|           27.314|
|      loc82|           27.355|
|      loc90|           23.216|
|     loc118|           24.219|
|     loc195|            27.25|
|     loc208|           26.206|
|      loc39|           25.199|
|      loc75|           23.209|
|     loc228|           27.295|
+-----------+-----------------+
only showing top 20 rows



In [25]:
sampling = events.sample(fraction=0.1, withReplacement=False)

In [26]:
sampling.groupBy('location_id').agg({'temp_celcius':'mean'}) \
                               .orderBy('location_id').show()

+-----------+------------------+
|location_id| avg(temp_celcius)|
+-----------+------------------+
|       loc0|29.298387096774192|
|       loc1|             28.14|
|      loc10|25.393939393939394|
|     loc100|             27.32|
|     loc101| 25.24175824175824|
|     loc102|29.737864077669904|
|     loc103| 25.13953488372093|
|     loc104|26.346938775510203|
|     loc105|25.870967741935484|
|     loc106| 26.84269662921348|
|     loc107| 33.38775510204081|
|     loc108| 32.12871287128713|
|     loc109|24.339449541284402|
|      loc11| 25.17391304347826|
|     loc110| 26.43243243243243|
|     loc111|31.426966292134832|
|     loc112| 33.26373626373626|
|     loc113|30.565217391304348|
|     loc114| 28.90909090909091|
|     loc115|23.377777777777776|
+-----------+------------------+
only showing top 20 rows



In [27]:
events.groupBy('location_id').agg({'temp_celcius':'mean'}) \
                               .orderBy('location_id').show()

+-----------+-----------------+
|location_id|avg(temp_celcius)|
+-----------+-----------------+
|       loc0|           29.176|
|       loc1|           28.246|
|      loc10|           25.337|
|     loc100|           27.297|
|     loc101|           25.317|
|     loc102|           30.327|
|     loc103|           25.341|
|     loc104|           26.204|
|     loc105|           26.217|
|     loc106|           27.201|
|     loc107|           33.268|
|     loc108|           32.195|
|     loc109|           24.138|
|      loc11|           25.308|
|     loc110|           26.239|
|     loc111|           31.391|
|     loc112|           33.359|
|     loc113|           30.345|
|     loc114|           29.261|
|     loc115|           23.239|
+-----------+-----------------+
only showing top 20 rows



## Spark w/ SQL

In [28]:
df2.createOrReplaceTempView("utilization") # needed to create a table

In [29]:
df_sql = spark.sql('Select * from utilization limit 10')

In [30]:
df_sql.show()

+-------------------+---------+---------------+-----------+-------------+
|     event_datetime|server_id|cpu_utilization|free_memory|session_count|
+-------------------+---------+---------------+-----------+-------------+
|03/05/2019 08:06:14|      100|           0.57|       0.51|           47|
|03/05/2019 08:11:14|      100|           0.47|       0.62|           43|
|03/05/2019 08:16:14|      100|           0.56|       0.57|           62|
|03/05/2019 08:21:14|      100|           0.57|       0.56|           50|
|03/05/2019 08:26:14|      100|           0.35|       0.46|           43|
|03/05/2019 08:31:14|      100|           0.41|       0.58|           48|
|03/05/2019 08:36:14|      100|           0.57|       0.35|           58|
|03/05/2019 08:41:14|      100|           0.41|        0.4|           58|
|03/05/2019 08:46:14|      100|           0.53|       0.35|           62|
|03/05/2019 08:51:14|      100|           0.51|        0.6|           45|
+-------------------+---------+-------

**Pyspark sql commands are the same as regular SQL commands**

In [31]:
df_sql = spark.sql('select server_id as sid,session_count as sc from utilization limit 10')

In [32]:
df_sql.show()

+---+---+
|sid| sc|
+---+---+
|100| 47|
|100| 43|
|100| 62|
|100| 50|
|100| 43|
|100| 48|
|100| 58|
|100| 58|
|100| 62|
|100| 45|
+---+---+



## Handle Duplicates

In [33]:
from pyspark.sql import Row

In [34]:
df_dup = sc.parallelize([Row(server_name = '101 Server', cpu_utilization=85, session_count=80), \
                         Row(server_name = '101 Server', cpu_utilization=80, session_count=90), \
                         Row(server_name = '102 Server', cpu_utilization=85, session_count=80), \
                         Row(server_name = '102 Server', cpu_utilization=85, session_count=80)]).toDF()

In [35]:
df_dup.show()

+-----------+---------------+-------------+
|server_name|cpu_utilization|session_count|
+-----------+---------------+-------------+
| 101 Server|             85|           80|
| 101 Server|             80|           90|
| 102 Server|             85|           80|
| 102 Server|             85|           80|
+-----------+---------------+-------------+



In [36]:
df_dup.drop_duplicates().show() # Only rows that are full duplicates

+-----------+---------------+-------------+
|server_name|cpu_utilization|session_count|
+-----------+---------------+-------------+
| 101 Server|             85|           80|
| 101 Server|             80|           90|
| 102 Server|             85|           80|
+-----------+---------------+-------------+



In [37]:
df_dup.drop_duplicates(['server_name']).show() # Duplicates only on the columns passed

+-----------+---------------+-------------+
|server_name|cpu_utilization|session_count|
+-----------+---------------+-------------+
| 102 Server|             85|           80|
| 101 Server|             85|           80|
+-----------+---------------+-------------+



### Missing Values

In [38]:
from pyspark.sql.functions import lit
from pyspark.sql.types import StringType

In [39]:
df_na = sc.parallelize([Row(server_name = '101 Server', cpu_utilization=85, session_count=80), \
                         Row(server_name = '101 Server', cpu_utilization=80, session_count=90), \
                         Row(server_name = '102 Server', cpu_utilization=85, session_count=40), \
                         Row(server_name = '103 Server', cpu_utilization=70, session_count=80), \
                         Row(server_name = '104 Server', cpu_utilization=60, session_count=80)]).toDF()

In [40]:
df_na.show()

+-----------+---------------+-------------+
|server_name|cpu_utilization|session_count|
+-----------+---------------+-------------+
| 101 Server|             85|           80|
| 101 Server|             80|           90|
| 102 Server|             85|           40|
| 103 Server|             70|           80|
| 104 Server|             60|           80|
+-----------+---------------+-------------+



In [41]:
df_na = df_na.withColumn('na_col', lit(None).cast(StringType()))

In [42]:
df_na.show()

+-----------+---------------+-------------+------+
|server_name|cpu_utilization|session_count|na_col|
+-----------+---------------+-------------+------+
| 101 Server|             85|           80|  null|
| 101 Server|             80|           90|  null|
| 102 Server|             85|           40|  null|
| 103 Server|             70|           80|  null|
| 104 Server|             60|           80|  null|
+-----------+---------------+-------------+------+



In [43]:
df_na.fillna('A').show()

+-----------+---------------+-------------+------+
|server_name|cpu_utilization|session_count|na_col|
+-----------+---------------+-------------+------+
| 101 Server|             85|           80|     A|
| 101 Server|             80|           90|     A|
| 102 Server|             85|           40|     A|
| 103 Server|             70|           80|     A|
| 104 Server|             60|           80|     A|
+-----------+---------------+-------------+------+



### EDA

In [44]:
file_path_no_header = data_path + '/utilization.csv'
util = spark.read.load(file_path_no_header, format='csv', header='false', inferSchema='true')

In [45]:
util = util.withColumnRenamed('_c0','event_datetime') \
         .withColumnRenamed('_c1','server_id') \
         .withColumnRenamed('_c2','cpu_utilization') \
         .withColumnRenamed('_c3','free_memory') \
         .withColumnRenamed('_c4','session_count')

In [46]:
events.show(10)

+-------------------+-----------+------------+
|         event_date|location_id|temp_celcius|
+-------------------+-----------+------------+
|03/04/2019 19:48:06|       loc0|          29|
|03/04/2019 19:53:06|       loc0|          27|
|03/04/2019 19:58:06|       loc0|          28|
|03/04/2019 20:03:06|       loc0|          30|
|03/04/2019 20:08:06|       loc0|          27|
|03/04/2019 20:13:06|       loc0|          27|
|03/04/2019 20:18:06|       loc0|          27|
|03/04/2019 20:23:06|       loc0|          29|
|03/04/2019 20:28:06|       loc0|          32|
|03/04/2019 20:33:06|       loc0|          35|
+-------------------+-----------+------------+
only showing top 10 rows



In [47]:
util.show(10)

+-------------------+---------+---------------+-----------+-------------+
|     event_datetime|server_id|cpu_utilization|free_memory|session_count|
+-------------------+---------+---------------+-----------+-------------+
|03/05/2019 08:06:14|      100|           0.57|       0.51|           47|
|03/05/2019 08:11:14|      100|           0.47|       0.62|           43|
|03/05/2019 08:16:14|      100|           0.56|       0.57|           62|
|03/05/2019 08:21:14|      100|           0.57|       0.56|           50|
|03/05/2019 08:26:14|      100|           0.35|       0.46|           43|
|03/05/2019 08:31:14|      100|           0.41|       0.58|           48|
|03/05/2019 08:36:14|      100|           0.57|       0.35|           58|
|03/05/2019 08:41:14|      100|           0.41|        0.4|           58|
|03/05/2019 08:46:14|      100|           0.53|       0.35|           62|
|03/05/2019 08:51:14|      100|           0.51|        0.6|           45|
+-------------------+---------+-------

In [48]:
util.createOrReplaceTempView('utilization')

In [49]:
util.describe().show()

+-------+-------------------+------------------+-------------------+-------------------+------------------+
|summary|     event_datetime|         server_id|    cpu_utilization|        free_memory|     session_count|
+-------+-------------------+------------------+-------------------+-------------------+------------------+
|  count|             500000|            500000|             500000|             500000|            500000|
|   mean|               null|             124.5| 0.6205177400000115|0.37912809999999625|          69.59616|
| stddev|               null|14.430884120553204|0.15875173872912818|0.15830931278376212|14.850676696352831|
|    min|03/05/2019 08:06:14|               100|               0.22|                0.0|                32|
|    max|04/09/2019 01:22:46|               149|                1.0|               0.78|               105|
+-------+-------------------+------------------+-------------------+-------------------+------------------+



In [50]:
util.stat.corr('cpu_utilization','free_memory')   # correlations

-0.47047715730807216

In [51]:
util.stat.corr('session_count','free_memory')

-0.5008320848876588

In [52]:
util.stat.freqItems(('server_id','session_count')).show()  # most frequent items

+--------------------+-----------------------+
| server_id_freqItems|session_count_freqItems|
+--------------------+-----------------------+
|[146, 137, 101, 1...|   [92, 101, 83, 104...|
+--------------------+-----------------------+



### Machine Learning - KMeans

In [74]:
from pyspark.sql import SparkSession
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans
from pyspark.ml.regression import LinearRegression

In [61]:
v_assembler = VectorAssembler(inputCols=['cpu_utilization','free_memory','session_count'], outputCol='features')

In [62]:
vcluster_df = v_assembler.transform(util)

In [63]:
vcluster_df.show(5)

+-------------------+---------+---------------+-----------+-------------+----------------+
|     event_datetime|server_id|cpu_utilization|free_memory|session_count|        features|
+-------------------+---------+---------------+-----------+-------------+----------------+
|03/05/2019 08:06:14|      100|           0.57|       0.51|           47|[0.57,0.51,47.0]|
|03/05/2019 08:11:14|      100|           0.47|       0.62|           43|[0.47,0.62,43.0]|
|03/05/2019 08:16:14|      100|           0.56|       0.57|           62|[0.56,0.57,62.0]|
|03/05/2019 08:21:14|      100|           0.57|       0.56|           50|[0.57,0.56,50.0]|
|03/05/2019 08:26:14|      100|           0.35|       0.46|           43|[0.35,0.46,43.0]|
+-------------------+---------+---------------+-----------+-------------+----------------+
only showing top 5 rows



The machine learning algorithms in spark expect the features to be in a single vector

In [64]:
kmeans = KMeans().setK(3)

In [65]:
kmeans = kmeans.setSeed(1)

In [67]:
kmodel = kmeans.fit(vcluster_df)

In [68]:
kmodel.clusterCenters()

[array([ 0.71174897,  0.28808911, 86.87510507]),
 array([ 0.61918113,  0.38080285, 68.75004716]),
 array([ 0.51439668,  0.48445202, 50.49452021])]

### Machine Learning - Linear Regression

In [71]:
v_assembler = VectorAssembler(inputCols=['cpu_utilization'], outputCol='features')
vutil = v_assembler.transform(util)

In [73]:
vutil.show(5)

+-------------------+---------+---------------+-----------+-------------+--------+
|     event_datetime|server_id|cpu_utilization|free_memory|session_count|features|
+-------------------+---------+---------------+-----------+-------------+--------+
|03/05/2019 08:06:14|      100|           0.57|       0.51|           47|  [0.57]|
|03/05/2019 08:11:14|      100|           0.47|       0.62|           43|  [0.47]|
|03/05/2019 08:16:14|      100|           0.56|       0.57|           62|  [0.56]|
|03/05/2019 08:21:14|      100|           0.57|       0.56|           50|  [0.57]|
|03/05/2019 08:26:14|      100|           0.35|       0.46|           43|  [0.35]|
+-------------------+---------+---------------+-----------+-------------+--------+
only showing top 5 rows



In [75]:
lr = LinearRegression(featuresCol='features', labelCol='session_count')

In [77]:
lrModel = lr.fit(vutil)

In [78]:
lrModel.coefficients, lrModel.intercept

(DenseVector([47.024]), 40.41695103553606)

In [79]:
lrModel.summary.rootMeanSquaredError

12.837990225931877