# Spark DataFrame basic operations
### Dr. Tirthajyoti Sarkar, Fremont, CA, July 2020 (updated)
In this notebook, we go through basic operations that can be performed with a Spark DataFrame object. We will use a .CSV file of stock prices to illustrate the various useful methods that are associated with a Spark DataFrame.
---
https://github.com/tirthajyoti/Spark-with-Python

In [2]:
from pyspark.sql import SparkSession

### A `SparkSession` app, reading CSV, and the Schema

In [3]:
spark1 = SparkSession.builder.appName('Ops').getOrCreate()

Read the Apple stock CSV file. Note we have an option of inferring the schema for CSV. We also have the option to set `header` to `True`.

In [4]:
df = spark1.read.csv('../datasets/appl_stock.csv',inferSchema=True,header=True)

Print the schema

In [5]:
df.printSchema()

root
 |-- Date: date (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



Show the DataFrame

In [6]:
df.show()

+----------+------------------+------------------+------------------+------------------+---------+------------------+
|      Date|              Open|              High|               Low|             Close|   Volume|         Adj Close|
+----------+------------------+------------------+------------------+------------------+---------+------------------+
|2010-01-04|        213.429998|        214.499996|212.38000099999996|        214.009998|123432400|         27.727039|
|2010-01-05|        214.599998|        215.589994|        213.249994|        214.379993|150476200|27.774976000000002|
|2010-01-06|        214.379993|            215.23|        210.750004|        210.969995|138040000|27.333178000000004|
|2010-01-07|            211.75|        212.000006|        209.050005|            210.58|119282800|          27.28265|
|2010-01-08|        210.299994|        212.000006|209.06000500000002|211.98000499999998|111902700|         27.464034|
|2010-01-11|212.79999700000002|        213.000002|      

---

### Show the columns

The `columns` attribute returns a simple Python list of column names!

In [7]:
df.columns

['Date', 'Open', 'High', 'Low', 'Close', 'Volume', 'Adj Close']

---

### Spark DataFrames have separate `Column` and `Row` types 
Instead of generic Pandas series, Spark DataFrames have separate `Column` and `Row` types. This is ensure thet can handle special data files.

In [9]:
type(df)

pyspark.sql.dataframe.DataFrame

In [10]:
type(df['High'])

pyspark.sql.column.Column

In [11]:
type(df.head(2)[0])

pyspark.sql.types.Row

---

### The `select` method to select particular columns

We can use this method to actually select the DataFrame columns and see them. Note that we still have to use the `show` method to actually output the data.

In [12]:
df.select('High')

DataFrame[High: double]

In [13]:
df.select('High').show()

+------------------+
|              High|
+------------------+
|        214.499996|
|        215.589994|
|            215.23|
|        212.000006|
|        212.000006|
|        213.000002|
|209.76999500000002|
|210.92999500000002|
|210.45999700000002|
|211.59999700000003|
|215.18999900000003|
|        215.549994|
|213.30999599999998|
|        207.499996|
|        204.699999|
|        213.710005|
|            210.58|
|        205.500004|
|        202.199995|
|             196.0|
+------------------+
only showing top 20 rows



In [14]:
df.select(['High','Close']).show()

+------------------+------------------+
|              High|             Close|
+------------------+------------------+
|        214.499996|        214.009998|
|        215.589994|        214.379993|
|            215.23|        210.969995|
|        212.000006|            210.58|
|        212.000006|211.98000499999998|
|        213.000002|210.11000299999998|
|209.76999500000002|        207.720001|
|210.92999500000002|        210.650002|
|210.45999700000002|            209.43|
|211.59999700000003|            205.93|
|215.18999900000003|        215.039995|
|        215.549994|            211.73|
|213.30999599999998|        208.069996|
|        207.499996|            197.75|
|        204.699999|        203.070002|
|        213.710005|        205.940001|
|            210.58|        207.880005|
|        205.500004|        199.289995|
|        202.199995|        192.060003|
|             196.0|        194.729998|
+------------------+------------------+
only showing top 20 rows



---

### The `limit` method to take first few rows (without any collection)

Applying `limit()` to the DataFrame will result in a new Dataframe. This is a transformation and does not perform collecting the data. Other similar methods like `take` and `head` result in an Array of Rows i.e. a Scala Collection Object like `scala.collection.immutable.Array` (which is transformed to Python list while using the PySpark API).

In [None]:
df.limit(5)

In [None]:
df.limit(5).show()

---

### The `head()` and the `asDict()` methods

In [15]:
df.head(2)

[Row(Date=datetime.date(2010, 1, 4), Open=213.429998, High=214.499996, Low=212.38000099999996, Close=214.009998, Volume=123432400, Adj Close=27.727039),
 Row(Date=datetime.date(2010, 1, 5), Open=214.599998, High=215.589994, Low=213.249994, Close=214.379993, Volume=150476200, Adj Close=27.774976000000002)]

In [16]:
dict1=df.head(2)[0].asDict()

In [17]:
dict1

{'Date': datetime.date(2010, 1, 4),
 'Open': 213.429998,
 'High': 214.499996,
 'Low': 212.38000099999996,
 'Close': 214.009998,
 'Volume': 123432400,
 'Adj Close': 27.727039}

---

### The `count` method - number of rows

In [18]:
df.count()

1762

---

### Coutning rows with distinct values - `distinct` method

To demonstrate this method, we read in another small CSV file.

In [21]:
df2 = spark1.read.csv('../datasets/sparkbyexamples/sales_info.csv',inferSchema=True,header=True)
df2.show()

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|   GOOG|    Sam|200.0|
|   GOOG|Charlie|120.0|
|   GOOG|  Frank|340.0|
|   MSFT|   Tina|600.0|
|   MSFT|    Amy|124.0|
|   MSFT|Vanessa|243.0|
|     FB|   Carl|870.0|
|     FB|  Sarah|350.0|
|   APPL|   John|250.0|
|   APPL|  Linda|130.0|
|   APPL|   Mike|750.0|
|   APPL|  Chris|350.0|
+-------+-------+-----+



In [22]:
df2.select('Company').distinct().show()

+-------+
|Company|
+-------+
|   APPL|
|   GOOG|
|     FB|
|   MSFT|
+-------+



In [23]:
df2.select('Company').distinct().count()

4

---

### Random sampling using the `sample` method

In [24]:
df.sample(withReplacement=False,
          fraction = 0.005,
          seed = 101).show()

+----------+------------------+------------------+------------------+------------------+---------+------------------+
|      Date|              Open|              High|               Low|             Close|   Volume|         Adj Close|
+----------+------------------+------------------+------------------+------------------+---------+------------------+
|2010-02-03|        195.169994|        200.200003|        194.420004|        199.229994|153832000|25.812148999999998|
|2010-05-05|         253.03001|258.14000699999997|248.72999199999998|255.98999799999999|220775800|         33.165949|
|2010-09-20|         276.07999|        283.780006|275.84999799999997|        283.230007|164669400|36.695153000000005|
|2010-10-19|303.40000200000003|        313.770012|300.02000400000003|309.48999399999997|308196000|40.097384999999996|
|2010-10-21|312.35999300000003|314.73999399999997|        306.799999|        309.520008|137865000|         40.101273|
|2011-03-22|342.55998999999997|        342.619991|      

---

### Inserting new column using `withColumn` method

We can insert a new column into the DataFrame using the `withColumn()` method. It is mostly used when we are doing some transformartion of the data using one or more columns. Following is a simple example where we are calculating the range of the stock price (difference between the `High` and `Low` columns).

Also note how we are **chaining the methods** - a `withColumn`, followed by a `limit`, followed by a `select` to show only the columns impacted (in a certain order), and finally followed by a `show` to see the final result of only first five rows.

In [25]:
df.withColumn('Range',df['High']-df['Low']).limit(5).select(['High','Low','Range']).show()

+----------+------------------+------------------+
|      High|               Low|             Range|
+----------+------------------+------------------+
|214.499996|212.38000099999996|2.1199950000000456|
|215.589994|        213.249994|2.3400000000000034|
|    215.23|        210.750004|          4.479996|
|212.000006|        209.050005|2.9500010000000145|
|212.000006|209.06000500000002| 2.940000999999995|
+----------+------------------+------------------+



This is **not an _in-place_ operation** i.e. the original DataFrame remians unchanged.

---

### Renaming a column using `withColumnRenamed` method

In [26]:
df3 = df.withColumn('Range',df['High']-df['Low']).limit(5).select(['High','Low','Range']) 

In [27]:
df3.show()

+----------+------------------+------------------+
|      High|               Low|             Range|
+----------+------------------+------------------+
|214.499996|212.38000099999996|2.1199950000000456|
|215.589994|        213.249994|2.3400000000000034|
|    215.23|        210.750004|          4.479996|
|212.000006|        209.050005|2.9500010000000145|
|212.000006|209.06000500000002| 2.940000999999995|
+----------+------------------+------------------+



In [29]:
# Just pass the old and new column names
df3 = df3.withColumnRenamed('Range','Min-to-Max')
df3.show()

+----------+------------------+------------------+
|      High|               Low|        Min-to-Max|
+----------+------------------+------------------+
|214.499996|212.38000099999996|2.1199950000000456|
|215.589994|        213.249994|2.3400000000000034|
|    215.23|        210.750004|          4.479996|
|212.000006|        209.050005|2.9500010000000145|
|212.000006|209.06000500000002| 2.940000999999995|
+----------+------------------+------------------+



---

### Filtering Operations - using either `filter()` or `where()` method

Filtering can be done with **SQL-like syntax** or **Pythonic** way. We show both examples.

Pass on a SQL syntax to the `filter()` method

In [28]:
df.filter("Close < 500").show(5)

+----------+----------+----------+------------------+------------------+---------+------------------+
|      Date|      Open|      High|               Low|             Close|   Volume|         Adj Close|
+----------+----------+----------+------------------+------------------+---------+------------------+
|2010-01-04|213.429998|214.499996|212.38000099999996|        214.009998|123432400|         27.727039|
|2010-01-05|214.599998|215.589994|        213.249994|        214.379993|150476200|27.774976000000002|
|2010-01-06|214.379993|    215.23|        210.750004|        210.969995|138040000|27.333178000000004|
|2010-01-07|    211.75|212.000006|        209.050005|            210.58|119282800|          27.28265|
|2010-01-08|210.299994|212.000006|209.06000500000002|211.98000499999998|111902700|         27.464034|
+----------+----------+----------+------------------+------------------+---------+------------------+
only showing top 5 rows



We can chain methods to see only desired columns

In [31]:
df.filter("Close < 500 AND Open > 500").select(['Date','Open','Close']).show()

+----------+----------+------------------+
|      Date|      Open|             Close|
+----------+----------+------------------+
|2012-02-15|514.259995|        497.669975|
|2013-09-05|500.250008|495.26997400000005|
|2013-09-10|506.199997|494.63999900000005|
|2014-01-30|502.539993|        499.779984|
+----------+----------+------------------+



In [32]:
df.where("Open < 500 AND (Open-Close)> 10").select(['Date','Open','Close']).show(5)

+----------+------------------+------------------+
|      Date|              Open|             Close|
+----------+------------------+------------------+
|2010-07-21|        265.089993|254.23999799999999|
|2011-03-16|        342.000004|         330.01001|
|2011-08-04|        389.410007|        377.369999|
|2011-09-29|        401.919987|        390.570007|
|2011-11-10|397.02999500000004|385.22000499999996|
+----------+------------------+------------------+
only showing top 5 rows



---

### Now we use DataFrame syntax to achieve the same output

In [30]:
df.filter(df['Close']<500).show(5)

+----------+----------+----------+------------------+------------------+---------+------------------+
|      Date|      Open|      High|               Low|             Close|   Volume|         Adj Close|
+----------+----------+----------+------------------+------------------+---------+------------------+
|2010-01-04|213.429998|214.499996|212.38000099999996|        214.009998|123432400|         27.727039|
|2010-01-05|214.599998|215.589994|        213.249994|        214.379993|150476200|27.774976000000002|
|2010-01-06|214.379993|    215.23|        210.750004|        210.969995|138040000|27.333178000000004|
|2010-01-07|    211.75|212.000006|        209.050005|            210.58|119282800|          27.28265|
|2010-01-08|210.299994|212.000006|209.06000500000002|211.98000499999998|111902700|         27.464034|
+----------+----------+----------+------------------+------------------+---------+------------------+
only showing top 5 rows



**If we need to chain multiple conditions together, use `&` for AND and `|` for OR and clearly separate the conditions by putting them inside parantheses.**

In [33]:
df.filter((df['Close']<500) & (df['Open']>500)).select(['Date','Open','Close']).show(5)

+----------+----------+------------------+
|      Date|      Open|             Close|
+----------+----------+------------------+
|2012-02-15|514.259995|        497.669975|
|2013-09-05|500.250008|495.26997400000005|
|2013-09-10|506.199997|494.63999900000005|
|2014-01-30|502.539993|        499.779984|
+----------+----------+------------------+



In [34]:
df.filter((df['Close']<500) & (df['Open']-df['Close']>10)).select(['Date','Open','Close']).show(5)

+----------+------------------+------------------+
|      Date|              Open|             Close|
+----------+------------------+------------------+
|2010-07-21|        265.089993|254.23999799999999|
|2011-03-16|        342.000004|         330.01001|
|2011-08-04|        389.410007|        377.369999|
|2011-09-29|        401.919987|        390.570007|
|2011-11-10|397.02999500000004|385.22000499999996|
+----------+------------------+------------------+
only showing top 5 rows



We can use `==` to compare with an exact value for comparison and `~` for NOT operator

In [35]:
df.filter(df['Low']==197.16).show()

+----------+------------------+----------+------+------+---------+---------+
|      Date|              Open|      High|   Low| Close|   Volume|Adj Close|
+----------+------------------+----------+------+------+---------+---------+
|2010-01-22|206.78000600000001|207.499996|197.16|197.75|220441900|25.620401|
+----------+------------------+----------+------+------+---------+---------+



In [None]:
df.filter((df['Open']<200) & ~(df['Close']>200)).show(5)

---

### Use the `collect` method instead of `show`, to collect the actual data

In [36]:
low_data = df.filter(df['Low']==197.16).collect()

In [37]:
low_data

[Row(Date=datetime.date(2010, 1, 22), Open=206.78000600000001, High=207.499996, Low=197.16, Close=197.75, Volume=220441900, Adj Close=25.620401)]

#### It is still a list. So, grab the 0-index element as a Row object and convert it to a dictionary using `asDict` method

In [38]:
dt = low_data[0]

In [39]:
dt.asDict()

{'Date': datetime.date(2010, 1, 22),
 'Open': 206.78000600000001,
 'High': 207.499996,
 'Low': 197.16,
 'Close': 197.75,
 'Volume': 220441900,
 'Adj Close': 25.620401}

Now, you can do whatever processing you want to do with the dictionary object!

---

### Sorting DataFrame - `sort` and `orderBy` methods

We create a smaller DataFrame from the original one.

In [None]:
df4 = df.select('High','Low').limit(10)
df4.show()

In [None]:
df4.sort("High").show()

By default, it sorts by ascending order. You can change it by passing `ascending=False`

In [None]:
df4.sort("High", ascending=False).show()

You can also use the **`desc` function** imported from the `pyspark.sql.functions` module

In [None]:
from pyspark.sql.functions import desc
df4.sort(desc("High")).show()

You can pass a list to the `ascending` parameter to create custom sorting order on various columns. In the following example, we are sorting in descending order for the `Company` column, while doing an ascending sort on the `Sales` column. We are using the `orderBy` method for this one.

In [None]:
df2.show()

In [None]:
df2.orderBy(["Company","Sales"],ascending=[0,1]).show()

---

### Using functions from the `pyspark.sql.functions` module

We can utilize various utility functions from the `pyspark.sql.functions` module to do transformation on the data and extract insights, look for patterns, build statistical models, etc.

We will just show a couple of examples here, but you are encouraged to explore others. Here is the detailed documentation link: 

http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#module-pyspark.sql.functions

In [None]:
from pyspark.sql.functions import corr, dayofweek, month

Suppose, we want to check ***if there is any correlation between the trade volumes and the range (Max - Min) i.e. intra-day volatility of the stock***.

Here is how we can do that computation. Note the use of two new methods - `agg` which is nothing but an aggregration over the entire DataFrame without forming any specific groups and the `alias` method which is just a way to specify a descriptive name for the column resulting from the application of the `corr` function. We need the `agg` here because we want to aggregrate and throw all the data of the DataFrame at the `corr` function to calculate the [Pearson correlation coefficient](https://en.wikipedia.org/wiki/Pearson_correlation_coefficient) between the `Volume` and `Range` data columns.

In [None]:
df.withColumn('Range',df['High']-df['Low']).agg(corr("Volume","Range").alias('Pearson-Corr')).collect()

So, we do have a fairly strong positive correlation between the `Volume` and `Range` data.

Here is another fun observation! If you stare at the code above a bit longer, you will notice that **we did not produce a standalone `Range` column at all**. We computed it in an ephemeral manner - just enough to calculate the correlation coefficient. The original DataFrame remains unchanged while we got our answer about the positive correlation!

In [None]:
df.show(5)

Next, we apply the `dayofweek` and `month` functions on the `Date` column to produce new columns `Day-of-Week` and `Month`. Note how we chained two `withColumn` methods together for this transformation.

In [None]:
df5=df.withColumn('Day-of-Week',dayofweek('Date')).\
withColumn('Month',month('Date')).\
select(['Day-of-Week','Month','Open','Close'])

df5.show(5)

Now, we have not shown the `groupby()` and aggregration (like average or `mean`) methods yet but if you have a fair understanding of what a `groupby` (followed by an aggregration like `mean`) does, then you can appreciate the following example where we look at the average `Volume` grouped by `Day-of-Week` and `Month` and try to gauge ***which day of the week or month have the highest and lowest average trade volumes***.

The example below is certainly a complex one but you can think this one as an illustration of using some of the most important DataFrame methods we have examined in this Notebook (and some future ones we will look at), in a combined fashion.

- `withColumn`
- `select`
- `orderBy`
- `groupBy`
- `mean` (aggregration)

In [None]:
df5=df.withColumn('Day-of-Week',dayofweek('Date')).\
withColumn('Month',month('Date')).\
select(['Day-of-Week','Month','Volume']).\
groupby(['Day-of-Week','Month']).\
mean().\
select(['Day-of-Week','Month','avg(Volume)']).\
orderBy(['Day-of-Week','Month'])

df5.show(60)