In [108]:
from pyspark.sql import SparkSession
import configparser

## Configuration

In [111]:
config = configparser.ConfigParser()
config.read('config.ini')

['config.ini']

In [115]:
data_path = config['PATH']['path']

In [62]:
# create spark session
spark = SparkSession.builder.getOrCreate()

## CSV

In [113]:
# import location_temp dataset
file_path = data_path + "/location_temp.csv"
df1 = spark.read.format("csv").option("header", "true").load(file_path)

In [114]:
df1.head(10)

[Row(event_date='03/04/2019 19:48:06', location_id='loc0', temp_celcius='29'),
 Row(event_date='03/04/2019 19:53:06', location_id='loc0', temp_celcius='27'),
 Row(event_date='03/04/2019 19:58:06', location_id='loc0', temp_celcius='28'),
 Row(event_date='03/04/2019 20:03:06', location_id='loc0', temp_celcius='30'),
 Row(event_date='03/04/2019 20:08:06', location_id='loc0', temp_celcius='27'),
 Row(event_date='03/04/2019 20:13:06', location_id='loc0', temp_celcius='27'),
 Row(event_date='03/04/2019 20:18:06', location_id='loc0', temp_celcius='27'),
 Row(event_date='03/04/2019 20:23:06', location_id='loc0', temp_celcius='29'),
 Row(event_date='03/04/2019 20:28:06', location_id='loc0', temp_celcius='32'),
 Row(event_date='03/04/2019 20:33:06', location_id='loc0', temp_celcius='35')]

In [66]:
df1.show()

+-------------------+-----------+------------+
|         event_date|location_id|temp_celcius|
+-------------------+-----------+------------+
|03/04/2019 19:48:06|       loc0|          29|
|03/04/2019 19:53:06|       loc0|          27|
|03/04/2019 19:58:06|       loc0|          28|
|03/04/2019 20:03:06|       loc0|          30|
|03/04/2019 20:08:06|       loc0|          27|
|03/04/2019 20:13:06|       loc0|          27|
|03/04/2019 20:18:06|       loc0|          27|
|03/04/2019 20:23:06|       loc0|          29|
|03/04/2019 20:28:06|       loc0|          32|
|03/04/2019 20:33:06|       loc0|          35|
|03/04/2019 20:38:06|       loc0|          32|
|03/04/2019 20:43:06|       loc0|          28|
|03/04/2019 20:48:06|       loc0|          28|
|03/04/2019 20:53:06|       loc0|          32|
|03/04/2019 20:58:06|       loc0|          34|
|03/04/2019 21:03:06|       loc0|          33|
|03/04/2019 21:08:06|       loc0|          27|
|03/04/2019 21:13:06|       loc0|          28|
|03/04/2019 2

In [67]:
# number of rows
df1.count()

500000

In [68]:
file_path_no_header = data_path + '/utilization.csv'

# since there are no headers in the file, indicate false for header and use inferSchema
# to let spark guess the schema
df2 = spark.read.format("csv").option("header", "false").option('inferSchema', 'true').load(file_path_no_header)

In [69]:
df2.show()

+-------------------+---+----+----+---+
|                _c0|_c1| _c2| _c3|_c4|
+-------------------+---+----+----+---+
|03/05/2019 08:06:14|100|0.57|0.51| 47|
|03/05/2019 08:11:14|100|0.47|0.62| 43|
|03/05/2019 08:16:14|100|0.56|0.57| 62|
|03/05/2019 08:21:14|100|0.57|0.56| 50|
|03/05/2019 08:26:14|100|0.35|0.46| 43|
|03/05/2019 08:31:14|100|0.41|0.58| 48|
|03/05/2019 08:36:14|100|0.57|0.35| 58|
|03/05/2019 08:41:14|100|0.41| 0.4| 58|
|03/05/2019 08:46:14|100|0.53|0.35| 62|
|03/05/2019 08:51:14|100|0.51| 0.6| 45|
|03/05/2019 08:56:14|100|0.32|0.37| 47|
|03/05/2019 09:01:14|100|0.62|0.59| 60|
|03/05/2019 09:06:14|100|0.66|0.72| 57|
|03/05/2019 09:11:14|100|0.54|0.54| 44|
|03/05/2019 09:16:14|100|0.29| 0.4| 47|
|03/05/2019 09:21:14|100|0.43|0.68| 66|
|03/05/2019 09:26:14|100|0.49|0.66| 65|
|03/05/2019 09:31:14|100|0.64|0.55| 66|
|03/05/2019 09:36:14|100|0.42| 0.6| 42|
|03/05/2019 09:41:14|100|0.55|0.59| 63|
+-------------------+---+----+----+---+
only showing top 20 rows



