# Spark SQL

In [3]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
                    .appName('Spark SQL query df') \
                    .getOrCreate()

In [6]:
data_path = '/home/lorenzo/spark-repo/1_spark_dataframes/data/california.csv'
df = spark.read.option('header', 'True') \
                .option('inferSchema', 'True') \
                .csv(data_path)

In [8]:
df.show(5)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|
|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|
|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|
|  -122.25|   37.85|              52.0|     1274.0|         235.0|     558.0|     219.0|       5.6431|          341300.0|       NEAR BAY|
|  -122.25|   37.85|              

In [28]:
df.count()

20640

In order to run SQL queries we need to register a DataFrame as a temporary table, providing it with a name.

In [7]:
df.createOrReplaceTempView('california')

### Querying a spark dataframe
Standard SQL syntax applies.

In [12]:
spark.sql('SELECT total_rooms, population, median_house_value FROM california').show(5)

+-----------+----------+------------------+
|total_rooms|population|median_house_value|
+-----------+----------+------------------+
|      880.0|     322.0|          452600.0|
|     7099.0|    2401.0|          358500.0|
|     1467.0|     496.0|          352100.0|
|     1274.0|     558.0|          341300.0|
|     1627.0|     565.0|          342200.0|
+-----------+----------+------------------+
only showing top 5 rows



In [18]:
spark.sql('SELECT total_rooms as rooms, \
                  population, \
                  total_rooms/population as rooms_per_person, \
                  median_house_value as house_value \
           FROM california').show(5)

+------+----------+------------------+-----------+
| rooms|population|  rooms_per_person|house_value|
+------+----------+------------------+-----------+
| 880.0|     322.0| 2.732919254658385|   452600.0|
|7099.0|    2401.0|2.9566847147022073|   358500.0|
|1467.0|     496.0|2.9576612903225805|   352100.0|
|1274.0|     558.0| 2.283154121863799|   341300.0|
|1627.0|     565.0| 2.879646017699115|   342200.0|
+------+----------+------------------+-----------+
only showing top 5 rows



In [20]:
spark.sql('SELECT total_rooms as rooms, \
                  population, \
                  total_rooms/population as rooms_per_person, \
                  median_house_value as house_value \
           FROM california \
           WHERE median_house_value < 200000 \
           ORDER BY rooms_per_person').show(5)

+-----+----------+--------------------+-----------+
|rooms|population|    rooms_per_person|house_value|
+-----+----------+--------------------+-----------+
| 19.0|    7460.0|0.002546916890080429|   137500.0|
| 36.0|    4198.0| 0.00857551214864221|    67500.0|
|538.0|    8733.0| 0.06160540478644223|   154600.0|
|161.0|    1542.0| 0.10440985732814527|   162500.0|
| 19.0|     166.0|  0.1144578313253012|   162500.0|
+-----+----------+--------------------+-----------+
only showing top 5 rows



### Aggregating data

In [22]:
spark.sql('SELECT ocean_proximity, \
                  mean(total_rooms) as rooms, \
                  mean(population), \
                  mean(total_rooms/population) as rooms_per_person, \
                  mean(median_house_value) as house_value \
           FROM california \
           GROUP BY ocean_proximity').show(5)

+---------------+------------------+------------------+------------------+------------------+
|ocean_proximity|             rooms|   avg(population)|  rooms_per_person|       house_value|
+---------------+------------------+------------------+------------------+------------------+
|         ISLAND|            1574.6|             668.0|2.3830333305631446|          380440.0|
|     NEAR OCEAN| 2583.700902934537|1354.0086531226486|2.0127291274331847|249433.97742663656|
|       NEAR BAY| 2493.589519650655|1230.3174672489083|2.0810756094097096|259212.31179039303|
|      <1H OCEAN|2628.3435858143607|1520.2904991243433|1.8077601913423533|240084.28546409807|
|         INLAND|2717.7427873607085|1391.0462524805373| 2.161737594698368|124805.39200122119|
+---------------+------------------+------------------+------------------+------------------+



