# PySpark DataFrames
## 1.) Create a Session
* Spark requires an active session in order to use its functionality
* This is accessed via the below libraries/code
* Note that we've also used **findspark** to allow us to use Spark from any directory, not just within the directory that the files are kept

In [1]:
# load findspark
import findspark

# tell code where spark files are kept
findspark.init('/home/matt/spark-3.0.2-bin-hadoop3.2/')

# load libraries for spark session
from pyspark.sql import SparkSession

# create session
spark = SparkSession.builder.appName('Basics').getOrCreate()

## 2.) Read Data
* We can use Spark to directly read a range of input data formats
* Here we are loading json but you can use **shift + tab** after the 'read.' method to see the other available data formats
* **.show()** lets us look at the dataframe content
* We can use **printSchema()** to show us the dataframe schema also

In [2]:
# read data into df
df = spark.read.json('people.json')

# peek at data
# note that nulls aren't an issue
df.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [3]:
# show df schema
df.printSchema()

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)



In [4]:
# show column names as list
df.columns

['age', 'name']

In [5]:
# show statistical summary of df
# note that it shows non-numerica variables also
df.describe().show()

+-------+------------------+-------+
|summary|               age|   name|
+-------+------------------+-------+
|  count|                 2|      3|
|   mean|              24.5|   null|
| stddev|7.7781745930520225|   null|
|    min|                19|   Andy|
|    max|                30|Michael|
+-------+------------------+-------+



## 3.) Schemas
* Dataframes in Spark require a schema to define the column names, types and ability to handle nulls
* There are a number of other properties they can have too which we will come onto
* Below, the code shows how to manually define a schema for a dataframe to ensure data types and missing data is handled correctly

In [6]:
# load data types for schema definition
from pyspark.sql.types import (StructField, StringType,
                               IntegerType, StructType)

# create list of structure fields
# create a dataframe column schema called age which stores integers and True allows nulls
data_schema = [StructField('age', IntegerType(), True),
               StructField('name', StringType(), True)]

# create dataframe schema based on above column definitions
final_struct = StructType(fields=data_schema)

# load data into structure
df = spark.read.json('people.json', schema=final_struct)

# show schema
df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- name: string (nullable = true)



## 4.) Manipulating DataFrames
* As in pandas, Spark dataframes offer you a wide range of functionality
* Below are a few basic examples, we will look at more complex ones later one
* Simple column and row extraction, creation of new columns, renaming, selection of specific data etc. are covered below

In [7]:
# indexing will return the entire column as a single object
df['age']

# select specific column using Spark select
df.select('age')

# this actually returns a dataframe of your column data
# this is more useful when wanting to actually do something with the data
df.select('age').show()

+----+
| age|
+----+
|null|
|  30|
|  19|
+----+



In [8]:
# show top n rows
df.head(2)

# extract specific row
df.head(2)[0] # get first row

Row(age=None, name='Michael')

In [9]:
# select multiple columns and show all selected data
df.select(['age', 'name']).show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [10]:
# create new column
# duplicate age column and double
df.withColumn('double_age', df['age']*2).show()

+----+-------+----------+
| age|   name|double_age|
+----+-------+----------+
|null|Michael|      null|
|  30|   Andy|        60|
|  19| Justin|        38|
+----+-------+----------+



In [11]:
# note that the above creation of columns does not
# overwrite data in the original dataframe (see below)
# to overwrite, you would have to store the above code
# in a new variable and create a column in place
df.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [12]:
# rename column
df.withColumnRenamed('age', 'renamed_age').show()

+-----------+-------+
|renamed_age|   name|
+-----------+-------+
|       null|Michael|
|         30|   Andy|
|         19| Justin|
+-----------+-------+



## 5.) SQL with Dataframes
* You can use SQL queries to interact with and extract specific data from your dataframes
* The Spark library we imported above is called SQL because of its ability to utilise SQL language queries with Spark dataframes
* The above code is python PySpark code to interact with dataframes, whilst the below shows you the SQL equivalent
* You can choose which you use, or use a combination of the two
* The beauty of this is absolute flexibility and cross-platform creation

