# Spark DataFrame Basic Operations

In [1]:
import findspark
findspark.init('C:\Spark\spark-3.0.1-bin-hadoop2.7')
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName('ops').getOrCreate()

In [3]:
df = spark.read.csv('appl_stock.csv',
                    inferSchema=True,
                    header=True)

In [4]:
df.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



In [5]:
df.show()

+----------+------+------+------+------+--------+---------+
|      Date|  Open|  High|   Low| Close|  Volume|Adj Close|
+----------+------+------+------+------+--------+---------+
|2012-03-30|608.77|610.56|597.94|599.55|26050900|   599.55|
|2012-03-29|612.78|616.56|607.23|609.86|21668300|   609.86|
|2012-03-28|618.38|621.45|610.31|617.62|23385200|   617.62|
|2012-03-27|606.18|616.28|606.06|614.48|21628200|   614.48|
|2012-03-26|599.79|607.15|595.26|606.98|21259900|   606.98|
|2012-03-23|600.49| 601.8| 594.4|596.05|15359900|   596.05|
|2012-03-22|597.78| 604.5|595.53|599.34|22281100|   599.34|
|2012-03-21|602.74|609.65|601.41| 602.5|22958200|    602.5|
|2012-03-20|599.51| 606.9|591.48|605.96|29166500|   605.96|
|2012-03-19|598.37|601.77|589.05| 601.1|32187000|    601.1|
|2012-03-16|584.72| 589.2| 578.0|585.57|29481700|   585.57|
|2012-03-15|599.61|600.01|578.55|585.56|41418500|   585.56|
|2012-03-14|578.05|594.72| 575.4|589.58|50570100|   589.58|
|2012-03-13|557.54|568.18|555.75| 568.1|

In [6]:
df.head(3)[0]

Row(Date='2012-03-30', Open=608.77, High=610.56, Low=597.94, Close=599.55, Volume=26050900, Adj Close=599.55)

In [7]:
df.filter('Close < 500').show(5)

+----------+------+------+------+------+--------+---------+
|      Date|  Open|  High|   Low| Close|  Volume|Adj Close|
+----------+------+------+------+------+--------+---------+
|2012-02-15|514.26|526.29|496.89|497.67|53706600|   497.67|
|2012-02-10|490.96|497.62|488.55|493.42|22523900|   493.42|
|2012-02-09|480.76|496.75|480.56|493.17|31527700|   493.17|
|2012-02-08| 470.5|476.79| 469.7|476.68|14544700|   476.68|
|2012-02-07|465.25|469.75|464.58|468.83|11280600|   468.83|
+----------+------+------+------+------+--------+---------+
only showing top 5 rows



In [8]:
df.filter('Close < 500').select(['Open', 'Close']).show(5)

+------+------+
|  Open| Close|
+------+------+
|514.26|497.67|
|490.96|493.42|
|480.76|493.17|
| 470.5|476.68|
|465.25|468.83|
+------+------+
only showing top 5 rows



In [9]:
df.filter((df['Close'] < 200) & ~(df['Open'] > 200)).show(5)

+----------+------+------+------+------+--------+---------+
|      Date|  Open|  High|   Low| Close|  Volume|Adj Close|
+----------+------+------+------+------+--------+---------+
|2010-02-23| 200.0|201.33|195.71|197.06|20539100|   197.06|
|2010-02-11|194.88|199.75|194.06|198.67|19655200|   198.67|
|2010-02-10|195.89| 196.6|194.26|195.12|13227200|   195.12|
|2010-02-09|196.42| 197.5|194.75|196.19|22603100|   196.19|
|2010-02-08|195.69|197.88| 194.0|194.12|17081100|   194.12|
+----------+------+------+------+------+--------+---------+
only showing top 5 rows



In [10]:
df.createOrReplaceTempView('temp')

In [11]:
results = spark.sql('''
                SELECT * 
                FROM temp
                WHERE Low = 197.16
''')

In [12]:
results.show()

+----------+------+-----+------+------+--------+---------+
|      Date|  Open| High|   Low| Close|  Volume|Adj Close|
+----------+------+-----+------+------+--------+---------+
|2010-01-22|206.78|207.5|197.16|197.75|31491700|   197.75|
+----------+------+-----+------+------+--------+---------+



In [13]:
df.filter(df['Low'] == 197.16).show()

+----------+------+-----+------+------+--------+---------+
|      Date|  Open| High|   Low| Close|  Volume|Adj Close|
+----------+------+-----+------+------+--------+---------+
|2010-01-22|206.78|207.5|197.16|197.75|31491700|   197.75|
+----------+------+-----+------+------+--------+---------+



In [14]:
result = df.filter(df['Low'] == 197.16).collect()
df.filter(df['Low'] == 197.16).collect()

[Row(Date='2010-01-22', Open=206.78, High=207.5, Low=197.16, Close=197.75, Volume=31491700, Adj Close=197.75)]

In [15]:
result

[Row(Date='2010-01-22', Open=206.78, High=207.5, Low=197.16, Close=197.75, Volume=31491700, Adj Close=197.75)]

In [16]:
row = result[0]

In [17]:
row.asDict()

{'Date': '2010-01-22',
 'Open': 206.78,
 'High': 207.5,
 'Low': 197.16,
 'Close': 197.75,
 'Volume': 31491700,
 'Adj Close': 197.75}

In [18]:
row.asDict()['Volume']

31491700