# Spark dataframe Basic operations

In [4]:
import findspark
findspark.init('/home/appzop2/spark-2.4.0-bin-hadoop2.7/')

In [6]:
from pyspark.sql import SparkSession

In [7]:
spark = SparkSession.builder.appName('ops').getOrCreate()

## read from csv
header=True means first row of csv as column names or schema

In [8]:
df = spark.read.csv('apple.csv',inferSchema=True,header=True)

In [10]:
df.printSchema()

root
 |-- Date: timestamp (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



In [11]:
df.show()

+-------------------+------+------+------+------+--------+---------+
|               Date|  Open|  High|   Low| Close|  Volume|Adj Close|
+-------------------+------+------+------+------+--------+---------+
|2008-10-14 00:00:00|116.26| 116.4|103.14|104.08|70749800|   104.08|
|2008-10-13 00:00:00|104.55|110.53|101.02|110.26|54967000|   110.26|
|2008-10-10 00:00:00|  85.7| 100.0|  85.0|  96.8|79260700|     96.8|
|2008-10-09 00:00:00| 93.35|  95.8|  86.6| 88.74|57763700|    88.74|
|2008-10-08 00:00:00| 85.91| 96.33| 85.68| 89.79|78847900|    89.79|
|2008-10-07 00:00:00|100.48| 101.5| 88.95| 89.16|67099000|    89.16|
|2008-10-06 00:00:00| 91.96| 98.78| 87.54| 98.14|75264900|    98.14|
|2008-10-03 00:00:00| 104.0| 106.5| 94.65| 97.07|81942800|    97.07|
|2008-10-02 00:00:00|108.01|108.79| 100.0| 100.1|57477300|    100.1|
|2008-10-01 00:00:00|111.92|112.36|107.39|109.12|46303000|   109.12|
|2008-09-30 00:00:00|108.25| 115.0| 106.3|113.66|58095800|   113.66|
|2008-09-29 00:00:00|119.62|119.68

## read first 3 rows as a list

In [39]:
df.head(3)

[Row(Date=datetime.datetime(2008, 10, 14, 0, 0), Open=116.26, High=116.4, Low=103.14, Close=104.08, Volume=70749800, Adj Close=104.08),
 Row(Date=datetime.datetime(2008, 10, 13, 0, 0), Open=104.55, High=110.53, Low=101.02, Close=110.26, Volume=54967000, Adj Close=110.26),
 Row(Date=datetime.datetime(2008, 10, 10, 0, 0), Open=85.7, High=100.0, Low=85.0, Close=96.8, Volume=79260700, Adj Close=96.8)]

In [40]:
df.head(3)[0]

Row(Date=datetime.datetime(2008, 10, 14, 0, 0), Open=116.26, High=116.4, Low=103.14, Close=104.08, Volume=70749800, Adj Close=104.08)

## filter
used to filter out data in dataframe

In [23]:
df.filter("Close < 100").select(['Open','Close']).show()

+------+-----+
|  Open|Close|
+------+-----+
|  85.7| 96.8|
| 93.35|88.74|
| 85.91|89.79|
|100.48|89.16|
| 91.96|98.14|
| 104.0|97.07|
| 99.59|99.47|
|100.09| 99.8|
| 98.18|99.92|
|101.58|98.84|
| 94.23|95.35|
| 93.96|93.24|
| 91.59|93.51|
| 90.89|90.97|
| 90.19|90.27|
| 90.16| 90.4|
|  92.0|90.35|
| 90.57|91.43|
|  90.9|90.24|
| 92.04|92.19|
+------+-----+
only showing top 20 rows



In [25]:
#df.filter( (df['Close'] < 200) & ~(df['Open']>200) ).show()

df.filter( (df['Close'] < 200) & (df['Open']>200) ).show()

+-------------------+------+------+------+------+--------+---------+
|               Date|  Open|  High|   Low| Close|  Volume|Adj Close|
+-------------------+------+------+------+------+--------+---------+
|2007-12-28 00:00:00|200.59|201.56|196.88|199.83|24987400|   199.83|
+-------------------+------+------+------+------+--------+---------+



In [26]:
df.filter(df['Low'] == 100.0).show()

+-------------------+------+------+-----+------+--------+---------+
|               Date|  Open|  High|  Low| Close|  Volume|Adj Close|
+-------------------+------+------+-----+------+--------+---------+
|2008-10-02 00:00:00|108.01|108.79|100.0| 100.1|57477300|    100.1|
|2000-02-01 00:00:00| 104.0| 105.0|100.0|100.25|11380000|    25.06|
+-------------------+------+------+-----+------+--------+---------+



## collect

Collect (Action) - Return all the elements of the dataset as an array at the driver program. <b>Collect is usually useful after a filter or other operation that returns a sufficiently small subset of the data.</b>

<b>show()</b> simply projects data onto the screen while <b>collect returns all the rows in a dataset</b> which is quite dangerous if you have a large dataset, use it on small datasets after a filter...


In [27]:
result = df.filter(df['Low'] == 100.0).collect()

In [28]:
result

[Row(Date=datetime.datetime(2008, 10, 2, 0, 0), Open=108.01, High=108.79, Low=100.0, Close=100.1, Volume=57477300, Adj Close=100.1),
 Row(Date=datetime.datetime(2000, 2, 1, 0, 0), Open=104.0, High=105.0, Low=100.0, Close=100.25, Volume=11380000, Adj Close=25.06)]

In [29]:
result[0]

Row(Date=datetime.datetime(2008, 10, 2, 0, 0), Open=108.01, High=108.79, Low=100.0, Close=100.1, Volume=57477300, Adj Close=100.1)

In [31]:
result[0]['Volume']

57477300

## convert to dictionary

In [35]:
row = result[0].asDict()
row

{'Date': datetime.datetime(2008, 10, 2, 0, 0),
 'Open': 108.01,
 'High': 108.79,
 'Low': 100.0,
 'Close': 100.1,
 'Volume': 57477300,
 'Adj Close': 100.1}

In [38]:
#result[0].asDict()['Volume']
row['Volume']

57477300