### Querying Dataframes with SQL

In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession \
    .builder \
    .appName("Spark SQL Query Dataframes") \
    .getOrCreate()

23/07/15 10:57:32 WARN Utils: Your hostname, bhajji resolves to a loopback address: 127.0.1.1; using 192.168.2.24 instead (on interface wlp2s0)
23/07/15 10:57:32 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/07/15 10:57:32 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
spark

In [4]:
json_df2_path = "./Data/utilization.json"
df = spark.read.format("json").load(json_df2_path)

                                                                                

In [5]:
df.show()

+---------------+-------------------+-----------+---------+-------------+
|cpu_utilization|     event_datetime|free_memory|server_id|session_count|
+---------------+-------------------+-----------+---------+-------------+
|           0.57|03/05/2019 08:06:14|       0.51|      100|           47|
|           0.47|03/05/2019 08:11:14|       0.62|      100|           43|
|           0.56|03/05/2019 08:16:14|       0.57|      100|           62|
|           0.57|03/05/2019 08:21:14|       0.56|      100|           50|
|           0.35|03/05/2019 08:26:14|       0.46|      100|           43|
|           0.41|03/05/2019 08:31:14|       0.58|      100|           48|
|           0.57|03/05/2019 08:36:14|       0.35|      100|           58|
|           0.41|03/05/2019 08:41:14|        0.4|      100|           58|
|           0.53|03/05/2019 08:46:14|       0.35|      100|           62|
|           0.51|03/05/2019 08:51:14|        0.6|      100|           45|
|           0.32|03/05/2019 08:56:14| 

In [6]:
df.printSchema()

root
 |-- cpu_utilization: double (nullable = true)
 |-- event_datetime: string (nullable = true)
 |-- free_memory: double (nullable = true)
 |-- server_id: long (nullable = true)
 |-- session_count: long (nullable = true)



In [7]:
# table creation
df.createOrReplaceTempView("utilization")

In [8]:
df_sql = spark.sql("SELECT * FROM utilization LIMIT 10")
df_sql.show()

+---------------+-------------------+-----------+---------+-------------+
|cpu_utilization|     event_datetime|free_memory|server_id|session_count|
+---------------+-------------------+-----------+---------+-------------+
|           0.57|03/05/2019 08:06:14|       0.51|      100|           47|
|           0.47|03/05/2019 08:11:14|       0.62|      100|           43|
|           0.56|03/05/2019 08:16:14|       0.57|      100|           62|
|           0.57|03/05/2019 08:21:14|       0.56|      100|           50|
|           0.35|03/05/2019 08:26:14|       0.46|      100|           43|
|           0.41|03/05/2019 08:31:14|       0.58|      100|           48|
|           0.57|03/05/2019 08:36:14|       0.35|      100|           58|
|           0.41|03/05/2019 08:41:14|        0.4|      100|           58|
|           0.53|03/05/2019 08:46:14|       0.35|      100|           62|
|           0.51|03/05/2019 08:51:14|        0.6|      100|           45|
+---------------+-------------------+-

In [9]:
df_sql.count()

10

In [10]:
df_sql = spark.sql("SELECT server_id, session_count FROM utilization LIMIT 10")
df_sql.show()

+---------+-------------+
|server_id|session_count|
+---------+-------------+
|      100|           47|
|      100|           43|
|      100|           62|
|      100|           50|
|      100|           43|
|      100|           48|
|      100|           58|
|      100|           58|
|      100|           62|
|      100|           45|
+---------+-------------+



In [11]:
df_sql = spark.sql("SELECT server_id as sid, session_count as sc FROM utilization")
df_sql.show()

+---+---+
|sid| sc|
+---+---+
|100| 47|
|100| 43|
|100| 62|
|100| 50|
|100| 43|
|100| 48|
|100| 58|
|100| 58|
|100| 62|
|100| 45|
|100| 47|
|100| 60|
|100| 57|
|100| 44|
|100| 47|
|100| 66|
|100| 65|
|100| 66|
|100| 42|
|100| 63|
+---+---+
only showing top 20 rows



### Filtering Dataframes with SQL

In [12]:
df_sql = spark.sql("SELECT * FROM utilization WHERE server_id = 120")
df_sql.show()



