In [6]:
import findspark
findspark.init()

In [36]:
import pyspark # only run after findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit
from pyspark.sql.types import StringType

In [8]:
spark = SparkSession.builder.getOrCreate()

In [9]:
# define data path
data_path = "Exercise Files/data"

In [10]:
# load utilization.json
file_path = data_path + "/utlization.json"
df1 = spark.read.format("json").load(file_path)

In [11]:
# show df1
df1.show()

+---------------+-------------------+-----------+---------+-------------+
|cpu_utilization|     event_datetime|free_memory|server_id|session_count|
+---------------+-------------------+-----------+---------+-------------+
|           0.57|03/05/2019 08:06:14|       0.51|      100|           47|
|           0.47|03/05/2019 08:11:14|       0.62|      100|           43|
|           0.56|03/05/2019 08:16:14|       0.57|      100|           62|
|           0.57|03/05/2019 08:21:14|       0.56|      100|           50|
|           0.35|03/05/2019 08:26:14|       0.46|      100|           43|
|           0.41|03/05/2019 08:31:14|       0.58|      100|           48|
|           0.57|03/05/2019 08:36:14|       0.35|      100|           58|
|           0.41|03/05/2019 08:41:14|        0.4|      100|           58|
|           0.53|03/05/2019 08:46:14|       0.35|      100|           62|
|           0.51|03/05/2019 08:51:14|        0.6|      100|           45|
|           0.32|03/05/2019 08:56:14| 

In [30]:
# load location_temp.csv
file_path = data_path + "/location_temp.csv"
df2 = spark.read.format("csv").option("header", "true").load(file_path)

In [31]:
# show df2
df2.show()

+-------------------+-----------+------------+
|         event_date|location_id|temp_celcius|
+-------------------+-----------+------------+
|03/04/2019 19:48:06|       loc0|          29|
|03/04/2019 19:53:06|       loc0|          27|
|03/04/2019 19:58:06|       loc0|          28|
|03/04/2019 20:03:06|       loc0|          30|
|03/04/2019 20:08:06|       loc0|          27|
|03/04/2019 20:13:06|       loc0|          27|
|03/04/2019 20:18:06|       loc0|          27|
|03/04/2019 20:23:06|       loc0|          29|
|03/04/2019 20:28:06|       loc0|          32|
|03/04/2019 20:33:06|       loc0|          35|
|03/04/2019 20:38:06|       loc0|          32|
|03/04/2019 20:43:06|       loc0|          28|
|03/04/2019 20:48:06|       loc0|          28|
|03/04/2019 20:53:06|       loc0|          32|
|03/04/2019 20:58:06|       loc0|          34|
|03/04/2019 21:03:06|       loc0|          33|
|03/04/2019 21:08:06|       loc0|          27|
|03/04/2019 21:13:06|       loc0|          28|
|03/04/2019 2

In [6]:
# list column names
df1.columns

['cpu_utilization',
 'event_datetime',
 'free_memory',
 'server_id',
 'session_count']

In [58]:
# print the schema
df1.printSchema()

root
 |-- cpu_utilization: double (nullable = true)
 |-- event_datetime: string (nullable = true)
 |-- free_memory: double (nullable = true)
 |-- server_id: long (nullable = true)
 |-- session_count: long (nullable = true)



In [None]:
# pandafy a DataFrame
df_pandas = df1.toPandas()
df_pandas

In [60]:
# select specific columns from DataFrame
df_select_cols = df1['server_id', 'session_count']
df_select_cols.show()

+---------+-------------+
|server_id|session_count|
+---------+-------------+
|      100|           47|
|      100|           43|
|      100|           62|
|      100|           50|
|      100|           43|
|      100|           48|
|      100|           58|
|      100|           58|
|      100|           62|
|      100|           45|
|      100|           47|
|      100|           60|
|      100|           57|
|      100|           44|
|      100|           47|
|      100|           66|
|      100|           65|
|      100|           66|
|      100|           42|
|      100|           63|
+---------+-------------+
only showing top 20 rows



In [22]:
# extract sample of data, without replacement
df1_sample = df1.sample(withReplacement=False, fraction=0.1)
df1_sample.count()

49889

In [8]:
# sort data by column
df1_sort = df1.sort("event_datetime")
df1_sort.show()

