Installing

In [0]:
# install java jdk
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
# install spark
!wget -q https://downloads.apache.org/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz

In [0]:
# unzip the spark file
!tar xf spark-2.4.5-bin-hadoop2.7.tgz

In [0]:
# install findspark package
!pip install -q findspark

In [17]:
# install pyarroe
!pip install -U pyarrow

Requirement already up-to-date: pyarrow in /usr/local/lib/python3.6/dist-packages (0.16.0)


Setup

In [0]:
# set environment variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"

In [0]:
# initialize the Spark instance
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark.conf.set("spark.executor.memory", "4g")
spark.conf.set("spark.driver.memory", "4g")
spark.conf.set("spark.memory.fraction", "0.9")

Load Dataset

In [20]:
# load dataset in spark dataframe
spark_df = spark.read.csv(path='/content/sample_data/california_housing_train.csv', inferSchema=True, header=True)
spark_df

DataFrame[longitude: double, latitude: double, housing_median_age: double, total_rooms: double, total_bedrooms: double, population: double, households: double, median_income: double, median_house_value: double]

In [21]:
type(spark_df)

pyspark.sql.dataframe.DataFrame

Analysis On Dataset

In [22]:
# diaplay the dataset
spark_df.show(5)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|  -114.31|   34.19|              15.0|     5612.0|        1283.0|    1015.0|     472.0|       1.4936|           66900.0|
|  -114.47|    34.4|              19.0|     7650.0|        1901.0|    1129.0|     463.0|         1.82|           80100.0|
|  -114.56|   33.69|              17.0|      720.0|         174.0|     333.0|     117.0|       1.6509|           85700.0|
|  -114.57|   33.64|              14.0|     1501.0|         337.0|     515.0|     226.0|       3.1917|           73400.0|
|  -114.57|   33.57|              20.0|     1454.0|         326.0|     624.0|     262.0|        1.925|           65500.0|
+---------+--------+----

In [23]:
# display the schema of the dataset
spark_df.printSchema()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)



In [24]:
# show datatypes of columns in dataset
spark_df.dtypes

[('longitude', 'double'),
 ('latitude', 'double'),
 ('housing_median_age', 'double'),
 ('total_rooms', 'double'),
 ('total_bedrooms', 'double'),
 ('population', 'double'),
 ('households', 'double'),
 ('median_income', 'double'),
 ('median_house_value', 'double')]

In [25]:
# display column names
spark_df.columns

['longitude',
 'latitude',
 'housing_median_age',
 'total_rooms',
 'total_bedrooms',
 'population',
 'households',
 'median_income',
 'median_house_value']

In [26]:
# count the number od rows in our dataset
spark_df.count()

17000

In [27]:
# describe specific columns
spark_df.describe('total_rooms').show()

+-------+-----------------+
|summary|      total_rooms|
+-------+-----------------+
|  count|            17000|
|   mean|2643.664411764706|
| stddev|2179.947071452777|
|    min|              2.0|
|    max|          37937.0|
+-------+-----------------+



In [29]:
# rename columns
spark_df = spark_df.withColumnRenamed('median_house_value', 'median_house_value_target')
spark_df.columns

['longitude',
 'latitude',
 'housing_median_age',
 'total_rooms',
 'total_bedrooms',
 'population',
 'households',
 'median_income',
 'median_house_value_target']

In [31]:
# select multiple columns
spark_df.select('longitude', 'latitude', 'median_income', 'median_house_value_target').show(5)

+---------+--------+-------------+-------------------------+
|longitude|latitude|median_income|median_house_value_target|
+---------+--------+-------------+-------------------------+
|  -114.31|   34.19|       1.4936|                  66900.0|
|  -114.47|    34.4|         1.82|                  80100.0|
|  -114.56|   33.69|       1.6509|                  85700.0|
|  -114.57|   33.64|       3.1917|                  73400.0|
|  -114.57|   33.57|        1.925|                  65500.0|
+---------+--------+-------------+-------------------------+
only showing top 5 rows



In [37]:
# drop columns
spark_df_target = spark_df.select('median_house_value_target')
spark_df_target.show(5)

+-------------------------+
|median_house_value_target|
+-------------------------+
|                  66900.0|
|                  80100.0|
|                  85700.0|
|                  73400.0|
|                  65500.0|
+-------------------------+
only showing top 5 rows



In [38]:
spark_df_features = spark_df.drop('median_house_value_target')
spark_df_features.show(5)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+
|  -114.31|   34.19|              15.0|     5612.0|        1283.0|    1015.0|     472.0|       1.4936|
|  -114.47|    34.4|              19.0|     7650.0|        1901.0|    1129.0|     463.0|         1.82|
|  -114.56|   33.69|              17.0|      720.0|         174.0|     333.0|     117.0|       1.6509|
|  -114.57|   33.64|              14.0|     1501.0|         337.0|     515.0|     226.0|       3.1917|
|  -114.57|   33.57|              20.0|     1454.0|         326.0|     624.0|     262.0|        1.925|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+
only showing top 5 rows