+---------------+-------------------+-----------+---------+-------------+
|cpu_utilization|     event_datetime|free_memory|server_id|session_count|
+---------------+-------------------+-----------+---------+-------------+
|           0.66|03/05/2019 08:06:48|       0.31|      120|           54|
|           0.58|03/05/2019 08:11:48|       0.38|      120|           64|
|           0.55|03/05/2019 08:16:48|       0.61|      120|           54|
|            0.7|03/05/2019 08:21:48|       0.35|      120|           80|
|            0.6|03/05/2019 08:26:48|       0.39|      120|           71|
|           0.53|03/05/2019 08:31:48|       0.35|      120|           49|
|           0.73|03/05/2019 08:36:48|       0.42|      120|           73|
|           0.41|03/05/2019 08:41:48|        0.6|      120|           72|
|           0.62|03/05/2019 08:46:48|       0.57|      120|           57|
|           0.67|03/05/2019 08:51:48|       0.44|      120|           78|
|           0.67|03/05/2019 08:56:48| 

                                                                                

In [13]:
df_sql = spark.sql("SELECT server_id, session_count FROM utilization WHERE session_count > 70")
df_sql.show()

+---------+-------------+
|server_id|session_count|
+---------+-------------+
|      100|           71|
|      100|           71|
|      100|           71|
|      100|           71|
|      100|           72|
|      100|           72|
|      100|           71|
|      100|           71|
|      100|           71|
|      100|           72|
|      100|           71|
|      100|           72|
|      100|           71|
|      100|           71|
|      100|           72|
|      100|           71|
|      100|           72|
|      100|           72|
|      100|           71|
|      100|           71|
+---------+-------------+
only showing top 20 rows



In [14]:
df_sql.count()

                                                                                

239659

In [15]:
df_sql = spark.sql("SELECT server_id, session_count FROM utilization WHERE session_count > 70 AND server_id = 120")
df_sql.show()

+---------+-------------+
|server_id|session_count|
+---------+-------------+
|      120|           80|
|      120|           71|
|      120|           73|
|      120|           72|
|      120|           78|
|      120|           73|
|      120|           78|
|      120|           73|
|      120|           74|
|      120|           78|
|      120|           75|
|      120|           75|
|      120|           73|
|      120|           79|
|      120|           72|
|      120|           77|
|      120|           75|
|      120|           72|
|      120|           79|
|      120|           75|
+---------+-------------+
only showing top 20 rows



In [16]:
df_sql.count()

2733

In [17]:
df_sql = spark.sql("SELECT server_id, session_count \
                   FROM utilization \
                   WHERE session_count > 70 AND server_id = 120 \
                   ORDER BY session_count DESC")
df_sql.show()

+---------+-------------+
|server_id|session_count|
+---------+-------------+
|      120|           80|
|      120|           80|
|      120|           80|
|      120|           80|
|      120|           80|
|      120|           80|
|      120|           80|
|      120|           80|
|      120|           80|
|      120|           80|
|      120|           80|
|      120|           80|
|      120|           80|
|      120|           80|
|      120|           80|
|      120|           80|
|      120|           80|
|      120|           80|
|      120|           80|
|      120|           80|
+---------+-------------+
only showing top 20 rows



### Aggregating Dataframes with SQL

In [18]:
df_count = spark.sql("SELECT count(*) FROM utilization")
df_count.show()



+--------+
|count(1)|
+--------+
|  500000|
+--------+



                                                                                

In [19]:
df_sql = spark.sql("SELECT count(*) \
                    FROM utilization \
                    WHERE session_count > 70")
df_sql.show()

+--------+
|count(1)|
+--------+
|  239659|
+--------+



In [20]:
df_sql = spark.sql("SELECT server_id, count(*) \
                    FROM utilization \
                    WHERE session_count > 70 \
                    GROUP BY server_id")
df_sql.show()



+---------+--------+
|server_id|count(1)|
+---------+--------+
|      112|    7425|
|      113|    9418|
|      110|    2826|
|      107|    5646|
|      103|    8744|
|      104|    7366|
|      111|    3093|
|      100|     391|
|      105|    1110|
|      108|    8375|
|      101|    9808|
|      102|    8586|
|      109|    3129|
|      126|    6365|
|      116|    1167|
|      114|    2128|
|      115|    5284|
|      120|    2733|
|      123|    7918|
|      118|    7913|
+---------+--------+
only showing top 20 rows



                                                                                

In [21]:
df_sql = spark.sql("SELECT server_id, count(*) \
                    FROM utilization \
                    WHERE session_count > 70 \
                    GROUP BY server_id \
                    ORDER BY count(*) DESC")
df_sql.show()

