In [1]:
from pyspark.sql import SparkSession

In [4]:
spark = SparkSession. \
            builder. \
            appName("CCA175ExamPreparation"). \
            master("yarn"). \
            config("spark.ui.port", "0"). \
            getOrCreate()

In [5]:
spark

### Read the solar hot water heater data which is stored as Json file

In [111]:
df = spark.read.json("/user/akashpatel/exam_preparation/dataset/json/solar_hot_water_heater_data.json")

### Understand the schema of the given dataset

In [112]:
df.printSchema()

root
 |-- APPLICATION_DATE: string (nullable = true)
 |-- COMPLETED_DATE: string (nullable = true)
 |-- DESCRIPTION: string (nullable = true)
 |-- ISSUED_DATE: string (nullable = true)
 |-- PERMIT_NUM: string (nullable = true)
 |-- PERMIT_TYPE: string (nullable = true)
 |-- POSTAL: string (nullable = true)
 |-- REVISION_NUM: long (nullable = true)
 |-- STATUS: string (nullable = true)
 |-- STREET_DIRECTION: string (nullable = true)
 |-- STREET_NAME: string (nullable = true)
 |-- STREET_NUM: string (nullable = true)
 |-- STREET_TYPE: string (nullable = true)
 |-- STRUCTURE_TYPE: string (nullable = true)
 |-- WORK: string (nullable = true)
 |-- _id: long (nullable = true)



### view the first 5 records of the dataset

In [113]:
df.show(5, truncate = 120)

+----------------+--------------+------------------------------------------------------------------------------------------------------------------------+-----------+----------+--------------------------+------+------------+-------------+----------------+-----------+----------+-----------+-------------------+------------------------------+----+
|APPLICATION_DATE|COMPLETED_DATE|                                                                                                             DESCRIPTION|ISSUED_DATE|PERMIT_NUM|               PERMIT_TYPE|POSTAL|REVISION_NUM|       STATUS|STREET_DIRECTION|STREET_NAME|STREET_NUM|STREET_TYPE|     STRUCTURE_TYPE|                          WORK| _id|
+----------------+--------------+------------------------------------------------------------------------------------------------------------------------+-----------+----------+--------------------------+------+------------+-------------+----------------+-----------+----------+-----------+----------------

In [114]:
### Print names of the columns

In [115]:
df.columns

['APPLICATION_DATE',
 'COMPLETED_DATE',
 'DESCRIPTION',
 'ISSUED_DATE',
 'PERMIT_NUM',
 'PERMIT_TYPE',
 'POSTAL',
 'REVISION_NUM',
 'STATUS',
 'STREET_DIRECTION',
 'STREET_NAME',
 'STREET_NUM',
 'STREET_TYPE',
 'STRUCTURE_TYPE',
 'WORK',
 '_id']

### Drop Description column from the dataframe

In [116]:
df = df.drop("DESCRIPTION")
df.show(5, truncate = 120)

+----------------+--------------+-----------+----------+--------------------------+------+------------+-------------+----------------+-----------+----------+-----------+-------------------+------------------------------+----+
|APPLICATION_DATE|COMPLETED_DATE|ISSUED_DATE|PERMIT_NUM|               PERMIT_TYPE|POSTAL|REVISION_NUM|       STATUS|STREET_DIRECTION|STREET_NAME|STREET_NUM|STREET_TYPE|     STRUCTURE_TYPE|                          WORK| _id|
+----------------+--------------+-----------+----------+--------------------------+------+------------+-------------+----------------+-----------+----------+-----------+-------------------+------------------------------+----+
|      2009-05-07|    2010-10-05| 2009-05-11| 09 135007|Small Residential Projects|   M4K|           0|       Closed|                |  GRANDVIEW|         4|        AVE|SFD - Semi-Detached|Solar Domestic Hot Water (Res)|6561|
|      2009-05-07|          null| 2009-05-11| 09 135023|Small Residential Projects|   M4M|      

### Count number of records 

In [117]:
df.count()

164

### Count unique value to each columns