In [13]:
# register df as temporary SQL view
df.createOrReplaceTempView('people')

# directly query df with SQL
results = spark.sql('SELECT * FROM people WHERE age=30')

# show results
results.show()

+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+



## 6.) DataFrame Operations
* The main purpose of storing data in Spark dataframes is so that you can extract exactly what you're after
* Below are a few of the more common operations such as filtering rows with conditional logic, selecting specific columns and so on
* Note that you can either use SQL based syntax or pythonic syntax depending on preference
* If you want to simply view filtered data then a combination of filter, select etc. followed by show() will do this
* If you actually want to store your filtered data then **collect()** should be used in conjunction with variable assignment

In [14]:
# read data into df
# guess schema automatically and read first row as header
df = spark.read.csv('appl_stock.csv', inferSchema=True, header=True)

# peek at df schema
df.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



In [15]:
# peek at data
df.show(2)

+----------+----------+----------+------------------+----------+---------+------------------+
|      Date|      Open|      High|               Low|     Close|   Volume|         Adj Close|
+----------+----------+----------+------------------+----------+---------+------------------+
|2010-01-04|213.429998|214.499996|212.38000099999996|214.009998|123432400|         27.727039|
|2010-01-05|214.599998|215.589994|        213.249994|214.379993|150476200|27.774976000000002|
+----------+----------+----------+------------------+----------+---------+------------------+
only showing top 2 rows



In [17]:
# use SQL based code to filter df
df.filter("Close < 500").select(['Open', 'Low']).show(2)

+----------+------------------+
|      Open|               Low|
+----------+------------------+
|213.429998|212.38000099999996|
|214.599998|        213.249994|
+----------+------------------+
only showing top 2 rows



In [21]:
# this is the more pythonic way of performing the same operation as above
df.filter(df['Close'] < 500).select(df['Open'], df['Low']).show(2)

+----------+------------------+
|      Open|               Low|
+----------+------------------+
|213.429998|212.38000099999996|
|214.599998|        213.249994|
+----------+------------------+
only showing top 2 rows



In [24]:
# conditional operators require separation via brackets
# and also &, | or ~ symbols (and, or and not won't work)
df.filter((df['Close'] < 200) & ~(df['Open'] > 200)).show(2)

+----------+------------------+----------+------------------+----------+---------+------------------+
|      Date|              Open|      High|               Low|     Close|   Volume|         Adj Close|
+----------+------------------+----------+------------------+----------+---------+------------------+
|2010-02-01|192.36999699999998|     196.0|191.29999899999999|194.729998|187469100|         25.229131|
|2010-02-02|        195.909998|196.319994|193.37999299999998|195.859997|174585600|25.375532999999997|
+----------+------------------+----------+------------------+----------+---------+------------------+
only showing top 2 rows



In [27]:
# find exact values and collect into var
# returns a list of all matching row objects
result = df.filter(df['Low'] == 197.16).collect()

# extract first row from result
row = result[0]

# store row data in dictionary
row_dict = row.asDict()

# show dict
row_dict

{'Date': '2010-01-22',
 'Open': 206.78000600000001,
 'High': 207.499996,
 'Low': 197.16,
 'Close': 197.75,
 'Volume': 220441900,
 'Adj Close': 25.620401}

In [29]:
# extract specific value
row_dict['Volume']

220441900

## 7.) Group By and Aggregate
* In order to summarize your data you can use group by and aggregate functions
* Group by summarizes your data by specific fields (i.e. group by company name)
* Aggregate functions allow you to summarize your data in specific ways (e.g. count, sum, max, min, avg...)

In [30]:
# read data into df
df = spark.read.csv('sales_info.csv', inferSchema=True, header=True)

# check schema
df.printSchema()

root
 |-- Company: string (nullable = true)
 |-- Person: string (nullable = true)
 |-- Sales: double (nullable = true)



