## Different ways of Dataframe creation



In [58]:
from pyspark.sql import SparkSession
import getpass
username =getpass.getuser()
spark = SparkSession.\
builder.\
config('spark.ui.port','0').\
config("spark.sql.warehouse.dir",f"/user/itv011212/warehouse").\
enableHiveSupport().\
master('yarn').\
getOrCreate()

In [59]:
spark

### 1. Using Default Read Method

In [7]:
order_df = spark.read \
.format("csv") \
.option("inferSchema","true") \
.load("/public/trendytech/retail_db/orders")

In [8]:
order_df.show()

+---+--------------------+-----+---------------+
|_c0|                 _c1|  _c2|            _c3|
+---+--------------------+-----+---------------+
|  1|2013-07-25 00:00:...|11599|         CLOSED|
|  2|2013-07-25 00:00:...|  256|PENDING_PAYMENT|
|  3|2013-07-25 00:00:...|12111|       COMPLETE|
|  4|2013-07-25 00:00:...| 8827|         CLOSED|
|  5|2013-07-25 00:00:...|11318|       COMPLETE|
|  6|2013-07-25 00:00:...| 7130|       COMPLETE|
|  7|2013-07-25 00:00:...| 4530|       COMPLETE|
|  8|2013-07-25 00:00:...| 2911|     PROCESSING|
|  9|2013-07-25 00:00:...| 5657|PENDING_PAYMENT|
| 10|2013-07-25 00:00:...| 5648|PENDING_PAYMENT|
| 11|2013-07-25 00:00:...|  918| PAYMENT_REVIEW|
| 12|2013-07-25 00:00:...| 1837|         CLOSED|
| 13|2013-07-25 00:00:...| 9149|PENDING_PAYMENT|
| 14|2013-07-25 00:00:...| 9842|     PROCESSING|
| 15|2013-07-25 00:00:...| 2568|       COMPLETE|
| 16|2013-07-25 00:00:...| 7276|PENDING_PAYMENT|
| 17|2013-07-25 00:00:...| 2667|       COMPLETE|
| 18|2013-07-25 00:0

In [9]:
order_df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: integer (nullable = true)
 |-- _c3: string (nullable = true)



In [11]:
col_names = ['order_id','order_date','customer_id','order_status']

In [28]:
order_df.toDF(*col_names).show(5)

+--------+--------------------+-----------+---------------+
|order_id|          order_date|customer_id|   order_status|
+--------+--------------------+-----------+---------------+
|       1|2013-07-25 00:00:...|      11599|         CLOSED|
|       2|2013-07-25 00:00:...|        256|PENDING_PAYMENT|
|       3|2013-07-25 00:00:...|      12111|       COMPLETE|
|       4|2013-07-25 00:00:...|       8827|         CLOSED|
|       5|2013-07-25 00:00:...|      11318|       COMPLETE|
+--------+--------------------+-----------+---------------+
only showing top 5 rows



In [14]:
order_df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: integer (nullable = true)
 |-- _c3: string (nullable = true)



###  2. Using spark.sql

In [22]:
spark.sql("create database if not exists itv0153_retail")

In [23]:
spark.sql("show databases").filter("namespace = 'itv0153_retail'").show()

+--------------+
|     namespace|
+--------------+
|itv0153_retail|
+--------------+



In [24]:
spark.sql("use itv0153_retail")

In [25]:
spark.sql("show tables")

database,tableName,isTemporary
itv0153_retail,orders,False
itv0153_retail,orders_ext,False


In [32]:
orders_df  = spark.sql("select * from orders")

In [33]:
orders_df.show(5)

+--------+--------------------+-----------+---------------+
|order_id|          order_date|customer_id|   order_status|
+--------+--------------------+-----------+---------------+
|       1|2013-07-25 00:00:...|      11599|         CLOSED|
|       2|2013-07-25 00:00:...|        256|PENDING_PAYMENT|
|       3|2013-07-25 00:00:...|      12111|       COMPLETE|
|       4|2013-07-25 00:00:...|       8827|         CLOSED|
|       5|2013-07-25 00:00:...|      11318|       COMPLETE|
+--------+--------------------+-----------+---------------+
only showing top 5 rows



In [34]:
orders_df.printSchema()

root
 |-- order_id: integer (nullable = true)
 |-- order_date: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- order_status: string (nullable = true)



###  3. Using spark.table

In [36]:
spark.sql("show tables")

database,tableName,isTemporary
itv0153_retail,orders,False
itv0153_retail,orders_ext,False


In [39]:
df = spark.table("itv0153_retail.orders_ext")

In [40]:
df.show(5)

