<a href="https://colab.research.google.com/github/BDML-Workshop/DataFrames/blob/main/DataFrame_Basic_Operations_stock_example.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Basic Operations

This activity will cover some basic operations with Spark DataFrames.

I will use Alpha vantage AI key 57BHZCU2AZ7PIIKX , you can get a free api key here : https://www.alphavantage.co/support/#api-key 

Through the Alpha Vantage Time Series endpoints, it is possible to obtain historical equities and currency rate data for individual symbols. For daily, weekly, and monthly frequencies, 20+ years of historical data is available. The past 3-5 days of intraday data is also available.

The following endpoints are available:

* av-intraday - Intraday Time Series
* av-daily - Daily Time Series
* av-daily-adjusted - Daily Time Series (Adjusted)
* av-weekly - Weekly Time Series
* av-weekly-adjusted - Weekly Time Series (Adjusted)
* av-monthly - Monthly Time Series
* av-monthly-adjusted - Monthly Time Series (Adjusted)
* av-forex-daily - Daily Time Series

AAPL is Apple Inc.



In [43]:
from datetime import datetime
import pandas_datareader.data as web
pandas_df = web.DataReader("AAPL", "av-daily", start=datetime(2020, 2, 9),
                   end=datetime(2020, 11, 14),api_key='57BHZCU2AZ7PIIKX')


In [45]:
panda_df_msf =  web.DataReader("MSFT", "av-daily", start=datetime(2020, 2, 9),
                   end=datetime(2020, 11, 14),api_key='57BHZCU2AZ7PIIKX')

In [4]:
!wget -q https://mirrors.netix.net/apache/spark/spark-3.0.1/spark-3.0.1-bin-hadoop3.2.tgz
!tar -xzf spark-3.0.1-bin-hadoop3.2.tgz
!pip install -q findspark
# define some evironement variable diretly with python instruction using the module os
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/default-java"
os.environ["SPARK_HOME"] = "/content/spark-3.0.1-bin-hadoop3.2"
import findspark
findspark.init()

In [5]:
from pyspark.sql import SparkSession

In [6]:
# May take awhile locally
spark = SparkSession.builder.appName("Operations").getOrCreate()

In [46]:
# Let Spark know about the header and infer the Schema types!
df = spark.createDataFrame(pandas_df)
df_msf = spark.createDataFrame(panda_df_msf)

In [9]:
df.printSchema()

root
 |-- open: double (nullable = true)
 |-- high: double (nullable = true)
 |-- low: double (nullable = true)
 |-- close: double (nullable = true)
 |-- volume: long (nullable = true)



In [47]:
df_msf.show()

+------+------+------+------+--------+
|  open|  high|   low| close|  volume|
+------+------+------+------+--------+
|183.58|188.84|183.25| 188.7|35844267|
|190.65| 190.7| 183.5|184.44|53159906|
|185.58|185.85|181.85|184.71|47062921|
|183.08|186.23|182.87|183.71|35295834|
|183.25|185.41|182.65|185.35|23149516|
|185.61| 187.7| 185.5|187.23|27853113|
|188.06|188.18|186.47|187.28|29997471|
|186.95|187.25| 181.1|184.42|36862376|
|183.17| 183.5|177.25|178.59|48600385|
|167.77|174.55|163.23|170.89|68311066|
| 174.2|174.84|167.65|168.07|68073295|
|169.71|173.26|168.21|170.17|56387148|
|163.32|167.03|157.98|158.18|93174906|
|152.41|163.71| 152.0|162.01|97073557|
|165.31|172.92|162.31|172.79|71030810|
| 173.8| 175.0|162.26|164.51|71677019|
|168.49| 170.7|165.62|170.55|49814383|
|166.05|170.87|165.69|166.27|47817251|
|162.61|163.11| 156.0|161.57|72821057|
| 151.0|157.75| 150.0|150.62|70419274|
+------+------+------+------+--------+
only showing top 20 rows



In [26]:
df.show()

+------+------+------+------+---------+
|  open|  high|   low| close|   volume|
+------+------+------+------+---------+
|314.18|321.55|313.85|321.55| 27337215|
| 323.6| 323.9|318.71|319.61| 23580780|
|321.47|327.22|321.47| 327.2| 28432573|
|324.19|326.22|323.35|324.87| 23686892|
|324.74|325.98|322.85|324.95| 20028447|
|315.36|319.75|314.61| 319.0| 38190545|
| 320.0|324.57| 320.0|323.62| 23495991|
|322.63|324.65|318.21| 320.3| 25141489|
|318.62|320.45| 310.5|313.05| 32426415|
|297.26|304.18|289.23|298.18| 55548828|
|300.95|302.53|286.13|288.08| 57668364|
|286.53|297.88| 286.5|292.65| 49678431|
| 281.1| 286.0|272.96|273.52| 80151381|
|257.26|278.41|256.37|273.36|106721230|
|282.28|301.44|277.72|298.81| 85349339|
|303.67| 304.0| 285.8|289.32| 79868852|
|296.44| 303.4|293.13|302.74| 54794568|
|295.52|299.55|291.41|292.92| 46893219|
| 282.0|290.82|281.23|289.03| 56544246|
|263.75|278.09| 263.0|266.17| 71686208|
+------+------+------+------+---------+
only showing top 20 rows



