In [1]:
from pyspark.sql import SparkSession
import getpass
username = getpass.getuser()
spark = SparkSession. \
    builder. \
    config('spark.ui.port', '0'). \
    config("spark.sql.warehouse.dir", f"/user/itv009538/warehouse"). \
    enableHiveSupport(). \
    master('yarn'). \
    getOrCreate()

#### Different methods for creating DataFrames

#### spark.read()

In [4]:
df = spark.read \
.format("csv") \
.load("/public/trendytech/datasets/orders_sample1.csv")

In [5]:
df.show()

+---+----------+-----+---------------+
|_c0|       _c1|  _c2|            _c3|
+---+----------+-----+---------------+
|  1|2013-07-25|11599|         CLOSED|
|  2|2013-07-25|  256|PENDING_PAYMENT|
|  3|2013-07-25|12111|       COMPLETE|
|  4|2013-07-25| 8827|         CLOSED|
|  5|2013-07-25|11318|       COMPLETE|
|  6|2013-07-25| 7130|       COMPLETE|
|  7|2013-07-25| 4530|       COMPLETE|
|  8|2013-07-25| 2911|     PROCESSING|
|  9|2013-07-25| 5657|PENDING_PAYMENT|
| 10|2013-07-25| 5648|PENDING_PAYMENT|
+---+----------+-----+---------------+



In [6]:
df = spark.sql("select * from itv005857_retail.orders_ext")

#### spark.sql()

In [7]:
df.show()

+--------+--------------------+-----------+---------------+
|order_id|          order_date|customer_id|   order_status|
+--------+--------------------+-----------+---------------+
|       1|2013-07-25 00:00:...|      11599|         CLOSED|
|       2|2013-07-25 00:00:...|        256|PENDING_PAYMENT|
|       3|2013-07-25 00:00:...|      12111|       COMPLETE|
|       4|2013-07-25 00:00:...|       8827|         CLOSED|
|       5|2013-07-25 00:00:...|      11318|       COMPLETE|
|       6|2013-07-25 00:00:...|       7130|       COMPLETE|
|       7|2013-07-25 00:00:...|       4530|       COMPLETE|
|       8|2013-07-25 00:00:...|       2911|     PROCESSING|
|       9|2013-07-25 00:00:...|       5657|PENDING_PAYMENT|
|      10|2013-07-25 00:00:...|       5648|PENDING_PAYMENT|
|      11|2013-07-25 00:00:...|        918| PAYMENT_REVIEW|
|      12|2013-07-25 00:00:...|       1837|         CLOSED|
|      13|2013-07-25 00:00:...|       9149|PENDING_PAYMENT|
|      14|2013-07-25 00:00:...|       98

#### spark.table()

In [8]:
df = spark.table("itv005857_retail.orders_ext")

In [9]:
df.show()

+--------+--------------------+-----------+---------------+
|order_id|          order_date|customer_id|   order_status|
+--------+--------------------+-----------+---------------+
|       1|2013-07-25 00:00:...|      11599|         CLOSED|
|       2|2013-07-25 00:00:...|        256|PENDING_PAYMENT|
|       3|2013-07-25 00:00:...|      12111|       COMPLETE|
|       4|2013-07-25 00:00:...|       8827|         CLOSED|
|       5|2013-07-25 00:00:...|      11318|       COMPLETE|
|       6|2013-07-25 00:00:...|       7130|       COMPLETE|
|       7|2013-07-25 00:00:...|       4530|       COMPLETE|
|       8|2013-07-25 00:00:...|       2911|     PROCESSING|
|       9|2013-07-25 00:00:...|       5657|PENDING_PAYMENT|
|      10|2013-07-25 00:00:...|       5648|PENDING_PAYMENT|
|      11|2013-07-25 00:00:...|        918| PAYMENT_REVIEW|
|      12|2013-07-25 00:00:...|       1837|         CLOSED|
|      13|2013-07-25 00:00:...|       9149|PENDING_PAYMENT|
|      14|2013-07-25 00:00:...|       98

#### spark.range()

In [10]:
df = spark.range(5)

In [11]:
df.show()

+---+
| id|
+---+
|  0|
|  1|
|  2|
|  3|
|  4|
+---+



In [12]:
df = spark.range(1,8)

In [13]:
df.show()

+---+
| id|
+---+
|  1|
|  2|
|  3|
|  4|
|  5|
|  6|
|  7|
+---+



In [14]:
df = spark.range(1,8,3)

In [15]:
df.show()

+---+
| id|
+---+
|  1|
|  4|
|  7|
+---+



