In [1]:
from pyspark.sql import SparkSession

In [5]:
spark = SparkSession.builder.appName('Spark SQL Query Dataframes').getOrCreate()

In [8]:
data_path = 'C:/Users/pcmr/spark/data/'

In [9]:
json_df2_path = data_path + 'utilization.json'
df = spark.read.format('json').load(json_df2_path)

In [10]:
df.show(10)

+---------------+-------------------+-----------+---------+-------------+
|cpu_utilization|     event_datetime|free_memory|server_id|session_count|
+---------------+-------------------+-----------+---------+-------------+
|           0.57|03/05/2019 08:06:14|       0.51|      100|           47|
|           0.47|03/05/2019 08:11:14|       0.62|      100|           43|
|           0.56|03/05/2019 08:16:14|       0.57|      100|           62|
|           0.57|03/05/2019 08:21:14|       0.56|      100|           50|
|           0.35|03/05/2019 08:26:14|       0.46|      100|           43|
|           0.41|03/05/2019 08:31:14|       0.58|      100|           48|
|           0.57|03/05/2019 08:36:14|       0.35|      100|           58|
|           0.41|03/05/2019 08:41:14|        0.4|      100|           58|
|           0.53|03/05/2019 08:46:14|       0.35|      100|           62|
|           0.51|03/05/2019 08:51:14|        0.6|      100|           45|
+---------------+-------------------+-

In [12]:
df.count()

500000

In [20]:
## Querying DataFrames

In [13]:
df.createOrReplaceTempView('utilization')

In [14]:
df_sql = spark.sql('SELECT * FROM utilization LIMIT 10')

In [16]:
df_sql.show()

+---------------+-------------------+-----------+---------+-------------+
|cpu_utilization|     event_datetime|free_memory|server_id|session_count|
+---------------+-------------------+-----------+---------+-------------+
|           0.57|03/05/2019 08:06:14|       0.51|      100|           47|
|           0.47|03/05/2019 08:11:14|       0.62|      100|           43|
|           0.56|03/05/2019 08:16:14|       0.57|      100|           62|
|           0.57|03/05/2019 08:21:14|       0.56|      100|           50|
|           0.35|03/05/2019 08:26:14|       0.46|      100|           43|
|           0.41|03/05/2019 08:31:14|       0.58|      100|           48|
|           0.57|03/05/2019 08:36:14|       0.35|      100|           58|
|           0.41|03/05/2019 08:41:14|        0.4|      100|           58|
|           0.53|03/05/2019 08:46:14|       0.35|      100|           62|
|           0.51|03/05/2019 08:51:14|        0.6|      100|           45|
+---------------+-------------------+-

In [17]:
df_sql.count()

10

In [18]:
df_sql = spark.sql('SELECT server_id AS sid, session_count AS sc FROM utilization')

In [19]:
df_sql.show()

+---+---+
|sid| sc|
+---+---+
|100| 47|
|100| 43|
|100| 62|
|100| 50|
|100| 43|
|100| 48|
|100| 58|
|100| 58|
|100| 62|
|100| 45|
|100| 47|
|100| 60|
|100| 57|
|100| 44|
|100| 47|
|100| 66|
|100| 65|
|100| 66|
|100| 42|
|100| 63|
+---+---+
only showing top 20 rows



In [21]:
## Filtering DataFrames with SQL

In [22]:
df.createOrReplaceTempView('utilization')

In [23]:
df_sql = spark.sql('SELECT * FROM utilization WHERE server_id = 120')

In [24]:
df_sql.show()

+---------------+-------------------+-----------+---------+-------------+
|cpu_utilization|     event_datetime|free_memory|server_id|session_count|
+---------------+-------------------+-----------+---------+-------------+
|           0.66|03/05/2019 08:06:48|       0.31|      120|           54|
|           0.58|03/05/2019 08:11:48|       0.38|      120|           64|
|           0.55|03/05/2019 08:16:48|       0.61|      120|           54|
|            0.7|03/05/2019 08:21:48|       0.35|      120|           80|
|            0.6|03/05/2019 08:26:48|       0.39|      120|           71|
|           0.53|03/05/2019 08:31:48|       0.35|      120|           49|
|           0.73|03/05/2019 08:36:48|       0.42|      120|           73|
|           0.41|03/05/2019 08:41:48|        0.6|      120|           72|
|           0.62|03/05/2019 08:46:48|       0.57|      120|           57|
|           0.67|03/05/2019 08:51:48|       0.44|      120|           78|
|           0.67|03/05/2019 08:56:48| 