In [70]:
df2.count()

500000

In [71]:
# rename columns
df2 = df2.withColumnRenamed("_c0", "event_datetime") \
        .withColumnRenamed("_c1", "server_id") \
        .withColumnRenamed("_c2", "cpu_utilization") \
        .withColumnRenamed("_c3", "free_memory") \
        .withColumnRenamed("_c4", "session_count")

In [72]:
df2.show()

+-------------------+---------+---------------+-----------+-------------+
|     event_datetime|server_id|cpu_utilization|free_memory|session_count|
+-------------------+---------+---------------+-----------+-------------+
|03/05/2019 08:06:14|      100|           0.57|       0.51|           47|
|03/05/2019 08:11:14|      100|           0.47|       0.62|           43|
|03/05/2019 08:16:14|      100|           0.56|       0.57|           62|
|03/05/2019 08:21:14|      100|           0.57|       0.56|           50|
|03/05/2019 08:26:14|      100|           0.35|       0.46|           43|
|03/05/2019 08:31:14|      100|           0.41|       0.58|           48|
|03/05/2019 08:36:14|      100|           0.57|       0.35|           58|
|03/05/2019 08:41:14|      100|           0.41|        0.4|           58|
|03/05/2019 08:46:14|      100|           0.53|       0.35|           62|
|03/05/2019 08:51:14|      100|           0.51|        0.6|           45|
|03/05/2019 08:56:14|      100|       

## JSON

In [73]:
json_df1_path = data_path + "/utlization.json"
df1_json = spark.read.format("json").load(json_df1_path)

In [74]:
df1_json.count()

500000

In [75]:
df1_json.show()

+---------------+-------------------+-----------+---------+-------------+
|cpu_utilization|     event_datetime|free_memory|server_id|session_count|
+---------------+-------------------+-----------+---------+-------------+
|           0.57|03/05/2019 08:06:14|       0.51|      100|           47|
|           0.47|03/05/2019 08:11:14|       0.62|      100|           43|
|           0.56|03/05/2019 08:16:14|       0.57|      100|           62|
|           0.57|03/05/2019 08:21:14|       0.56|      100|           50|
|           0.35|03/05/2019 08:26:14|       0.46|      100|           43|
|           0.41|03/05/2019 08:31:14|       0.58|      100|           48|
|           0.57|03/05/2019 08:36:14|       0.35|      100|           58|
|           0.41|03/05/2019 08:41:14|        0.4|      100|           58|
|           0.53|03/05/2019 08:46:14|       0.35|      100|           62|
|           0.51|03/05/2019 08:51:14|        0.6|      100|           45|
|           0.32|03/05/2019 08:56:14| 

## Working with the data

In [76]:
df2.columns

['event_datetime',
 'server_id',
 'cpu_utilization',
 'free_memory',
 'session_count']

In [77]:
df1.columns

['event_date', 'location_id', 'temp_celcius']

In [78]:
# working with a subset/sample of data
# sample with or without replacement
# use without replacement so that rows that are pulled from the main df are not put back
# to prevent pulling the same rows again when pulling another sample
df2_sample = df2.sample(False, fraction=0.1)

In [79]:
df2_sample.show()

