In [1]:
from pyspark.sql import SparkSession
import getpass
username = getpass.getuser()
spark = SparkSession. \
    builder. \
    config('spark.ui.port','0'). \
    config("spark.sql.warehouse.dir", f"/user/itv007008/warehouse"). \
    enableHiveSupport(). \
    master('yarn'). \
    getOrCreate()

In [2]:
spark

In [3]:
orders_df = spark.read \
.format("csv") \
.option("header","true") \
.option("inferSchema", "true") \
.load("/public/trendytech/orders_wh/orders_wh.csv")

In [4]:
orders_df.show()

+--------+--------------------+-----------+---------------+
|order_id|          order_date|customer_id|   order_status|
+--------+--------------------+-----------+---------------+
|       1|2013-07-25 00:00:...|      11599|         CLOSED|
|       2|2013-07-25 00:00:...|        256|PENDING_PAYMENT|
|       3|2013-07-25 00:00:...|      12111|       COMPLETE|
|       4|2013-07-25 00:00:...|       8827|         CLOSED|
|       5|2013-07-25 00:00:...|      11318|       COMPLETE|
|       6|2013-07-25 00:00:...|       7130|       COMPLETE|
|       7|2013-07-25 00:00:...|       4530|       COMPLETE|
|       8|2013-07-25 00:00:...|       2911|     PROCESSING|
|       9|2013-07-25 00:00:...|       5657|PENDING_PAYMENT|
|      10|2013-07-25 00:00:...|       5648|PENDING_PAYMENT|
|      11|2013-07-25 00:00:...|        918| PAYMENT_REVIEW|
|      12|2013-07-25 00:00:...|       1837|         CLOSED|
|      13|2013-07-25 00:00:...|       9149|PENDING_PAYMENT|
|      14|2013-07-25 00:00:...|       98

In [4]:
orders_df.printSchema()

root
 |-- order_id: integer (nullable = true)
 |-- order_date: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- order_status: string (nullable = true)



In [5]:
transformed_df1 = orders_df.withColumnRenamed("order_status", "status")

In [6]:
transformed_df1.show()

+--------+--------------------+-----------+---------------+
|order_id|          order_date|customer_id|         status|
+--------+--------------------+-----------+---------------+
|       1|2013-07-25 00:00:...|      11599|         CLOSED|
|       2|2013-07-25 00:00:...|        256|PENDING_PAYMENT|
|       3|2013-07-25 00:00:...|      12111|       COMPLETE|
|       4|2013-07-25 00:00:...|       8827|         CLOSED|
|       5|2013-07-25 00:00:...|      11318|       COMPLETE|
|       6|2013-07-25 00:00:...|       7130|       COMPLETE|
|       7|2013-07-25 00:00:...|       4530|       COMPLETE|
|       8|2013-07-25 00:00:...|       2911|     PROCESSING|
|       9|2013-07-25 00:00:...|       5657|PENDING_PAYMENT|
|      10|2013-07-25 00:00:...|       5648|PENDING_PAYMENT|
|      11|2013-07-25 00:00:...|        918| PAYMENT_REVIEW|
|      12|2013-07-25 00:00:...|       1837|         CLOSED|
|      13|2013-07-25 00:00:...|       9149|PENDING_PAYMENT|
|      14|2013-07-25 00:00:...|       98

In [7]:
from pyspark.sql.functions import *
transformed_df2 = transformed_df1.withColumn("order_date_new", to_timestamp("order_date"))

In [8]:
transformed_df2.show()

