In [8]:
# read stock price data

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Ops").getOrCreate()

df = spark.read.csv("appl_stock.csv", inferSchema = True, header = True)

df.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



In [9]:
df.show()

+----------+------------------+------------------+------------------+------------------+---------+------------------+
|      Date|              Open|              High|               Low|             Close|   Volume|         Adj Close|
+----------+------------------+------------------+------------------+------------------+---------+------------------+
|2010-01-04|        213.429998|        214.499996|212.38000099999996|        214.009998|123432400|         27.727039|
|2010-01-05|        214.599998|        215.589994|        213.249994|        214.379993|150476200|27.774976000000002|
|2010-01-06|        214.379993|            215.23|        210.750004|        210.969995|138040000|27.333178000000004|
|2010-01-07|            211.75|        212.000006|        209.050005|            210.58|119282800|          27.28265|
|2010-01-08|        210.299994|        212.000006|209.06000500000002|211.98000499999998|111902700|         27.464034|
|2010-01-11|212.79999700000002|        213.000002|      

In [10]:
df.head(5)[1]

Row(Date='2010-01-05', Open=214.599998, High=215.589994, Low=213.249994, Close=214.379993, Volume=150476200, Adj Close=27.774976000000002)

In [16]:
# filter with SQL
df.filter("close < 92").show()

+----------+---------+-----------------+---------+-----------------+---------+-----------------+
|      Date|     Open|             High|      Low|            Close|   Volume|        Adj Close|
+----------+---------+-----------------+---------+-----------------+---------+-----------------+
|2014-06-13|92.199997|        92.440002|90.879997|        91.279999| 54525000|        86.610132|
|2014-06-19|92.290001|        92.300003|91.339996|        91.860001| 35528000|        87.160461|
|2014-06-20|91.849998|        92.550003|90.900002|        90.910004|100898000|        86.259066|
|2014-06-23|    91.32|        91.620003|90.599998|90.83000200000001| 43694000|        86.183157|
|2014-06-24|    90.75|        91.739998|90.190002|        90.279999| 39036000|        85.661292|
|2014-06-25|90.209999|        90.699997|89.650002|        90.360001| 36869000|        85.737201|
|2014-06-26|90.370003|        91.050003|89.800003|        90.900002| 32629000|        86.249576|
|2014-06-27|    90.82|        

In [19]:
df.filter("close < 92").select("Open", "Close").show()

+---------+-----------------+
|     Open|            Close|
+---------+-----------------+
|92.199997|        91.279999|
|92.290001|        91.860001|
|91.849998|        90.910004|
|    91.32|90.83000200000001|
|    90.75|        90.279999|
|90.209999|        90.360001|
|90.370003|        90.900002|
|    90.82|        91.980003|
|92.720001|        90.339996|
|     90.0|        90.519997|
+---------+-----------------+



In [20]:
# filter same with py syntax

df.filter(df["Close"] < 92).select("Open", "Close").show()

+---------+-----------------+
|     Open|            Close|
+---------+-----------------+
|92.199997|        91.279999|
|92.290001|        91.860001|
|91.849998|        90.910004|
|    91.32|90.83000200000001|
|    90.75|        90.279999|
|90.209999|        90.360001|
|90.370003|        90.900002|
|    90.82|        91.980003|
|92.720001|        90.339996|
|     90.0|        90.519997|
+---------+-----------------+



In [24]:
# filter with multiple conditions
# conditions must be added within parentheses and 
# separated by & or |
# the NOT operator (negation) is the tilde ~ symbol

df.filter( (df["Close"] < 200) & (df["Open"] > 200) ).show()


+----------+------------------+----------+----------+----------+---------+------------------+
|      Date|              Open|      High|       Low|     Close|   Volume|         Adj Close|
+----------+------------------+----------+----------+----------+---------+------------------+
|2010-01-22|206.78000600000001|207.499996|    197.16|    197.75|220441900|         25.620401|
|2010-01-28|        204.930004|205.500004|198.699995|199.289995|293375600|25.819922000000002|
|2010-01-29|        201.079996|202.199995|190.250002|192.060003|311488100|         24.883208|
+----------+------------------+----------+----------+----------+---------+------------------+



In [26]:
# saving the result is by the collect() method
result = df.filter(df["Low"] == 197.16).collect()
result #row object

[Row(Date='2010-01-22', Open=206.78000600000001, High=207.499996, Low=197.16, Close=197.75, Volume=220441900, Adj Close=25.620401)]

In [30]:
#turn it into a dictionary
row = result[0]
row_dict = row.asDict()
row_dict

{'Date': '2010-01-22',
 'Open': 206.78000600000001,
 'High': 207.499996,
 'Low': 197.16,
 'Close': 197.75,
 'Volume': 220441900,
 'Adj Close': 25.620401}

In [31]:
row_dict["Volume"]

220441900