+-------------------+---------+---------------+-----------+-------------+
|     event_datetime|server_id|cpu_utilization|free_memory|session_count|
+-------------------+---------+---------------+-----------+-------------+
|03/05/2019 08:46:14|      100|           0.53|       0.35|           62|
|03/05/2019 09:51:14|      100|           0.61|       0.34|           70|
|03/05/2019 10:01:14|      100|           0.62|       0.49|           37|
|03/05/2019 10:16:14|      100|           0.62|       0.69|           56|
|03/05/2019 11:46:14|      100|           0.65|       0.68|           59|
|03/05/2019 12:21:14|      100|           0.65|       0.37|           65|
|03/05/2019 12:36:14|      100|           0.63|       0.65|           45|
|03/05/2019 12:56:14|      100|           0.48|       0.38|           44|
|03/05/2019 15:01:14|      100|           0.39|       0.52|           48|
|03/05/2019 15:11:14|      100|           0.58|       0.54|           49|
|03/05/2019 16:26:14|      100|       

In [80]:
# sorting the sample data
df2_sort = df2_sample.sort('event_datetime')

In [81]:
df2_sort.show()

+-------------------+---------+---------------+-----------+-------------+
|     event_datetime|server_id|cpu_utilization|free_memory|session_count|
+-------------------+---------+---------------+-----------+-------------+
|03/05/2019 08:06:24|      106|            0.4|       0.39|           43|
|03/05/2019 08:07:21|      138|           0.24|       0.68|           59|
|03/05/2019 08:07:39|      147|           0.39|       0.63|           49|
|03/05/2019 08:07:44|      149|           0.74|       0.27|           66|
|03/05/2019 08:16:19|      103|           0.61|       0.27|           88|
|03/05/2019 08:16:21|      104|           0.57|       0.19|           90|
|03/05/2019 08:16:53|      123|           0.73|       0.46|           81|
|03/05/2019 08:16:56|      125|           0.41|       0.45|           53|
|03/05/2019 08:17:29|      142|           0.79|       0.49|           84|
|03/05/2019 08:17:33|      144|           0.65|        0.3|           86|
|03/05/2019 08:17:37|      146|       

## Use dataframe API to filter some of the rows in dataframe

In [82]:
df1.filter(df1["location_id"]=="loc0").show()

+-------------------+-----------+------------+
|         event_date|location_id|temp_celcius|
+-------------------+-----------+------------+
|03/04/2019 19:48:06|       loc0|          29|
|03/04/2019 19:53:06|       loc0|          27|
|03/04/2019 19:58:06|       loc0|          28|
|03/04/2019 20:03:06|       loc0|          30|
|03/04/2019 20:08:06|       loc0|          27|
|03/04/2019 20:13:06|       loc0|          27|
|03/04/2019 20:18:06|       loc0|          27|
|03/04/2019 20:23:06|       loc0|          29|
|03/04/2019 20:28:06|       loc0|          32|
|03/04/2019 20:33:06|       loc0|          35|
|03/04/2019 20:38:06|       loc0|          32|
|03/04/2019 20:43:06|       loc0|          28|
|03/04/2019 20:48:06|       loc0|          28|
|03/04/2019 20:53:06|       loc0|          32|
|03/04/2019 20:58:06|       loc0|          34|
|03/04/2019 21:03:06|       loc0|          33|
|03/04/2019 21:08:06|       loc0|          27|
|03/04/2019 21:13:06|       loc0|          28|
|03/04/2019 2

In [83]:
df1.filter(df1["location_id"]=="loc0").count()

1000

In [84]:
df1.filter(df1["location_id"]=="loc1").count()

1000

In [85]:
df1.filter(df1["location_id"]=="loc1").show()

