In [1]:
import findspark

In [2]:
findspark.init('/home/gympass/spark-3.1.2-bin-hadoop3.2')

In [4]:
from pyspark.sql import SparkSession

In [12]:
spark = SparkSession.builder.appName('dates').getOrCreate()

In [32]:
from pyspark.sql.types import (StructField, 
                               StringType, 
                               IntegerType,
                               StructType,
                               TimestampType,
                               DoubleType)

In [33]:
data_schema = [
    StructField('Date',TimestampType(),True),
    StructField('Open',DoubleType(),True),
    StructField('High',DoubleType(),True),
    StructField('Low',DoubleType(),True),
    StructField('Close',DoubleType(),True),
    StructField('Volume',IntegerType(),True),
    StructField('Adj Close',DoubleType(),True),
              ]
final_struct = StructType(fields=data_schema)

In [34]:
df = spark.read.csv('appl_stock.csv', header=True, schema=final_struct)

In [35]:
df.printSchema()

root
 |-- Date: timestamp (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



In [36]:
df.head(1)

[Row(Date=datetime.datetime(2010, 1, 4, 0, 0), Open=213.429998, High=214.499996, Low=212.38000099999996, Close=214.009998, Volume=123432400, Adj Close=27.727039)]

In [37]:
df.select(['Date','Open']).show()

+-------------------+------------------+
|               Date|              Open|
+-------------------+------------------+
|2010-01-04 00:00:00|        213.429998|
|2010-01-05 00:00:00|        214.599998|
|2010-01-06 00:00:00|        214.379993|
|2010-01-07 00:00:00|            211.75|
|2010-01-08 00:00:00|        210.299994|
|2010-01-11 00:00:00|212.79999700000002|
|2010-01-12 00:00:00|209.18999499999998|
|2010-01-13 00:00:00|        207.870005|
|2010-01-14 00:00:00|210.11000299999998|
|2010-01-15 00:00:00|210.92999500000002|
|2010-01-19 00:00:00|        208.330002|
|2010-01-20 00:00:00|        214.910006|
|2010-01-21 00:00:00|        212.079994|
|2010-01-22 00:00:00|206.78000600000001|
|2010-01-25 00:00:00|202.51000200000001|
|2010-01-26 00:00:00|205.95000100000001|
|2010-01-27 00:00:00|        206.849995|
|2010-01-28 00:00:00|        204.930004|
|2010-01-29 00:00:00|        201.079996|
|2010-02-01 00:00:00|192.36999699999998|
+-------------------+------------------+
only showing top

In [39]:
from pyspark.sql.functions import (dayofmonth,hour,dayofyear,
                                   month,year,weekofyear,
                                   format_number,date_format)

In [44]:
df.select(dayofmonth(df['Date'])).show()

+----------------+
|dayofmonth(Date)|
+----------------+
|               4|
|               5|
|               6|
|               7|
|               8|
|              11|
|              12|
|              13|
|              14|
|              15|
|              19|
|              20|
|              21|
|              22|
|              25|
|              26|
|              27|
|              28|
|              29|
|               1|
+----------------+
only showing top 20 rows



In [57]:
df.filter('month(Date) > 5').select(month(df['Date']).alias('Day')).show()

+---+
|Day|
+---+
|  6|
|  6|
|  6|
|  6|
|  6|
|  6|
|  6|
|  6|
|  6|
|  6|
|  6|
|  6|
|  6|
|  6|
|  6|
|  6|
|  6|
|  6|
|  6|
|  6|
+---+
only showing top 20 rows



In [65]:
newdf = df.withColumn('Year',year(df['Date']))

In [70]:
result = newdf.groupBy('Year').mean().select(['Year','avg(Close)'])

In [76]:
new = result.withColumnRenamed('avg(Close)', 'Average Closing Price')

In [80]:
new.select(['Year',format_number('Average Closing Price',2).alias('Avg Close')]).show()

+----+---------+
|Year|Avg Close|
+----+---------+
|2015|   120.04|
|2013|   472.63|
|2014|   295.40|
|2012|   576.05|
|2016|   104.60|
|2010|   259.84|
|2011|   364.00|
+----+---------+