+---------------+-------------------+-----------+---------+-------------+
|cpu_utilization|     event_datetime|free_memory|server_id|session_count|
+---------------+-------------------+-----------+---------+-------------+
|           0.57|03/05/2019 08:06:14|       0.51|      100|           47|
|           0.86|03/05/2019 08:06:16|       0.09|      101|           93|
|           0.61|03/05/2019 08:06:17|       0.12|      102|           71|
|           0.64|03/05/2019 08:06:19|       0.32|      103|           96|
|           0.84|03/05/2019 08:06:21|       0.36|      104|           94|
|           0.54|03/05/2019 08:06:23|       0.38|      105|           74|
|            0.4|03/05/2019 08:06:24|       0.39|      106|           43|
|           0.64|03/05/2019 08:06:26|       0.55|      107|           56|
|           0.62|03/05/2019 08:06:28|       0.41|      108|           86|
|            0.7|03/05/2019 08:06:29|       0.59|      109|           77|
|           0.68|03/05/2019 08:06:31| 

In [13]:
# filter data by column
df2_filter = df2.filter(df2["location_id"] == 'loc1')
df2_filter.show()

+-------------------+-----------+------------+
|         event_date|location_id|temp_celcius|
+-------------------+-----------+------------+
|03/04/2019 19:48:06|       loc1|          31|
|03/04/2019 19:53:06|       loc1|          26|
|03/04/2019 19:58:06|       loc1|          31|
|03/04/2019 20:03:06|       loc1|          26|
|03/04/2019 20:08:06|       loc1|          28|
|03/04/2019 20:13:06|       loc1|          27|
|03/04/2019 20:18:06|       loc1|          30|
|03/04/2019 20:23:06|       loc1|          28|
|03/04/2019 20:28:06|       loc1|          28|
|03/04/2019 20:33:06|       loc1|          27|
|03/04/2019 20:38:06|       loc1|          30|
|03/04/2019 20:43:06|       loc1|          32|
|03/04/2019 20:48:06|       loc1|          26|
|03/04/2019 20:53:06|       loc1|          30|
|03/04/2019 20:58:06|       loc1|          26|
|03/04/2019 21:03:06|       loc1|          28|
|03/04/2019 21:08:06|       loc1|          27|
|03/04/2019 21:13:06|       loc1|          28|
|03/04/2019 2

In [14]:
# aggregate data and get count
df2_agg_count = df2.groupBy("location_id").count()
df2_agg_count.show()

+-----------+-----+
|location_id|count|
+-----------+-----+
|      loc22| 1000|
|      loc31| 1000|
|      loc82| 1000|
|      loc90| 1000|
|     loc118| 1000|
|      loc39| 1000|
|      loc75| 1000|
|     loc122| 1000|
|      loc24| 1000|
|      loc30| 1000|
|     loc105| 1000|
|      loc96| 1000|
|     loc102| 1000|
|      loc18| 1000|
|      loc27| 1000|
|     loc143| 1000|
|      loc43| 1000|
|     loc123| 1000|
|      loc15| 1000|
|      loc48| 1000|
+-----------+-----+
only showing top 20 rows



In [16]:
# aggregate data and get count, with ordering
df2_agg_count = df2.groupBy("location_id").count().orderBy("location_id")
df2_agg_count.show()

+-----------+-----+
|location_id|count|
+-----------+-----+
|       loc0| 1000|
|       loc1| 1000|
|      loc10| 1000|
|     loc100| 1000|
|     loc101| 1000|
|     loc102| 1000|
|     loc103| 1000|
|     loc104| 1000|
|     loc105| 1000|
|     loc106| 1000|
|     loc107| 1000|
|     loc108| 1000|
|     loc109| 1000|
|      loc11| 1000|
|     loc110| 1000|
|     loc111| 1000|
|     loc112| 1000|
|     loc113| 1000|
|     loc114| 1000|
|     loc115| 1000|
+-----------+-----+
only showing top 20 rows



In [18]:
# aggregate data and get mean
df2_agg_mean = df2.groupBy("location_id").agg({"temp_celcius": "mean"})
df2_agg_mean.show()

+-----------+-----------------+
|location_id|avg(temp_celcius)|
+-----------+-----------------+
|      loc22|           28.251|
|      loc31|           25.196|
|      loc82|           27.355|
|      loc90|           23.216|
|     loc118|           24.219|
|      loc39|           25.199|
|      loc75|           23.209|
|     loc122|            32.36|
|      loc24|           31.109|
|      loc30|           30.211|
|     loc105|           26.217|
|      loc96|           28.138|
|     loc102|           30.327|
|      loc18|           30.198|
|      loc27|           31.239|
|     loc143|           28.213|
|      loc43|           32.196|
|     loc123|           23.424|
|      loc15|           32.171|
|      loc48|           30.244|
+-----------+-----------------+
only showing top 20 rows