In [25]:
df_sql.count()

10000

In [26]:
df_sql = spark.sql('SELECT server_id, session_count FROM utilization WHERE session_count > 70')

In [27]:
df_sql.show()

+---------+-------------+
|server_id|session_count|
+---------+-------------+
|      100|           71|
|      100|           71|
|      100|           71|
|      100|           71|
|      100|           72|
|      100|           72|
|      100|           71|
|      100|           71|
|      100|           71|
|      100|           72|
|      100|           71|
|      100|           72|
|      100|           71|
|      100|           71|
|      100|           72|
|      100|           71|
|      100|           72|
|      100|           72|
|      100|           71|
|      100|           71|
+---------+-------------+
only showing top 20 rows



In [28]:
df_sql.count()

239659

In [29]:
## Selecting multiple conditions in the WHERE clause

In [31]:
df_sql = spark.sql('SELECT server_id, session_count FROM utilization WHERE session_count > 70 AND server_id = 120')

In [32]:
df_sql.show()

+---------+-------------+
|server_id|session_count|
+---------+-------------+
|      120|           80|
|      120|           71|
|      120|           73|
|      120|           72|
|      120|           78|
|      120|           73|
|      120|           78|
|      120|           73|
|      120|           74|
|      120|           78|
|      120|           75|
|      120|           75|
|      120|           73|
|      120|           79|
|      120|           72|
|      120|           77|
|      120|           75|
|      120|           72|
|      120|           79|
|      120|           75|
+---------+-------------+
only showing top 20 rows



In [33]:
df_sql.count()

2733

In [34]:
## Adding order by clauses

In [37]:
df_sql = spark.sql('SELECT server_id, session_count \
                    FROM utilization \
                    WHERE session_count > 70 AND server_id = 120 \
                    ORDER BY session_count DESC')

In [38]:
 df_sql.show()

+---------+-------------+
|server_id|session_count|
+---------+-------------+
|      120|           80|
|      120|           80|
|      120|           80|
|      120|           80|
|      120|           80|
|      120|           80|
|      120|           80|
|      120|           80|
|      120|           80|
|      120|           80|
|      120|           80|
|      120|           80|
|      120|           80|
|      120|           80|
|      120|           80|
|      120|           80|
|      120|           80|
|      120|           80|
|      120|           80|
|      120|           80|
+---------+-------------+
only showing top 20 rows



In [39]:
## Aggregating DATA with SQL

In [40]:
df_count = spark.sql('SELECT count(*) FROM utilization')

In [41]:
df_count.show()

+--------+
|count(1)|
+--------+
|  500000|
+--------+



In [42]:
df_sql = spark.sql('SELECT count(*) \
                    FROM utilization \
                    WHERE session_count > 70')

In [43]:
df_sql.show()

+--------+
|count(1)|
+--------+
|  239659|
+--------+



In [46]:
df_sql = spark.sql('SELECT server_id, count(*) \
                    FROM utilization \
                    WHERE session_count > 70 \
                    GROUP BY server_id')

In [47]:
df_sql.show()

+---------+--------+
|server_id|count(1)|
+---------+--------+
|      103|    8744|
|      104|    7366|
|      100|     391|
|      101|    9808|
|      102|    8586|
|      107|    5646|
|      105|    1110|
|      108|    8375|
|      112|    7425|
|      113|    9418|
|      110|    2826|
|      111|    3093|
|      109|    3129|
|      116|    1167|
|      114|    2128|
|      115|    5284|
|      117|    3605|
|      120|    2733|
|      118|    7913|
|      122|    4885|
+---------+--------+
only showing top 20 rows



In [48]:
df_sql = spark.sql('SELECT server_id, count(*) \
                    FROM utilization \
                    WHERE session_count > 70 \
                    GROUP BY server_id \
                    ORDER BY count(*) DESC')

In [49]:
df_sql.show()

