In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName("SparkDateTime").getOrCreate()

24/11/16 22:34:39 WARN Utils: Your hostname, ivan resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface enp0s3)
24/11/16 22:34:39 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/16 22:34:39 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/11/16 22:34:40 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [3]:
df = spark.read.csv("items_bought.csv",header=True,inferSchema=True)

In [4]:
df.show(3)

+----------+---------+----------+--------+----------+------------+
|      date|item_name|item_price|quantity|tax_amount|total_amount|
+----------+---------+----------+--------+----------+------------+
|11-10-2018|     Beer|     110.5|       2|     53.04|      163.54|
|14-02-2018|   Whisky|    1250.0|       1|     300.0|      1550.0|
|23-03-2020|   Whisky|    1300.5|       2|    624.24|     1924.74|
+----------+---------+----------+--------+----------+------------+
only showing top 3 rows



In [5]:
print("Schema with date as string datatype")
df.printSchema()

Schema with date as string datatype
root
 |-- date: string (nullable = true)
 |-- item_name: string (nullable = true)
 |-- item_price: double (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- tax_amount: double (nullable = true)
 |-- total_amount: double (nullable = true)



In [6]:
from pyspark.sql.functions import unix_timestamp, from_unixtime, to_date #importing necessary functions to convert date string to date type

In [7]:
updated_df = df.withColumn('formatted_date',to_date(unix_timestamp(df['date'],'dd-MM-yyyy').cast('timestamp')))

In [8]:
print("Schema with date column string datatype converted to date datatype")

updated_df.printSchema()

Schema with date column string datatype converted to date datatype
root
 |-- date: string (nullable = true)
 |-- item_name: string (nullable = true)
 |-- item_price: double (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- tax_amount: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- formatted_date: date (nullable = true)



In [9]:
print("Data after dropping the date column which was of stringtype")
updated_df=updated_df.drop("date") #dropping the date column with string type
updated_df.show(2)

Data after dropping the date column which was of stringtype
+---------+----------+--------+----------+------------+--------------+
|item_name|item_price|quantity|tax_amount|total_amount|formatted_date|
+---------+----------+--------+----------+------------+--------------+
|     Beer|     110.5|       2|     53.04|      163.54|    2018-10-11|
|   Whisky|    1250.0|       1|     300.0|      1550.0|    2018-02-14|
+---------+----------+--------+----------+------------+--------------+
only showing top 2 rows



In [10]:
from pyspark.sql.functions import weekofyear,dayofmonth,month,year,date_format #extracting data from dates

In [11]:
print("Data Extraction from dates")
final_df = updated_df.select(updated_df["item_name"],
weekofyear(updated_df["formatted_date"]).alias("week_number"),
dayofmonth(updated_df["formatted_date"]).alias("day_number"),
month(updated_df["formatted_date"]).alias("month"),
year(updated_df["formatted_date"]).alias("year"))

Data Extraction from dates


In [12]:
final_df.show(3)
date_string_value = updated_df.select(df["item_name"],date_format(updated_df["formatted_date"],'MM/dd/yyyy')) #converting date type to a different date format string

+---------+-----------+----------+-----+----+
|item_name|week_number|day_number|month|year|
+---------+-----------+----------+-----+----+
|     Beer|         41|        11|   10|2018|
|   Whisky|          7|        14|    2|2018|
|   Whisky|         13|        23|    3|2020|
+---------+-----------+----------+-----+----+
only showing top 3 rows



In [13]:
date_string_value.show(2)


+---------+---------------------------------------+
|item_name|date_format(formatted_date, MM/dd/yyyy)|
+---------+---------------------------------------+
|     Beer|                             10/11/2018|
|   Whisky|                             02/14/2018|
+---------+---------------------------------------+
only showing top 2 rows



In [14]:
print("Usecase - Total amount of items purchased in that particular year")
final_format=final_df.groupBy("year").sum().select(["year","sum(year)"])

Usecase - Total amount of items purchased in that particular year


In [15]:
final_format.withColumnRenamed("sum(year)","Total Expenditure").show()

+----+-----------------+
|year|Total Expenditure|
+----+-----------------+
|2018|             6054|
|2019|             4038|
|2020|             8080|
+----+-----------------+