+-------------------+-----------+------------+
|         event_date|location_id|temp_celcius|
+-------------------+-----------+------------+
|03/04/2019 19:48:06|       loc1|          31|
|03/04/2019 19:53:06|       loc1|          26|
|03/04/2019 19:58:06|       loc1|          31|
|03/04/2019 20:03:06|       loc1|          26|
|03/04/2019 20:08:06|       loc1|          28|
|03/04/2019 20:13:06|       loc1|          27|
|03/04/2019 20:18:06|       loc1|          30|
|03/04/2019 20:23:06|       loc1|          28|
|03/04/2019 20:28:06|       loc1|          28|
|03/04/2019 20:33:06|       loc1|          27|
|03/04/2019 20:38:06|       loc1|          30|
|03/04/2019 20:43:06|       loc1|          32|
|03/04/2019 20:48:06|       loc1|          26|
|03/04/2019 20:53:06|       loc1|          30|
|03/04/2019 20:58:06|       loc1|          26|
|03/04/2019 21:03:06|       loc1|          28|
|03/04/2019 21:08:06|       loc1|          27|
|03/04/2019 21:13:06|       loc1|          28|
|03/04/2019 2

## Aggregating using dataframe API

In [86]:
df1.show()

+-------------------+-----------+------------+
|         event_date|location_id|temp_celcius|
+-------------------+-----------+------------+
|03/04/2019 19:48:06|       loc0|          29|
|03/04/2019 19:53:06|       loc0|          27|
|03/04/2019 19:58:06|       loc0|          28|
|03/04/2019 20:03:06|       loc0|          30|
|03/04/2019 20:08:06|       loc0|          27|
|03/04/2019 20:13:06|       loc0|          27|
|03/04/2019 20:18:06|       loc0|          27|
|03/04/2019 20:23:06|       loc0|          29|
|03/04/2019 20:28:06|       loc0|          32|
|03/04/2019 20:33:06|       loc0|          35|
|03/04/2019 20:38:06|       loc0|          32|
|03/04/2019 20:43:06|       loc0|          28|
|03/04/2019 20:48:06|       loc0|          28|
|03/04/2019 20:53:06|       loc0|          32|
|03/04/2019 20:58:06|       loc0|          34|
|03/04/2019 21:03:06|       loc0|          33|
|03/04/2019 21:08:06|       loc0|          27|
|03/04/2019 21:13:06|       loc0|          28|
|03/04/2019 2

In [87]:
# count how many measurements for each location in the data
df1.groupBy("location_id").count().show()

+-----------+-----+
|location_id|count|
+-----------+-----+
|      loc22| 1000|
|      loc31| 1000|
|      loc82| 1000|
|      loc90| 1000|
|     loc118| 1000|
|      loc39| 1000|
|      loc75| 1000|
|     loc122| 1000|
|      loc24| 1000|
|      loc30| 1000|
|     loc105| 1000|
|      loc96| 1000|
|     loc102| 1000|
|      loc18| 1000|
|      loc27| 1000|
|     loc143| 1000|
|      loc43| 1000|
|     loc123| 1000|
|      loc15| 1000|
|      loc48| 1000|
+-----------+-----+
only showing top 20 rows



In [88]:
# order the data
df1.orderBy("location_id").show()

+-------------------+-----------+------------+
|         event_date|location_id|temp_celcius|
+-------------------+-----------+------------+
|03/04/2019 21:23:06|       loc0|          28|
|03/04/2019 20:43:06|       loc0|          28|
|03/04/2019 21:18:06|       loc0|          33|
|03/04/2019 20:18:06|       loc0|          27|
|03/04/2019 20:38:06|       loc0|          32|
|03/04/2019 20:58:06|       loc0|          34|
|03/04/2019 21:13:06|       loc0|          28|
|03/04/2019 19:58:06|       loc0|          28|
|03/04/2019 20:13:06|       loc0|          27|
|03/04/2019 20:28:06|       loc0|          32|
|03/04/2019 20:33:06|       loc0|          35|
|03/04/2019 20:48:06|       loc0|          28|
|03/04/2019 20:53:06|       loc0|          32|
|03/04/2019 21:03:06|       loc0|          33|
|03/04/2019 21:08:06|       loc0|          27|
|03/04/2019 19:48:06|       loc0|          29|
|03/04/2019 19:53:06|       loc0|          27|
|03/04/2019 20:03:06|       loc0|          30|
|03/04/2019 2