In [118]:
from pyspark.sql.functions import col, countDistinct

In [119]:
df.agg(*(countDistinct(col(c)).alias(c) for c in df.columns)).show()

+----------------+--------------+-----------+----------+-----------+------+------------+------+----------------+-----------+----------+-----------+--------------+----+---+
|APPLICATION_DATE|COMPLETED_DATE|ISSUED_DATE|PERMIT_NUM|PERMIT_TYPE|POSTAL|REVISION_NUM|STATUS|STREET_DIRECTION|STREET_NAME|STREET_NUM|STREET_TYPE|STRUCTURE_TYPE|WORK|_id|
+----------------+--------------+-----------+----------+-----------+------+------------+------+----------------+-----------+----------+-----------+--------------+----+---+
|              72|            85|         69|       164|          1|    43|           1|     5|               4|        123|       127|          8|             4|   1|164|
+----------------+--------------+-----------+----------+-----------+------+------------+------+----------------+-----------+----------+-----------+--------------+----+---+



### How many types of STATUS values are there?

In [120]:
from pyspark.sql.functions import col,count, lit

In [121]:
df. \
    groupBy(col("STATUS")). \
    agg(count(lit(1)).alias("numbers_of_applications")). \
    show()

+----------------+-----------------------+
|          STATUS|numbers_of_applications|
+----------------+-----------------------+
|   Permit Issued|                     21|
|      Inspection|                     18|
|       Cancelled|                     12|
|Work Not Started|                      2|
|          Closed|                    111|
+----------------+-----------------------+



In [122]:
df.filter(df.APPLICATION_DATE.isNotNull()).count()

164

In [123]:
df.filter(df.ISSUED_DATE.isNotNull()).count()

162

In [124]:
df.filter(df.COMPLETED_DATE.isNotNull()).count()

123

### Filter the applications which are processed

In [105]:
df_issued = df. \
    filter(col("ISSUED_DATE") != 'null')

df_issued.count()

162

### How many days it took to process application?
### Sort results in desending order by the days has been taken to process the application


In [106]:
from pyspark.sql.functions import datediff

In [107]:
df_issued. \
    withColumn("processing_days", datediff(col("ISSUED_DATE"), col("APPLICATION_DATE"))). \
    select("_id", "APPLICATION_DATE", "ISSUED_DATE", "processing_days"). \
    orderBy(col("processing_days").desc()). \
    show()

+----+----------------+-----------+---------------+
| _id|APPLICATION_DATE|ISSUED_DATE|processing_days|
+----+----------------+-----------+---------------+
|6659|      2010-09-29| 2015-03-19|           1632|
|6619|      2009-12-01| 2014-04-07|           1588|
|6648|      2010-02-18| 2013-07-31|           1259|
|6615|      2009-11-26| 2010-07-30|            246|
|6567|      2009-06-19| 2010-02-03|            229|
|6620|      2009-12-01| 2010-04-26|            146|
|6621|      2009-12-01| 2010-04-26|            146|
|6622|      2009-12-01| 2010-04-26|            146|
|6712|      2011-12-15| 2012-04-03|            110|
|6568|      2009-08-04| 2009-11-18|            106|
|6714|      2012-06-19| 2012-09-26|             99|
|6575|      2009-08-31| 2009-12-03|             94|
|6565|      2009-06-11| 2009-08-17|             67|
|6566|      2009-06-11| 2009-08-17|             67|
|6564|      2009-06-11| 2009-08-17|             67|
|6710|      2011-09-27| 2011-11-02|             36|
|6713|      

### How many Water Heaters issued in year 2015?

In [108]:
from pyspark.sql.functions import to_date, year

help(to_date)

Help on function to_date in module pyspark.sql.functions:

to_date(col, format=None)
    Converts a :class:`Column` of :class:`pyspark.sql.types.StringType` or
    :class:`pyspark.sql.types.TimestampType` into :class:`pyspark.sql.types.DateType`
    using the optionally specified format. Specify formats according to
    `SimpleDateFormats <http://docs.oracle.com/javase/tutorial/i18n/format/simpleDateFormat.html>`_.
    By default, it follows casting rules to :class:`pyspark.sql.types.DateType` if the format
    is omitted (equivalent to ``col.cast("date")``).
    
    >>> df = spark.createDataFrame([('1997-02-28 10:30:00',)], ['t'])
    >>> df.select(to_date(df.t).alias('date')).collect()
    [Row(date=datetime.date(1997, 2, 28))]
    
    >>> df = spark.createDataFrame([('1997-02-28 10:30:00',)], ['t'])
    >>> df.select(to_date(df.t, 'yyyy-MM-dd HH:mm:ss').alias('date')).collect()
    [Row(date=datetime.date(1997, 2, 28))]
    
    .. versionadded:: 2.2



In [109]:
df_issued. \
    withColumn("ISSUED_YEAR", year(to_date(col("ISSUED_DATE"), format = "yyyy-MM-dd"))). \
    where("ISSUED_YEAR = 2015 "). \
    count()

3

### How many Water Heaters issued by year?

In [110]:
df_issued. \
    withColumn("ISSUED_YEAR", year(to_date(col("ISSUED_DATE"), format = "yyyy-MM-dd"))). \
    groupBy(col("ISSUED_YEAR")). \
    agg(count(lit(1)).alias("no of issued applications")). \
    orderBy(col("ISSUED_YEAR").desc()). \
    show()

+-----------+-------------------------+
|ISSUED_YEAR|no of issued applications|
+-----------+-------------------------+
|       2017|                        1|
|       2016|                        2|
|       2015|                        3|
|       2014|                        3|
|       2013|                        2|
|       2012|                        4|
|       2011|                       10|
|       2010|                       59|
|       2009|                       78|
+-----------+-------------------------+



### Find out the yearwise First 3 Applicants of the Solar Water Heater plant? 

In [139]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, col, to_date, year

In [140]:
spec = Window. \
    partitionBy(col("APPLICATION_YEAR")). \
    orderBy(col("APPLICATION_YEAR").desc())

In [167]:
df. \
    withColumn("APPLICATION_YEAR", year(to_date(col("APPLICATION_DATE"), format= "yyyy-MM-dd"))). \
    withColumn("NUMBER", row_number().over(spec)). \
    filter("NUMBER <= 3"). \
    select(col("_id"), col("APPLICATION_YEAR"), col("NUMBER")). \
    orderBy(col("APPLICATION_YEAR").desc(), col("NUMBER")). \
    show(30)

+----+----------------+------+
| _id|APPLICATION_YEAR|NUMBER|
+----+----------------+------+
|6724|            2017|     1|
|6721|            2016|     1|
|6722|            2016|     2|
|6723|            2016|     3|
|6719|            2015|     1|
|6720|            2015|     2|
|6717|            2014|     1|
|6718|            2014|     2|
|6716|            2013|     1|
|6713|            2012|     1|
|6714|            2012|     2|
|6715|            2012|     3|
|6702|            2011|     1|
|6703|            2011|     2|
|6704|            2011|     3|
|6645|            2010|     1|
|6646|            2010|     2|
|6647|            2010|     3|
|6561|            2009|     1|
|6562|            2009|     2|
|6563|            2009|     3|
+----+----------------+------+



### Find Unique Application's Dates and view in ascending order.

In [170]:
df.select(df["APPLICATION_DATE"]). \
    distinct(). \
    orderBy(col("APPLICATION_DATE").asc()). \
    show()

+----------------+
|APPLICATION_DATE|
+----------------+
|      2009-05-07|
|      2009-05-11|
|      2009-06-11|
|      2009-06-19|
|      2009-08-04|
|      2009-08-08|
|      2009-08-20|
|      2009-08-31|
|      2009-09-09|
|      2009-10-02|
|      2009-10-23|
|      2009-10-27|
|      2009-11-03|
|      2009-11-04|
|      2009-11-09|
|      2009-11-10|
|      2009-11-13|
|      2009-11-17|
|      2009-11-20|
|      2009-11-23|
+----------------+
only showing top 20 rows