+---------+--------+
|server_id|count(1)|
+---------+--------+
|      101|    9808|
|      113|    9418|
|      145|    9304|
|      103|    8744|
|      102|    8586|
|      133|    8583|
|      108|    8375|
|      149|    8288|
|      137|    8248|
|      148|    8027|
|      123|    7918|
|      118|    7913|
|      112|    7425|
|      139|    7383|
|      104|    7366|
|      142|    7084|
|      121|    7084|
|      146|    7072|
|      126|    6365|
|      144|    6220|
+---------+--------+
only showing top 20 rows





In [22]:
df_sql = spark.sql("SELECT server_id, min(session_count), avg(session_count), max(session_count) \
                    FROM utilization \
                    WHERE session_count > 70 \
                    GROUP BY server_id \
                    ORDER BY count(*) DESC")
df_sql.show()



+---------+------------------+------------------+------------------+
|server_id|min(session_count)|avg(session_count)|max(session_count)|
+---------+------------------+------------------+------------------+
|      101|                71| 87.66557911908646|               105|
|      113|                71| 86.96262476109577|               103|
|      145|                71| 86.97732158211522|               103|
|      103|                71| 85.76372369624886|               101|
|      102|                71| 85.71150710458886|               101|
|      133|                71| 85.46720260981009|               100|
|      108|                71|  85.1219104477612|               100|
|      149|                71|  84.9612693050193|                99|
|      137|                71|  85.0061833171678|                99|
|      148|                71| 84.70350068518749|                99|
|      123|                71| 84.53220510229856|                98|
|      118|                71| 84.

                                                                                

In [23]:
df_sql = spark.sql("SELECT server_id, min(session_count), round(avg(session_count),2), max(session_count) \
                    FROM utilization \
                    WHERE session_count > 70 \
                    GROUP BY server_id \
                    ORDER BY count(*) DESC")
df_sql.show()



+---------+------------------+----------------------------+------------------+
|server_id|min(session_count)|round(avg(session_count), 2)|max(session_count)|
+---------+------------------+----------------------------+------------------+
|      101|                71|                       87.67|               105|
|      113|                71|                       86.96|               103|
|      145|                71|                       86.98|               103|
|      103|                71|                       85.76|               101|
|      102|                71|                       85.71|               101|
|      133|                71|                       85.47|               100|
|      108|                71|                       85.12|               100|
|      149|                71|                       84.96|                99|
|      137|                71|                       85.01|                99|
|      148|                71|                      

                                                                                

### Joining Dataframes with SQL

In [24]:
csv_df_path = "./Data/server_name.csv"
df_server = spark.read.csv(csv_df_path,header=True)

In [25]:
df_server.show()

+---------+-----------+
|server_id|server_name|
+---------+-----------+
|      100| 100 Server|
|      101| 101 Server|
|      102| 102 Server|
|      103| 103 Server|
|      104| 104 Server|
|      105| 105 Server|
|      106| 106 Server|
|      107| 107 Server|
|      108| 108 Server|
|      109| 109 Server|
|      110| 110 Server|
|      111| 111 Server|
|      112| 112 Server|
|      113| 113 Server|
|      114| 114 Server|
|      115| 115 Server|
|      116| 116 Server|
|      117| 117 Server|
|      118| 118 Server|
|      119| 119 Server|
+---------+-----------+
only showing top 20 rows



In [26]:
#table creation
df_server.createOrReplaceTempView("server_name")

In [27]:
df_count = spark.sql("SELECT DISTINCT server_id FROM utilization ORDER BY server_id")
df_count.show()



+---------+
|server_id|
+---------+
|      100|
|      101|
|      102|
|      103|
|      104|
|      105|
|      106|
|      107|
|      108|
|      109|
|      110|
|      111|
|      112|
|      113|
|      114|
|      115|
|      116|
|      117|
|      118|
|      119|
+---------+
only showing top 20 rows



                                                                                

In [28]:
spark.sql("SELECT min(server_id), max(server_id) FROM utilization").show()

[Stage 47:>                                                         (0 + 4) / 4]

+--------------+--------------+
|min(server_id)|max(server_id)|
+--------------+--------------+
|           100|           149|
+--------------+--------------+



                                                                                

In [29]:
spark.sql("SELECT * FROM server_name").show()

+---------+-----------+
|server_id|server_name|
+---------+-----------+
|      100| 100 Server|
|      101| 101 Server|
|      102| 102 Server|
|      103| 103 Server|
|      104| 104 Server|
|      105| 105 Server|
|      106| 106 Server|
|      107| 107 Server|
|      108| 108 Server|
|      109| 109 Server|
|      110| 110 Server|
|      111| 111 Server|
|      112| 112 Server|
|      113| 113 Server|
|      114| 114 Server|
|      115| 115 Server|
|      116| 116 Server|
|      117| 117 Server|
|      118| 118 Server|
|      119| 119 Server|
+---------+-----------+
only showing top 20 rows