+--------+--------------------+-----------+---------------+
|order_id|          order_date|customer_id|   order_status|
+--------+--------------------+-----------+---------------+
|       1|2013-07-25 00:00:...|      11599|         CLOSED|
|       2|2013-07-25 00:00:...|        256|PENDING_PAYMENT|
|       3|2013-07-25 00:00:...|      12111|       COMPLETE|
|       4|2013-07-25 00:00:...|       8827|         CLOSED|
|       5|2013-07-25 00:00:...|      11318|       COMPLETE|
+--------+--------------------+-----------+---------------+
only showing top 5 rows



In [41]:
df.printSchema()

root
 |-- order_id: integer (nullable = true)
 |-- order_date: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- order_status: string (nullable = true)



### 4. Creating from a local list : Using createDataFrame and .toDF transformations


In [60]:
orders_list = [(1,'2013-07-25 00:00:00.0',11599,'CLOSED'),
(2,'2013-07-25 00:00:00.0',256,'PENDING_PAYMENT'),
(3,'2013-07-25 00:00:00.0',12111, 'COMPLETE'),
(4,'2013-07-25 00:00:00.0',8827, 'CLOSED'),
(5,'2013-07-25 00:00:00.0',11318, 'COMPLETE')

]

In [61]:
orders_df = spark.createDataFrame(orders_list)

In [62]:
orders_df.show(5)

+---+--------------------+-----+---------------+
| _1|                  _2|   _3|             _4|
+---+--------------------+-----+---------------+
|  1|2013-07-25 00:00:...|11599|         CLOSED|
|  2|2013-07-25 00:00:...|  256|PENDING_PAYMENT|
|  3|2013-07-25 00:00:...|12111|       COMPLETE|
|  4|2013-07-25 00:00:...| 8827|         CLOSED|
|  5|2013-07-25 00:00:...|11318|       COMPLETE|
+---+--------------------+-----+---------------+



In [63]:
orders_df.printSchema()

root
 |-- _1: long (nullable = true)
 |-- _2: string (nullable = true)
 |-- _3: long (nullable = true)
 |-- _4: string (nullable = true)



In [None]:
## Fixing the column names

In [65]:
orders_new_df = spark.createDataFrame(orders_list).toDF('order_id','order_date','customer_id','order_status')

In [66]:
orders_new_df.show(5)

+--------+--------------------+-----------+---------------+
|order_id|          order_date|customer_id|   order_status|
+--------+--------------------+-----------+---------------+
|       1|2013-07-25 00:00:...|      11599|         CLOSED|
|       2|2013-07-25 00:00:...|        256|PENDING_PAYMENT|
|       3|2013-07-25 00:00:...|      12111|       COMPLETE|
|       4|2013-07-25 00:00:...|       8827|         CLOSED|
|       5|2013-07-25 00:00:...|      11318|       COMPLETE|
+--------+--------------------+-----------+---------------+



In [67]:
orders_new_df.printSchema()

root
 |-- order_id: long (nullable = true)
 |-- order_date: string (nullable = true)
 |-- customer_id: long (nullable = true)
 |-- order_status: string (nullable = true)



## Here we loaded date as a string type . want to convert this to timestamp

In [72]:
new_df = orders_new_df.withColumn("order_date",to_timestamp('order_date'))

In [71]:
from pyspark.sql.functions import to_timestamp

In [73]:
new_df.show(4)

+--------+-------------------+-----------+---------------+
|order_id|         order_date|customer_id|   order_status|
+--------+-------------------+-----------+---------------+
|       1|2013-07-25 00:00:00|      11599|         CLOSED|
|       2|2013-07-25 00:00:00|        256|PENDING_PAYMENT|
|       3|2013-07-25 00:00:00|      12111|       COMPLETE|
|       4|2013-07-25 00:00:00|       8827|         CLOSED|
+--------+-------------------+-----------+---------------+
only showing top 4 rows



In [74]:
new_df.printSchema()

root
 |-- order_id: long (nullable = true)
 |-- order_date: timestamp (nullable = true)
 |-- customer_id: long (nullable = true)
 |-- order_status: string (nullable = true)



### 4.1 Single step process :

#### Defining schema upfront

In [83]:
orders_schema = 'order_id long,order_date string,customer_id int,order_status string'

In [84]:
orders_final = spark.createDataFrame(orders_list,orders_schema)

In [85]:
orders_final.show(5)

+--------+--------------------+-----------+---------------+
|order_id|          order_date|customer_id|   order_status|
+--------+--------------------+-----------+---------------+
|       1|2013-07-25 00:00:...|      11599|         CLOSED|
|       2|2013-07-25 00:00:...|        256|PENDING_PAYMENT|
|       3|2013-07-25 00:00:...|      12111|       COMPLETE|
|       4|2013-07-25 00:00:...|       8827|         CLOSED|
|       5|2013-07-25 00:00:...|      11318|       COMPLETE|
+--------+--------------------+-----------+---------------+



In [86]:
orders_df.printSchema()

root
 |-- _1: long (nullable = true)
 |-- _2: string (nullable = true)
 |-- _3: long (nullable = true)
 |-- _4: string (nullable = true)



In [87]:
spark.stop()