In [32]:
# peek at data
df.show(5)

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|   GOOG|    Sam|200.0|
|   GOOG|Charlie|120.0|
|   GOOG|  Frank|340.0|
|   MSFT|   Tina|600.0|
|   MSFT|    Amy|124.0|
+-------+-------+-----+
only showing top 5 rows



In [36]:
# group by specific criteria
#df.groupBy('Company').mean().show()
#df.groupBy('Company').max().show()
#df.groupBy('Company').min().show()
#df.groupBy('Company').count().show()
df.groupBy('Company').mean().show()

+-------+-----------------+
|Company|       avg(Sales)|
+-------+-----------------+
|   APPL|            370.0|
|   GOOG|            220.0|
|     FB|            610.0|
|   MSFT|322.3333333333333|
+-------+-----------------+



In [40]:
# aggregate across all rows
#df.agg({'Sales':'max'}).show()
df.agg({'Sales':'sum'}).show()

+----------+
|sum(Sales)|
+----------+
|    4327.0|
+----------+



In [41]:
# filter using group by
group_data = df.groupBy('Company')

# show aggregate function of filtered data
group_data.agg({'Sales':'max'}).show()

+-------+----------+
|Company|max(Sales)|
+-------+----------+
|   APPL|     750.0|
|   GOOG|     340.0|
|     FB|     870.0|
|   MSFT|     600.0|
+-------+----------+



Notes:
* As well as the above, in-built functions, you can also import more advanced functions from Spark
* There are many libraries and methods to be used, below shows a few samples
* These functions are commonly used within a select call
* You can pass an alias to define the name of your output column/data
* Formatting can be applied to tidy up your outputs using the **format_number** library

In [47]:
# load specific aggregate functions
from pyspark.sql.functions import countDistinct, avg, stddev

# apply function to specific df column
df.select(countDistinct('Sales').alias('Distinct Sales')).show()

+--------------+
|Distinct Sales|
+--------------+
|            11|
+--------------+



In [46]:
# apply function to specific df column
df.select(avg('Sales').alias('Avg Sales')).show()

+-----------------+
|        Avg Sales|
+-----------------+
|360.5833333333333|
+-----------------+



In [51]:
# libraries to format output data
from pyspark.sql.functions import format_number

# apply function to specific df column
sales_std = df.select(stddev('Sales').alias('STD Sales'))

# show output data with 2 d.p.
sales_std.select(format_number('STD Sales', 2).alias('STD Sales')).show()

+---------+
|STD Sales|
+---------+
|   250.09|
+---------+



Notes:
* You can also order and sort your data using the below code
* You are able to specify asc/desc order as desired

In [53]:
# order by specific column, asc by default
df.orderBy('Sales').show(3)

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|   GOOG|Charlie|120.0|
|   MSFT|    Amy|124.0|
|   APPL|  Linda|130.0|
+-------+-------+-----+
only showing top 3 rows



In [54]:
# order by in descending order (requires pythonic format)
df.orderBy(df['Sales'].desc()).show(3)

+-------+------+-----+
|Company|Person|Sales|
+-------+------+-----+
|     FB|  Carl|870.0|
|   APPL|  Mike|750.0|
|   MSFT|  Tina|600.0|
+-------+------+-----+
only showing top 3 rows



## 8.) Missing Data
* Often, data contains missing values in specific areas
* Many processes will break when trying to compute missing data, so it normally has to be handled
* 3 basic options:
    * Keep missing data
    * Drop missing data
    * Impute replacement data (based on e.g. mean, mode etc.)
    
### a.) Drop Nulls

In [56]:
# read data into df
df = spark.read.csv('ContainsNull.csv', inferSchema=True, header=True)

# check schema
df.printSchema()

root
 |-- Id: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sales: double (nullable = true)



In [57]:
# peek at data
df.show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp2| null| null|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [58]:
# drop all null rows
# use shift + tab after na. to show options
df.na.drop().show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp4|Cindy|456.0|
+----+-----+-----+



In [60]:
# drop all nulls if there are
# at least 2 nulls within the row
df.na.drop(thresh=2).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [61]:
# drop rows if all row values are null
df.na.drop(how='all').show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp2| null| null|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [62]:
# drop rows if sales are null
df.na.drop(subset=['Sales']).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



