In [1]:
from pyspark.sql import SparkSession
import getpass
username = getpass.getuser()
spark = SparkSession. \
builder. \
config('spark.ui.port', '0'). \
config("spark.sql.warehouse.dir", f"/user/{username}/warehouse"). \
enableHiveSupport(). \
master('yarn'). \
getOrCreate()

In [2]:
orders_df = spark.read \
.format("csv") \
.option("header", "true") \
.option("inferSchema", "true") \
.load("/public/trendytech/orders_wh/*")

In [3]:
orders_df.printSchema()

root
 |-- order_id: integer (nullable = true)
 |-- order_date: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- order_status: string (nullable = true)



In [4]:
df = spark.read \
.format("csv") \
.load("/public/trendytech/datasets/orders_sample.csv")

In [5]:
df.show()

+---+--------------------+-----+---------------+
|_c0|                 _c1|  _c2|            _c3|
+---+--------------------+-----+---------------+
|  1|2013-07-25 00:00:...|11599|         CLOSED|
|  2|2013-07-25 00:00:...|  256|PENDING_PAYMENT|
|  3|2013-07-25 00:00:...|12111|       COMPLETE|
|  4|2013-07-25 00:00:...| 8827|         CLOSED|
|  5|2013-07-25 00:00:...|11318|       COMPLETE|
|  6|2013-07-25 00:00:...| 7130|       COMPLETE|
|  7|2013-07-25 00:00:...| 4530|       COMPLETE|
|  8|2013-07-25 00:00:...| 2911|     PROCESSING|
|  9|2013-07-25 00:00:...| 5657|PENDING_PAYMENT|
| 10|2013-07-25 00:00:...| 5648|PENDING_PAYMENT|
| 11|2013-07-25 00:00:...|  918| PAYMENT_REVIEW|
| 12|2013-07-25 00:00:...| 1837|         CLOSED|
| 13|2013-07-25 00:00:...| 9149|PENDING_PAYMENT|
| 14|2013-07-25 00:00:...| 9842|     PROCESSING|
| 15|2013-07-25 00:00:...| 2568|       COMPLETE|
| 16|2013-07-25 00:00:...| 7276|PENDING_PAYMENT|
| 17|2013-07-25 00:00:...| 2667|       COMPLETE|
| 18|2013-07-25 00:0

In [6]:
df.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)



In [7]:
#2 ways to enforce the schema instead of using inferschema

#1.define schema manually (schema DDL)
orders_schema = 'order_id int, order_date date, cust_id long, order_status string'

df1 = spark.read.format("csv").schema(orders_schema).load("/public/trendytech/datasets/orders_sample.csv")
df1.show()
df1.printSchema()

#2.Using StructType and StructField
from pyspark.sql.types import StructType, StructField, StringType, IntegerType,DateType
schema = StructType([StructField("order_id",IntegerType()),
                      StructField("order_date",DateType()),
                      StructField("cust_id",IntegerType()),
                      StructField("order_status",StringType())])
df2 = spark.read.format("csv").schema(schema).load("/public/trendytech/datasets/orders_sample.csv")
df2.show()
df2.printSchema()                                
                                  

+--------+----------+-------+---------------+
|order_id|order_date|cust_id|   order_status|
+--------+----------+-------+---------------+
|       1|2013-07-25|  11599|         CLOSED|
|       2|2013-07-25|    256|PENDING_PAYMENT|
|       3|2013-07-25|  12111|       COMPLETE|
|       4|2013-07-25|   8827|         CLOSED|
|       5|2013-07-25|  11318|       COMPLETE|
|       6|2013-07-25|   7130|       COMPLETE|
|       7|2013-07-25|   4530|       COMPLETE|
|       8|2013-07-25|   2911|     PROCESSING|
|       9|2013-07-25|   5657|PENDING_PAYMENT|
|      10|2013-07-25|   5648|PENDING_PAYMENT|
|      11|2013-07-25|    918| PAYMENT_REVIEW|
|      12|2013-07-25|   1837|         CLOSED|
|      13|2013-07-25|   9149|PENDING_PAYMENT|
|      14|2013-07-25|   9842|     PROCESSING|
|      15|2013-07-25|   2568|       COMPLETE|
|      16|2013-07-25|   7276|PENDING_PAYMENT|
|      17|2013-07-25|   2667|       COMPLETE|
|      18|2013-07-25|   1205|         CLOSED|
|      19|2013-07-25|   2667|PENDI

In [8]:
#Dealing with date datatype

#Sometime date can be in diff format than yyyy-mm-dd, Spark sometimes will not be able to parse it and give error.
#1.use dateFormat
orders_schema = 'order_id int, order_date date, cust_id long, order_status string'

df3 = spark.read.format("csv").schema(orders_schema).option("dateFormat","mm-dd-yyyy")\
.load("/public/trendytech/datasets/orders_sample2.csv")
df3.show()
df3.printSchema()

#2.change the  datatype format using withColumn
from pyspark.sql.functions import to_date
newdf = df3.withColumn("order_date_new",to_date("order_date","mm-dd-yyyy"))
newdf.show()

+--------+----------+-------+---------------+
|order_id|order_date|cust_id|   order_status|
+--------+----------+-------+---------------+
|       1|2013-01-25|  11599|         CLOSED|
|       2|2013-01-25|    256|PENDING_PAYMENT|
|       3|2013-01-25|  12111|       COMPLETE|
|       4|2013-01-25|   8827|         CLOSED|
|       5|2013-01-25|  11318|       COMPLETE|
|       6|2013-01-25|   7130|       COMPLETE|
|       7|2013-01-25|   4530|       COMPLETE|
|       8|2013-01-25|   2911|     PROCESSING|
|       9|2013-01-25|   5657|PENDING_PAYMENT|
|      10|2013-01-25|   5648|PENDING_PAYMENT|
+--------+----------+-------+---------------+

root
 |-- order_id: integer (nullable = true)
 |-- order_date: date (nullable = true)
 |-- cust_id: long (nullable = true)
 |-- order_status: string (nullable = true)

+--------+----------+-------+---------------+--------------+
|order_id|order_date|cust_id|   order_status|order_date_new|
+--------+----------+-------+---------------+--------------+
|   