+--------+--------------------+-----------+---------------+-------------------+
|order_id|          order_date|customer_id|         status|     order_date_new|
+--------+--------------------+-----------+---------------+-------------------+
|       1|2013-07-25 00:00:...|      11599|         CLOSED|2013-07-25 00:00:00|
|       2|2013-07-25 00:00:...|        256|PENDING_PAYMENT|2013-07-25 00:00:00|
|       3|2013-07-25 00:00:...|      12111|       COMPLETE|2013-07-25 00:00:00|
|       4|2013-07-25 00:00:...|       8827|         CLOSED|2013-07-25 00:00:00|
|       5|2013-07-25 00:00:...|      11318|       COMPLETE|2013-07-25 00:00:00|
|       6|2013-07-25 00:00:...|       7130|       COMPLETE|2013-07-25 00:00:00|
|       7|2013-07-25 00:00:...|       4530|       COMPLETE|2013-07-25 00:00:00|
|       8|2013-07-25 00:00:...|       2911|     PROCESSING|2013-07-25 00:00:00|
|       9|2013-07-25 00:00:...|       5657|PENDING_PAYMENT|2013-07-25 00:00:00|
|      10|2013-07-25 00:00:...|       56

In [9]:
transformed_df2.printSchema()

root
 |-- order_id: integer (nullable = true)
 |-- order_date: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- status: string (nullable = true)
 |-- order_date_new: timestamp (nullable = true)



In [10]:
csv_reader_df = spark.read.csv("/public/trendytech/orders_wh/orders_wh.csv", header = "true", inferSchema = "true")

In [11]:
csv_reader_df.show()

+--------+--------------------+-----------+---------------+
|order_id|          order_date|customer_id|   order_status|
+--------+--------------------+-----------+---------------+
|       1|2013-07-25 00:00:...|      11599|         CLOSED|
|       2|2013-07-25 00:00:...|        256|PENDING_PAYMENT|
|       3|2013-07-25 00:00:...|      12111|       COMPLETE|
|       4|2013-07-25 00:00:...|       8827|         CLOSED|
|       5|2013-07-25 00:00:...|      11318|       COMPLETE|
|       6|2013-07-25 00:00:...|       7130|       COMPLETE|
|       7|2013-07-25 00:00:...|       4530|       COMPLETE|
|       8|2013-07-25 00:00:...|       2911|     PROCESSING|
|       9|2013-07-25 00:00:...|       5657|PENDING_PAYMENT|
|      10|2013-07-25 00:00:...|       5648|PENDING_PAYMENT|
|      11|2013-07-25 00:00:...|        918| PAYMENT_REVIEW|
|      12|2013-07-25 00:00:...|       1837|         CLOSED|
|      13|2013-07-25 00:00:...|       9149|PENDING_PAYMENT|
|      14|2013-07-25 00:00:...|       98

In [12]:
csv_reader_df.printSchema()

root
 |-- order_id: integer (nullable = true)
 |-- order_date: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- order_status: string (nullable = true)



In [13]:
json_reader_df=spark.read.json("/public/trendytech/datasets/orders.json")

In [14]:
json_reader_df.show()

+-----------+--------------------+--------+---------------+
|customer_id|          order_date|order_id|   order_status|
+-----------+--------------------+--------+---------------+
|      11599|2013-07-25 00:00:...|       1|         CLOSED|
|        256|2013-07-25 00:00:...|       2|PENDING_PAYMENT|
|      12111|2013-07-25 00:00:...|       3|       COMPLETE|
|       8827|2013-07-25 00:00:...|       4|         CLOSED|
|      11318|2013-07-25 00:00:...|       5|       COMPLETE|
|       7130|2013-07-25 00:00:...|       6|       COMPLETE|
|       4530|2013-07-25 00:00:...|       7|       COMPLETE|
|       2911|2013-07-25 00:00:...|       8|     PROCESSING|
|       5657|2013-07-25 00:00:...|       9|PENDING_PAYMENT|
|       5648|2013-07-25 00:00:...|      10|PENDING_PAYMENT|
|        918|2013-07-25 00:00:...|      11| PAYMENT_REVIEW|
|       1837|2013-07-25 00:00:...|      12|         CLOSED|
|       9149|2013-07-25 00:00:...|      13|PENDING_PAYMENT|
|       9842|2013-07-25 00:00:...|      

In [15]:
json_reader_df.printSchema()

root
 |-- customer_id: long (nullable = true)
 |-- order_date: string (nullable = true)
 |-- order_id: long (nullable = true)
 |-- order_status: string (nullable = true)



