In [1]:
from pyspark.sql import SparkSession
import getpass

username = getpass.getuser()

spark = SparkSession. \
builder. \
config('spark.ui.port', '0'). \
config("spark.sql.warehouse.dir", f"/user/{username}/warehouse"). \
enableHiveSupport(). \
master('yarn'). \
getOrCreate()

In [18]:
! hdfs dfs -head /public/trendytech/orders_wh/orders_wh.csv

order_id,order_date,customer_id,order_status
1,2013-07-25 00:00:00.0,11599,CLOSED
2,2013-07-25 00:00:00.0,256,PENDING_PAYMENT
3,2013-07-25 00:00:00.0,12111,COMPLETE
4,2013-07-25 00:00:00.0,8827,CLOSED
5,2013-07-25 00:00:00.0,11318,COMPLETE
6,2013-07-25 00:00:00.0,7130,COMPLETE
7,2013-07-25 00:00:00.0,4530,COMPLETE
8,2013-07-25 00:00:00.0,2911,PROCESSING
9,2013-07-25 00:00:00.0,5657,PENDING_PAYMENT
10,2013-07-25 00:00:00.0,5648,PENDING_PAYMENT
11,2013-07-25 00:00:00.0,918,PAYMENT_REVIEW
12,2013-07-25 00:00:00.0,1837,CLOSED
13,2013-07-25 00:00:00.0,9149,PENDING_PAYMENT
14,2013-07-25 00:00:00.0,9842,PROCESSING
15,2013-07-25 00:00:00.0,2568,COMPLETE
16,2013-07-25 00:00:00.0,7276,PENDING_PAYMENT
17,2013-07-25 00:00:00.0,2667,COMPLETE
18,2013-07-25 00:00:00.0,1205,CLOSED
19,2013-07-25 00:00:00.0,9488,PENDING_PAYMENT
20,2013-07-25 00:00:00.0,9198,PROCESSING
21,2013-07-25 00:00:00.0,2711,PENDING
22,2013-07-25 00:00:00.0,333,COMPLETE
23,2013-07-25 00:00:00.0,4367,PENDING_PAYMENT
24,2013-07-25 0

# Standard way to create a Dataframe from a file -

In [7]:
order_df = spark.read.format("csv")\
.option("inferschema","true")\
.option("header","true")\
.load("/public/trendytech/orders_wh/*")

In [11]:
order_df.show(5,False)

+--------+---------------------+-----------+---------------+
|order_id|order_date           |customer_id|order_status   |
+--------+---------------------+-----------+---------------+
|1       |2013-07-25 00:00:00.0|11599      |CLOSED         |
|2       |2013-07-25 00:00:00.0|256        |PENDING_PAYMENT|
|3       |2013-07-25 00:00:00.0|12111      |COMPLETE       |
|4       |2013-07-25 00:00:00.0|8827       |CLOSED         |
|5       |2013-07-25 00:00:00.0|11318      |COMPLETE       |
+--------+---------------------+-----------+---------------+
only showing top 5 rows



In [10]:
order_df.printSchema()

root
 |-- order_id: integer (nullable = true)
 |-- order_date: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- order_status: string (nullable = true)



# Renaming an existing column -

In [13]:
order_col_renamed_df = order_df.withColumnRenamed("order_status","status")
order_col_renamed_df.show(5,False)

+--------+---------------------+-----------+---------------+
|order_id|order_date           |customer_id|status         |
+--------+---------------------+-----------+---------------+
|1       |2013-07-25 00:00:00.0|11599      |CLOSED         |
|2       |2013-07-25 00:00:00.0|256        |PENDING_PAYMENT|
|3       |2013-07-25 00:00:00.0|12111      |COMPLETE       |
|4       |2013-07-25 00:00:00.0|8827       |CLOSED         |
|5       |2013-07-25 00:00:00.0|11318      |COMPLETE       |
+--------+---------------------+-----------+---------------+
only showing top 5 rows



# Convert String to timestamp -

In [15]:
from pyspark.sql.functions import *

In [16]:
order_date_timestamp_df = order_col_renamed_df.withColumn("order_date_timestamp", to_timestamp("order_date"))
order_date_timestamp_df.printSchema()

root
 |-- order_id: integer (nullable = true)
 |-- order_date: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- status: string (nullable = true)
 |-- order_date_timestamp: timestamp (nullable = true)



In [17]:
order_date_timestamp_df.show(5,False)

+--------+---------------------+-----------+---------------+--------------------+
|order_id|order_date           |customer_id|status         |order_date_timestamp|
+--------+---------------------+-----------+---------------+--------------------+
|1       |2013-07-25 00:00:00.0|11599      |CLOSED         |2013-07-25 00:00:00 |
|2       |2013-07-25 00:00:00.0|256        |PENDING_PAYMENT|2013-07-25 00:00:00 |
|3       |2013-07-25 00:00:00.0|12111      |COMPLETE       |2013-07-25 00:00:00 |
|4       |2013-07-25 00:00:00.0|8827       |CLOSED         |2013-07-25 00:00:00 |
|5       |2013-07-25 00:00:00.0|11318      |COMPLETE       |2013-07-25 00:00:00 |
+--------+---------------------+-----------+---------------+--------------------+
only showing top 5 rows



In [19]:
spark.stop()