## Filtering Data

A large part of working with DataFrames is the ability to quickly filter out data based on conditions. Spark DataFrames are built on top of the Spark SQL platform, which means that is you already know SQL, you can quickly and easily grab that data using SQL commands, or using the DataFram methods (which is what we focus on in this course).

In [27]:
# Using SQL
df.filter("Close<300").show()

+------+------+------+------+---------+
|  open|  high|   low| close|   volume|
+------+------+------+------+---------+
|297.26|304.18|289.23|298.18| 55548828|
|300.95|302.53|286.13|288.08| 57668364|
|286.53|297.88| 286.5|292.65| 49678431|
| 281.1| 286.0|272.96|273.52| 80151381|
|257.26|278.41|256.37|273.36|106721230|
|282.28|301.44|277.72|298.81| 85349339|
|303.67| 304.0| 285.8|289.32| 79868852|
|295.52|299.55|291.41|292.92| 46893219|
| 282.0|290.82|281.23|289.03| 56544246|
|263.75|278.09| 263.0|266.17| 71686208|
|277.14|286.44|269.37|285.34| 71322520|
|277.39|281.22|271.86|275.43| 64094970|
|255.94| 270.0| 248.0|248.23|104618517|
|264.89|279.92|252.95|277.97| 92683032|
|241.95|259.08| 240.0|242.21| 80605865|
|247.51|257.61| 238.4|252.86| 81013965|
|239.77| 250.0|237.12|246.67| 75058406|
|247.39|252.84|242.61|244.78| 67964255|
|247.18|251.83| 228.0|229.24|100423346|
|228.08| 228.5|212.61|224.37| 84188208|
+------+------+------+------+---------+
only showing top 20 rows



In [21]:
# Using SQL with .select()
df.filter("Close<300").select('Open').show()

+------+
|  Open|
+------+
|171.05|
| 170.1|
|171.39|
|169.71|
|171.25|
|169.71|
| 171.8|
+------+



In [29]:
# Using SQL with .select()
df.filter("Close<300").select(['Open','Close']).show()

+------+------+
|  Open| Close|
+------+------+
|297.26|298.18|
|300.95|288.08|
|286.53|292.65|
| 281.1|273.52|
|257.26|273.36|
|282.28|298.81|
|303.67|289.32|
|295.52|292.92|
| 282.0|289.03|
|263.75|266.17|
|277.14|285.34|
|277.39|275.43|
|255.94|248.23|
|264.89|277.97|
|241.95|242.21|
|247.51|252.86|
|239.77|246.67|
|247.39|244.78|
|247.18|229.24|
|228.08|224.37|
+------+------+
only showing top 20 rows



Using normal python comparison operators is another way to do this, they will look very similar to SQL operators, except you need to make sure you are calling the entire column within the dataframe, using the format: df["column name"]

Let's see some examples:

In [28]:
df.filter(df["Close"] < 300).show()

+------+------+------+------+---------+
|  open|  high|   low| close|   volume|
+------+------+------+------+---------+
|297.26|304.18|289.23|298.18| 55548828|
|300.95|302.53|286.13|288.08| 57668364|
|286.53|297.88| 286.5|292.65| 49678431|
| 281.1| 286.0|272.96|273.52| 80151381|
|257.26|278.41|256.37|273.36|106721230|
|282.28|301.44|277.72|298.81| 85349339|
|303.67| 304.0| 285.8|289.32| 79868852|
|295.52|299.55|291.41|292.92| 46893219|
| 282.0|290.82|281.23|289.03| 56544246|
|263.75|278.09| 263.0|266.17| 71686208|
|277.14|286.44|269.37|285.34| 71322520|
|277.39|281.22|271.86|275.43| 64094970|
|255.94| 270.0| 248.0|248.23|104618517|
|264.89|279.92|252.95|277.97| 92683032|
|241.95|259.08| 240.0|242.21| 80605865|
|247.51|257.61| 238.4|252.86| 81013965|
|239.77| 250.0|237.12|246.67| 75058406|
|247.39|252.84|242.61|244.78| 67964255|
|247.18|251.83| 228.0|229.24|100423346|
|228.08| 228.5|212.61|224.37| 84188208|
+------+------+------+------+---------+
only showing top 20 rows



In [30]:
# Will produce an error, make sure to read the error!
df.filter(df["Close"] < 300 and df['Open'] > 200).show()