In [39]:
# perform filtering 
spark_df_features.filter(spark_df_features.housing_median_age == "20.0").show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+
|  -114.57|   33.57|              20.0|     1454.0|         326.0|     624.0|     262.0|        1.925|
|  -114.68|   33.49|              20.0|     1491.0|         360.0|    1135.0|     303.0|       1.6395|
|  -114.94|   34.55|              20.0|      350.0|          95.0|     119.0|      58.0|        1.625|
|  -115.51|   32.99|              20.0|     1402.0|         287.0|    1104.0|     317.0|       1.9088|
|  -115.56|   32.79|              20.0|     2372.0|         835.0|    2283.0|     767.0|       1.1707|
|  -115.57|   32.78|              20.0|     1534.0|         235.0|     871.0|     222.0|       6.2715|
|  -115.59|   32.85|              20.0|     1608.0|         274.0|     86

In [40]:
# perform filtering
spark_df_features[spark_df_features.population < "1000.0"].count()

6689

In [42]:
# perform filtering
spark_df_features.filter((spark_df_features.total_rooms <= "300.0") & (spark_df_features.total_bedrooms <= "300.0")).show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+
|  -114.65|   32.79|              21.0|       44.0|          33.0|      64.0|      27.0|       0.8571|
|  -114.67|   33.92|              17.0|       97.0|          24.0|      29.0|      15.0|       1.2656|
|   -115.8|   33.26|               2.0|       96.0|          18.0|      30.0|      16.0|       5.3374|
|  -115.88|   32.93|              15.0|      208.0|          49.0|      51.0|      20.0|       4.0208|
|  -115.94|   33.38|               5.0|      186.0|          43.0|      41.0|      21.0|          2.7|
|  -115.95|   33.28|              12.0|       99.0|          25.0|      37.0|      17.0|       1.8958|
|  -115.98|   33.32|               8.0|      240.0|          46.0|      6

In [43]:
# sorting dataset
spark_df_features.orderBy(spark_df_features.population).show(5)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+
|  -118.44|   34.04|              16.0|       18.0|           6.0|       3.0|       4.0|        0.536|
|  -117.79|   35.21|               4.0|        2.0|           2.0|       6.0|       2.0|        2.375|
|  -117.76|   35.22|               4.0|       18.0|           3.0|       8.0|       6.0|        1.625|
|  -122.06|   37.39|              26.0|       18.0|           4.0|       8.0|       4.0|         3.75|
|  -120.85|   37.75|              26.0|       28.0|           4.0|       9.0|       5.0|        1.625|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+
only showing top 5 rows



In [44]:
# perform grouping
spark_df_features.groupby("housing_median_age").count().show(5)

+------------------+-----+
|housing_median_age|count|
+------------------+-----+
|               8.0|  178|
|               7.0|  151|
|              49.0|  111|
|              29.0|  374|
|              47.0|  175|
+------------------+-----+
only showing top 5 rows



In [45]:
# filling null values in dataset
spark_df_features.fillna(0)
spark_df_features.printSchema()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)



In [48]:
# perform aggregation
spark_df_features.groupby(['total_rooms', 'total_bedrooms']).agg({'population': 'mean', 'households': 'min'}).show(5)

+-----------+--------------+---------------+---------------+
|total_rooms|total_bedrooms|min(households)|avg(population)|
+-----------+--------------+---------------+---------------+
|     5799.0|        1527.0|          262.0|          713.0|
|     1262.0|         294.0|          275.0|         5176.0|
|     1796.0|         428.0|          424.0|          918.0|
|     3999.0|        1182.0|         1130.0|         2051.0|
|     1417.0|         373.0|          348.0|          814.0|
+-----------+--------------+---------------+---------------+
only showing top 5 rows



In [50]:
# adding a new column
spark_df_features.withColumn('1/total_rooms', 1 / spark_df_features.total_rooms).show(5)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+--------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|       1/total_rooms|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+--------------------+
|  -114.31|   34.19|              15.0|     5612.0|        1283.0|    1015.0|     472.0|       1.4936|1.781895937277263E-4|
|  -114.47|    34.4|              19.0|     7650.0|        1901.0|    1129.0|     463.0|         1.82|1.307189542483660...|
|  -114.56|   33.69|              17.0|      720.0|         174.0|     333.0|     117.0|       1.6509|0.001388888888888889|
|  -114.57|   33.64|              14.0|     1501.0|         337.0|     515.0|     226.0|       3.1917|6.662225183211193E-4|
|  -114.57|   33.57|              20.0|     1454.0|         326.0|     624.0|     262.0|        1.925| 6.87757909215956E-4|
+-------

In [54]:
# using sql queries
spark_df_features.createOrReplaceTempView('sql_df')
spark.sql('select population from sql_df').show(5)

+----------+
|population|
+----------+
|    1015.0|
|    1129.0|
|     333.0|
|     515.0|
|     624.0|
+----------+
only showing top 5 rows



In [55]:
# using sql queries
spark.sql('select max(housing_median_age) from sql_df').show()

+-----------------------+
|max(housing_median_age)|
+-----------------------+
|                   52.0|
+-----------------------+

