In [35]:
from pyspark.sql import SparkSession
import getpass

username = getpass.getuser()

spark = SparkSession. \
	builder. \
	config('spark.ui.port', '0'). \
	config("spark.sql.warehouse.dir", f"/user/{username}/warehouse"). \
	enableHiveSupport(). \
	master('yarn'). \
	getOrCreate()

In [36]:
spark

In [37]:
orders_df = spark.read \
.format("csv") \
.option("header","true") \
.option("inferSchema","true") \
.load("/public/trendytech/orders_wh/*")

In [38]:
orders_df.show()

+--------+--------------------+-----------+---------------+
|order_id|          order_date|customer_id|   order_status|
+--------+--------------------+-----------+---------------+
|       1|2013-07-25 00:00:...|      11599|         CLOSED|
|       2|2013-07-25 00:00:...|        256|PENDING_PAYMENT|
|       3|2013-07-25 00:00:...|      12111|       COMPLETE|
|       4|2013-07-25 00:00:...|       8827|         CLOSED|
|       5|2013-07-25 00:00:...|      11318|       COMPLETE|
|       6|2013-07-25 00:00:...|       7130|       COMPLETE|
|       7|2013-07-25 00:00:...|       4530|       COMPLETE|
|       8|2013-07-25 00:00:...|       2911|     PROCESSING|
|       9|2013-07-25 00:00:...|       5657|PENDING_PAYMENT|
|      10|2013-07-25 00:00:...|       5648|PENDING_PAYMENT|
|      11|2013-07-25 00:00:...|        918| PAYMENT_REVIEW|
|      12|2013-07-25 00:00:...|       1837|         CLOSED|
|      13|2013-07-25 00:00:...|       9149|PENDING_PAYMENT|
|      14|2013-07-25 00:00:...|       98

### Show the schema

In [39]:
orders_df.printSchema()

root
 |-- order_id: integer (nullable = true)
 |-- order_date: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- order_status: string (nullable = true)



### change order status to status

In [40]:
renamed_orders_df = orders_df.withColumnRenamed("order_status","status")

In [41]:
renamed_orders_df.show(5)

+--------+--------------------+-----------+---------------+
|order_id|          order_date|customer_id|         status|
+--------+--------------------+-----------+---------------+
|       1|2013-07-25 00:00:...|      11599|         CLOSED|
|       2|2013-07-25 00:00:...|        256|PENDING_PAYMENT|
|       3|2013-07-25 00:00:...|      12111|       COMPLETE|
|       4|2013-07-25 00:00:...|       8827|         CLOSED|
|       5|2013-07-25 00:00:...|      11318|       COMPLETE|
+--------+--------------------+-----------+---------------+
only showing top 5 rows



### change datatype of order date

In [42]:
from pyspark.sql.functions import to_timestamp
changed_datatype_df=renamed_orders_df.withColumn("orders_date", to_timestamp("order_date"))

In [43]:
changed_datatype_df.printSchema()

root
 |-- order_id: integer (nullable = true)
 |-- order_date: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- status: string (nullable = true)
 |-- orders_date: timestamp (nullable = true)



### short cut methods to read different file formats

In [44]:
orders_csv_df = spark.read \
.csv("/public/trendytech/orders_wh/*", \
     header = "true", \
     inferSchema="true")

In [45]:
orders_csv_df.show(5)

+--------+--------------------+-----------+---------------+
|order_id|          order_date|customer_id|   order_status|
+--------+--------------------+-----------+---------------+
|       1|2013-07-25 00:00:...|      11599|         CLOSED|
|       2|2013-07-25 00:00:...|        256|PENDING_PAYMENT|
|       3|2013-07-25 00:00:...|      12111|       COMPLETE|
|       4|2013-07-25 00:00:...|       8827|         CLOSED|
|       5|2013-07-25 00:00:...|      11318|       COMPLETE|
+--------+--------------------+-----------+---------------+
only showing top 5 rows



### Read Json

In [46]:
orders_json_df = spark.read.json("/public/trendytech/datasets/orders.json")

In [47]:
orders_json_df.show(5)

+-----------+--------------------+--------+---------------+
|customer_id|          order_date|order_id|   order_status|
+-----------+--------------------+--------+---------------+
|      11599|2013-07-25 00:00:...|       1|         CLOSED|
|        256|2013-07-25 00:00:...|       2|PENDING_PAYMENT|
|      12111|2013-07-25 00:00:...|       3|       COMPLETE|
|       8827|2013-07-25 00:00:...|       4|         CLOSED|
|      11318|2013-07-25 00:00:...|       5|       COMPLETE|
+-----------+--------------------+--------+---------------+
only showing top 5 rows



In [68]:
orders_json_df.printSchema()

root
 |-- customer_id: long (nullable = true)
 |-- order_date: string (nullable = true)
 |-- order_id: long (nullable = true)
 |-- order_status: string (nullable = true)



### Read Parquet file

In [48]:
orders_parquet_df = spark.read.parquet("/public/trendytech/datasets/ordersparquet")

In [50]:
orders_parquet_df.show(5)

