In [1]:
from pyspark.sql import SparkSession, Row, functions

In [None]:
### just configure spark session and read data

In [2]:
data_location = "file:///home/hatef/courses/term-4/hw3/Datasets/3/stock.csv"
spark = SparkSession.builder.appName("three").master("local[*]").getOrCreate()

In [3]:
stockData = spark.read.csv(data_location, header=True, inferSchema=True)

In [4]:
stockData.printSchema()

root
 |-- Date: timestamp (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



In [None]:
### 3 rows of our dataframe

In [5]:
stockData.head(3)

[Row(Date=datetime.datetime(2012, 1, 3, 0, 0), Open=59.970001, High=61.060001, Low=59.869999, Close=60.330002, Volume=12668800, Adj Close=52.619234999999996),
 Row(Date=datetime.datetime(2012, 1, 4, 0, 0), Open=60.209998999999996, High=60.349998, Low=59.470001, Close=59.709998999999996, Volume=9593300, Adj Close=52.078475),
 Row(Date=datetime.datetime(2012, 1, 5, 0, 0), Open=59.349998, High=59.619999, Low=58.369999, Close=59.419998, Volume=12768200, Adj Close=51.825539)]

In [6]:
stockData.show()

+-------------------+------------------+------------------+------------------+------------------+--------+------------------+
|               Date|              Open|              High|               Low|             Close|  Volume|         Adj Close|
+-------------------+------------------+------------------+------------------+------------------+--------+------------------+
|2012-01-03 00:00:00|         59.970001|         61.060001|         59.869999|         60.330002|12668800|52.619234999999996|
|2012-01-04 00:00:00|60.209998999999996|         60.349998|         59.470001|59.709998999999996| 9593300|         52.078475|
|2012-01-05 00:00:00|         59.349998|         59.619999|         58.369999|         59.419998|12768200|         51.825539|
|2012-01-06 00:00:00|         59.419998|         59.450001|         58.869999|              59.0| 8069400|          51.45922|
|2012-01-09 00:00:00|         59.029999|         59.549999|         58.919998|             59.18| 6679300|51.616215000

In [7]:
### part 1

In [8]:
stockData_with_HV = stockData.withColumn("HV", stockData["High"]/stockData["Volume"])

In [9]:
stockData_with_HV.select(["High", "Volume", "HV"]).show()

+------------------+--------+--------------------+
|              High|  Volume|                  HV|
+------------------+--------+--------------------+
|         61.060001|12668800|4.819714653321546E-6|
|         60.349998| 9593300|6.290848613094555E-6|
|         59.619999|12768200|4.669412994783916E-6|
|         59.450001| 8069400|7.367338463826307E-6|
|         59.549999| 6679300|8.915604778943901E-6|
|59.709998999999996| 6907300|8.644477436914568E-6|
|         59.529999| 6365600|9.351828421515645E-6|
|              60.0| 7236400| 8.29141562102703E-6|
|59.610001000000004| 7729300|7.712212102001476E-6|
|60.110001000000004| 8500000|7.071764823529412E-6|
|         60.029999| 5911400|1.015495466386981E-5|
|             60.73| 9234600|6.576354146362592...|
|             61.25|10378800| 5.90145296180676E-6|
|             60.98| 7134100|8.547679455011844E-6|
|              62.0| 7362800|8.420709512685392E-6|
|61.610001000000004| 5915800|1.041448341728929...|
|             61.84| 7436200|8.

In [10]:
### part 2

In [11]:
stockData.orderBy("High", ascending=False).limit(1).select(["Date", "High"]).show()

+-------------------+---------+
|               Date|     High|
+-------------------+---------+
|2015-01-13 00:00:00|90.970001|
+-------------------+---------+



In [12]:
stockData.createOrReplaceTempView("stock")

In [13]:
spark.sql("select Date, High from stock order by High desc limit 1").show()

+-------------------+---------+
|               Date|     High|
+-------------------+---------+
|2015-01-13 00:00:00|90.970001|
+-------------------+---------+



In [14]:
### part 3

In [15]:
stockData.select(functions.avg(functions.col("Close")).alias("average close")).show()

+-----------------+
|    average close|
+-----------------+
|72.38844998012726|
+-----------------+



In [16]:
spark.sql("select avg(Close) as averag_close from stock").show()

+-----------------+
|     averag_close|
+-----------------+
|72.38844998012726|
+-----------------+



In [17]:
### part 4

In [18]:
stockData.select([functions.min(functions.col("Volume")).alias("min volume"), functions.max(functions.col("Volume")).alias("max volume")]).show()

+----------+----------+
|min volume|max volume|
+----------+----------+
|   2094900|  80898100|
+----------+----------+



In [19]:
spark.sql("select min(Volume) as min_volume, max(Volume) as max_volume from stock").show()

+----------+----------+
|min_volume|max_volume|
+----------+----------+
|   2094900|  80898100|
+----------+----------+



In [20]:
### part 5

In [21]:
stockData.filter(stockData["Close"] < 60).count()

81

In [22]:
spark.sql("select count(*) as cnt from stock where Close < 60").show()

+---+
|cnt|
+---+
| 81|
+---+



In [23]:
### part 6

In [24]:
stockData.stat.corr("High", "Volume")

-0.3384326061737161

In [25]:
spark.sql("select corr(High, Volume) as corr  from stock").show()

+-------------------+
|               corr|
+-------------------+
|-0.3384326061737161|
+-------------------+



In [26]:
### part 7

In [27]:
stockData.select([functions.col("Date"), functions.col("High")]).groupBy(functions.year(functions.col("Date")))\
 .max("High").withColumnRenamed("year(Date)", "year").withColumnRenamed("max(High)", "High").show()

+----+---------+
|year|     High|
+----+---------+
|2015|90.970001|
|2013|81.370003|
|2014|88.089996|
|2012|77.599998|
|2016|75.190002|
+----+---------+



In [28]:
spark.sql("select year(Date) as year, max(High) as high from stock group by year").show()

+----+---------+
|year|     high|
+----+---------+
|2015|90.970001|
|2013|81.370003|
|2014|88.089996|
|2012|77.599998|
|2016|75.190002|
+----+---------+