In [19]:
# aggregate data and get mean
df2_agg_mean = df2.groupBy("location_id").agg({"temp_celcius": "max"})
df2_agg_mean.show()

+-----------+-----------------+
|location_id|max(temp_celcius)|
+-----------+-----------------+
|       loc0|               36|
|       loc1|               35|
|      loc10|               32|
|     loc100|               34|
|     loc101|               32|
|     loc102|               37|
|     loc103|               32|
|     loc104|               33|
|     loc105|               33|
|     loc106|               34|
|     loc107|               40|
|     loc108|               39|
|     loc109|               31|
|      loc11|               32|
|     loc110|               33|
|     loc111|               38|
|     loc112|               40|
|     loc113|               37|
|     loc114|               36|
|     loc115|               30|
+-----------+-----------------+
only showing top 20 rows



In [23]:
# aggregate data and get count, using agg()
df2_agg_count = df2.groupBy("location_id").agg({"*": "count"}).orderBy("location_id")
df2_agg_count.show()

+-----------+--------+
|location_id|count(1)|
+-----------+--------+
|       loc0|    1000|
|       loc1|    1000|
|      loc10|    1000|
|     loc100|    1000|
|     loc101|    1000|
|     loc102|    1000|
|     loc103|    1000|
|     loc104|    1000|
|     loc105|    1000|
|     loc106|    1000|
|     loc107|    1000|
|     loc108|    1000|
|     loc109|    1000|
|      loc11|    1000|
|     loc110|    1000|
|     loc111|    1000|
|     loc112|    1000|
|     loc113|    1000|
|     loc114|    1000|
|     loc115|    1000|
+-----------+--------+
only showing top 20 rows



In [32]:
# drop duplicate rows
df_no_dups = df2.drop_duplicates()
df_no_dups.show()

+-------------------+-----------+------------+
|         event_date|location_id|temp_celcius|
+-------------------+-----------+------------+
|03/05/2019 02:33:06|       loc0|          29|
|03/05/2019 04:28:06|       loc0|          32|
|03/05/2019 07:18:06|       loc0|          29|
|03/05/2019 20:03:06|       loc0|          29|
|03/06/2019 21:48:06|       loc0|          32|
|03/07/2019 16:18:06|       loc0|          28|
|03/05/2019 07:38:06|       loc1|          29|
|03/05/2019 08:33:06|       loc1|          29|
|03/06/2019 12:08:06|       loc1|          31|
|03/07/2019 09:18:06|       loc1|          31|
|03/07/2019 13:03:06|       loc1|          28|
|03/07/2019 21:18:06|       loc1|          32|
|03/04/2019 21:13:06|       loc2|          28|
|03/06/2019 20:18:06|       loc2|          31|
|03/07/2019 23:03:06|       loc2|          30|
|03/04/2019 21:38:06|       loc3|          25|
|03/06/2019 07:43:06|       loc3|          22|
|03/06/2019 16:13:06|       loc3|          23|
|03/06/2019 2

In [34]:
# drop rows based on duplicate values of chosen column
df_no_dups = df2.drop_duplicates(["location_id"])
df_no_dups.show()

+-------------------+-----------+------------+
|         event_date|location_id|temp_celcius|
+-------------------+-----------+------------+
|03/04/2019 19:48:06|       loc0|          29|
|03/04/2019 19:48:06|       loc1|          31|
|03/04/2019 19:48:08|      loc10|          26|
|03/04/2019 19:48:25|     loc100|          32|
|03/04/2019 19:48:25|     loc101|          24|
|03/04/2019 19:48:25|     loc102|          29|
|03/04/2019 19:48:25|     loc103|          23|
|03/04/2019 19:48:26|     loc104|          24|
|03/04/2019 19:48:26|     loc105|          30|
|03/04/2019 19:48:26|     loc106|          25|
|03/04/2019 19:48:26|     loc107|          35|
|03/04/2019 19:48:26|     loc108|          39|
|03/04/2019 19:48:27|     loc109|          24|
|03/04/2019 19:48:08|      loc11|          28|
|03/04/2019 19:48:27|     loc110|          26|
|03/04/2019 19:48:27|     loc111|          35|
|03/04/2019 19:48:27|     loc112|          32|
|03/04/2019 19:48:27|     loc113|          30|
|03/04/2019 1

In [63]:
# add column to DataFrame 1
df_new_col = df1.withColumn("new_col", df1['session_count']/2)
df_new_col.show()