In [30]:
df_join = spark.sql("SELECT u.server_id, sn.server_name, u.session_count \
                     FROM utilization u \
                     INNER JOIN server_name sn \
                     ON sn.server_id = u.server_id")
df_join.show()   

+---------+-----------+-------------+
|server_id|server_name|session_count|
+---------+-----------+-------------+
|      100| 100 Server|           47|
|      100| 100 Server|           43|
|      100| 100 Server|           62|
|      100| 100 Server|           50|
|      100| 100 Server|           43|
|      100| 100 Server|           48|
|      100| 100 Server|           58|
|      100| 100 Server|           58|
|      100| 100 Server|           62|
|      100| 100 Server|           45|
|      100| 100 Server|           47|
|      100| 100 Server|           60|
|      100| 100 Server|           57|
|      100| 100 Server|           44|
|      100| 100 Server|           47|
|      100| 100 Server|           66|
|      100| 100 Server|           65|
|      100| 100 Server|           66|
|      100| 100 Server|           42|
|      100| 100 Server|           63|
+---------+-----------+-------------+
only showing top 20 rows



### Working with NA

In [31]:
from pyspark.sql.functions import lit
from pyspark.sql.types import StringType

df_na = df.withColumn('na_col', lit(None).cast(StringType()))

In [32]:
df_na.show()

+---------------+-------------------+-----------+---------+-------------+------+
|cpu_utilization|     event_datetime|free_memory|server_id|session_count|na_col|
+---------------+-------------------+-----------+---------+-------------+------+
|           0.57|03/05/2019 08:06:14|       0.51|      100|           47|  null|
|           0.47|03/05/2019 08:11:14|       0.62|      100|           43|  null|
|           0.56|03/05/2019 08:16:14|       0.57|      100|           62|  null|
|           0.57|03/05/2019 08:21:14|       0.56|      100|           50|  null|
|           0.35|03/05/2019 08:26:14|       0.46|      100|           43|  null|
|           0.41|03/05/2019 08:31:14|       0.58|      100|           48|  null|
|           0.57|03/05/2019 08:36:14|       0.35|      100|           58|  null|
|           0.41|03/05/2019 08:41:14|        0.4|      100|           58|  null|
|           0.53|03/05/2019 08:46:14|       0.35|      100|           62|  null|
|           0.51|03/05/2019 

In [33]:
df_na.fillna('A').show()

+---------------+-------------------+-----------+---------+-------------+------+
|cpu_utilization|     event_datetime|free_memory|server_id|session_count|na_col|
+---------------+-------------------+-----------+---------+-------------+------+
|           0.57|03/05/2019 08:06:14|       0.51|      100|           47|     A|
|           0.47|03/05/2019 08:11:14|       0.62|      100|           43|     A|
|           0.56|03/05/2019 08:16:14|       0.57|      100|           62|     A|
|           0.57|03/05/2019 08:21:14|       0.56|      100|           50|     A|
|           0.35|03/05/2019 08:26:14|       0.46|      100|           43|     A|
|           0.41|03/05/2019 08:31:14|       0.58|      100|           48|     A|
|           0.57|03/05/2019 08:36:14|       0.35|      100|           58|     A|
|           0.41|03/05/2019 08:41:14|        0.4|      100|           58|     A|
|           0.53|03/05/2019 08:46:14|       0.35|      100|           62|     A|
|           0.51|03/05/2019 

In [34]:
df2 = df_na.fillna('A').union(df_na)

In [35]:
df2.show()

+---------------+-------------------+-----------+---------+-------------+------+
|cpu_utilization|     event_datetime|free_memory|server_id|session_count|na_col|
+---------------+-------------------+-----------+---------+-------------+------+
|           0.57|03/05/2019 08:06:14|       0.51|      100|           47|     A|
|           0.47|03/05/2019 08:11:14|       0.62|      100|           43|     A|
|           0.56|03/05/2019 08:16:14|       0.57|      100|           62|     A|
|           0.57|03/05/2019 08:21:14|       0.56|      100|           50|     A|
|           0.35|03/05/2019 08:26:14|       0.46|      100|           43|     A|
|           0.41|03/05/2019 08:31:14|       0.58|      100|           48|     A|
|           0.57|03/05/2019 08:36:14|       0.35|      100|           58|     A|
|           0.41|03/05/2019 08:41:14|        0.4|      100|           58|     A|
|           0.53|03/05/2019 08:46:14|       0.35|      100|           62|     A|
|           0.51|03/05/2019 

