# Data frames basic operations

In [1]:
import findspark
findspark.init('/home/fernando/spark-2.4.6-bin-hadoop2.7')
import pyspark

In [2]:
from pyspark.sql import SparkSession

In [3]:
%%time
spark = SparkSession.builder.appName('ops').getOrCreate()

CPU times: user 39.3 ms, sys: 11.9 ms, total: 51.2 ms
Wall time: 8.26 s


In [6]:
df = spark.read.csv('DATA/appl_stock.csv', inferSchema=True, header=True)

In [7]:
df.printSchema()

root
 |-- Date: timestamp (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



In [14]:
df.show()

+-------------------+------------------+------------------+------------------+------------------+---------+------------------+
|               Date|              Open|              High|               Low|             Close|   Volume|         Adj Close|
+-------------------+------------------+------------------+------------------+------------------+---------+------------------+
|2010-01-04 00:00:00|        213.429998|        214.499996|212.38000099999996|        214.009998|123432400|         27.727039|
|2010-01-05 00:00:00|        214.599998|        215.589994|        213.249994|        214.379993|150476200|27.774976000000002|
|2010-01-06 00:00:00|        214.379993|            215.23|        210.750004|        210.969995|138040000|27.333178000000004|
|2010-01-07 00:00:00|            211.75|        212.000006|        209.050005|            210.58|119282800|          27.28265|
|2010-01-08 00:00:00|        210.299994|        212.000006|209.06000500000002|211.98000499999998|111902700|    

In [17]:
%%time
df.count()

CPU times: user 1.6 ms, sys: 150 µs, total: 1.75 ms
Wall time: 406 ms


1762

In [18]:
%%time
df.describe().show()

+-------+------------------+------------------+------------------+-----------------+-------------------+------------------+
|summary|              Open|              High|               Low|            Close|             Volume|         Adj Close|
+-------+------------------+------------------+------------------+-----------------+-------------------+------------------+
|  count|              1762|              1762|              1762|             1762|               1762|              1762|
|   mean| 313.0763111589103| 315.9112880164581| 309.8282405079457|312.9270656379113|9.422577587968218E7| 75.00174115607275|
| stddev|185.29946803981522|186.89817686485767|183.38391664371008|185.1471036170943|6.020518776592709E7| 28.57492972179906|
|    min|              90.0|         90.699997|         89.470001|        90.279999|           11475900|         24.881912|
|    max|        702.409988|        705.070023|        699.569977|       702.100021|          470249500|127.96609099999999|
+-------

In [23]:
df.head(3)[0]

Row(Date=datetime.datetime(2010, 1, 4, 0, 0), Open=213.429998, High=214.499996, Low=212.38000099999996, Close=214.009998, Volume=123432400, Adj Close=27.727039)

## Using SQL

In [26]:
df.filter('Close < 500').select('Open').show()

+------------------+
|              Open|
+------------------+
|        213.429998|
|        214.599998|
|        214.379993|
|            211.75|
|        210.299994|
|212.79999700000002|
|209.18999499999998|
|        207.870005|
|210.11000299999998|
|210.92999500000002|
|        208.330002|
|        214.910006|
|        212.079994|
|206.78000600000001|
|202.51000200000001|
|205.95000100000001|
|        206.849995|
|        204.930004|
|        201.079996|
|192.36999699999998|
+------------------+
only showing top 20 rows



In [27]:
df.filter('Close < 500').select(['Open','Close']).show()

+------------------+------------------+
|              Open|             Close|
+------------------+------------------+
|        213.429998|        214.009998|
|        214.599998|        214.379993|
|        214.379993|        210.969995|
|            211.75|            210.58|
|        210.299994|211.98000499999998|
|212.79999700000002|210.11000299999998|
|209.18999499999998|        207.720001|
|        207.870005|        210.650002|
|210.11000299999998|            209.43|
|210.92999500000002|            205.93|
|        208.330002|        215.039995|
|        214.910006|            211.73|
|        212.079994|        208.069996|
|206.78000600000001|            197.75|
|202.51000200000001|        203.070002|
|205.95000100000001|        205.940001|
|        206.849995|        207.880005|
|        204.930004|        199.289995|
|        201.079996|        192.060003|
|192.36999699999998|        194.729998|
+------------------+------------------+
only showing top 20 rows



### More pythonic

In [28]:
df.filter(df['Close'] < 500).show()

+-------------------+------------------+------------------+------------------+------------------+---------+------------------+
|               Date|              Open|              High|               Low|             Close|   Volume|         Adj Close|
+-------------------+------------------+------------------+------------------+------------------+---------+------------------+
|2010-01-04 00:00:00|        213.429998|        214.499996|212.38000099999996|        214.009998|123432400|         27.727039|
|2010-01-05 00:00:00|        214.599998|        215.589994|        213.249994|        214.379993|150476200|27.774976000000002|
|2010-01-06 00:00:00|        214.379993|            215.23|        210.750004|        210.969995|138040000|27.333178000000004|
|2010-01-07 00:00:00|            211.75|        212.000006|        209.050005|            210.58|119282800|          27.28265|
|2010-01-08 00:00:00|        210.299994|        212.000006|209.06000500000002|211.98000499999998|111902700|    

In [29]:
df.filter(df['Close'] < 500).select(['Open', 'Close']).show()

+------------------+------------------+
|              Open|             Close|
+------------------+------------------+
|        213.429998|        214.009998|
|        214.599998|        214.379993|
|        214.379993|        210.969995|
|            211.75|            210.58|
|        210.299994|211.98000499999998|
|212.79999700000002|210.11000299999998|
|209.18999499999998|        207.720001|
|        207.870005|        210.650002|
|210.11000299999998|            209.43|
|210.92999500000002|            205.93|
|        208.330002|        215.039995|
|        214.910006|            211.73|
|        212.079994|        208.069996|
|206.78000600000001|            197.75|
|202.51000200000001|        203.070002|
|205.95000100000001|        205.940001|
|        206.849995|        207.880005|
|        204.930004|        199.289995|
|        201.079996|        192.060003|
|192.36999699999998|        194.729998|
+------------------+------------------+
only showing top 20 rows



In [32]:
df.filter((df['Close'] < 200) & (df['Open'] > 200)).show()

+-------------------+------------------+----------+----------+----------+---------+------------------+
|               Date|              Open|      High|       Low|     Close|   Volume|         Adj Close|
+-------------------+------------------+----------+----------+----------+---------+------------------+
|2010-01-22 00:00:00|206.78000600000001|207.499996|    197.16|    197.75|220441900|         25.620401|
|2010-01-28 00:00:00|        204.930004|205.500004|198.699995|199.289995|293375600|25.819922000000002|
|2010-01-29 00:00:00|        201.079996|202.199995|190.250002|192.060003|311488100|         24.883208|
+-------------------+------------------+----------+----------+----------+---------+------------------+



In [33]:
df.filter((df['Close'] < 200) & (df['Open'] > 200)).select(['Open','Close']).show()

+------------------+----------+
|              Open|     Close|
+------------------+----------+
|206.78000600000001|    197.75|
|        204.930004|199.289995|
|        201.079996|192.060003|
+------------------+----------+



open NOT greater than 200

In [34]:
df.filter((df['Close'] < 200) & ~(df['Open'] > 200)).select(['Open','Close']).show()

+------------------+------------------+
|              Open|             Close|
+------------------+------------------+
|192.36999699999998|        194.729998|
|        195.909998|        195.859997|
|        195.169994|        199.229994|
|        196.730003|        192.050003|
|192.63000300000002|        195.460001|
|        195.690006|194.11999699999998|
|        196.419996|196.19000400000002|
|        195.889997|195.12000700000002|
|        194.880001|        198.669994|
|        199.999998|        197.059998|
|         92.699997|         93.699997|
|         94.730003|             94.25|
|         94.129997|         93.860001|
|         94.040001|         92.290001|
|         92.199997|         91.279999|
|         91.510002|         92.199997|
|         92.309998| 92.08000200000001|
|         92.269997|             92.18|
|         92.290001|         91.860001|
|         91.849998|         90.910004|
+------------------+------------------+
only showing top 20 rows



In [36]:
# Not so nice
df.filter(df['Low'] == 197.16).show()

+-------------------+------------------+----------+------+------+---------+---------+
|               Date|              Open|      High|   Low| Close|   Volume|Adj Close|
+-------------------+------------------+----------+------+------+---------+---------+
|2010-01-22 00:00:00|206.78000600000001|207.499996|197.16|197.75|220441900|25.620401|
+-------------------+------------------+----------+------+------+---------+---------+



In [38]:
result = df.filter(df['Low'] == 197.16).collect()
result

[Row(Date=datetime.datetime(2010, 1, 22, 0, 0), Open=206.78000600000001, High=207.499996, Low=197.16, Close=197.75, Volume=220441900, Adj Close=25.620401)]

In [41]:
row = result[0]
row

Row(Date=datetime.datetime(2010, 1, 22, 0, 0), Open=206.78000600000001, High=207.499996, Low=197.16, Close=197.75, Volume=220441900, Adj Close=25.620401)

In [42]:
row.asDict()

{'Date': datetime.datetime(2010, 1, 22, 0, 0),
 'Open': 206.78000600000001,
 'High': 207.499996,
 'Low': 197.16,
 'Close': 197.75,
 'Volume': 220441900,
 'Adj Close': 25.620401}

In [43]:
row.asDict()['Volume']

220441900

In [48]:
res = df.filter('Close < 500').select(['Open','Close']).collect()
res

[Row(Open=213.429998, Close=214.009998),
 Row(Open=214.599998, Close=214.379993),
 Row(Open=214.379993, Close=210.969995),
 Row(Open=211.75, Close=210.58),
 Row(Open=210.299994, Close=211.98000499999998),
 Row(Open=212.79999700000002, Close=210.11000299999998),
 Row(Open=209.18999499999998, Close=207.720001),
 Row(Open=207.870005, Close=210.650002),
 Row(Open=210.11000299999998, Close=209.43),
 Row(Open=210.92999500000002, Close=205.93),
 Row(Open=208.330002, Close=215.039995),
 Row(Open=214.910006, Close=211.73),
 Row(Open=212.079994, Close=208.069996),
 Row(Open=206.78000600000001, Close=197.75),
 Row(Open=202.51000200000001, Close=203.070002),
 Row(Open=205.95000100000001, Close=205.940001),
 Row(Open=206.849995, Close=207.880005),
 Row(Open=204.930004, Close=199.289995),
 Row(Open=201.079996, Close=192.060003),
 Row(Open=192.36999699999998, Close=194.729998),
 Row(Open=195.909998, Close=195.859997),
 Row(Open=195.169994, Close=199.229994),
 Row(Open=196.730003, Close=192.050003),
 

In [49]:
res[0]

Row(Open=213.429998, Close=214.009998)

In [50]:
res[0].asDict()

{'Open': 213.429998, 'Close': 214.009998}

In [57]:
res = df.filter('Close < 500').select(['Open','Close'])
res

DataFrame[Open: double, Close: double]

# As Dictionary
[stackoverflow](https://stackoverflow.com/questions/49432167/how-to-convert-rows-into-dictionary-in-pyspark)

In [58]:
res_dict = res.rdd.map(lambda row : {row[0]:row[1]}).collect()

In [59]:
res_dict

[{213.429998: 214.009998},
 {214.599998: 214.379993},
 {214.379993: 210.969995},
 {211.75: 210.58},
 {210.299994: 211.98000499999998},
 {212.79999700000002: 210.11000299999998},
 {209.18999499999998: 207.720001},
 {207.870005: 210.650002},
 {210.11000299999998: 209.43},
 {210.92999500000002: 205.93},
 {208.330002: 215.039995},
 {214.910006: 211.73},
 {212.079994: 208.069996},
 {206.78000600000001: 197.75},
 {202.51000200000001: 203.070002},
 {205.95000100000001: 205.940001},
 {206.849995: 207.880005},
 {204.930004: 199.289995},
 {201.079996: 192.060003},
 {192.36999699999998: 194.729998},
 {195.909998: 195.859997},
 {195.169994: 199.229994},
 {196.730003: 192.050003},
 {192.63000300000002: 195.460001},
 {195.690006: 194.11999699999998},
 {196.419996: 196.19000400000002},
 {195.889997: 195.12000700000002},
 {194.880001: 198.669994},
 {198.109995: 200.37999299999998},
 {201.940002: 203.399996},
 {204.190001: 202.550003},
 {201.629995: 202.929998},
 {201.860001: 201.669996},
 {202.339998: