In [1]:
import findspark

In [2]:
findspark.init('/home/ubuntu/spark-2.4.5-bin-hadoop2.7')

In [3]:
from pyspark.sql import SparkSession

In [4]:
spark = SparkSession.builder.appName('apple').getOrCreate()

In [5]:
df = spark.read.csv('appl_stock.csv',inferSchema=True, header=True)

In [6]:
df.columns

['Date', 'Open', 'High', 'Low', 'Close', 'Volume', 'Adj Close']

In [7]:
df.printSchema()

root
 |-- Date: timestamp (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



In [8]:
for row in df.head(5):
    print(row)

Row(Date=datetime.datetime(2010, 1, 4, 0, 0), Open=213.429998, High=214.499996, Low=212.38000099999996, Close=214.009998, Volume=123432400, Adj Close=27.727039)
Row(Date=datetime.datetime(2010, 1, 5, 0, 0), Open=214.599998, High=215.589994, Low=213.249994, Close=214.379993, Volume=150476200, Adj Close=27.774976000000002)
Row(Date=datetime.datetime(2010, 1, 6, 0, 0), Open=214.379993, High=215.23, Low=210.750004, Close=210.969995, Volume=138040000, Adj Close=27.333178000000004)
Row(Date=datetime.datetime(2010, 1, 7, 0, 0), Open=211.75, High=212.000006, Low=209.050005, Close=210.58, Volume=119282800, Adj Close=27.28265)
Row(Date=datetime.datetime(2010, 1, 8, 0, 0), Open=210.299994, High=212.000006, Low=209.06000500000002, Close=211.98000499999998, Volume=111902700, Adj Close=27.464034)


In [9]:
df.describe().show()

+-------+------------------+------------------+------------------+-----------------+-------------------+------------------+
|summary|              Open|              High|               Low|            Close|             Volume|         Adj Close|
+-------+------------------+------------------+------------------+-----------------+-------------------+------------------+
|  count|              1762|              1762|              1762|             1762|               1762|              1762|
|   mean| 313.0763111589103| 315.9112880164581| 309.8282405079457|312.9270656379113|9.422577587968218E7| 75.00174115607275|
| stddev|185.29946803981522|186.89817686485767|183.38391664371008|185.1471036170943|6.020518776592709E7| 28.57492972179906|
|    min|              90.0|         90.699997|         89.470001|        90.279999|           11475900|         24.881912|
|    max|        702.409988|        705.070023|        699.569977|       702.100021|          470249500|127.96609099999999|
+-------

In [10]:
df.describe().printSchema()

root
 |-- summary: string (nullable = true)
 |-- Open: string (nullable = true)
 |-- High: string (nullable = true)
 |-- Low: string (nullable = true)
 |-- Close: string (nullable = true)
 |-- Volume: string (nullable = true)
 |-- Adj Close: string (nullable = true)



In [11]:
from pyspark.sql.functions import format_number

In [12]:
result = df.describe()

In [13]:
result.select(result['summary'],
              format_number(result['Open'].cast('float'),2).alias('Open'),
              format_number(result['High'].cast('float'),2).alias('High'),
              format_number(result['Low'].cast('float'),2).alias('Low'),
              format_number(result['Close'].cast('float'),2).alias('Close'),
              result['Volume'].cast('int').alias('Volume')).show()

+-------+--------+--------+--------+--------+---------+
|summary|    Open|    High|     Low|   Close|   Volume|
+-------+--------+--------+--------+--------+---------+
|  count|1,762.00|1,762.00|1,762.00|1,762.00|     1762|
|   mean|  313.08|  315.91|  309.83|  312.93|     null|
| stddev|  185.30|  186.90|  183.38|  185.15|     null|
|    min|   90.00|   90.70|   89.47|   90.28| 11475900|
|    max|  702.41|  705.07|  699.57|  702.10|470249500|
+-------+--------+--------+--------+--------+---------+



In [14]:
result.describe().printSchema()

root
 |-- summary: string (nullable = true)
 |-- summary: string (nullable = true)
 |-- Open: string (nullable = true)
 |-- High: string (nullable = true)
 |-- Low: string (nullable = true)
 |-- Close: string (nullable = true)
 |-- Volume: string (nullable = true)
 |-- Adj Close: string (nullable = true)



In [15]:
df2 = df.withColumn("HV ration", df['High']/df['Volume'])

In [16]:
df2.select('HV ration').show()

+--------------------+
|           HV ration|
+--------------------+
|1.737793286041590...|
|1.432718223878593...|
|1.559185743262822...|
|1.777288980473295...|
|1.894503045949740...|
|1.843239827133528...|
|1.411500428288146...|
|1.392525367557254...|
|1.944679270213955...|
|1.424753661031169E-6|
|1.179111006515548...|
|1.408471832522860...|
|1.402998948951121...|
|9.412910884908904E-7|
|7.683215757986584E-7|
|4.578412734118504E-7|
|4.889907419641508E-7|
|7.004672644896166E-7|
|6.491419575900331E-7|
|1.045505632661596E-6|
+--------------------+
only showing top 20 rows



In [17]:
df.head(11)

[Row(Date=datetime.datetime(2010, 1, 4, 0, 0), Open=213.429998, High=214.499996, Low=212.38000099999996, Close=214.009998, Volume=123432400, Adj Close=27.727039),
 Row(Date=datetime.datetime(2010, 1, 5, 0, 0), Open=214.599998, High=215.589994, Low=213.249994, Close=214.379993, Volume=150476200, Adj Close=27.774976000000002),
 Row(Date=datetime.datetime(2010, 1, 6, 0, 0), Open=214.379993, High=215.23, Low=210.750004, Close=210.969995, Volume=138040000, Adj Close=27.333178000000004),
 Row(Date=datetime.datetime(2010, 1, 7, 0, 0), Open=211.75, High=212.000006, Low=209.050005, Close=210.58, Volume=119282800, Adj Close=27.28265),
 Row(Date=datetime.datetime(2010, 1, 8, 0, 0), Open=210.299994, High=212.000006, Low=209.06000500000002, Close=211.98000499999998, Volume=111902700, Adj Close=27.464034),
 Row(Date=datetime.datetime(2010, 1, 11, 0, 0), Open=212.79999700000002, High=213.000002, Low=208.450005, Close=210.11000299999998, Volume=115557400, Adj Close=27.221758),
 Row(Date=datetime.datet

In [18]:
df.orderBy(df['High'].desc()).head(1)

[Row(Date=datetime.datetime(2012, 9, 21, 0, 0), Open=702.409988, High=705.070023, Low=699.3599849999999, Close=700.089989, Volume=142897300, Adj Close=91.09278)]

In [19]:
df.orderBy(df['High'].desc()).head(1)[0][0]

datetime.datetime(2012, 9, 21, 0, 0)

In [20]:
from pyspark.sql.functions import mean

In [21]:
df.select(mean('Close')).show()

+-----------------+
|       avg(Close)|
+-----------------+
|312.9270656379113|
+-----------------+



In [22]:
from pyspark.sql.functions import max,min

In [23]:
df.select(max('Volume'),min('Volume')).show()

+-----------+-----------+
|max(Volume)|min(Volume)|
+-----------+-----------+
|  470249500|   11475900|
+-----------+-----------+



In [24]:
df.filter("Close < 100").count()

163

In [25]:
df.filter(df['Close']<100).count()

163

In [26]:
from pyspark.sql.functions import count

In [27]:
result = df.filter(df['Close']<100)

In [28]:
result.select(count('Close')).show()

+------------+
|count(Close)|
+------------+
|         163|
+------------+



In [29]:
(df.filter(df['High']>120).count()*1.0/df.count())*100

70.71509648127127

In [30]:
from pyspark.sql.functions import corr

In [31]:
df.select(corr('High','Volume')).show()

+------------------+
|corr(High, Volume)|
+------------------+
|0.4144024424820674|
+------------------+



In [32]:
from pyspark.sql.functions import year

In [33]:
yeardf = df.withColumn('Year',year(df['Date']))

In [34]:
max_df = yeardf.groupBy('Year').max()

In [35]:
max_df.select('Year','max(Low)','max(High)').show()

+----+-----------------+------------------+
|Year|         max(Low)|         max(High)|
+----+-----------------+------------------+
|2015|       131.399994|134.53999299999998|
|2013|566.4100269999999|        575.139999|
|2014|       644.470024|        651.259979|
|2012|       699.569977|        705.070023|
|2016|       117.449997|        118.690002|
|2010|       325.099991|            326.66|
|2011|       415.990002|426.69999299999995|
+----+-----------------+------------------+



In [36]:
from pyspark.sql.functions import month

In [37]:
monthdf = df.withColumn('Month',month('Date'))

In [38]:
monthavga = monthdf.select('Month','Open','Close').groupBy('Month').mean()

In [39]:
monthavga.select('Month','avg(Open)','avg(Close)').orderBy('Month').show()

+-----+------------------+------------------+
|Month|         avg(Open)|        avg(Close)|
+-----+------------------+------------------+
|    1|322.90628572142856|322.20971425714276|
|    2| 320.7831106888889| 321.3595563037038|
|    3| 332.8893468300655|332.91156731372547|
|    4| 341.0048631506845|340.51041081506827|
|    5|351.59870752380965|351.62102085714304|
|    6|288.75166685333335|      288.12546566|
|    7| 281.2487831148649|281.72216211486483|
|    8| 300.2057422967742| 300.4385809612901|
|    9| 301.3590970763887| 301.0763195902777|
|   10| 308.5571721973687|308.30552563157886|
|   11|  306.612237797203| 306.2725174895104|
|   12|302.76953076510085|302.35053626845644|
+-----+------------------+------------------+