### Save data partition by year, with gzip compression and parquet format

#### since we have more where query on YEAR column 

In [172]:
from pyspark.sql.functions import col, to_date, year

In [175]:
df. \
    withColumn("YEAR", year(to_date(col("APPLICATION_DATE"), format = "yyyy-MM-dd"))). \
    write.parquet("/user/akashpatel/exam_preparation/solar_water_heater/output", mode= "overwrite", partitionBy= "YEAR", compression= "gzip")

In [183]:
%%sh 

hdfs dfs -ls /user/akashpatel/exam_preparation/solar_water_heater/output/

Found 10 items
drwxr-xr-x   - akashpatel hdfs          0 2020-04-30 21:22 /user/akashpatel/exam_preparation/solar_water_heater/output/YEAR=2009
drwxr-xr-x   - akashpatel hdfs          0 2020-04-30 21:22 /user/akashpatel/exam_preparation/solar_water_heater/output/YEAR=2010
drwxr-xr-x   - akashpatel hdfs          0 2020-04-30 21:22 /user/akashpatel/exam_preparation/solar_water_heater/output/YEAR=2011
drwxr-xr-x   - akashpatel hdfs          0 2020-04-30 21:22 /user/akashpatel/exam_preparation/solar_water_heater/output/YEAR=2012
drwxr-xr-x   - akashpatel hdfs          0 2020-04-30 21:22 /user/akashpatel/exam_preparation/solar_water_heater/output/YEAR=2013
drwxr-xr-x   - akashpatel hdfs          0 2020-04-30 21:22 /user/akashpatel/exam_preparation/solar_water_heater/output/YEAR=2014
drwxr-xr-x   - akashpatel hdfs          0 2020-04-30 21:22 /user/akashpatel/exam_preparation/solar_water_heater/output/YEAR=2015
drwxr-xr-x   - akashpatel hdfs          0 2020-04-30 21:22 /user/akashpatel/exam_p

### Read above the partitioned data, which is in parquet format 

In [178]:
%%sh 

hdfs dfs -ls /user/akashpatel/exam_preparation/solar_water_heater/output/YEAR=2009

Found 1 items
-rw-r--r--   2 akashpatel hdfs       6520 2020-04-30 21:22 /user/akashpatel/exam_preparation/solar_water_heater/output/YEAR=2009/part-00000-200ddc0b-ba61-4f0f-9b13-661dc429a5db.c000.gz.parquet


In [179]:
parquet_df = spark.read.parquet("/user/akashpatel/exam_preparation/solar_water_heater/output")

In [180]:
parquet_df.printSchema()

root
 |-- APPLICATION_DATE: string (nullable = true)
 |-- COMPLETED_DATE: string (nullable = true)
 |-- ISSUED_DATE: string (nullable = true)
 |-- PERMIT_NUM: string (nullable = true)
 |-- PERMIT_TYPE: string (nullable = true)
 |-- POSTAL: string (nullable = true)
 |-- REVISION_NUM: long (nullable = true)
 |-- STATUS: string (nullable = true)
 |-- STREET_DIRECTION: string (nullable = true)
 |-- STREET_NAME: string (nullable = true)
 |-- STREET_NUM: string (nullable = true)
 |-- STREET_TYPE: string (nullable = true)
 |-- STRUCTURE_TYPE: string (nullable = true)
 |-- WORK: string (nullable = true)
 |-- _id: long (nullable = true)
 |-- YEAR: integer (nullable = true)



### Read specific data of year - 2009, from the partitioned data

In [181]:
parquet_df_2009_year_specific = spark.read.parquet("/user/akashpatel/exam_preparation/solar_water_heater/output/YEAR=2009")

In [182]:
parquet_df_year_specific.printSchema()