In [16]:
parquet_readers_df = spark.read.parquet("/public/trendytech/datasets/ordersparquet/*")

In [17]:
parquet_readers_df.show()

+-----------+--------------------+--------+---------------+
|customer_id|          order_date|order_id|   order_status|
+-----------+--------------------+--------+---------------+
|      11599|2013-07-25 00:00:...|       1|         CLOSED|
|        256|2013-07-25 00:00:...|       2|PENDING_PAYMENT|
|      12111|2013-07-25 00:00:...|       3|       COMPLETE|
|       8827|2013-07-25 00:00:...|       4|         CLOSED|
|      11318|2013-07-25 00:00:...|       5|       COMPLETE|
|       7130|2013-07-25 00:00:...|       6|       COMPLETE|
|       4530|2013-07-25 00:00:...|       7|       COMPLETE|
|       2911|2013-07-25 00:00:...|       8|     PROCESSING|
|       5657|2013-07-25 00:00:...|       9|PENDING_PAYMENT|
|       5648|2013-07-25 00:00:...|      10|PENDING_PAYMENT|
|        918|2013-07-25 00:00:...|      11| PAYMENT_REVIEW|
|       1837|2013-07-25 00:00:...|      12|         CLOSED|
|       9149|2013-07-25 00:00:...|      13|PENDING_PAYMENT|
|       9842|2013-07-25 00:00:...|      

In [18]:
orc_readers_df = spark.read.orc("/public/trendytech/datasets/ordersorc/*")

In [19]:
orc_readers_df.show()

+-----------+--------------------+--------+---------------+
|customer_id|          order_date|order_id|   order_status|
+-----------+--------------------+--------+---------------+
|      11599|2013-07-25 00:00:...|       1|         CLOSED|
|        256|2013-07-25 00:00:...|       2|PENDING_PAYMENT|
|      12111|2013-07-25 00:00:...|       3|       COMPLETE|
|       8827|2013-07-25 00:00:...|       4|         CLOSED|
|      11318|2013-07-25 00:00:...|       5|       COMPLETE|
|       7130|2013-07-25 00:00:...|       6|       COMPLETE|
|       4530|2013-07-25 00:00:...|       7|       COMPLETE|
|       2911|2013-07-25 00:00:...|       8|     PROCESSING|
|       5657|2013-07-25 00:00:...|       9|PENDING_PAYMENT|
|       5648|2013-07-25 00:00:...|      10|PENDING_PAYMENT|
|        918|2013-07-25 00:00:...|      11| PAYMENT_REVIEW|
|       1837|2013-07-25 00:00:...|      12|         CLOSED|
|       9149|2013-07-25 00:00:...|      13|PENDING_PAYMENT|
|       9842|2013-07-25 00:00:...|      

In [20]:
filtered_df = orc_readers_df.where("customer_id=11599")

In [21]:
filtered_df.show(truncate=False)

+-----------+---------------------+--------+------------+
|customer_id|order_date           |order_id|order_status|
+-----------+---------------------+--------+------------+
|11599      |2013-07-25 00:00:00.0|1       |CLOSED      |
|11599      |2013-10-03 00:00:00.0|11397   |COMPLETE    |
|11599      |2013-12-20 00:00:00.0|23908   |COMPLETE    |
|11599      |2014-06-27 00:00:00.0|53545   |PENDING     |
|11599      |2013-10-17 00:00:00.0|59911   |PROCESSING  |
+-----------+---------------------+--------+------------+



In [22]:
filtered_df = orc_readers_df.filter("customer_id=11599")

In [23]:
filtered_df.show()

+-----------+--------------------+--------+------------+
|customer_id|          order_date|order_id|order_status|
+-----------+--------------------+--------+------------+
|      11599|2013-07-25 00:00:...|       1|      CLOSED|
|      11599|2013-10-03 00:00:...|   11397|    COMPLETE|
|      11599|2013-12-20 00:00:...|   23908|    COMPLETE|
|      11599|2014-06-27 00:00:...|   53545|     PENDING|
|      11599|2013-10-17 00:00:...|   59911|  PROCESSING|
+-----------+--------------------+--------+------------+