+---------------+-------------------+-----------+---------+-------------+-------+
|cpu_utilization|     event_datetime|free_memory|server_id|session_count|new_col|
+---------------+-------------------+-----------+---------+-------------+-------+
|           0.57|03/05/2019 08:06:14|       0.51|      100|           47|   23.5|
|           0.47|03/05/2019 08:11:14|       0.62|      100|           43|   21.5|
|           0.56|03/05/2019 08:16:14|       0.57|      100|           62|   31.0|
|           0.57|03/05/2019 08:21:14|       0.56|      100|           50|   25.0|
|           0.35|03/05/2019 08:26:14|       0.46|      100|           43|   21.5|
|           0.41|03/05/2019 08:31:14|       0.58|      100|           48|   24.0|
|           0.57|03/05/2019 08:36:14|       0.35|      100|           58|   29.0|
|           0.41|03/05/2019 08:41:14|        0.4|      100|           58|   29.0|
|           0.53|03/05/2019 08:46:14|       0.35|      100|           62|   31.0|
|           0.51

In [47]:
# add column to DataFrame 2
df_na = df1.withColumn("na_col", lit(None).cast(StringType()))
df_na.show()

+---------------+-------------------+-----------+---------+-------------+------+
|cpu_utilization|     event_datetime|free_memory|server_id|session_count|na_col|
+---------------+-------------------+-----------+---------+-------------+------+
|           0.57|03/05/2019 08:06:14|       0.51|      100|           47|  null|
|           0.47|03/05/2019 08:11:14|       0.62|      100|           43|  null|
|           0.56|03/05/2019 08:16:14|       0.57|      100|           62|  null|
|           0.57|03/05/2019 08:21:14|       0.56|      100|           50|  null|
|           0.35|03/05/2019 08:26:14|       0.46|      100|           43|  null|
|           0.41|03/05/2019 08:31:14|       0.58|      100|           48|  null|
|           0.57|03/05/2019 08:36:14|       0.35|      100|           58|  null|
|           0.41|03/05/2019 08:41:14|        0.4|      100|           58|  null|
|           0.53|03/05/2019 08:46:14|       0.35|      100|           62|  null|
|           0.51|03/05/2019 

In [48]:
# replace nulls with a given value
df_no_na = df_na.fillna("A")
df_no_na.show()

+---------------+-------------------+-----------+---------+-------------+------+
|cpu_utilization|     event_datetime|free_memory|server_id|session_count|na_col|
+---------------+-------------------+-----------+---------+-------------+------+
|           0.57|03/05/2019 08:06:14|       0.51|      100|           47|     A|
|           0.47|03/05/2019 08:11:14|       0.62|      100|           43|     A|
|           0.56|03/05/2019 08:16:14|       0.57|      100|           62|     A|
|           0.57|03/05/2019 08:21:14|       0.56|      100|           50|     A|
|           0.35|03/05/2019 08:26:14|       0.46|      100|           43|     A|
|           0.41|03/05/2019 08:31:14|       0.58|      100|           48|     A|
|           0.57|03/05/2019 08:36:14|       0.35|      100|           58|     A|
|           0.41|03/05/2019 08:41:14|        0.4|      100|           58|     A|
|           0.53|03/05/2019 08:46:14|       0.35|      100|           62|     A|
|           0.51|03/05/2019 

In [54]:
# concatenate similar DataFrames
df_na = df_no_na.union(df_na)
df_na.show()

+---------------+-------------------+-----------+---------+-------------+------+
|cpu_utilization|     event_datetime|free_memory|server_id|session_count|na_col|
+---------------+-------------------+-----------+---------+-------------+------+
|           0.57|03/05/2019 08:06:14|       0.51|      100|           47|     A|
|           0.47|03/05/2019 08:11:14|       0.62|      100|           43|     A|
|           0.56|03/05/2019 08:16:14|       0.57|      100|           62|     A|
|           0.57|03/05/2019 08:21:14|       0.56|      100|           50|     A|
|           0.35|03/05/2019 08:26:14|       0.46|      100|           43|     A|
|           0.41|03/05/2019 08:31:14|       0.58|      100|           48|     A|
|           0.57|03/05/2019 08:36:14|       0.35|      100|           58|     A|
|           0.41|03/05/2019 08:41:14|        0.4|      100|           58|     A|
|           0.53|03/05/2019 08:46:14|       0.35|      100|           62|     A|
|           0.51|03/05/2019 

In [56]:
# drop rows with na values
df_no_na = df_na.na.drop()
df_no_na.show()

1000000

In [None]:
# save DataFrame as CSV
df1.write.csv("df1.csv")

In [None]:
# save DataFrame as CSV
df1.write.json("df1.json")