#### spark.createDataFrame()

In [17]:
! hadoop fs -cat /public/trendytech/retail_db/orders/part-00000 | head

1,2013-07-25 00:00:00.0,11599,CLOSED
2,2013-07-25 00:00:00.0,256,PENDING_PAYMENT
3,2013-07-25 00:00:00.0,12111,COMPLETE
4,2013-07-25 00:00:00.0,8827,CLOSED
5,2013-07-25 00:00:00.0,11318,COMPLETE
6,2013-07-25 00:00:00.0,7130,COMPLETE
7,2013-07-25 00:00:00.0,4530,COMPLETE
8,2013-07-25 00:00:00.0,2911,PROCESSING
9,2013-07-25 00:00:00.0,5657,PENDING_PAYMENT
10,2013-07-25 00:00:00.0,5648,PENDING_PAYMENT
cat: Unable to write to output stream.


In [29]:
df_list = [(1,'2013-07-25 00:00:00.000',11599,'CLOSED')
      ,(2,'2013-07-25 00:00:00.000',256,'PENDING_PAYMENT')
      ,(3,'2013-07-25 00:00:00.000',12111,'COMPLETE')]

In [30]:
df_1 = spark.createDataFrame(df_list)

In [31]:
df_1.show()

+---+--------------------+-----+---------------+
| _1|                  _2|   _3|             _4|
+---+--------------------+-----+---------------+
|  1|2013-07-25 00:00:...|11599|         CLOSED|
|  2|2013-07-25 00:00:...|  256|PENDING_PAYMENT|
|  3|2013-07-25 00:00:...|12111|       COMPLETE|
+---+--------------------+-----+---------------+



In [32]:
df1_1 = df_1.toDF('order_id', 'order_date', 'customer_id', 'order_status')

In [33]:
df1_1.show()

+--------+--------------------+-----------+---------------+
|order_id|          order_date|customer_id|   order_status|
+--------+--------------------+-----------+---------------+
|       1|2013-07-25 00:00:...|      11599|         CLOSED|
|       2|2013-07-25 00:00:...|        256|PENDING_PAYMENT|
|       3|2013-07-25 00:00:...|      12111|       COMPLETE|
+--------+--------------------+-----------+---------------+



In [34]:
orders_schema = ['order_id', 'order_date', 'customer_id', 'order_status']

In [35]:
df1_2 = spark.createDataFrame(df_list,orders_schema)

In [36]:
df1_2.show()

+--------+--------------------+-----------+---------------+
|order_id|          order_date|customer_id|   order_status|
+--------+--------------------+-----------+---------------+
|       1|2013-07-25 00:00:...|      11599|         CLOSED|
|       2|2013-07-25 00:00:...|        256|PENDING_PAYMENT|
|       3|2013-07-25 00:00:...|      12111|       COMPLETE|
+--------+--------------------+-----------+---------------+



In [37]:
df_2 = spark.createDataFrame(df_list, schema="order_id LONG, order_date STRING, customer_id LONG, order_status STRING")

In [38]:
df_2.show()

+--------+--------------------+-----------+---------------+
|order_id|          order_date|customer_id|   order_status|
+--------+--------------------+-----------+---------------+
|       1|2013-07-25 00:00:...|      11599|         CLOSED|
|       2|2013-07-25 00:00:...|        256|PENDING_PAYMENT|
|       3|2013-07-25 00:00:...|      12111|       COMPLETE|
+--------+--------------------+-----------+---------------+



In [39]:
df_2.printSchema()

root
 |-- order_id: long (nullable = true)
 |-- order_date: string (nullable = true)
 |-- customer_id: long (nullable = true)
 |-- order_status: string (nullable = true)



In [40]:
from pyspark.sql.functions import to_timestamp

In [41]:
df_3 = df_2.withColumn("order_date", to_timestamp('order_date'))

In [42]:
df_3.printSchema()

root
 |-- order_id: long (nullable = true)
 |-- order_date: timestamp (nullable = true)
 |-- customer_id: long (nullable = true)
 |-- order_status: string (nullable = true)



In [43]:
df_3.show()

+--------+-------------------+-----------+---------------+
|order_id|         order_date|customer_id|   order_status|
+--------+-------------------+-----------+---------------+
|       1|2013-07-25 00:00:00|      11599|         CLOSED|
|       2|2013-07-25 00:00:00|        256|PENDING_PAYMENT|
|       3|2013-07-25 00:00:00|      12111|       COMPLETE|
+--------+-------------------+-----------+---------------+