In [24]:
orders_df.createOrReplaceTempView("orders")

In [25]:
filtered_df = spark.sql("Select * from orders where order_status = 'CLOSED'")

In [26]:
filtered_df.show()

+--------+--------------------+-----------+------------+
|order_id|          order_date|customer_id|order_status|
+--------+--------------------+-----------+------------+
|       1|2013-07-25 00:00:...|      11599|      CLOSED|
|       4|2013-07-25 00:00:...|       8827|      CLOSED|
|      12|2013-07-25 00:00:...|       1837|      CLOSED|
|      18|2013-07-25 00:00:...|       1205|      CLOSED|
|      24|2013-07-25 00:00:...|      11441|      CLOSED|
|      25|2013-07-25 00:00:...|       9503|      CLOSED|
|      37|2013-07-25 00:00:...|       5863|      CLOSED|
|      51|2013-07-25 00:00:...|      12271|      CLOSED|
|      57|2013-07-25 00:00:...|       7073|      CLOSED|
|      61|2013-07-25 00:00:...|       4791|      CLOSED|
|      62|2013-07-25 00:00:...|       9111|      CLOSED|
|      87|2013-07-25 00:00:...|       3065|      CLOSED|
|      90|2013-07-25 00:00:...|       9131|      CLOSED|
|     101|2013-07-25 00:00:...|       5116|      CLOSED|
|     116|2013-07-26 00:00:...|

In [27]:
ordersdf = spark.read.table("orders")

In [28]:
ordersdf.show()

+--------+--------------------+-----------+---------------+
|order_id|          order_date|customer_id|   order_status|
+--------+--------------------+-----------+---------------+
|       1|2013-07-25 00:00:...|      11599|         CLOSED|
|       2|2013-07-25 00:00:...|        256|PENDING_PAYMENT|
|       3|2013-07-25 00:00:...|      12111|       COMPLETE|
|       4|2013-07-25 00:00:...|       8827|         CLOSED|
|       5|2013-07-25 00:00:...|      11318|       COMPLETE|
|       6|2013-07-25 00:00:...|       7130|       COMPLETE|
|       7|2013-07-25 00:00:...|       4530|       COMPLETE|
|       8|2013-07-25 00:00:...|       2911|     PROCESSING|
|       9|2013-07-25 00:00:...|       5657|PENDING_PAYMENT|
|      10|2013-07-25 00:00:...|       5648|PENDING_PAYMENT|
|      11|2013-07-25 00:00:...|        918| PAYMENT_REVIEW|
|      12|2013-07-25 00:00:...|       1837|         CLOSED|
|      13|2013-07-25 00:00:...|       9149|PENDING_PAYMENT|
|      14|2013-07-25 00:00:...|       98

In [29]:
## How to create a spark table ###
spark.sql("CREATE DATABASE IF NOT EXISTS itv007008_retail")

In [30]:
spark.sql("show databases").show()

+--------------------+
|           namespace|
+--------------------+
|   0001_av_ivy_tesco|
|        003402_hive1|
|    005198_ivy_tesco|
|    005212_ivy_tesco|
| 005222_ivy_practice|
| 005260_ivy_database|
|    005302_retail_db|
|005876_week5_assi...|
|       005933_retail|
|     006586_database|
|     006608_database|
|006866_week5_assi...|
|         00ivy_tesco|
|          00ivy_test|
|       07172021_nyse|
|     07172021_retail|
|        07172021_sms|
|         1230_trendy|
|     1230_trendytech|
|       1540retail_db|
+--------------------+
only showing top 20 rows



In [31]:
spark.sql("show databases").filter("namespace like 'itv007008%'").show()

+----------------+
|       namespace|
+----------------+
|itv007008_retail|
+----------------+



In [32]:
spark.sql("show tables").show()

