# Spark DataFrame Operations

___

In [1]:
# Create and initiate Spark Session
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('DF_Ops').getOrCreate()

### Load CSV Data with infer Schema and print the schema

In [2]:
df = spark.read.csv('dataset/appl_stock.csv', 
                    inferSchema=True, header=True)

In [3]:
df.printSchema()

root
 |-- Date: timestamp (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



In [4]:
df.show(3)

+-------------------+----------+----------+------------------+----------+---------+------------------+
|               Date|      Open|      High|               Low|     Close|   Volume|         Adj Close|
+-------------------+----------+----------+------------------+----------+---------+------------------+
|2010-01-04 00:00:00|213.429998|214.499996|212.38000099999996|214.009998|123432400|         27.727039|
|2010-01-05 00:00:00|214.599998|215.589994|        213.249994|214.379993|150476200|27.774976000000002|
|2010-01-06 00:00:00|214.379993|    215.23|        210.750004|210.969995|138040000|27.333178000000004|
+-------------------+----------+----------+------------------+----------+---------+------------------+
only showing top 3 rows



In [5]:
df.show(2, vertical=True)

-RECORD 0------------------------
 Date      | 2010-01-04 00:00:00 
 Open      | 213.429998          
 High      | 214.499996          
 Low       | 212.38000099999996  
 Close     | 214.009998          
 Volume    | 123432400           
 Adj Close | 27.727039           
-RECORD 1------------------------
 Date      | 2010-01-05 00:00:00 
 Open      | 214.599998          
 High      | 215.589994          
 Low       | 213.249994          
 Close     | 214.379993          
 Volume    | 150476200           
 Adj Close | 27.774976000000002  
only showing top 2 rows



___
## Filter DataFrame

### Filter using statement

In [6]:
df.filter("Close < 500").select(['Open', 'Close']).show()

+------------------+------------------+
|              Open|             Close|
+------------------+------------------+
|        213.429998|        214.009998|
|        214.599998|        214.379993|
|        214.379993|        210.969995|
|            211.75|            210.58|
|        210.299994|211.98000499999998|
|212.79999700000002|210.11000299999998|
|209.18999499999998|        207.720001|
|        207.870005|        210.650002|
|210.11000299999998|            209.43|
|210.92999500000002|            205.93|
|        208.330002|        215.039995|
|        214.910006|            211.73|
|        212.079994|        208.069996|
|206.78000600000001|            197.75|
|202.51000200000001|        203.070002|
|205.95000100000001|        205.940001|
|        206.849995|        207.880005|
|        204.930004|        199.289995|
|        201.079996|        192.060003|
|192.36999699999998|        194.729998|
+------------------+------------------+
only showing top 20 rows



### Filter using python DataFrame Syntax

In [7]:
df.filter(df['Close'] < 500).select(['Open', 'Volume']).show()

+------------------+---------+
|              Open|   Volume|
+------------------+---------+
|        213.429998|123432400|
|        214.599998|150476200|
|        214.379993|138040000|
|            211.75|119282800|
|        210.299994|111902700|
|212.79999700000002|115557400|
|209.18999499999998|148614900|
|        207.870005|151473000|
|210.11000299999998|108223500|
|210.92999500000002|148516900|
|        208.330002|182501900|
|        214.910006|153038200|
|        212.079994|152038600|
|206.78000600000001|220441900|
|202.51000200000001|266424900|
|205.95000100000001|466777500|
|        206.849995|430642100|
|        204.930004|293375600|
|        201.079996|311488100|
|192.36999699999998|187469100|
+------------------+---------+
only showing top 20 rows



### Filter Multiple Condition
Use `df.filter((statement 1) &/| (statement 2)).show()`

In [8]:
# Using tuple () to separate each condition statement
df.filter((df['Close'] < 200) & (df['Open'] > 200)).show()

+-------------------+------------------+----------+----------+----------+---------+------------------+
|               Date|              Open|      High|       Low|     Close|   Volume|         Adj Close|
+-------------------+------------------+----------+----------+----------+---------+------------------+
|2010-01-22 00:00:00|206.78000600000001|207.499996|    197.16|    197.75|220441900|         25.620401|
|2010-01-28 00:00:00|        204.930004|205.500004|198.699995|199.289995|293375600|25.819922000000002|
|2010-01-29 00:00:00|        201.079996|202.199995|190.250002|192.060003|311488100|         24.883208|
+-------------------+------------------+----------+----------+----------+---------+------------------+



In [9]:
# Use ~ as negation means Not or != in python
df.filter((df['Close'] < 200) & ~(df['Open'] > 200)).show(5)

+-------------------+------------------+----------+------------------+----------+---------+------------------+
|               Date|              Open|      High|               Low|     Close|   Volume|         Adj Close|
+-------------------+------------------+----------+------------------+----------+---------+------------------+
|2010-02-01 00:00:00|192.36999699999998|     196.0|191.29999899999999|194.729998|187469100|         25.229131|
|2010-02-02 00:00:00|        195.909998|196.319994|193.37999299999998|195.859997|174585600|25.375532999999997|
|2010-02-03 00:00:00|        195.169994|200.200003|        194.420004|199.229994|153832000|25.812148999999998|
|2010-02-04 00:00:00|        196.730003|198.370001|        191.570005|192.050003|189413000|         24.881912|
|2010-02-05 00:00:00|192.63000300000002|     196.0|        190.850002|195.460001|212576700|25.323710000000002|
+-------------------+------------------+----------+------------------+----------+---------+------------------+
o

___
## Collecting Data

When using **`.show()`** it's actually only showing the data, to realy collect the data, we need to use **`.collect()`**

In [10]:
# method .show() will only printing the data, not collected it
test = df.filter(df['Low'] == 197.16).show()

+-------------------+------------------+----------+------+------+---------+---------+
|               Date|              Open|      High|   Low| Close|   Volume|Adj Close|
+-------------------+------------------+----------+------+------+---------+---------+
|2010-01-22 00:00:00|206.78000600000001|207.499996|197.16|197.75|220441900|25.620401|
+-------------------+------------------+----------+------+------+---------+---------+



In [11]:
# That's why it will return `None`
print(test)

None


### Use `.collect()`

In [12]:
# Assign Filtering DataFrame collected
collect_data = df.filter(df['Close'] > 700).collect()

In [13]:
# check collected data type
type(collect_data)

list

In [14]:
# Show Length of Data Collected
len(collect_data)

3

In [15]:
# Show collected data
collect_data

[Row(Date=datetime.datetime(2012, 9, 18, 0, 0), Open=699.879997, High=702.329987, Low=696.4199980000001, Close=701.910004, Volume=93375800, Adj Close=91.329593),
 Row(Date=datetime.datetime(2012, 9, 19, 0, 0), Open=700.259979, High=703.989998, Low=699.569977, Close=702.100021, Volume=81718700, Adj Close=91.35431700000001),
 Row(Date=datetime.datetime(2012, 9, 21, 0, 0), Open=702.409988, High=705.070023, Low=699.3599849999999, Close=700.089989, Volume=142897300, Adj Close=91.09278)]

In [16]:
# Show First element of the collected data, assign to row
row_data = collect_data[0]

row_data

Row(Date=datetime.datetime(2012, 9, 18, 0, 0), Open=699.879997, High=702.329987, Low=696.4199980000001, Close=701.910004, Volume=93375800, Adj Close=91.329593)

In [17]:
# Show row data as Dictionary
row_data.asDict()

{'Date': datetime.datetime(2012, 9, 18, 0, 0),
 'Open': 699.879997,
 'High': 702.329987,
 'Low': 696.4199980000001,
 'Close': 701.910004,
 'Volume': 93375800,
 'Adj Close': 91.329593}

In [18]:
# Get Specific column value as Dictionary Keys
row_data.asDict()['Volume']

93375800

___
# Spark Function, Aggregate and GroupBy pivoting

In [19]:
spark = SparkSession.builder.appName('aggregate').getOrCreate()
df = spark.read.csv('dataset/sales_info.csv', 
                    inferSchema=True, header=True)
df.show()

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|   GOOG|    Sam|200.0|
|   GOOG|Charlie|120.0|
|   GOOG|  Frank|340.0|
|   MSFT|   Tina|600.0|
|   MSFT|    Amy|124.0|
|   MSFT|Vanessa|243.0|
|     FB|   Carl|870.0|
|     FB|  Sarah|350.0|
|   APPL|   John|250.0|
|   APPL|  Linda|130.0|
|   APPL|   Mike|750.0|
|   APPL|  Chris|350.0|
+-------+-------+-----+



## Spark Built-in Function
Import from `pyspark.sql.functions`

In [20]:
import pyspark.sql.functions as F

In [21]:
df.select(F.countDistinct('Company')).show()

+-----------------------+
|count(DISTINCT Company)|
+-----------------------+
|                      4|
+-----------------------+



## Aggregation Function

### Using `.select()` method

In [22]:
# We can use Select then combined with Function to aggregate columns
df.select(F.sum('Sales')).show()

+----------+
|sum(Sales)|
+----------+
|    4327.0|
+----------+



### Using `.agg()` Function

In [23]:
# But the Good practice is using agg() method
df.agg(F.avg('Sales')).show()

+-----------------+
|       avg(Sales)|
+-----------------+
|360.5833333333333|
+-----------------+



In [24]:
# Can Also use Aggregation and pass Dictionary
df.agg({'Sales':'max'}).show()

+----------+
|max(Sales)|
+----------+
|     870.0|
+----------+



### Column Alias

In [25]:
df.agg(F.avg('Sales').alias('Average Sales')).show()

+-----------------+
|    Average Sales|
+-----------------+
|360.5833333333333|
+-----------------+



### Number Formatiing
Using **`format_number()`** Function from _`pyspark.sql.functions`_

In [26]:
# Create Standard Deviation Column
sales_std = df.agg(F.stddev('Sales').alias('std'))

In [27]:
sales_std.show()

+------------------+
|               std|
+------------------+
|250.08742410799007|
+------------------+



In [28]:
# Format length of comma to 2 digits
sales_std.select(F.format_number('std', 2).alias('std')).show()

+------+
|   std|
+------+
|250.09|
+------+



___
## DataFrame Pivot using Group By

In [29]:
# Create GroupBy Object
df.groupBy('Company')

<pyspark.sql.group.GroupedData at 0x7f64cbfeda90>

### Combine GroupBy using direct built-in function (`.sum()`, `.count()`)

In [30]:
# Create Summarize pivot
df.groupBy('Company').sum().show()

+-------+----------+
|Company|sum(Sales)|
+-------+----------+
|   APPL|    1480.0|
|   GOOG|     660.0|
|     FB|    1220.0|
|   MSFT|     967.0|
+-------+----------+



In [31]:
# Also Count row base on Group by column
df.groupBy('Company').count().show()

+-------+-----+
|Company|count|
+-------+-----+
|   APPL|    4|
|   GOOG|    3|
|     FB|    2|
|   MSFT|    3|
+-------+-----+



## Group by Dict Style using Aggregation method

In [32]:
group_company = df.groupBy('Company')

In [33]:
group_company.agg({'Sales':'max'}).show()

+-------+----------+
|Company|max(Sales)|
+-------+----------+
|   APPL|     750.0|
|   GOOG|     340.0|
|     FB|     870.0|
|   MSFT|     600.0|
+-------+----------+



___
## Order By
We can also sort the data using method `orderBy` from DataFrame which sorting our dataframe based on parameter given

### Ascending

In [34]:
# Default Order By is Ascending
df.orderBy('Sales').show()

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|   GOOG|Charlie|120.0|
|   MSFT|    Amy|124.0|
|   APPL|  Linda|130.0|
|   GOOG|    Sam|200.0|
|   MSFT|Vanessa|243.0|
|   APPL|   John|250.0|
|   GOOG|  Frank|340.0|
|     FB|  Sarah|350.0|
|   APPL|  Chris|350.0|
|   MSFT|   Tina|600.0|
|   APPL|   Mike|750.0|
|     FB|   Carl|870.0|
+-------+-------+-----+



### Descending `.desc()` method

In [35]:
df.orderBy(df['Sales'].desc()).show()

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|     FB|   Carl|870.0|
|   APPL|   Mike|750.0|
|   MSFT|   Tina|600.0|
|     FB|  Sarah|350.0|
|   APPL|  Chris|350.0|
|   GOOG|  Frank|340.0|
|   APPL|   John|250.0|
|   MSFT|Vanessa|243.0|
|   GOOG|    Sam|200.0|
|   APPL|  Linda|130.0|
|   MSFT|    Amy|124.0|
|   GOOG|Charlie|120.0|
+-------+-------+-----+



### Descending by arguments

In [36]:
df.orderBy('Sales', ascending=False).show()

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|     FB|   Carl|870.0|
|   APPL|   Mike|750.0|
|   MSFT|   Tina|600.0|
|     FB|  Sarah|350.0|
|   APPL|  Chris|350.0|
|   GOOG|  Frank|340.0|
|   APPL|   John|250.0|
|   MSFT|Vanessa|243.0|
|   GOOG|    Sam|200.0|
|   APPL|  Linda|130.0|
|   MSFT|    Amy|124.0|
|   GOOG|Charlie|120.0|
+-------+-------+-----+



___
## Combination Operation of Aggregation & Ordering 
### GroupBy & OrderBy

In [37]:
# Import Sum Function from pyspark.sql.functions
from pyspark.sql.functions import sum as sparksum

In [38]:
# GroupBy then Sort using OrderBy
df.groupBy('Company').agg(sparksum("Sales").alias('Total_Sales'))\
  .orderBy('Total_Sales', ascending=False).show()

+-------+-----------+
|Company|Total_Sales|
+-------+-----------+
|   APPL|     1480.0|
|     FB|     1220.0|
|   MSFT|      967.0|
|   GOOG|      660.0|
+-------+-----------+



### Spark SQL Group By Order By

In [39]:
# Create SQL Temp view
df.createOrReplaceTempView('sales')

In [40]:
# Query using Spark SQL
spark.sql("""SELECT Company, SUM(SALES) AS Total_Sales
             FROM sales
             GROUP BY Company
             ORDER BY Total_Sales DESC
             """).show()

+-------+-----------+
|Company|Total_Sales|
+-------+-----------+
|   APPL|     1480.0|
|     FB|     1220.0|
|   MSFT|      967.0|
|   GOOG|      660.0|
+-------+-----------+



## Multiple Group By
### Spark DataFrame Syntax

In [41]:
import pyspark.sql.functions as F

In [42]:
df.groupBy('Company').agg(F.sum('Sales').alias('Total_Sales'), 
                          F.avg('Sales').alias("Average_Sales"))\
                     .orderBy('Total_Sales', ascending=False).show()

+-------+-----------+-----------------+
|Company|Total_Sales|    Average_Sales|
+-------+-----------+-----------------+
|   APPL|     1480.0|            370.0|
|     FB|     1220.0|            610.0|
|   MSFT|      967.0|322.3333333333333|
|   GOOG|      660.0|            220.0|
+-------+-----------+-----------------+



### Spark SQL Syntax

In [43]:
# Create Temporary SQL table view
df.createOrReplaceTempView('sales')

In [44]:
# SQL Query using spark.sql
spark.sql("""SELECT Company, SUM(Sales) AS Total_Sales, AVG(Sales) AS Average_Sales
             FROM sales
             GROUP BY Company
             ORDER BY Total_Sales DESC
          """).show()

+-------+-----------+-----------------+
|Company|Total_Sales|    Average_Sales|
+-------+-----------+-----------------+
|   APPL|     1480.0|            370.0|
|     FB|     1220.0|            610.0|
|   MSFT|      967.0|322.3333333333333|
|   GOOG|      660.0|            220.0|
+-------+-----------+-----------------+



___
# Spark Data Missing Value

In [45]:
df = spark.read.csv('dataset/ContainsNull.csv', 
                     header=True, inferSchema=True)
df.show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp2| null| null|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [46]:
df.printSchema()

root
 |-- Id: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sales: double (nullable = true)



## Drop `Null` data 
### Drop all missing data using `.na.drop()`

In [47]:
# Drop all rows contain null value
df.na.drop().show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp4|Cindy|456.0|
+----+-----+-----+



### Drop Null using Threshold value

In [48]:
# Drop only rows with 2 missing value in a rows
df.na.drop(thresh=2).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



### Drop using conditional `how`

In [49]:
# Will only drop rows if all value are null in a row
df.na.drop(how='all').show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp2| null| null|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



### Drop using subset of columns

In [50]:
# Will only drop rows if Sales column null
df.na.drop(subset=["Sales"]).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



___
## Fill NA value
### Fill NA automatically infer from schema

In [51]:
# Will automatically filling value with string only in string columns
df.na.fill('No Name').show()

+----+-------+-----+
|  Id|   Name|Sales|
+----+-------+-----+
|emp1|   John| null|
|emp2|No Name| null|
|emp3|No Name|345.0|
|emp4|  Cindy|456.0|
+----+-------+-----+



In [52]:
# Will automatically filling value with string only in numeric columns
df.na.fill(20).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| 20.0|
|emp2| null| 20.0|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



### Fillna with Explicit column

In [53]:
# Explicitly assign column to fill with the value
df.na.fill('No Name', subset=['Name']).show()

+----+-------+-----+
|  Id|   Name|Sales|
+----+-------+-----+
|emp1|   John| null|
|emp2|No Name| null|
|emp3|No Name|345.0|
|emp4|  Cindy|456.0|
+----+-------+-----+



### Filling Multiple Columns

In [54]:
# Filling NA on each columns using dict
df.na.fill({'Name':'No Name', 'Sales':500}).show()

+----+-------+-----+
|  Id|   Name|Sales|
+----+-------+-----+
|emp1|   John|500.0|
|emp2|No Name|500.0|
|emp3|No Name|345.0|
|emp4|  Cindy|456.0|
+----+-------+-----+



### Filling with Computational Value (Mean)

In [55]:
# Calculate and collect the mean
mean = df.select(F.avg('Sales')).collect()
mean

[Row(avg(Sales)=400.5)]

In [56]:
# To get the actual value, we can do indexing
mean_value = mean[0][0]
mean_value

400.5

In [57]:
# Now we can use that value in fillna
df.na.fill(mean_value, subset=['Sales']).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John|400.5|
|emp2| null|400.5|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



### One Line Imputer

In [58]:
# Directly calculate mean inside fill function
df.na.fill(df.select(F.avg('Sales')).collect()[0][0],
           subset=['Sales']).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John|400.5|
|emp2| null|400.5|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



___
# Dates and Timestamps

In [59]:
df = spark.read.csv('dataset/appl_stock.csv', 
                    header=True, inferSchema=True)
df.show(2, vertical=True)

-RECORD 0------------------------
 Date      | 2010-01-04 00:00:00 
 Open      | 213.429998          
 High      | 214.499996          
 Low       | 212.38000099999996  
 Close     | 214.009998          
 Volume    | 123432400           
 Adj Close | 27.727039           
-RECORD 1------------------------
 Date      | 2010-01-05 00:00:00 
 Open      | 214.599998          
 High      | 215.589994          
 Low       | 213.249994          
 Close     | 214.379993          
 Volume    | 150476200           
 Adj Close | 27.774976000000002  
only showing top 2 rows



## Extract Information from Datetime
From spark.sql.functions, there are several function related to datetime such as **`dayofmonth, dayofyear, hour, month, weekofyear, date_format`**

### Extract Day of the month

In [60]:
# Extract Day of the month
df.select(F.dayofmonth('Date')).show(7)

+----------------+
|dayofmonth(Date)|
+----------------+
|               4|
|               5|
|               6|
|               7|
|               8|
|              11|
|              12|
+----------------+
only showing top 7 rows



### Create new Column From Datetime

In [61]:
# Create New Column 'Year'
year_df = df.withColumn('Year', F.year('Date'))
year_df.show(2)

+-------------------+----------+----------+------------------+----------+---------+------------------+----+
|               Date|      Open|      High|               Low|     Close|   Volume|         Adj Close|Year|
+-------------------+----------+----------+------------------+----------+---------+------------------+----+
|2010-01-04 00:00:00|213.429998|214.499996|212.38000099999996|214.009998|123432400|         27.727039|2010|
|2010-01-05 00:00:00|214.599998|215.589994|        213.249994|214.379993|150476200|27.774976000000002|2010|
+-------------------+----------+----------+------------------+----------+---------+------------------+----+
only showing top 2 rows



### Aggregating Based on Date year

In [62]:
# Aggregating using GroupBy of Year 
year_groupby = year_df.groupBy('Year').agg(F.avg('Close'))
year_groupby.show()

+----+------------------+
|Year|        avg(Close)|
+----+------------------+
|2015|120.03999980555547|
|2013| 472.6348802857143|
|2014| 295.4023416507935|
|2012| 576.0497195640002|
|2016|104.60400786904763|
|2010| 259.8424600000002|
|2011|364.00432532142867|
+----+------------------+



In [63]:
# Formating, sorting and using Alias
year_groupby = year_df.groupBy('Year').agg(F.format_number(F.avg('Close'), 2).alias('Average Closed')).orderBy('Average Closed', ascending=False)
year_groupby.show()

+----+--------------+
|Year|Average Closed|
+----+--------------+
|2012|        576.05|
|2013|        472.63|
|2011|        364.00|
|2014|        295.40|
|2010|        259.84|
|2015|        120.04|
|2016|        104.60|
+----+--------------+



### Extract Datetime using Spark SQL

In [64]:
df.createOrReplaceTempView('appl')

In [65]:
spark.sql("""SELECT EXTRACT(YEAR FROM Date) AS Year, ROUND(AVG(Close),2) AS `Average Closed`
             FROM appl
             GROUP BY Year
             ORDER BY `Average Closed` DESC
             """).show()

+----+--------------+
|Year|Average Closed|
+----+--------------+
|2012|        576.05|
|2013|        472.63|
|2011|         364.0|
|2014|         295.4|
|2010|        259.84|
|2015|        120.04|
|2016|         104.6|
+----+--------------+