In [36]:
df2.na.drop().show()

+---------------+-------------------+-----------+---------+-------------+------+
|cpu_utilization|     event_datetime|free_memory|server_id|session_count|na_col|
+---------------+-------------------+-----------+---------+-------------+------+
|           0.57|03/05/2019 08:06:14|       0.51|      100|           47|     A|
|           0.47|03/05/2019 08:11:14|       0.62|      100|           43|     A|
|           0.56|03/05/2019 08:16:14|       0.57|      100|           62|     A|
|           0.57|03/05/2019 08:21:14|       0.56|      100|           50|     A|
|           0.35|03/05/2019 08:26:14|       0.46|      100|           43|     A|
|           0.41|03/05/2019 08:31:14|       0.58|      100|           48|     A|
|           0.57|03/05/2019 08:36:14|       0.35|      100|           58|     A|
|           0.41|03/05/2019 08:41:14|        0.4|      100|           58|     A|
|           0.53|03/05/2019 08:46:14|       0.35|      100|           62|     A|
|           0.51|03/05/2019 

In [37]:
df2.createOrReplaceTempView("na_table")

In [38]:
spark.sql("SELECT * FROM na_table").show()

+---------------+-------------------+-----------+---------+-------------+------+
|cpu_utilization|     event_datetime|free_memory|server_id|session_count|na_col|
+---------------+-------------------+-----------+---------+-------------+------+
|           0.57|03/05/2019 08:06:14|       0.51|      100|           47|     A|
|           0.47|03/05/2019 08:11:14|       0.62|      100|           43|     A|
|           0.56|03/05/2019 08:16:14|       0.57|      100|           62|     A|
|           0.57|03/05/2019 08:21:14|       0.56|      100|           50|     A|
|           0.35|03/05/2019 08:26:14|       0.46|      100|           43|     A|
|           0.41|03/05/2019 08:31:14|       0.58|      100|           48|     A|
|           0.57|03/05/2019 08:36:14|       0.35|      100|           58|     A|
|           0.41|03/05/2019 08:41:14|        0.4|      100|           58|     A|
|           0.53|03/05/2019 08:46:14|       0.35|      100|           62|     A|
|           0.51|03/05/2019 

In [39]:
spark.sql("SELECT * FROM na_table WHERE na_col IS NULL").show()

+---------------+-------------------+-----------+---------+-------------+------+
|cpu_utilization|     event_datetime|free_memory|server_id|session_count|na_col|
+---------------+-------------------+-----------+---------+-------------+------+
|           0.57|03/05/2019 08:06:14|       0.51|      100|           47|  null|
|           0.47|03/05/2019 08:11:14|       0.62|      100|           43|  null|
|           0.56|03/05/2019 08:16:14|       0.57|      100|           62|  null|
|           0.57|03/05/2019 08:21:14|       0.56|      100|           50|  null|
|           0.35|03/05/2019 08:26:14|       0.46|      100|           43|  null|
|           0.41|03/05/2019 08:31:14|       0.58|      100|           48|  null|
|           0.57|03/05/2019 08:36:14|       0.35|      100|           58|  null|
|           0.41|03/05/2019 08:41:14|        0.4|      100|           58|  null|
|           0.53|03/05/2019 08:46:14|       0.35|      100|           62|  null|
|           0.51|03/05/2019 

In [40]:
spark.sql("SELECT * FROM na_table WHERE na_col IS NOT NULL").show()

+---------------+-------------------+-----------+---------+-------------+------+
|cpu_utilization|     event_datetime|free_memory|server_id|session_count|na_col|
+---------------+-------------------+-----------+---------+-------------+------+
|           0.57|03/05/2019 08:06:14|       0.51|      100|           47|     A|
|           0.47|03/05/2019 08:11:14|       0.62|      100|           43|     A|
|           0.56|03/05/2019 08:16:14|       0.57|      100|           62|     A|
|           0.57|03/05/2019 08:21:14|       0.56|      100|           50|     A|
|           0.35|03/05/2019 08:26:14|       0.46|      100|           43|     A|
|           0.41|03/05/2019 08:31:14|       0.58|      100|           48|     A|
|           0.57|03/05/2019 08:36:14|       0.35|      100|           58|     A|
|           0.41|03/05/2019 08:41:14|        0.4|      100|           58|     A|
|           0.53|03/05/2019 08:46:14|       0.35|      100|           62|     A|
|           0.51|03/05/2019 