### Using aggregate functions
- use .agg(), and pas a key-value pair to specify the column name and the aggregate function
- e.g. .agg({"col1" : "mean"})

In [89]:
# find average temp by location
df1.groupBy("location_id").agg({"temp_celcius":"mean"}).show()

+-----------+-----------------+
|location_id|avg(temp_celcius)|
+-----------+-----------------+
|      loc22|           28.251|
|      loc31|           25.196|
|      loc82|           27.355|
|      loc90|           23.216|
|     loc118|           24.219|
|      loc39|           25.199|
|      loc75|           23.209|
|     loc122|            32.36|
|      loc24|           31.109|
|      loc30|           30.211|
|     loc105|           26.217|
|      loc96|           28.138|
|     loc102|           30.327|
|      loc18|           30.198|
|      loc27|           31.239|
|     loc143|           28.213|
|      loc43|           32.196|
|     loc123|           23.424|
|      loc15|           32.171|
|      loc48|           30.244|
+-----------+-----------------+
only showing top 20 rows



In [90]:
# find max temp in each location
df1.groupBy("location_id").agg({"temp_celcius":"max"}).show()

+-----------+-----------------+
|location_id|max(temp_celcius)|
+-----------+-----------------+
|       loc0|               36|
|       loc1|               35|
|      loc10|               32|
|     loc100|               34|
|     loc101|               32|
|     loc102|               37|
|     loc103|               32|
|     loc104|               33|
|     loc105|               33|
|     loc106|               34|
|     loc107|               40|
|     loc108|               39|
|     loc109|               31|
|      loc11|               32|
|     loc110|               33|
|     loc111|               38|
|     loc112|               40|
|     loc113|               37|
|     loc114|               36|
|     loc115|               30|
+-----------+-----------------+
only showing top 20 rows



## Sampling

In [91]:
df1.count()

500000

In [92]:
# take a randomly selected sample/subset of df1
df1_s1 = df1.sample(fraction=0.1, withReplacement=False)

In [93]:
df1_s1.count()

50003

In [94]:
df1_s1.groupBy("location_id").agg({"temp_celcius":"mean"}).show()

+-----------+------------------+
|location_id| avg(temp_celcius)|
+-----------+------------------+
|      loc22|28.342105263157894|
|      loc31|25.347826086956523|
|      loc82|27.515463917525775|
|      loc90|22.903225806451612|
|     loc118|24.480392156862745|
|      loc39|25.376146788990827|
|      loc75| 23.38372093023256|
|     loc122|32.216981132075475|
|      loc24|30.785046728971963|
|      loc30|29.990654205607477|
|     loc105| 26.48148148148148|
|      loc96|27.933962264150942|
|     loc102|30.495495495495497|
|      loc18| 29.87962962962963|
|      loc27|31.021505376344088|
|     loc143|28.367924528301888|
|      loc43|             32.16|
|     loc123|23.861702127659573|
|      loc15| 32.13095238095238|
|      loc48|30.258928571428573|
+-----------+------------------+
only showing top 20 rows



In [95]:
# order the above data
df1_s1.groupBy("location_id").agg({"temp_celcius":"mean"}).orderBy("location_id").show()

+-----------+------------------+
|location_id| avg(temp_celcius)|
+-----------+------------------+
|       loc0|29.528301886792452|
|       loc1|28.089887640449437|
|      loc10|25.528846153846153|
|     loc100| 27.36842105263158|
|     loc101|25.217391304347824|
|     loc102|30.495495495495497|
|     loc103|             25.32|
|     loc104|26.276785714285715|
|     loc105| 26.48148148148148|
|     loc106| 27.19607843137255|
|     loc107| 32.62376237623762|
|     loc108| 32.15151515151515|
|     loc109|              24.1|
|      loc11|25.037037037037038|
|     loc110|26.336734693877553|
|     loc111|31.279661016949152|
|     loc112|33.326530612244895|
|     loc113|30.320388349514563|
|     loc114|              28.8|
|     loc115| 23.29824561403509|
+-----------+------------------+
only showing top 20 rows