root
 |-- APPLICATION_DATE: string (nullable = true)
 |-- COMPLETED_DATE: string (nullable = true)
 |-- ISSUED_DATE: string (nullable = true)
 |-- PERMIT_NUM: string (nullable = true)
 |-- PERMIT_TYPE: string (nullable = true)
 |-- POSTAL: string (nullable = true)
 |-- REVISION_NUM: long (nullable = true)
 |-- STATUS: string (nullable = true)
 |-- STREET_DIRECTION: string (nullable = true)
 |-- STREET_NAME: string (nullable = true)
 |-- STREET_NUM: string (nullable = true)
 |-- STREET_TYPE: string (nullable = true)
 |-- STRUCTURE_TYPE: string (nullable = true)
 |-- WORK: string (nullable = true)
 |-- _id: long (nullable = true)



### Save data partition by year, with lzo compression and orc format

#### since we have more where query on YEAR column 

In [197]:
from pyspark.sql.functions import col, to_date, year

In [205]:
df.withColumn("YEAR", year(to_date("APPLICATION_DATE"))). \
    write. \
    orc("/user/akashpatel/exam_preparation/solar_water_heater/partition_output_orc", mode= "overwrite", partitionBy= "YEAR", compression= "lzo")

In [206]:
%%sh 

hdfs dfs -ls /user/akashpatel/exam_preparation/solar_water_heater/partition_output_orc

Found 10 items
drwxr-xr-x   - akashpatel hdfs          0 2020-04-30 22:01 /user/akashpatel/exam_preparation/solar_water_heater/partition_output_orc/YEAR=2009
drwxr-xr-x   - akashpatel hdfs          0 2020-04-30 22:01 /user/akashpatel/exam_preparation/solar_water_heater/partition_output_orc/YEAR=2010
drwxr-xr-x   - akashpatel hdfs          0 2020-04-30 22:01 /user/akashpatel/exam_preparation/solar_water_heater/partition_output_orc/YEAR=2011
drwxr-xr-x   - akashpatel hdfs          0 2020-04-30 22:01 /user/akashpatel/exam_preparation/solar_water_heater/partition_output_orc/YEAR=2012
drwxr-xr-x   - akashpatel hdfs          0 2020-04-30 22:01 /user/akashpatel/exam_preparation/solar_water_heater/partition_output_orc/YEAR=2013
drwxr-xr-x   - akashpatel hdfs          0 2020-04-30 22:01 /user/akashpatel/exam_preparation/solar_water_heater/partition_output_orc/YEAR=2014
drwxr-xr-x   - akashpatel hdfs          0 2020-04-30 22:01 /user/akashpatel/exam_preparation/solar_water_heater/partition_outpu

In [203]:
# If you want to rermove directory from the hdfs 
%%sh

hdfs dfs -rm -R -skipTrash /user/akashpatel/exam_preparation/solar_water_heater/bucket_partition_output

Deleted /user/akashpatel/exam_preparation/solar_water_heater/bucket_partition_output


### BucketBy only works with t

### Save data partition by year, as spark metastore - table

#### also create Bucket by 5, Status column

In [None]:
df.withColumn("YEAR", year(to_date("APPLICATION_DATE"))). \
    write. \
    bucketBy(5, "STATUS"). \
    saveAsTable('bucketed_table')

In [199]:
df_tbl = spark.read.table("bucketed_table")

In [200]:
df_tbl.printSchema()

root
 |-- APPLICATION_DATE: string (nullable = true)
 |-- COMPLETED_DATE: string (nullable = true)
 |-- ISSUED_DATE: string (nullable = true)
 |-- PERMIT_NUM: string (nullable = true)
 |-- PERMIT_TYPE: string (nullable = true)
 |-- POSTAL: string (nullable = true)
 |-- REVISION_NUM: long (nullable = true)
 |-- STATUS: string (nullable = true)
 |-- STREET_DIRECTION: string (nullable = true)
 |-- STREET_NAME: string (nullable = true)
 |-- STREET_NUM: string (nullable = true)
 |-- STREET_TYPE: string (nullable = true)
 |-- STRUCTURE_TYPE: string (nullable = true)
 |-- WORK: string (nullable = true)
 |-- _id: long (nullable = true)
 |-- YEAR: integer (nullable = true)