In [23]:
spark.sql('SELECT ocean_proximity, \
                  round(mean(total_rooms), 2) as rooms, \
                  round(mean(population), 2), \
                  round(mean(total_rooms/population), 2) as rooms_per_person, \
                  round(mean(median_house_value), 2) as house_value \
           FROM california \
           GROUP BY ocean_proximity').show(5)

+---------------+-------+-------------------------+----------------+-----------+
|ocean_proximity|  rooms|round(avg(population), 2)|rooms_per_person|house_value|
+---------------+-------+-------------------------+----------------+-----------+
|         ISLAND| 1574.6|                    668.0|            2.38|   380440.0|
|     NEAR OCEAN| 2583.7|                  1354.01|            2.01|  249433.98|
|       NEAR BAY|2493.59|                  1230.32|            2.08|  259212.31|
|      <1H OCEAN|2628.34|                  1520.29|            1.81|  240084.29|
|         INLAND|2717.74|                  1391.05|            2.16|  124805.39|
+---------------+-------+-------------------------+----------------+-----------+



### Joining tables

In [41]:
df1 = df.withColumn('key', df.latitude + df.longitude +  df.housing_median_age +  df.total_rooms +  df.total_bedrooms +  df.population +  df.households +  df.median_income +  df.median_house_value)
print(f"Distinct key values: {df1.select('key').distinct().count()}")

Distinct key values: 20434


In [37]:
df1.createOrReplaceTempView('california')
df1.createOrReplaceTempView('california_copy')

In [43]:
joined_df = spark.sql('SELECT a.total_rooms as rooms, \
                              a.population, \
                              a.median_house_value as a_house_value, \
                              b.median_house_value as b_house_value \
                       FROM california as a \
                       INNER JOIN california_copy as b \
                       on a.key = b.key')

In [44]:
joined_df.show(5)

+------+----------+-------------+-------------+
| rooms|population|a_house_value|b_house_value|
+------+----------+-------------+-------------+
| 880.0|     322.0|     452600.0|     452600.0|
|7099.0|    2401.0|     358500.0|     358500.0|
|1467.0|     496.0|     352100.0|     352100.0|
|1274.0|     558.0|     341300.0|     341300.0|
|1627.0|     565.0|     342200.0|     342200.0|
+------+----------+-------------+-------------+
only showing top 5 rows



In [45]:
joined_df.count()

20433

### Duplicates and NAs

In [53]:
# Drop rows that are duplicate on ALL columns
df1.drop_duplicates().count()

20640

In [48]:
# Drop rows that are duplicate on specified columns
df1.drop_duplicates(['key']).count()

20434

In [50]:
from pyspark.sql.functions import isnan, when, count, col

df1.select([count(when(isnan(c), c)).alias(c) for c in df.columns]).show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|        0|       0|                 0|          0|             0|         0|         0|            0|                 0|              0|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+



In [52]:
df1.na.drop().count()

20433

In [56]:
spark.sql('SELECT key, \
                  longitude, \
                  latitude, \
                  total_rooms as rooms, \
                  population, \
                  median_house_value as a_house_value \
           FROM california \
           WHERE key is null').show(10)

+----+---------+--------+------+----------+-------------+
| key|longitude|latitude| rooms|population|a_house_value|
+----+---------+--------+------+----------+-------------+
|null|  -122.16|   37.77|1256.0|     570.0|     161900.0|
|null|  -122.17|   37.75| 992.0|     732.0|      85100.0|
|null|  -122.28|   37.78|5154.0|    3741.0|     173400.0|
|null|  -122.24|   37.75| 891.0|     384.0|     247100.0|
|null|   -122.1|   37.69| 746.0|     387.0|     178400.0|
|null|  -122.14|   37.67|3342.0|    1635.0|     186900.0|
|null|  -121.77|   39.66|3759.0|    1705.0|     158600.0|
|null|  -121.95|   38.03|5526.0|    3207.0|     143100.0|
|null|  -121.98|   37.96|2987.0|    1420.0|     204100.0|
|null|  -122.01|   37.94|3741.0|    1339.0|     322300.0|
+----+---------+--------+------+----------+-------------+
only showing top 10 rows



In [57]:
spark.sql('SELECT count(*) \
           FROM california \
           WHERE key is null').show(10)

+--------+
|count(1)|
+--------+
|     207|
+--------+