In [96]:
# compare to original data
df1.groupBy("location_id").agg({"temp_celcius":"mean"}).orderBy("location_id").show()

+-----------+-----------------+
|location_id|avg(temp_celcius)|
+-----------+-----------------+
|       loc0|           29.176|
|       loc1|           28.246|
|      loc10|           25.337|
|     loc100|           27.297|
|     loc101|           25.317|
|     loc102|           30.327|
|     loc103|           25.341|
|     loc104|           26.204|
|     loc105|           26.217|
|     loc106|           27.201|
|     loc107|           33.268|
|     loc108|           32.195|
|     loc109|           24.138|
|      loc11|           25.308|
|     loc110|           26.239|
|     loc111|           31.391|
|     loc112|           33.359|
|     loc113|           30.345|
|     loc114|           29.261|
|     loc115|           23.239|
+-----------+-----------------+
only showing top 20 rows



## Save the results

In [98]:
df1.write.csv('df1.csv')

In [99]:
! ls df1.csv

_SUCCESS
part-00000-97f342b9-7f2e-4809-b9db-b0b0afe8ae1d-c000.csv
part-00001-97f342b9-7f2e-4809-b9db-b0b0afe8ae1d-c000.csv
part-00002-97f342b9-7f2e-4809-b9db-b0b0afe8ae1d-c000.csv
part-00003-97f342b9-7f2e-4809-b9db-b0b0afe8ae1d-c000.csv


- the results are not one single csv file, it is a directory with 4 files
- Spark breaks up the data into partitions, in this case it was 4 partitions
- each partition is its own file

In [100]:
# look at contents of one of the files
# head command shows top few rows of file

! head df1.csv/part-00000-97f342b9-7f2e-4809-b9db-b0b0afe8ae1d-c000.csv

03/04/2019 19:48:06,loc0,29
03/04/2019 19:53:06,loc0,27
03/04/2019 19:58:06,loc0,28
03/04/2019 20:03:06,loc0,30
03/04/2019 20:08:06,loc0,27
03/04/2019 20:13:06,loc0,27
03/04/2019 20:18:06,loc0,27
03/04/2019 20:23:06,loc0,29
03/04/2019 20:28:06,loc0,32
03/04/2019 20:33:06,loc0,35


In [102]:
# save to json format

df1.write.json('df1.json')

In [103]:
! ls df1.json

_SUCCESS
part-00000-7fdb0b84-2e87-4279-a8ac-d20f9c3d1300-c000.json
part-00001-7fdb0b84-2e87-4279-a8ac-d20f9c3d1300-c000.json
part-00002-7fdb0b84-2e87-4279-a8ac-d20f9c3d1300-c000.json
part-00003-7fdb0b84-2e87-4279-a8ac-d20f9c3d1300-c000.json


In [106]:
! head df1.json/part-00000-7fdb0b84-2e87-4279-a8ac-d20f9c3d1300-c000.json

{"event_date":"03/04/2019 19:48:06","location_id":"loc0","temp_celcius":"29"}
{"event_date":"03/04/2019 19:53:06","location_id":"loc0","temp_celcius":"27"}
{"event_date":"03/04/2019 19:58:06","location_id":"loc0","temp_celcius":"28"}
{"event_date":"03/04/2019 20:03:06","location_id":"loc0","temp_celcius":"30"}
{"event_date":"03/04/2019 20:08:06","location_id":"loc0","temp_celcius":"27"}
{"event_date":"03/04/2019 20:13:06","location_id":"loc0","temp_celcius":"27"}
{"event_date":"03/04/2019 20:18:06","location_id":"loc0","temp_celcius":"27"}
{"event_date":"03/04/2019 20:23:06","location_id":"loc0","temp_celcius":"29"}
{"event_date":"03/04/2019 20:28:06","location_id":"loc0","temp_celcius":"32"}
{"event_date":"03/04/2019 20:33:06","location_id":"loc0","temp_celcius":"35"}