+--------+-----------------+-----------+
|database|        tableName|isTemporary|
+--------+-----------------+-----------+
| default|            1htab|      false|
| default|   41group_movies|      false|
| default|    4group_movies|      false|
| default|             4tab|      false|
| default|    6_flags_simon|      false|
| default|               aa|      false|
| default|              abc|      false|
| default|             acid|      false|
| default|            acid1|      false|
| default|     acid_example|      false|
| default|    acid_example1|      false|
| default|    acid_example2|      false|
| default|           adata1|      false|
| default|        adata_ell|      false|
| default|         adata_vr|      false|
| default|    ad_earthquake|      false|
| default|ad_earthquake_par|      false|
| default|           adelta|      false|
| default|       adeltapart|      false|
| default|   adeltapartbuck|      false|
+--------+-----------------+-----------+
only showing top

In [33]:
spark.sql("use itv007008_retail")

In [34]:
spark.sql("show tables").show()

+----------------+----------+-----------+
|        database| tableName|isTemporary|
+----------------+----------+-----------+
|itv007008_retail|    orders|      false|
|itv007008_retail|orders_ext|      false|
|                |    orders|       true|
+----------------+----------+-----------+



In [36]:
spark.sql("CREATE TABLE itv007008_retail.ORDERS(order_id integer, order_date string, customer_id integer, order_status string)")

In [37]:
spark.sql("show tables").show()

+----------------+---------+-----------+
|        database|tableName|isTemporary|
+----------------+---------+-----------+
|itv007008_retail|   orders|      false|
|                |   orders|       true|
+----------------+---------+-----------+



In [38]:
spark.sql("insert into itv007008_retail.ORDERS select * from orders")

In [39]:
spark.sql("Select * from itv007008_retail.ORDERS where order_status='CLOSED'").show()

+--------+--------------------+-----------+------------+
|order_id|          order_date|customer_id|order_status|
+--------+--------------------+-----------+------------+
|       1|2013-07-25 00:00:...|      11599|      CLOSED|
|       4|2013-07-25 00:00:...|       8827|      CLOSED|
|      12|2013-07-25 00:00:...|       1837|      CLOSED|
|      18|2013-07-25 00:00:...|       1205|      CLOSED|
|      24|2013-07-25 00:00:...|      11441|      CLOSED|
|      25|2013-07-25 00:00:...|       9503|      CLOSED|
|      37|2013-07-25 00:00:...|       5863|      CLOSED|
|      51|2013-07-25 00:00:...|      12271|      CLOSED|
|      57|2013-07-25 00:00:...|       7073|      CLOSED|
|      61|2013-07-25 00:00:...|       4791|      CLOSED|
|      62|2013-07-25 00:00:...|       9111|      CLOSED|
|      87|2013-07-25 00:00:...|       3065|      CLOSED|
|      90|2013-07-25 00:00:...|       9131|      CLOSED|
|     101|2013-07-25 00:00:...|       5116|      CLOSED|
|     116|2013-07-26 00:00:...|

In [40]:
spark.sql("use itv007008_retail")

In [41]:
spark.sql("show tables").show()

+----------------+---------+-----------+
|        database|tableName|isTemporary|
+----------------+---------+-----------+
|itv007008_retail|   orders|      false|
|                |   orders|       true|
+----------------+---------+-----------+



In [42]:
spark.sql("describe table itv007008_retail.orders").show()

+------------+---------+-------+
|    col_name|data_type|comment|
+------------+---------+-------+
|    order_id|      int|   null|
|  order_date|   string|   null|
| customer_id|      int|   null|
|order_status|   string|   null|
+------------+---------+-------+



In [35]:
spark.sql("describe extended itv007008_retail.orders").show(truncate = False)

+----------------------------+---------------------------------------------------------------------------------+-------+
|col_name                    |data_type                                                                        |comment|
+----------------------------+---------------------------------------------------------------------------------+-------+
|order_id                    |int                                                                              |null   |
|order_date                  |string                                                                           |null   |
|customer_id                 |int                                                                              |null   |
|order_status                |string                                                                           |null   |
|                            |                                                                                 |       |
|# Detailed Table Information|  