+-----------+--------------------+--------+---------------+
|customer_id|          order_date|order_id|   order_status|
+-----------+--------------------+--------+---------------+
|      11599|2013-07-25 00:00:...|       1|         CLOSED|
|        256|2013-07-25 00:00:...|       2|PENDING_PAYMENT|
|      12111|2013-07-25 00:00:...|       3|       COMPLETE|
|       8827|2013-07-25 00:00:...|       4|         CLOSED|
|      11318|2013-07-25 00:00:...|       5|       COMPLETE|
+-----------+--------------------+--------+---------------+
only showing top 5 rows



In [49]:
orders_parquet_df.printSchema()

root
 |-- customer_id: long (nullable = true)
 |-- order_date: string (nullable = true)
 |-- order_id: long (nullable = true)
 |-- order_status: string (nullable = true)



### Read Orc file

In [53]:
orders_orc_df = spark.read.orc("/public/trendytech/datasets/ordersorc")

In [54]:
orders_orc_df.show(5)

+-----------+--------------------+--------+---------------+
|customer_id|          order_date|order_id|   order_status|
+-----------+--------------------+--------+---------------+
|      11599|2013-07-25 00:00:...|       1|         CLOSED|
|        256|2013-07-25 00:00:...|       2|PENDING_PAYMENT|
|      12111|2013-07-25 00:00:...|       3|       COMPLETE|
|       8827|2013-07-25 00:00:...|       4|         CLOSED|
|      11318|2013-07-25 00:00:...|       5|       COMPLETE|
+-----------+--------------------+--------+---------------+
only showing top 5 rows



In [55]:
orders_orc_df.printSchema()

root
 |-- customer_id: long (nullable = true)
 |-- order_date: string (nullable = true)
 |-- order_id: long (nullable = true)
 |-- order_status: string (nullable = true)



### Filter Data

In [56]:
filtered_data = orders_df.where("customer_id = 11599")

In [57]:
filtered_data.show()

+--------+--------------------+-----------+------------+
|order_id|          order_date|customer_id|order_status|
+--------+--------------------+-----------+------------+
|       1|2013-07-25 00:00:...|      11599|      CLOSED|
|   11397|2013-10-03 00:00:...|      11599|    COMPLETE|
|   23908|2013-12-20 00:00:...|      11599|    COMPLETE|
|   53545|2014-06-27 00:00:...|      11599|     PENDING|
|   59911|2013-10-17 00:00:...|      11599|  PROCESSING|
+--------+--------------------+-----------+------------+



In [58]:
filtered_data.show(truncate = False)

+--------+---------------------+-----------+------------+
|order_id|order_date           |customer_id|order_status|
+--------+---------------------+-----------+------------+
|1       |2013-07-25 00:00:00.0|11599      |CLOSED      |
|11397   |2013-10-03 00:00:00.0|11599      |COMPLETE    |
|23908   |2013-12-20 00:00:00.0|11599      |COMPLETE    |
|53545   |2014-06-27 00:00:00.0|11599      |PENDING     |
|59911   |2013-10-17 00:00:00.0|11599      |PROCESSING  |
+--------+---------------------+-----------+------------+



In [59]:
filtered_data_using_filter = orders_df.filter("customer_id = 11599")

In [60]:
filtered_data_using_filter.show()

+--------+--------------------+-----------+------------+
|order_id|          order_date|customer_id|order_status|
+--------+--------------------+-----------+------------+
|       1|2013-07-25 00:00:...|      11599|      CLOSED|
|   11397|2013-10-03 00:00:...|      11599|    COMPLETE|
|   23908|2013-12-20 00:00:...|      11599|    COMPLETE|
|   53545|2014-06-27 00:00:...|      11599|     PENDING|
|   59911|2013-10-17 00:00:...|      11599|  PROCESSING|
+--------+--------------------+-----------+------------+



### Spark Table

In [61]:
orders_df.createOrReplaceTempView("orders")

In [62]:
filtered_using_spark_sql_df = spark.sql("select * from orders where order_status = 'CLOSED'")

In [64]:
filtered_using_spark_sql_df.show(5, truncate=False)

+--------+---------------------+-----------+------------+
|order_id|order_date           |customer_id|order_status|
+--------+---------------------+-----------+------------+
|1       |2013-07-25 00:00:00.0|11599      |CLOSED      |
|4       |2013-07-25 00:00:00.0|8827       |CLOSED      |
|12      |2013-07-25 00:00:00.0|1837       |CLOSED      |
|18      |2013-07-25 00:00:00.0|1205       |CLOSED      |
|24      |2013-07-25 00:00:00.0|11441      |CLOSED      |
+--------+---------------------+-----------+------------+
only showing top 5 rows



### Table to Data frame

In [66]:
orders_df_using_spark_sql = spark.read.table("orders")

In [67]:
orders_df_using_spark_sql.show(5)

+--------+--------------------+-----------+---------------+
|order_id|          order_date|customer_id|   order_status|
+--------+--------------------+-----------+---------------+
|       1|2013-07-25 00:00:...|      11599|         CLOSED|
|       2|2013-07-25 00:00:...|        256|PENDING_PAYMENT|
|       3|2013-07-25 00:00:...|      12111|       COMPLETE|
|       4|2013-07-25 00:00:...|       8827|         CLOSED|
|       5|2013-07-25 00:00:...|      11318|       COMPLETE|
+--------+--------------------+-----------+---------------+
only showing top 5 rows

