# Dates and Timestamps

You will often find yourself working with Time and Date information, let's walk through some ways you can deal with it!

In [3]:
import findspark
findspark.init('/home/bahmansheikh/spark-3.1.2-bin-hadoop3.2')

In [4]:
from pyspark.sql import SparkSession
# May take a little while on a local computer
spark = SparkSession.builder.appName("dates").getOrCreate()

21/12/20 08:09:57 WARN Utils: Your hostname, 1PNG6D3-XPS15 resolves to a loopback address: 127.0.1.1; using 192.168.1.140 instead (on interface wlp108s0)
21/12/20 08:09:57 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
21/12/20 08:09:57 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
21/12/20 08:09:58 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
21/12/20 08:09:58 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
21/12/20 08:09:58 WARN Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043.


In [5]:
df = spark.read.csv("appl_stock.csv",header=True,inferSchema=True)

In [6]:
df.show()

+----------+------------------+------------------+------------------+------------------+---------+------------------+
|      Date|              Open|              High|               Low|             Close|   Volume|         Adj Close|
+----------+------------------+------------------+------------------+------------------+---------+------------------+
|2010-01-04|        213.429998|        214.499996|212.38000099999996|        214.009998|123432400|         27.727039|
|2010-01-05|        214.599998|        215.589994|        213.249994|        214.379993|150476200|27.774976000000002|
|2010-01-06|        214.379993|            215.23|        210.750004|        210.969995|138040000|27.333178000000004|
|2010-01-07|            211.75|        212.000006|        209.050005|            210.58|119282800|          27.28265|
|2010-01-08|        210.299994|        212.000006|209.06000500000002|211.98000499999998|111902700|         27.464034|
|2010-01-11|212.79999700000002|        213.000002|      

Let's walk through how to grab parts of the timestamp data

In [7]:
from pyspark.sql.functions import format_number,dayofmonth,hour,dayofyear,month,year,weekofyear,date_format

In [8]:
df.select(df['Date']).show()

+----------+
|      Date|
+----------+
|2010-01-04|
|2010-01-05|
|2010-01-06|
|2010-01-07|
|2010-01-08|
|2010-01-11|
|2010-01-12|
|2010-01-13|
|2010-01-14|
|2010-01-15|
|2010-01-19|
|2010-01-20|
|2010-01-21|
|2010-01-22|
|2010-01-25|
|2010-01-26|
|2010-01-27|
|2010-01-28|
|2010-01-29|
|2010-02-01|
+----------+
only showing top 20 rows



In [45]:
df.select(dayofmonth(df['Date'])).show()

+----------------+
|dayofmonth(Date)|
+----------------+
|               4|
|               5|
|               6|
|               7|
|               8|
|              11|
|              12|
|              13|
|              14|
|              15|
|              19|
|              20|
|              21|
|              22|
|              25|
|              26|
|              27|
|              28|
|              29|
|               1|
+----------------+
only showing top 20 rows



In [46]:
df.select(hour(df['Date'])).show()

+----------+
|hour(Date)|
+----------+
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
+----------+
only showing top 20 rows



In [8]:
df.select(dayofyear(df['Date'])).show()

+---------------+
|dayofyear(Date)|
+---------------+
|              4|
|              5|
|              6|
|              7|
|              8|
|             11|
|             12|
|             13|
|             14|
|             15|
|             19|
|             20|
|             21|
|             22|
|             25|
|             26|
|             27|
|             28|
|             29|
|             32|
+---------------+
only showing top 20 rows



In [11]:
df.select(month(df['Date'])).show()

+-----------+
|month(Date)|
+-----------+
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          2|
+-----------+
only showing top 20 rows



So for example, let's say we wanted to know the average closing price per year. Easy! With a groupby and the year() function call:

In [15]:
df.select(year(df['Date'])).show()

+----------+
|year(Date)|
+----------+
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
+----------+
only showing top 20 rows



In [19]:
df.withColumn("Year",year(df['Date'])).show()

+--------------------+------------------+------------------+------------------+------------------+---------+------------------+----+
|                Date|              Open|              High|               Low|             Close|   Volume|         Adj Close|Year|
+--------------------+------------------+------------------+------------------+------------------+---------+------------------+----+
|2010-01-04 00:00:...|        213.429998|        214.499996|212.38000099999996|        214.009998|123432400|         27.727039|2010|
|2010-01-05 00:00:...|        214.599998|        215.589994|        213.249994|        214.379993|150476200|27.774976000000002|2010|
|2010-01-06 00:00:...|        214.379993|            215.23|        210.750004|        210.969995|138040000|27.333178000000004|2010|
|2010-01-07 00:00:...|            211.75|        212.000006|        209.050005|            210.58|119282800|          27.28265|2010|
|2010-01-08 00:00:...|        210.299994|        212.000006|209.06000

In [9]:
newdf = df.withColumn("Year",year(df['Date']))
newdf.groupBy("Year").mean()[['Year','avg(Close)']].show()

+----+------------------+
|Year|        avg(Close)|
+----+------------------+
|2015|120.03999980555547|
|2013| 472.6348802857143|
|2014| 295.4023416507935|
|2012| 576.0497195640002|
|2016|104.60400786904763|
|2010| 259.8424600000002|
|2011|364.00432532142867|
+----+------------------+



In [10]:
newdf.groupBy("Year").mean().show()

+----+------------------+------------------+------------------+------------------+--------------------+------------------+---------+
|Year|         avg(Open)|         avg(High)|          avg(Low)|        avg(Close)|         avg(Volume)|    avg(Adj Close)|avg(Year)|
+----+------------------+------------------+------------------+------------------+--------------------+------------------+---------+
|2015|120.17575393253965|121.24452385714291| 118.8630954325397|120.03999980555547|  5.18378869047619E7|115.96740080555561|   2015.0|
|2013| 473.1281355634922| 477.6389272301587|468.24710264682557| 472.6348802857143|          1.016087E8| 62.61798788492063|   2013.0|
|2014| 295.1426195357143|297.56103184523823| 292.9949599801587| 295.4023416507935| 6.315273055555555E7| 87.63583323809523|   2014.0|
|2012|     576.652720788| 581.8254008040001| 569.9211606079999| 576.0497195640002|       1.319642044E8| 74.81383696800002|   2012.0|
|2016|104.50777772619044| 105.4271825436508|103.69027771825397|104.60

21/12/20 20:35:47 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 12057261 ms exceeds timeout 120000 ms
21/12/20 20:35:47 WARN SparkContext: Killing executors is not supported by current scheduler.


Still not quite presentable! Let's use the .alias method as well as round() to clean this up!

In [43]:
result = newdf.groupBy("Year").mean()[['avg(Year)','avg(Close)']]
result = result.withColumnRenamed("avg(Year)","Year")
result = result.select('Year',format_number('avg(Close)',2).alias("Mean Close")).show()

+------+----------+
|  Year|Mean Close|
+------+----------+
|2015.0|    120.04|
|2013.0|    472.63|
|2014.0|    295.40|
|2012.0|    576.05|
|2016.0|    104.60|
|2010.0|    259.84|
|2011.0|    364.00|
+------+----------+



Perfect! Now you know how to work with Date and Timestamp information!