In [None]:
spark.sql("drop table itv007008_retail.orders")

In [45]:
spark.sql("create table itv007008_retail.orders_ext(order_id integer, order_date string, customer_id integer, order_status string) using csv location '/public/trendytech/retail_db/orders'")

In [36]:
spark.sql("select * from itv007008_retail.orders_ext").show()

+--------+--------------------+-----------+---------------+
|order_id|          order_date|customer_id|   order_status|
+--------+--------------------+-----------+---------------+
|       1|2013-07-25 00:00:...|      11599|         CLOSED|
|       2|2013-07-25 00:00:...|        256|PENDING_PAYMENT|
|       3|2013-07-25 00:00:...|      12111|       COMPLETE|
|       4|2013-07-25 00:00:...|       8827|         CLOSED|
|       5|2013-07-25 00:00:...|      11318|       COMPLETE|
|       6|2013-07-25 00:00:...|       7130|       COMPLETE|
|       7|2013-07-25 00:00:...|       4530|       COMPLETE|
|       8|2013-07-25 00:00:...|       2911|     PROCESSING|
|       9|2013-07-25 00:00:...|       5657|PENDING_PAYMENT|
|      10|2013-07-25 00:00:...|       5648|PENDING_PAYMENT|
|      11|2013-07-25 00:00:...|        918| PAYMENT_REVIEW|
|      12|2013-07-25 00:00:...|       1837|         CLOSED|
|      13|2013-07-25 00:00:...|       9149|PENDING_PAYMENT|
|      14|2013-07-25 00:00:...|       98

In [37]:
spark.sql("describe extended itv007008_retail.orders_ext").show(truncate=False)

+----------------------------+----------------------------------------------------------------+-------+
|col_name                    |data_type                                                       |comment|
+----------------------------+----------------------------------------------------------------+-------+
|order_id                    |int                                                             |null   |
|order_date                  |string                                                          |null   |
|customer_id                 |int                                                             |null   |
|order_status                |string                                                          |null   |
|                            |                                                                |       |
|# Detailed Table Information|                                                                |       |
|Database                    |itv007008_retail                  

In [38]:
spark.sql("truncate table itv007008_retail.orders_ext")

AnalysisException: Operation not allowed: TRUNCATE TABLE on external tables: `itv007008_retail`.`orders_ext`;

In [39]:
spark.sql("insert into table itv007008_retail.orders_ext values(1111, '12-02-2023', 2222, 'CLOSED')")