+---------+--------+
|server_id|count(1)|
+---------+--------+
|      143|     144|
|      100|     391|
|      105|    1110|
|      116|    1167|
|      135|    1654|
|      147|    1783|
|      132|    2048|
|      114|    2128|
|      134|    2147|
|      120|    2733|
|      110|    2826|
|      125|    2843|
|      130|    2891|
|      111|    3093|
|      141|    3097|
|      109|    3129|
|      129|    3222|
|      117|    3605|
|      128|    3719|
|      136|    4316|
+---------+--------+
only showing top 20 rows



In [54]:
df_sql = spark.sql('SELECT server_id, min(session_count), round(avg(session_count), 2), max(session_count) \
                    FROM utilization \
                    WHERE session_count > 70 \
                    GROUP BY server_id \
                    ORDER BY count(*) DESC')

In [57]:
## Show data frame with the avg Rounded to 2 decimal points
df_sql.show()

+---------+------------------+----------------------------+------------------+
|server_id|min(session_count)|round(avg(session_count), 2)|max(session_count)|
+---------+------------------+----------------------------+------------------+
|      101|                71|                       87.67|               105|
|      113|                71|                       86.96|               103|
|      145|                71|                       86.98|               103|
|      103|                71|                       85.76|               101|
|      102|                71|                       85.71|               101|
|      133|                71|                       85.47|               100|
|      108|                71|                       85.12|               100|
|      149|                71|                       84.96|                99|
|      137|                71|                       85.01|                99|
|      148|                71|                      

In [58]:
## Joining DataFrames with SQL

In [65]:
## Creating a new data frame

df_util = spark.read.format('json').load(json_df2_path)

In [67]:
df_util.createOrReplaceTempView('utilization')

In [68]:
csv_df_path = data_path + 'server_name.csv'

In [69]:
df_server = spark.read.csv(csv_df_path, header = True)

In [71]:
df_server.show()

+---------+-----------+
|server_id|server_name|
+---------+-----------+
|      100| 100 Server|
|      101| 101 Server|
|      102| 102 Server|
|      103| 103 Server|
|      104| 104 Server|
|      105| 105 Server|
|      106| 106 Server|
|      107| 107 Server|
|      108| 108 Server|
|      109| 109 Server|
|      110| 110 Server|
|      111| 111 Server|
|      112| 112 Server|
|      113| 113 Server|
|      114| 114 Server|
|      115| 115 Server|
|      116| 116 Server|
|      117| 117 Server|
|      118| 118 Server|
|      119| 119 Server|
+---------+-----------+
only showing top 20 rows



In [72]:
df_server.createOrReplaceTempView('server_name')

In [73]:
df_count = spark.sql('SELECT DISTINCT server_id FROM utilization ORDER BY server_id')

In [74]:
df_count.show()

+---------+
|server_id|
+---------+
|      100|
|      101|
|      102|
|      103|
|      104|
|      105|
|      106|
|      107|
|      108|
|      109|
|      110|
|      111|
|      112|
|      113|
|      114|
|      115|
|      116|
|      117|
|      118|
|      119|
+---------+
only showing top 20 rows



In [75]:
spark.sql('SELECT min(server_id), max(server_id) FROM utilization').show()

+--------------+--------------+
|min(server_id)|max(server_id)|
+--------------+--------------+
|           100|           149|
+--------------+--------------+



In [82]:
## Joining DataFrames
df_join = spark.sql('SELECT u.server_id, sn.server_name, u.session_count \
                     FROM utilization u \
                     INNER JOIN server_name sn \
                     ON sn.server_id = u.server_id')

In [83]:
df_join.show()

+---------+-----------+-------------+
|server_id|server_name|session_count|
+---------+-----------+-------------+
|      100| 100 Server|           47|
|      100| 100 Server|           43|
|      100| 100 Server|           62|
|      100| 100 Server|           50|
|      100| 100 Server|           43|
|      100| 100 Server|           48|
|      100| 100 Server|           58|
|      100| 100 Server|           58|
|      100| 100 Server|           62|
|      100| 100 Server|           45|
|      100| 100 Server|           47|
|      100| 100 Server|           60|
|      100| 100 Server|           57|
|      100| 100 Server|           44|
|      100| 100 Server|           47|
|      100| 100 Server|           66|
|      100| 100 Server|           65|
|      100| 100 Server|           66|
|      100| 100 Server|           42|
|      100| 100 Server|           63|
+---------+-----------+-------------+
only showing top 20 rows

