In [None]:
pip install pyspark

In [2]:
import pandas as pd
from pyspark.sql import SparkSession

In [3]:
# creating a spark sesssion

spark = SparkSession.builder.appName('dates_and_timestamps').getOrCreate()

In [4]:
df = spark.read.csv('/content/drive/MyDrive/Spark_DataFrames/appl_stock.csv',inferSchema=True,header=True)

In [5]:
df.printSchema()

root
 |-- Date: date (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



In [7]:
df.show(10)

+----------+------------------+------------------+------------------+------------------+---------+------------------+
|      Date|              Open|              High|               Low|             Close|   Volume|         Adj Close|
+----------+------------------+------------------+------------------+------------------+---------+------------------+
|2010-01-04|        213.429998|        214.499996|212.38000099999996|        214.009998|123432400|         27.727039|
|2010-01-05|        214.599998|        215.589994|        213.249994|        214.379993|150476200|27.774976000000002|
|2010-01-06|        214.379993|            215.23|        210.750004|        210.969995|138040000|27.333178000000004|
|2010-01-07|            211.75|        212.000006|        209.050005|            210.58|119282800|          27.28265|
|2010-01-08|        210.299994|        212.000006|209.06000500000002|211.98000499999998|111902700|         27.464034|
|2010-01-11|212.79999700000002|        213.000002|      

Grabbing parts of the timestamp data

In [8]:
from pyspark.sql.functions import format_number,hour,dayofmonth,dayofyear,month,year,weekofyear,date_format

In [9]:
# dayofmonth() function function takes a date or datetime column as input and returns the day of the month as an integer

df.select(dayofmonth(df['Date'])).show()

+----------------+
|dayofmonth(Date)|
+----------------+
|               4|
|               5|
|               6|
|               7|
|               8|
|              11|
|              12|
|              13|
|              14|
|              15|
|              19|
|              20|
|              21|
|              22|
|              25|
|              26|
|              27|
|              28|
|              29|
|               1|
+----------------+
only showing top 20 rows



In [15]:
df.select(month(df['Date'])).show()

+-----------+
|month(Date)|
+-----------+
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          2|
+-----------+
only showing top 20 rows



In [16]:
df.select(hour(df['Date'])).show()

+----------+
|hour(Date)|
+----------+
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
+----------+
only showing top 20 rows



So for example, let's say we wanted to know the average closing price per year. Easy! With a groupby and the year() function call:

In [17]:
df.show(5)

+----------+----------+----------+------------------+------------------+---------+------------------+
|      Date|      Open|      High|               Low|             Close|   Volume|         Adj Close|
+----------+----------+----------+------------------+------------------+---------+------------------+
|2010-01-04|213.429998|214.499996|212.38000099999996|        214.009998|123432400|         27.727039|
|2010-01-05|214.599998|215.589994|        213.249994|        214.379993|150476200|27.774976000000002|
|2010-01-06|214.379993|    215.23|        210.750004|        210.969995|138040000|27.333178000000004|
|2010-01-07|    211.75|212.000006|        209.050005|            210.58|119282800|          27.28265|
|2010-01-08|210.299994|212.000006|209.06000500000002|211.98000499999998|111902700|         27.464034|
+----------+----------+----------+------------------+------------------+---------+------------------+
only showing top 5 rows



In [20]:
# creating a new column year, which will contain the year from the Date column

df.withColumn("Year",year(df['Date'])).show()

+----------+------------------+------------------+------------------+------------------+---------+------------------+----+
|      Date|              Open|              High|               Low|             Close|   Volume|         Adj Close|Year|
+----------+------------------+------------------+------------------+------------------+---------+------------------+----+
|2010-01-04|        213.429998|        214.499996|212.38000099999996|        214.009998|123432400|         27.727039|2010|
|2010-01-05|        214.599998|        215.589994|        213.249994|        214.379993|150476200|27.774976000000002|2010|
|2010-01-06|        214.379993|            215.23|        210.750004|        210.969995|138040000|27.333178000000004|2010|
|2010-01-07|            211.75|        212.000006|        209.050005|            210.58|119282800|          27.28265|2010|
|2010-01-08|        210.299994|        212.000006|209.06000500000002|211.98000499999998|111902700|         27.464034|2010|
|2010-01-11|212.

In [22]:
year_min = df.withColumn("Year",year(df['Date']))
year_min.groupby('Year').mean().show()

+----+------------------+------------------+------------------+------------------+--------------------+------------------+---------+
|Year|         avg(Open)|         avg(High)|          avg(Low)|        avg(Close)|         avg(Volume)|    avg(Adj Close)|avg(Year)|
+----+------------------+------------------+------------------+------------------+--------------------+------------------+---------+
|2015|120.17575393253965|121.24452385714291| 118.8630954325397|120.03999980555547|  5.18378869047619E7|115.96740080555561|   2015.0|
|2013| 473.1281355634922| 477.6389272301587|468.24710264682557| 472.6348802857143|          1.016087E8| 62.61798788492063|   2013.0|
|2014| 295.1426195357143|297.56103184523823| 292.9949599801587| 295.4023416507935| 6.315273055555555E7| 87.63583323809523|   2014.0|
|2012|     576.652720788| 581.8254008040001| 569.9211606079999| 576.0497195640002|       1.319642044E8| 74.81383696800002|   2012.0|
|2016|104.50777772619044| 105.4271825436508|103.69027771825397|104.60

In [26]:
year_min.groupby('Year').mean()[['avg(Close)']].show()

+------------------+
|        avg(Close)|
+------------------+
|120.03999980555547|
| 472.6348802857143|
| 295.4023416507935|
| 576.0497195640002|
|104.60400786904763|
| 259.8424600000002|
|364.00432532142867|
+------------------+



In [29]:
from pyspark.sql.functions import countDistinct, avg, stddev, format_number,mean

In [39]:
new = year_min.groupby('Year')
new.agg(mean('Close'),mean('Open')).show()

+----+------------------+------------------+
|Year|        avg(Close)|         avg(Open)|
+----+------------------+------------------+
|2015|120.03999980555547|120.17575393253965|
|2013| 472.6348802857143| 473.1281355634922|
|2014| 295.4023416507935| 295.1426195357143|
|2012| 576.0497195640002|     576.652720788|
|2016|104.60400786904763|104.50777772619044|
|2010| 259.8424600000002| 259.9576190992064|
|2011|364.00432532142867|364.06142773412705|
+----+------------------+------------------+



In [52]:
new_1 = new.agg(mean('Close'),mean('Open'))
new_1 = new_1.withColumnRenamed("avg(Close)","Average_closing_price")
new_1 = new_1.withColumnRenamed("avg(Open)","Average_opening_price")
new_1.show()

+----+---------------------+---------------------+
|Year|Average_closing_price|Average_opening_price|
+----+---------------------+---------------------+
|2015|   120.03999980555547|   120.17575393253965|
|2013|    472.6348802857143|    473.1281355634922|
|2014|    295.4023416507935|    295.1426195357143|
|2012|    576.0497195640002|        576.652720788|
|2016|   104.60400786904763|   104.50777772619044|
|2010|    259.8424600000002|    259.9576190992064|
|2011|   364.00432532142867|   364.06142773412705|
+----+---------------------+---------------------+



In [53]:
res = new_1.select(format_number('Average_closing_price',2))
res = new_1.select(format_number('Average_opening_price',2))
res.show()

+---------------------------------------+
|format_number(Average_opening_price, 2)|
+---------------------------------------+
|                                 120.18|
|                                 473.13|
|                                 295.14|
|                                 576.65|
|                                 104.51|
|                                 259.96|
|                                 364.06|
+---------------------------------------+



In [54]:
result = new.agg(mean('Close'),mean('Open'))
result = result.select('Year',format_number('avg(Close)',2).alias("Mean Close"))
result.show()

+----+----------+
|Year|Mean Close|
+----+----------+
|2015|    120.04|
|2013|    472.63|
|2014|    295.40|
|2012|    576.05|
|2016|    104.60|
|2010|    259.84|
|2011|    364.00|
+----+----------+

