# Basic Operations

This lecture will cover some basic operations with Spark DataFrames.

We will play around with some stock data from Apple.

In [2]:
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder.appName("Operations").getOrCreate()

In [25]:
# Let Spark know about the header and infer the Schema types!
df = spark.read.csv('appl_stock.csv',inferSchema=True,header=True)

In [26]:
df.printSchema()

root
 |-- Date: timestamp (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



In [27]:
df.show(5)

+-------------------+----------+----------+------------------+------------------+---------+------------------+
|               Date|      Open|      High|               Low|             Close|   Volume|         Adj Close|
+-------------------+----------+----------+------------------+------------------+---------+------------------+
|2010-01-04 00:00:00|213.429998|214.499996|212.38000099999996|        214.009998|123432400|         27.727039|
|2010-01-05 00:00:00|214.599998|215.589994|        213.249994|        214.379993|150476200|27.774976000000002|
|2010-01-06 00:00:00|214.379993|    215.23|        210.750004|        210.969995|138040000|27.333178000000004|
|2010-01-07 00:00:00|    211.75|212.000006|        209.050005|            210.58|119282800|          27.28265|
|2010-01-08 00:00:00|210.299994|212.000006|209.06000500000002|211.98000499999998|111902700|         27.464034|
+-------------------+----------+----------+------------------+------------------+---------+------------------+
o

## Filtering Data

A large part of working with DataFrames is the ability to quickly filter out data based on conditions. Spark DataFrames are built on top of the Spark SQL platform, which means that is you already know SQL, you can quickly and easily grab that data using SQL commands, or using the DataFram methods (which is what we focus on in this course).

In [41]:
# Using SQL -- all give the same result
df.filter("Close<500").show(5)
# df.filter(df["Close"]<500).show(5)
# df.filter(df.Close<500).show(5)

+-------------------+----------+----------+------------------+------------------+---------+------------------+
|               Date|      Open|      High|               Low|             Close|   Volume|         Adj Close|
+-------------------+----------+----------+------------------+------------------+---------+------------------+
|2010-01-04 00:00:00|213.429998|214.499996|212.38000099999996|        214.009998|123432400|         27.727039|
|2010-01-05 00:00:00|214.599998|215.589994|        213.249994|        214.379993|150476200|27.774976000000002|
|2010-01-06 00:00:00|214.379993|    215.23|        210.750004|        210.969995|138040000|27.333178000000004|
|2010-01-07 00:00:00|    211.75|212.000006|        209.050005|            210.58|119282800|          27.28265|
|2010-01-08 00:00:00|210.299994|212.000006|209.06000500000002|211.98000499999998|111902700|         27.464034|
+-------------------+----------+----------+------------------+------------------+---------+------------------+
o

In [43]:
# Using SQL with .select()
df.filter("Close<500").select('Open').show(5)

+----------+
|      Open|
+----------+
|213.429998|
|214.599998|
|214.379993|
|    211.75|
|210.299994|
+----------+
only showing top 5 rows



In [44]:
# Using SQL with .select()
df.filter("Close<500").select(['Open','Close']).show(5)

+----------+------------------+
|      Open|             Close|
+----------+------------------+
|213.429998|        214.009998|
|214.599998|        214.379993|
|214.379993|        210.969995|
|    211.75|            210.58|
|210.299994|211.98000499999998|
+----------+------------------+
only showing top 5 rows



Using normal python comparison operators is another way to do this, they will look very similar to SQL operators, except you need to make sure you are calling the entire column within the dataframe, using the format: df["column name"]

Let's see some examples:

In [45]:
df.filter(df["Close"] < 200).show(5)

+-------------------+------------------+----------+------------------+----------+---------+------------------+
|               Date|              Open|      High|               Low|     Close|   Volume|         Adj Close|
+-------------------+------------------+----------+------------------+----------+---------+------------------+
|2010-01-22 00:00:00|206.78000600000001|207.499996|            197.16|    197.75|220441900|         25.620401|
|2010-01-28 00:00:00|        204.930004|205.500004|        198.699995|199.289995|293375600|25.819922000000002|
|2010-01-29 00:00:00|        201.079996|202.199995|        190.250002|192.060003|311488100|         24.883208|
|2010-02-01 00:00:00|192.36999699999998|     196.0|191.29999899999999|194.729998|187469100|         25.229131|
|2010-02-02 00:00:00|        195.909998|196.319994|193.37999299999998|195.859997|174585600|25.375532999999997|
+-------------------+------------------+----------+------------------+----------+---------+------------------+
o

In [46]:
# Will produce an error, make sure to read the error!
df.filter(df["Close"] < 200 and df['Open'] > 200).show()

ValueError: Cannot convert column into bool: please use '&' for 'and', '|' for 'or', '~' for 'not' when building DataFrame boolean expressions.

In [49]:
# Make sure to add in the parenthesis separating the statements!
df.filter( (df["Close"] < 200) & (df['Open'] > 200) ).show()

+-------------------+------------------+----------+----------+----------+---------+------------------+
|               Date|              Open|      High|       Low|     Close|   Volume|         Adj Close|
+-------------------+------------------+----------+----------+----------+---------+------------------+
|2010-01-22 00:00:00|206.78000600000001|207.499996|    197.16|    197.75|220441900|         25.620401|
|2010-01-28 00:00:00|        204.930004|205.500004|198.699995|199.289995|293375600|25.819922000000002|
|2010-01-29 00:00:00|        201.079996|202.199995|190.250002|192.060003|311488100|         24.883208|
+-------------------+------------------+----------+----------+----------+---------+------------------+



In [50]:
# Make sure to add in the parenthesis separating the statements!
df.filter( (df["Close"] < 200) | (df['Open'] > 200) ).show(5)

+-------------------+----------+----------+------------------+------------------+---------+------------------+
|               Date|      Open|      High|               Low|             Close|   Volume|         Adj Close|
+-------------------+----------+----------+------------------+------------------+---------+------------------+
|2010-01-04 00:00:00|213.429998|214.499996|212.38000099999996|        214.009998|123432400|         27.727039|
|2010-01-05 00:00:00|214.599998|215.589994|        213.249994|        214.379993|150476200|27.774976000000002|
|2010-01-06 00:00:00|214.379993|    215.23|        210.750004|        210.969995|138040000|27.333178000000004|
|2010-01-07 00:00:00|    211.75|212.000006|        209.050005|            210.58|119282800|          27.28265|
|2010-01-08 00:00:00|210.299994|212.000006|209.06000500000002|211.98000499999998|111902700|         27.464034|
+-------------------+----------+----------+------------------+------------------+---------+------------------+
o

In [51]:
# Make sure to add in the parenthesis separating the statements!
df.filter( (df["Close"] < 200) & ~(df['Open'] < 200) ).show()

+-------------------+------------------+----------+----------+----------+---------+------------------+
|               Date|              Open|      High|       Low|     Close|   Volume|         Adj Close|
+-------------------+------------------+----------+----------+----------+---------+------------------+
|2010-01-22 00:00:00|206.78000600000001|207.499996|    197.16|    197.75|220441900|         25.620401|
|2010-01-28 00:00:00|        204.930004|205.500004|198.699995|199.289995|293375600|25.819922000000002|
|2010-01-29 00:00:00|        201.079996|202.199995|190.250002|192.060003|311488100|         24.883208|
+-------------------+------------------+----------+----------+----------+---------+------------------+



In [52]:
df.filter(df["Low"] == 197.16).show()

+-------------------+------------------+----------+------+------+---------+---------+
|               Date|              Open|      High|   Low| Close|   Volume|Adj Close|
+-------------------+------------------+----------+------+------+---------+---------+
|2010-01-22 00:00:00|206.78000600000001|207.499996|197.16|197.75|220441900|25.620401|
+-------------------+------------------+----------+------+------+---------+---------+



In [53]:
# Collecting results as Python objects
df.filter(df["Low"] == 197.16).collect()

[Row(Date=datetime.datetime(2010, 1, 22, 0, 0), Open=206.78000600000001, High=207.499996, Low=197.16, Close=197.75, Volume=220441900, Adj Close=25.620401)]

In [54]:
result = df.filter(df["Low"] == 197.16).collect()

In [55]:
# Note the nested structure returns a nested row object
type(result[0])

pyspark.sql.types.Row

In [56]:
row = result[0]

In [74]:
type(result[0])

pyspark.sql.types.Row

Rows can be called to turn into dictionaries

In [67]:
row

Row(Date=datetime.datetime(2010, 1, 22, 0, 0), Open=206.78000600000001, High=207.499996, Low=197.16, Close=197.75, Volume=220441900, Adj Close=25.620401)

In [69]:
row.count(197.75)

1

In [68]:
row.index(197.75)

4

In [70]:
type(result[0])

pyspark.sql.types.Row

In [85]:
row.asDict().values()

dict_values([207.499996, 197.16, 25.620401, 220441900, 197.75, 206.78000600000001, datetime.datetime(2010, 1, 22, 0, 0)])

In [89]:
for item in result[0]:
#     print(type(item))
    print(item)

2010-01-22 00:00:00
206.78000600000001
207.499996
197.16
197.75
220441900
25.620401


That is all for now Great Job!

In [80]:
for item in row.asDict():
#     print(type(item))
    print(item)

High
Low
Adj Close
Volume
Close
Open
Date


In [87]:
for item in row.asDict().keys():
#     print(type(item))
    print(item)

High
Low
Adj Close
Volume
Close
Open
Date


In [86]:
for item in row.asDict().values():
#     print(type(item))
    print(item)

207.499996
197.16
25.620401
220441900
197.75
206.78000600000001
2010-01-22 00:00:00
