In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import getpass
username = getpass.getuser()
spark = SparkSession. \
builder. \
config('spark.ui.port', '0'). \
config("spark.sql.warehouse.dir", f"/user/{username}/warehouse"). \
enableHiveSupport(). \
master('yarn'). \
getOrCreate()

In [2]:
spark.range(8)

id
0
1
2
3
4
5
6
7


In [3]:
spark.range(0,8)

id
0
1
2
3
4
5
6
7


In [4]:
spark.range(0,8,2)

id
0
2
4
6


In [8]:
lst=[]
for i in range(0,2000):
    lst.append(i)

In [10]:
rdd=spark.sparkContext.parallelize(lst)

In [None]:
df=spark.createDataFrame(lst)

In [2]:
! hdfs dfs -cat /public/trendytech/datasets/orders_sample2.csv

1,07-25-2013,11599,CLOSED
2,07-25-2013,256,PENDING_PAYMENT
3,07-25-2013,12111,COMPLETE
4,07-25-2013,8827,CLOSED
5,07-25-2013,11318,COMPLETE
6,07-25-2013,7130,COMPLETE
7,07-25-2013,4530,COMPLETE
8,07-25-2013,2911,PROCESSING
9,07-25-2013,5657,PENDING_PAYMENT
10,07-25-2013,5648,PENDING_PAYMENT


In [4]:
orders_list=[(1,'07-25-2013',11599,'CLOSED'),
(2,'07-25-2013',256,'PENDING_PAYMENT'),
(3,'07-25-2013',12111,'COMPLETE')]

In [5]:
orders_raw_df=spark.createDataFrame(orders_list)

In [6]:
orders_raw_df.show()

+---+----------+-----+---------------+
| _1|        _2|   _3|             _4|
+---+----------+-----+---------------+
|  1|07-25-2013|11599|         CLOSED|
|  2|07-25-2013|  256|PENDING_PAYMENT|
|  3|07-25-2013|12111|       COMPLETE|
+---+----------+-----+---------------+



In [7]:
orders_raw_df.printSchema()

root
 |-- _1: long (nullable = true)
 |-- _2: string (nullable = true)
 |-- _3: long (nullable = true)
 |-- _4: string (nullable = true)



In [None]:
#how to fix the column names

In [8]:
#way-1 2 step process
orders_raw_df_fixed_name=spark.createDataFrame(orders_list).toDF('order_id','order_date','customer_id','order_status')

In [9]:
orders_raw_df_fixed_name.show()

+--------+----------+-----------+---------------+
|order_id|order_date|customer_id|   order_status|
+--------+----------+-----------+---------------+
|       1|07-25-2013|      11599|         CLOSED|
|       2|07-25-2013|        256|PENDING_PAYMENT|
|       3|07-25-2013|      12111|       COMPLETE|
+--------+----------+-----------+---------------+



In [10]:
orders_raw_df_fixed_name.printSchema()

root
 |-- order_id: long (nullable = true)
 |-- order_date: string (nullable = true)
 |-- customer_id: long (nullable = true)
 |-- order_status: string (nullable = true)



In [12]:
#way-2 2 step process
orders_raw_df_fixed_name_2=orders_raw_df.toDF('order_id','order_date','customer_id','order_status')

In [14]:
#way-3
ord_schema=['order_id','order_date','customer_id','order_status']

In [15]:
ord_df=spark.createDataFrame(orders_list,ord_schema)

In [16]:
ord_df.show()

+--------+----------+-----------+---------------+
|order_id|order_date|customer_id|   order_status|
+--------+----------+-----------+---------------+
|       1|07-25-2013|      11599|         CLOSED|
|       2|07-25-2013|        256|PENDING_PAYMENT|
|       3|07-25-2013|      12111|       COMPLETE|
+--------+----------+-----------+---------------+



#how to enforce our own schema 

In [19]:
#way-1
order_schema='order_id long,order_date string,customer_id long,order_status string' 

In [26]:
order_df=spark.createDataFrame(orders_list,order_schema)

In [21]:
order_df.printSchema()

root
 |-- order_id: long (nullable = true)
 |-- order_date: string (nullable = true)
 |-- customer_id: long (nullable = true)
 |-- order_status: string (nullable = true)



In [22]:
#change order_date from string to datetime
from pyspark.sql.functions import to_timestamp

In [29]:
new_order_df=order_df.withColumn('order_date',to_timestamp('order_date','mm-dd-yyyy'))
#old/existing column and to_timestamp(new_colname)

In [30]:
new_order_df.show()

+--------+-------------------+-----------+---------------+
|order_id|         order_date|customer_id|   order_status|
+--------+-------------------+-----------+---------------+
|       1|2013-01-25 00:07:00|      11599|         CLOSED|
|       2|2013-01-25 00:07:00|        256|PENDING_PAYMENT|
|       3|2013-01-25 00:07:00|      12111|       COMPLETE|
+--------+-------------------+-----------+---------------+



In [31]:
new_order_df.printSchema()

root
 |-- order_id: long (nullable = true)
 |-- order_date: timestamp (nullable = true)
 |-- customer_id: long (nullable = true)
 |-- order_status: string (nullable = true)

