# Spark Dataframes - Basic Operations and actions

### Importing Libraries and functions

In [1]:
import os
from pyspark.sql import SparkSession
from pyspark.sql.types import (IntegerType, StringType, StructField, StructType)
from pyspark.sql.functions import (countDistinct, max, mean, stddev, format_number)
from pyspark.sql.functions import (month, year, hour, dayofmonth, dayofyear, date_format)

### Starting the spark session and creating a Spark application

In [2]:
spark = SparkSession.builder.appName('spark_app_1').getOrCreate()

### Reading in a `JSON` file into a Spark dataframe

In [3]:
os.chdir('..')

#### Automatically reading in a file - Spark assigns its own schema to DF

In [4]:
people_data_file = os.getcwd() + '/data/people.json'
people_df = spark.read.json(people_data_file)
people_df.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



#### Manually assigning a schema to the DF

In [5]:
data_schema = [StructField('age', IntegerType(), True),
              StructField('name', StringType(), True)] 

##StructField (column-name, what type it is, whether field can be Null)
## data_schema defines the schema

final_struc = StructType(fields = data_schema)
people_df = spark.read.json(people_data_file, schema = final_struc)
people_df.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



### Print the structure of a Spark dataframe

In [6]:
people_df.printSchema() ##Function that prints the structure/schema of a DF

root
 |-- age: integer (nullable = true)
 |-- name: string (nullable = true)



### Print the column names of a Spark dataframe

In [7]:
people_df.columns ##Attribute of a dataframe, therefore, no ()

['age', 'name']

### Run summary statistics on a Spark dataframe

In [8]:
people_df.describe()

DataFrame[summary: string, age: string, name: string]

In [9]:
summary = people_df.describe() ## Returns a dataframe that can be stored in a variable
summary.show() 

+-------+------------------+-------+
|summary|               age|   name|
+-------+------------------+-------+
|  count|                 2|      3|
|   mean|              24.5|   null|
| stddev|7.7781745930520225|   null|
|    min|                19|   Andy|
|    max|                30|Michael|
+-------+------------------+-------+



### Querying data from dataframes

#### Selecting columns

In [10]:
## Getting a column
sub = people_df['age'] ## this returns an object of type pyspark.sql.column.Column = just a column
print(sub)
sub = people_df.select('age') ## This selects the age column from the data from a column but returns a dataframe
print(sub)
sub.show()
subset = people_df.select(['age', 'name'])
subset.show()

Column<b'age'>
DataFrame[age: int]
+----+
| age|
+----+
|null|
|  30|
|  19|
+----+

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



#### Selecting Rows

In [11]:
top_2 = people_df.head(2)
print(top_2) ## i.e, head() returns a list of Row objects

## To get values from a row
print(top_2[1]['age']) ## i.e, get row index 0,1... and column name => hybrid indexing numeric and key

[Row(age=None, name='Michael'), Row(age=30, name='Andy')]
30


#### Creating/adding new columns

In [12]:
people_df.withColumn('double_age', people_df['age'] * 2) ## withColumn(new col name, transformation on existing col)
##withColumn returns a new df, the original DF remains unchanged
new_df = people_df.withColumn('double_age', people_df['age'] * 2)
new_df.show()
people_df.show()

+----+-------+----------+
| age|   name|double_age|
+----+-------+----------+
|null|Michael|      null|
|  30|   Andy|        60|
|  19| Justin|        38|
+----+-------+----------+

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



#### Renaming a column

In [13]:
people_df.withColumnRenamed('age', 'customer_age') ## Inplace operation of renaming a col, withColumnRenamed(old name, new name)
people_df.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



#### Using SQL

In [14]:
## Create a new view, i.e, turn the DF to a table
people_df.createOrReplaceTempView('people') ## Creates a table called people and registers it as a Temp View

In [15]:
results = spark.sql("SELECT name FROM people WHERE age > 25") ## SQL Query is the param, returns DF
results.show()

+----+
|name|
+----+
|Andy|
+----+



### Basic operations on a Spark DataFrame

In [16]:
stock_file = os.getcwd() + '/data/appl_stock.csv'
stock_df = spark.read.csv(stock_file, inferSchema = True, header = True)
stock_df.printSchema()