ValueError: ignored

In [32]:
# Make sure to add in the parenthesis separating the statements!
df.filter( (df["Close"] < 300) & (df['Open'] > 200) ).show()

+------+------+------+------+---------+
|  open|  high|   low| close|   volume|
+------+------+------+------+---------+
|297.26|304.18|289.23|298.18| 55548828|
|300.95|302.53|286.13|288.08| 57668364|
|286.53|297.88| 286.5|292.65| 49678431|
| 281.1| 286.0|272.96|273.52| 80151381|
|257.26|278.41|256.37|273.36|106721230|
|282.28|301.44|277.72|298.81| 85349339|
|303.67| 304.0| 285.8|289.32| 79868852|
|295.52|299.55|291.41|292.92| 46893219|
| 282.0|290.82|281.23|289.03| 56544246|
|263.75|278.09| 263.0|266.17| 71686208|
|277.14|286.44|269.37|285.34| 71322520|
|277.39|281.22|271.86|275.43| 64094970|
|255.94| 270.0| 248.0|248.23|104618517|
|264.89|279.92|252.95|277.97| 92683032|
|241.95|259.08| 240.0|242.21| 80605865|
|247.51|257.61| 238.4|252.86| 81013965|
|239.77| 250.0|237.12|246.67| 75058406|
|247.39|252.84|242.61|244.78| 67964255|
|247.18|251.83| 228.0|229.24|100423346|
|228.08| 228.5|212.61|224.37| 84188208|
+------+------+------+------+---------+
only showing top 20 rows



In [33]:
# Make sure to add in the parenthesis separating the statements!
df.filter( (df["Close"] < 200) | (df['Open'] > 200) ).show()

+------+------+------+------+---------+
|  open|  high|   low| close|   volume|
+------+------+------+------+---------+
|314.18|321.55|313.85|321.55| 27337215|
| 323.6| 323.9|318.71|319.61| 23580780|
|321.47|327.22|321.47| 327.2| 28432573|
|324.19|326.22|323.35|324.87| 23686892|
|324.74|325.98|322.85|324.95| 20028447|
|315.36|319.75|314.61| 319.0| 38190545|
| 320.0|324.57| 320.0|323.62| 23495991|
|322.63|324.65|318.21| 320.3| 25141489|
|318.62|320.45| 310.5|313.05| 32426415|
|297.26|304.18|289.23|298.18| 55548828|
|300.95|302.53|286.13|288.08| 57668364|
|286.53|297.88| 286.5|292.65| 49678431|
| 281.1| 286.0|272.96|273.52| 80151381|
|257.26|278.41|256.37|273.36|106721230|
|282.28|301.44|277.72|298.81| 85349339|
|303.67| 304.0| 285.8|289.32| 79868852|
|296.44| 303.4|293.13|302.74| 54794568|
|295.52|299.55|291.41|292.92| 46893219|
| 282.0|290.82|281.23|289.03| 56544246|
|263.75|278.09| 263.0|266.17| 71686208|
+------+------+------+------+---------+
only showing top 20 rows



In [None]:
# Make sure to add in the parenthesis separating the statements!
df.filter( (df["Close"] < 200) & ~(df['Open'] < 200) ).show()

+--------------------+------------------+----------+----------+----------+---------+------------------+
|                Date|              Open|      High|       Low|     Close|   Volume|         Adj Close|
+--------------------+------------------+----------+----------+----------+---------+------------------+
|2010-01-22 00:00:...|206.78000600000001|207.499996|    197.16|    197.75|220441900|         25.620401|
|2010-01-28 00:00:...|        204.930004|205.500004|198.699995|199.289995|293375600|25.819922000000002|
|2010-01-29 00:00:...|        201.079996|202.199995|190.250002|192.060003|311488100|         24.883208|
+--------------------+------------------+----------+----------+----------+---------+------------------+



In [35]:
df.filter(df["Low"] == 318.21).show()

+------+------+------+-----+--------+
|  open|  high|   low|close|  volume|
+------+------+------+-----+--------+
|322.63|324.65|318.21|320.3|25141489|
+------+------+------+-----+--------+



In [36]:
# Collecting results as Python objects
df.filter(df["Low"] == 318.21).collect()

[Row(open=322.63, high=324.65, low=318.21, close=320.3, volume=25141489)]

In [37]:
result = df.filter(df["Low"] == 318.21).collect()

In [38]:
# Note the nested structure returns a nested row object
type(result[0])

pyspark.sql.types.Row

In [39]:
row = result[0]

Rows can be called to turn into dictionaries

In [40]:
row.asDict()

{'close': 320.3,
 'high': 324.65,
 'low': 318.21,
 'open': 322.63,
 'volume': 25141489}

In [41]:
for item in result[0]:
    print(item)

322.63
324.65
318.21
320.3
25141489


That is all for now Great Job!