Py4JJavaError: An error occurred while calling o45.sql.
: org.apache.hadoop.security.AccessControlException: Permission denied: user=itv007008, access=WRITE, inode="/public/trendytech/retail_db/orders":itv005857:supergroup:drwxr-xr-x
	at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:496)
	at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:336)
	at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermissionWithContext(FSPermissionChecker.java:360)
	at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:239)
	at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1909)
	at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1893)
	at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkAncestorAccess(FSDirectory.java:1852)
	at org.apache.hadoop.hdfs.server.namenode.FSDirMkdirOp.mkdirs(FSDirMkdirOp.java:60)
	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirs(FSNamesystem.java:3407)
	at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.mkdirs(NameNodeRpcServer.java:1161)
	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.mkdirs(ClientNamenodeProtocolServerSideTranslatorPB.java:739)
	at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
	at org.apache.hadoop.ipc.ProtobufRpcEngine2$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine2.java:532)
	at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1070)
	at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:1020)
	at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:948)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1845)
	at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2952)

	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
	at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:121)
	at org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:88)
	at org.apache.hadoop.hdfs.DFSClient.primitiveMkdir(DFSClient.java:2426)
	at org.apache.hadoop.hdfs.DFSClient.mkdirs(DFSClient.java:2400)
	at org.apache.hadoop.hdfs.DistributedFileSystem$27.doCall(DistributedFileSystem.java:1324)
	at org.apache.hadoop.hdfs.DistributedFileSystem$27.doCall(DistributedFileSystem.java:1321)
	at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
	at org.apache.hadoop.hdfs.DistributedFileSystem.mkdirsInternal(DistributedFileSystem.java:1338)
	at org.apache.hadoop.hdfs.DistributedFileSystem.mkdirs(DistributedFileSystem.java:1313)
	at org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:2275)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.setupJob(FileOutputCommitter.java:354)
	at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.setupJob(HadoopMapReduceCommitProtocol.scala:163)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:168)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:178)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:108)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:106)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:120)
	at org.apache.spark.sql.Dataset.$anonfun$logicalPlan$1(Dataset.scala:229)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3618)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3616)
	at org.apache.spark.sql.Dataset.<init>(Dataset.scala:229)
	at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
	at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97)
	at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:607)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
	at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:602)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.AccessControlException): Permission denied: user=itv007008, access=WRITE, inode="/public/trendytech/retail_db/orders":itv005857:supergroup:drwxr-xr-x
	at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:496)
	at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:336)
	at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermissionWithContext(FSPermissionChecker.java:360)
	at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:239)
	at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1909)
	at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1893)
	at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkAncestorAccess(FSDirectory.java:1852)
	at org.apache.hadoop.hdfs.server.namenode.FSDirMkdirOp.mkdirs(FSDirMkdirOp.java:60)
	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirs(FSNamesystem.java:3407)
	at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.mkdirs(NameNodeRpcServer.java:1161)
	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.mkdirs(ClientNamenodeProtocolServerSideTranslatorPB.java:739)
	at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
	at org.apache.hadoop.ipc.ProtobufRpcEngine2$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine2.java:532)
	at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1070)
	at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:1020)
	at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:948)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1845)
	at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2952)

	at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1511)
	at org.apache.hadoop.ipc.Client.call(Client.java:1457)
	at org.apache.hadoop.ipc.Client.call(Client.java:1367)
	at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:228)
	at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116)
	at com.sun.proxy.$Proxy18.mkdirs(Unknown Source)
	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.mkdirs(ClientNamenodeProtocolTranslatorPB.java:656)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:422)
	at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:165)
	at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:157)
	at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:359)
	at com.sun.proxy.$Proxy19.mkdirs(Unknown Source)
	at org.apache.hadoop.hdfs.DFSClient.primitiveMkdir(DFSClient.java:2424)
	... 40 more


In [40]:
orders_df.show()

+--------+--------------------+-----------+---------------+
|order_id|          order_date|customer_id|   order_status|
+--------+--------------------+-----------+---------------+
|       1|2013-07-25 00:00:...|      11599|         CLOSED|
|       2|2013-07-25 00:00:...|        256|PENDING_PAYMENT|
|       3|2013-07-25 00:00:...|      12111|       COMPLETE|
|       4|2013-07-25 00:00:...|       8827|         CLOSED|
|       5|2013-07-25 00:00:...|      11318|       COMPLETE|
|       6|2013-07-25 00:00:...|       7130|       COMPLETE|
|       7|2013-07-25 00:00:...|       4530|       COMPLETE|
|       8|2013-07-25 00:00:...|       2911|     PROCESSING|
|       9|2013-07-25 00:00:...|       5657|PENDING_PAYMENT|
|      10|2013-07-25 00:00:...|       5648|PENDING_PAYMENT|
|      11|2013-07-25 00:00:...|        918| PAYMENT_REVIEW|
|      12|2013-07-25 00:00:...|       1837|         CLOSED|
|      13|2013-07-25 00:00:...|       9149|PENDING_PAYMENT|
|      14|2013-07-25 00:00:...|       98

In [41]:
orders_df.printSchema()

root
 |-- order_id: integer (nullable = true)
 |-- order_date: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- order_status: string (nullable = true)



## Higher Level API's Demo 

In [5]:
orders_df.createOrReplaceTempView("orders")