root
 |-- Date: timestamp (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



In [17]:
stock_df.show()

+-------------------+------------------+------------------+------------------+------------------+---------+------------------+
|               Date|              Open|              High|               Low|             Close|   Volume|         Adj Close|
+-------------------+------------------+------------------+------------------+------------------+---------+------------------+
|2010-01-04 00:00:00|        213.429998|        214.499996|212.38000099999996|        214.009998|123432400|         27.727039|
|2010-01-05 00:00:00|        214.599998|        215.589994|        213.249994|        214.379993|150476200|27.774976000000002|
|2010-01-06 00:00:00|        214.379993|            215.23|        210.750004|        210.969995|138040000|27.333178000000004|
|2010-01-07 00:00:00|            211.75|        212.000006|        209.050005|            210.58|119282800|          27.28265|
|2010-01-08 00:00:00|        210.299994|        212.000006|209.06000500000002|211.98000499999998|111902700|    

#### Using `filter`

In [18]:
sub = stock_df.filter("Close > 500").select(['Date', 'Open', 'Close']) ## Filter takes in a SQL where clause statement
## Select can be applied after filter to prune the data set further
sub.show(5)

## Similarly ...
sub = stock_df.filter(stock_df['Close'] > 500).select(['Date', 'Open', 'Close']) ## Filter also takes python conditions
sub.show(5)

## Multiple conditions in filter
sub = stock_df.filter((stock_df['Open'] < 500) & (stock_df['Close'] > 500)).select(['Date', 'Open', 'Close']) ## Each condition muct be wrapped in ()
sub.show()

+-------------------+------------------+------------------+
|               Date|              Open|             Close|
+-------------------+------------------+------------------+
|2012-02-13 00:00:00|        499.529991|502.60002099999997|
|2012-02-14 00:00:00|        504.659988|        509.459991|
|2012-02-16 00:00:00|        491.500008|502.20999900000004|
|2012-02-17 00:00:00|        503.109993|         502.12001|
|2012-02-21 00:00:00|506.88001299999996|        514.850021|
+-------------------+------------------+------------------+
only showing top 5 rows

+-------------------+------------------+------------------+
|               Date|              Open|             Close|
+-------------------+------------------+------------------+
|2012-02-13 00:00:00|        499.529991|502.60002099999997|
|2012-02-14 00:00:00|        504.659988|        509.459991|
|2012-02-16 00:00:00|        491.500008|502.20999900000004|
|2012-02-17 00:00:00|        503.109993|         502.12001|
|2012-02-21 00:

#### Using `collect`

In [19]:
res = stock_df.filter(stock_df['Low'] == 197.16) ## Returns DF for 1 row
print(type(res))

res = stock_df.filter(stock_df['Low'] == 197.16).collect() ## Collect returns the data as a list of Row datatypes
print(type(res)) ## List
print(res) ## List containing Rows
print(res[0]) ## First Row

row_dict = res[0].asDict() ## Can be converted to a dict
print(row_dict['Date'], row_dict['Volume'])

<class 'pyspark.sql.dataframe.DataFrame'>
<class 'list'>
[Row(Date=datetime.datetime(2010, 1, 22, 0, 0), Open=206.78000600000001, High=207.499996, Low=197.16, Close=197.75, Volume=220441900, Adj Close=25.620401)]
Row(Date=datetime.datetime(2010, 1, 22, 0, 0), Open=206.78000600000001, High=207.499996, Low=197.16, Close=197.75, Volume=220441900, Adj Close=25.620401)
2010-01-22 00:00:00 220441900


### Spark DataFrames GroupBy & Aggregate

In [20]:
sales_file = os.getcwd() + '/data/sales_info.csv'
sales_df = spark.read.csv(sales_file, inferSchema = True, header = True)
sales_df.printSchema()

root
 |-- Company: string (nullable = true)
 |-- Person: string (nullable = true)
 |-- Sales: double (nullable = true)



In [21]:
sales_df.show()

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|   GOOG|    Sam|200.0|
|   GOOG|Charlie|120.0|
|   GOOG|  Frank|340.0|
|   MSFT|   Tina|600.0|
|   MSFT|    Amy|124.0|
|   MSFT|Vanessa|243.0|
|     FB|   Carl|870.0|
|     FB|  Sarah|350.0|
|   APPL|   John|250.0|
|   APPL|  Linda|130.0|
|   APPL|   Mike|750.0|
|   APPL|  Chris|350.0|
+-------+-------+-----+



#### GroupBy

In [22]:
print(sales_df.groupBy("Company")) ## GroupedData object
print(sales_df.groupBy("Company").mean()) ## Returns DF
sales_df.groupBy("Company").count().show()

<pyspark.sql.group.GroupedData object at 0x7ff09047ef28>
DataFrame[Company: string, avg(Sales): double]
+-------+-----+
|Company|count|
+-------+-----+
|   APPL|    4|
|   GOOG|    3|
|     FB|    2|
|   MSFT|    3|
+-------+-----+



In [23]:
company_mean = sales_df.groupBy("Company").mean() ## Different functions: min, max, sum, count
company_mean.show()

+-------+-----------------+
|Company|       avg(Sales)|
+-------+-----------------+
|   APPL|            370.0|
|   GOOG|            220.0|
|     FB|            610.0|
|   MSFT|322.3333333333333|
+-------+-----------------+



#### Aggregate

In [24]:
sales_df.agg({'Sales' : 'sum'}).show() ## in agg({variable/column : function}), returns DF

+----------+
|sum(Sales)|
+----------+
|    4327.0|
+----------+



#### GroupBy and Aggregate

In [25]:
group_data = sales_df.groupBy('Company')
max_sales_per_company = group_data.agg({'Sales' : 'max'})
max_sales_per_company.show()

+-------+----------+
|Company|max(Sales)|
+-------+----------+
|   APPL|     750.0|
|   GOOG|     340.0|
|     FB|     870.0|
|   MSFT|     600.0|
+-------+----------+



In [26]:
## Using agg returns a new DF with columns named by the operation performed, 
## we can provide our own name to that col
max_sales_per_company = group_data.agg(max('Sales').alias('max_sales'))
max_sales_per_company.show()

+-------+---------+
|Company|max_sales|
+-------+---------+
|   APPL|    750.0|
|   GOOG|    340.0|
|     FB|    870.0|
|   MSFT|    600.0|
+-------+---------+



In [27]:
sd_sales_per_company = group_data.agg(stddev('Sales').alias('stdev_sales'))
sd_sales_per_company = sd_sales_per_company.select(['Company',
                                                    format_number('stdev_sales', 2).alias('stdev_sales')])
sd_sales_per_company.show()

+-------+-----------+
|Company|stdev_sales|
+-------+-----------+
|   APPL|     268.82|
|   GOOG|     111.36|
|     FB|     367.70|
|   MSFT|     247.72|
+-------+-----------+



#### Ordering the DataFrame

In [28]:
asc = sales_df.orderBy('Sales') ## Sorting/Ordering the DF in ascending order
asc.show()
desc = sales_df.orderBy(sales_df['Sales'].desc()) ## Sorting/Ordering the DF in ascending order, note: column has to be passed instead of column name
desc.show()

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|   GOOG|Charlie|120.0|
|   MSFT|    Amy|124.0|
|   APPL|  Linda|130.0|
|   GOOG|    Sam|200.0|
|   MSFT|Vanessa|243.0|
|   APPL|   John|250.0|
|   GOOG|  Frank|340.0|
|     FB|  Sarah|350.0|
|   APPL|  Chris|350.0|
|   MSFT|   Tina|600.0|
|   APPL|   Mike|750.0|
|     FB|   Carl|870.0|
+-------+-------+-----+

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|     FB|   Carl|870.0|
|   APPL|   Mike|750.0|
|   MSFT|   Tina|600.0|
|     FB|  Sarah|350.0|
|   APPL|  Chris|350.0|
|   GOOG|  Frank|340.0|
|   APPL|   John|250.0|
|   MSFT|Vanessa|243.0|
|   GOOG|    Sam|200.0|
|   APPL|  Linda|130.0|
|   MSFT|    Amy|124.0|
|   GOOG|Charlie|120.0|
+-------+-------+-----+



### Handling Missing Data in Spark DataFrames

In [29]:
df_file = os.getcwd() + '/data/ContainsNull.csv'
df = spark.read.csv(df_file, inferSchema = True, header = True)
df.printSchema()

root
 |-- Id: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sales: double (nullable = true)



In [30]:
df.show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp2| null| null|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



#### Remove all `null`s

In [31]:
df.na.drop().show() ## Removes all rows that contain any missing data

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp4|Cindy|456.0|
+----+-----+-----+



#### Using `thresh` to remove some rows

In [32]:
df.na.drop(thresh = 2).show() ## Returns only rows that have greater than threshold non-null entities

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



#### Using `how`

In [33]:
df.na.drop(how = 'all').show() ##Drops only rows where all entities are null

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp2| null| null|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [34]:
df.na.drop(how = 'any').show() ##Drops rows where any entity is null, i.e same as df.na.drop()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp4|Cindy|456.0|
+----+-----+-----+



#### Using `subset`

In [35]:
df.na.drop(subset = ['Sales']).show() ##Drops rows where Sales is null

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



#### Filling in Missing Values

In [36]:
## Consider the DF schema
df.printSchema()

root
 |-- Id: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sales: double (nullable = true)



In [37]:
## 2 string cols and 1 numeric col
df.na.fill("val").show() ## Spark is able to resolve the datatype of the param to the datatype of the col
## i.e, if fill has string value, nulls in string columns will be assigned that value
## similarly ...
df.na.fill(0).show()
## if fill has numeric value, nulls in numeric columns will be assigned that value

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp2|  val| null|
|emp3|  val|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John|  0.0|
|emp2| null|  0.0|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [38]:
## We can also embark on a column centric approach
df.na.fill(0, subset = ['Sales']).show() ## Fill 0 for nulls in this subset

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John|  0.0|
|emp2| null|  0.0|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [39]:
## We can also use standard DS conventions to handle missing values, 
## for example: fill in Nulls using the mean (numeric columns)

df_sales_mean = df.select(mean('Sales')).collect()
df_sales_mean

[Row(avg(Sales)=400.5)]

In [40]:
mean_sales = df_sales_mean[0][0] ##gets the numeric value from the list
df.na.fill(mean_sales, subset = 'Sales').show()

## Everything can also be squished into one line as follows
df.na.fill(df.select(mean('Sales')).collect()[0][0],
          ['Sales']).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John|400.5|
|emp2| null|400.5|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John|400.5|
|emp2| null|400.5|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



### Handling Dates and Timestamps

#### `datetime` object

In [41]:
date_sample = stock_df.head(1)[0].asDict()['Date']
date_sample ## Datetime Object (Y, M, D, HR, MIN)

datetime.datetime(2010, 1, 4, 0, 0)

#### PySpark functions for handling `datetime` elements

In [42]:
## We can use Spark functions to access the elements of the datetime functions as per our needs
## For example, we can find the days of the month, month and years when the closing price was less than 200
sub = stock_df.filter(stock_df['Close'] < 200).select(['Date', 'Open', 'Close'])
sub.select(['Close', dayofmonth(sub['Date']).alias('day'),
            month(sub['Date']).alias('month'), year(sub['Date']).alias('year')]).show()

+------------------+---+-----+----+
|             Close|day|month|year|
+------------------+---+-----+----+
|            197.75| 22|    1|2010|
|        199.289995| 28|    1|2010|
|        192.060003| 29|    1|2010|
|        194.729998|  1|    2|2010|
|        195.859997|  2|    2|2010|
|        199.229994|  3|    2|2010|
|        192.050003|  4|    2|2010|
|        195.460001|  5|    2|2010|
|194.11999699999998|  8|    2|2010|
|196.19000400000002|  9|    2|2010|
|195.12000700000002| 10|    2|2010|
|        198.669994| 11|    2|2010|
|        197.059998| 23|    2|2010|
|         93.699997|  9|    6|2014|
|             94.25| 10|    6|2014|
|         93.860001| 11|    6|2014|
|         92.290001| 12|    6|2014|
|         91.279999| 13|    6|2014|
|         92.199997| 16|    6|2014|
| 92.08000200000001| 17|    6|2014|
+------------------+---+-----+----+
only showing top 20 rows



#### Advanced Usage

In [43]:
## We can use datetime functions in more advanced queries such as : 
## Average Close price by Year
new_df = stock_df.select(['Date', 'Close'])
new_df = new_df.withColumn('Year', year(new_df['Date']))
new_df.show()
avg_year_close = new_df.groupBy('Year').mean('Close')
avg_year_close = avg_year_close.select(['Year', format_number('avg(Close)', 2).alias('avg_close')])
avg_year_close.orderBy('Year').show()

+-------------------+------------------+----+
|               Date|             Close|Year|
+-------------------+------------------+----+
|2010-01-04 00:00:00|        214.009998|2010|
|2010-01-05 00:00:00|        214.379993|2010|
|2010-01-06 00:00:00|        210.969995|2010|
|2010-01-07 00:00:00|            210.58|2010|
|2010-01-08 00:00:00|211.98000499999998|2010|
|2010-01-11 00:00:00|210.11000299999998|2010|
|2010-01-12 00:00:00|        207.720001|2010|
|2010-01-13 00:00:00|        210.650002|2010|
|2010-01-14 00:00:00|            209.43|2010|
|2010-01-15 00:00:00|            205.93|2010|
|2010-01-19 00:00:00|        215.039995|2010|
|2010-01-20 00:00:00|            211.73|2010|
|2010-01-21 00:00:00|        208.069996|2010|
|2010-01-22 00:00:00|            197.75|2010|
|2010-01-25 00:00:00|        203.070002|2010|
|2010-01-26 00:00:00|        205.940001|2010|
|2010-01-27 00:00:00|        207.880005|2010|
|2010-01-28 00:00:00|        199.289995|2010|
|2010-01-29 00:00:00|        192.0