### b.) Fill Nulls

In [63]:
# fill string columns only with specified value
# understands it should fill a string column only
df.na.fill('FILL VALUE').show()

+----+----------+-----+
|  Id|      Name|Sales|
+----+----------+-----+
|emp1|      John| null|
|emp2|FILL VALUE| null|
|emp3|FILL VALUE|345.0|
|emp4|     Cindy|456.0|
+----+----------+-----+



In [64]:
# fill numeric columns only with specified value
# understands it should fill a numeric column only
df.na.fill(0).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John|  0.0|
|emp2| null|  0.0|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [65]:
# explicitly fill selected column only
df.na.fill('No Name', subset=['Name']).show()

+----+-------+-----+
|  Id|   Name|Sales|
+----+-------+-----+
|emp1|   John| null|
|emp2|No Name| null|
|emp3|No Name|345.0|
|emp4|  Cindy|456.0|
+----+-------+-----+



In [66]:
# load libraries to impute missing data
from pyspark.sql.functions import mean

# get mean of column
mean_val = df.select(mean(df['Sales'])).collect()

# get mean value itself
# need to be double indexed due to nature of value itself
mean_sales = mean_val[0][0]

# fill null numeric column values with mean of column
df.na.fill(mean_sales, subset=['Sales']).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John|400.5|
|emp2| null|400.5|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [67]:
# do the exact same as above except in one line
df.na.fill(df.select(mean(df['Sales'])).collect()[0][0], subset=['Sales']).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John|400.5|
|emp2| null|400.5|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



## 9.) Dates and Timestamps
*

In [68]:
# read data into df
df = spark.read.csv('appl_stock.csv', inferSchema=True, header=True)

# show schema
df.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



In [72]:
# peek at data
df.show(2)

+----------+----------+----------+------------------+----------+---------+------------------+
|      Date|      Open|      High|               Low|     Close|   Volume|         Adj Close|
+----------+----------+----------+------------------+----------+---------+------------------+
|2010-01-04|213.429998|214.499996|212.38000099999996|214.009998|123432400|         27.727039|
|2010-01-05|214.599998|215.589994|        213.249994|214.379993|150476200|27.774976000000002|
+----------+----------+----------+------------------+----------+---------+------------------+
only showing top 2 rows



In [78]:
# load libraries for handling datetime
from pyspark.sql.functions import (dayofmonth, hour, dayofyear,
                                   month, year, weekofyear,
                                   format_number, date_format)

# apply function to specific df column
df.select(dayofmonth(df['Date'])).show(2)

+----------------+
|dayofmonth(Date)|
+----------------+
|               4|
|               5|
+----------------+
only showing top 2 rows



In [75]:
# show hour at which time was recorded
df.select(hour(df['Date'])).show(2)

+----------+
|hour(Date)|
+----------+
|         0|
|         0|
+----------+
only showing top 2 rows



In [79]:
# show the month recorded
df.select(month(df['Date'])).show(2)

+-----------+
|month(Date)|
+-----------+
|          1|
|          1|
+-----------+
only showing top 2 rows



Notes:
* The above code is fairly simplistic, often you'll need to aggregate data in combination with handling timestamp formats etc.
* Below we show a potential use case where we are grouping by an extracted date component before aggregating a numeric column to show average closing price by year

In [93]:
# create new column in new df containing year (extracted from date)
new_df = df.withColumn('Year', year(df['Date']))

# get average closing price by year and sort by year
result = new_df.groupBy('Year').mean().select(['Year', 'avg(Close)']).orderBy('Year')

# rename column
new = result.withColumnRenamed('avg(Close)', 'Average Closing Price')

# format values
new.select(['Year', format_number('Average Closing Price', 2).alias('Avg Close')]).show()

+----+---------+
|Year|Avg Close|
+----+---------+
|2010|   259.84|
|2011|   364.00|
|2012|   576.05|
|2013|   472.63|
|2014|   295.40|
|2015|   120.04|
|2016|   104.60|
+----+---------+