#### 1. Top 15 customers who placed most number of orders

In [6]:
result1 = orders_df.groupby("customer_id").count().sort("count", ascending = False).show(15)

+-----------+-----+
|customer_id|count|
+-----------+-----+
|       5897|   16|
|        569|   16|
|      12431|   16|
|       6316|   16|
|        221|   15|
|       5654|   15|
|       4320|   15|
|       5283|   15|
|      12284|   15|
|       5624|   15|
|      11689|   14|
|       4249|   14|
|       6248|   14|
|       4517|   14|
|       3710|   14|
+-----------+-----+
only showing top 15 rows



In [7]:
spark.sql("select customer_id, count(order_id) as count from orders group by customer_id order by count DESC limit 15").show()

+-----------+-----+
|customer_id|count|
+-----------+-----+
|       5897|   16|
|      12431|   16|
|        569|   16|
|       6316|   16|
|      12284|   15|
|       5624|   15|
|       5654|   15|
|       4320|   15|
|       5283|   15|
|        221|   15|
|       4517|   14|
|       3708|   14|
|       4116|   14|
|      11689|   14|
|       6248|   14|
+-----------+-----+



#### 2. Find the number of orders under each order status

In [8]:
orders_df.groupby("order_status").count().show()

+---------------+-----+
|   order_status|count|
+---------------+-----+
|PENDING_PAYMENT|15030|
|       COMPLETE|22899|
|        ON_HOLD| 3798|
| PAYMENT_REVIEW|  729|
|     PROCESSING| 8275|
|         CLOSED| 7556|
|SUSPECTED_FRAUD| 1558|
|        PENDING| 7610|
|       CANCELED| 1428|
+---------------+-----+



In [9]:
spark.sql("select order_status, count(order_id) as count from orders group by order_status").show()

+---------------+-----+
|   order_status|count|
+---------------+-----+
|PENDING_PAYMENT|15030|
|       COMPLETE|22899|
|        ON_HOLD| 3798|
| PAYMENT_REVIEW|  729|
|     PROCESSING| 8275|
|         CLOSED| 7556|
|SUSPECTED_FRAUD| 1558|
|        PENDING| 7610|
|       CANCELED| 1428|
+---------------+-----+



#### 3. Number of active customers ( who placed atleast one order)

In [19]:
spark.sql("Select count(DISTINCT customer_id) as count  from orders").show()

+-----+
|count|
+-----+
|12405|
+-----+



In [17]:
orders_df.select("customer_id").distinct().count()

12405

#### 4. Customers with most number of closed orders.

In [10]:
spark.sql("select customer_id, count(order_id) as count from orders where order_status = 'CLOSED' group by customer_id").sort("count", ascending = False).show()

+-----------+-----+
|customer_id|count|
+-----------+-----+
|       1833|    6|
|       1687|    5|
|       1363|    5|
|       5493|    5|
|       2403|    4|
|       2768|    4|
|      10263|    4|
|       2236|    4|
|      10111|    4|
|        437|    4|
|       4573|    4|
|       3631|    4|
|      12431|    4|
|       4588|    4|
|       1521|    4|
|       7948|    4|
|      10018|    4|
|       5319|    4|
|       2774|    4|
|       7879|    4|
+-----------+-----+
only showing top 20 rows



In [20]:
orders_df.filter("order_status='CLOSED'").groupby("customer_id").count().sort("count", ascending=False).show()

+-----------+-----+
|customer_id|count|
+-----------+-----+
|       1833|    6|
|       1363|    5|
|       1687|    5|
|       5493|    5|
|       7948|    4|
|       2768|    4|
|      10263|    4|
|       2236|    4|
|       2403|    4|
|       7879|    4|
|       4573|    4|
|       7850|    4|
|      12431|    4|
|       1521|    4|
|      10111|    4|
|        437|    4|
|      10018|    4|
|       5319|    4|
|       2774|    4|
|       3631|    4|
+-----------+-----+
only showing top 20 rows



In [21]:
spark.stop()