## DS164 Spark DataFrame Continuation:
- Prepared by: Van Julius Leander G. Lopez, MSDS

#### Quick Recap on our last learning checkpoint:
- We learned how to show the Schema of a spark dataframe
```python
finpop_spark_df.printSchema()
root
 |-- pop: string (nullable = true)
 |-- tot_local_sources: string (nullable = true)
 |-- tot_tax_revenue: string (nullable = true)
 |-- tot_current_oper_income: string (nullable = true)
 |-- total_oper_expenses: string (nullable = true)
 |-- net_oper_income: string (nullable = true)
 |-- total_non_income_receipts: string (nullable = true)
 |-- capital_expenditure: string (nullable = true)
 |-- total_non_oper_expenditures: string (nullable = true)
 |-- cash_balance_end: string (nullable = true)
 |-- shp_province: string (nullable = true)
 |-- shp_municipality: string (nullable = true)
```

- We learned how to select columns in a spark dataframe
```python
finpop_spark_df.columns

['pop',
 'tot_local_sources',
 'tot_tax_revenue',
 'tot_current_oper_income',
 'total_oper_expenses',
 'net_oper_income',
 'total_non_income_receipts',
 'capital_expenditure',
 'total_non_oper_expenditures',
 'cash_balance_end',
 'shp_province',
 'shp_municipality']
```

- We learned how to project and manipulate specific columns
```python
finpop_spark_df.describe('pop', 'tot_local_sources').show()

+-------+------------------+-----------------+
|summary|               pop|tot_local_sources|
+-------+------------------+-----------------+
|  count|              1590|             1627|
|   mean| 62196.46037735849|96.88792622731867|
| stddev|131565.85954168398|637.2402417874315|
|    min|            1297.0|              0.0|
|    max|         2936116.0|        14534.717|
+-------+------------------+-----------------+
```

In [1]:
# Importing our Last Checkpoint.
# Initiate a spark session
# read the finpop toy dataset

import os
import sys
import numpy as np
import pandas as pd
from pyspark.sql import SparkSession
from numpy.testing import assert_equal, assert_allclose

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

spark = (
    SparkSession
        .builder
        .master('local[*]')
        .getOrCreate()
)

finpop_spark_df = spark.read.csv(
    'financial_pop.csv',
    sep=',', header=True
)

finpop_df = pd.read_csv("financial_pop.csv")

In [2]:
type(finpop_df)

pandas.core.frame.DataFrame

In [3]:
type(finpop_spark_df)

pyspark.sql.dataframe.DataFrame

#### Working with Pyspark DataFrames:
***

In order for us to learn faster and internalize the fundamentals of pyspark, we are going to compare the code for Pandas and Pyspark side by side.

#### Viewing Data

In [4]:
# Pandas Code
finpop_df.head(5)

Unnamed: 0,pop,tot_local_sources,tot_tax_revenue,tot_current_oper_income,total_oper_expenses,net_oper_income,total_non_income_receipts,capital_expenditure,total_non_oper_expenditures,cash_balance_end,shp_province,shp_municipality
0,48163.0,74.04592,36.556294,230.577076,206.848717,23.728359,0.0,16.202464,21.181116,15.758681,Abra,Bangued
1,3573.0,0.115667,0.035633,56.689883,54.40991,2.279973,0.0,0.0,0.03,20.369743,Abra,Boliney
2,17115.0,1.736411,1.019565,93.647242,66.222389,27.424853,0.0,19.560034,20.792182,6.632671,Abra,Bucay
3,2501.0,0.273689,0.12915,44.789104,29.633181,15.155923,0.0,0.479569,0.656569,24.348356,Abra,Bucloc
4,2088.0,0.643801,0.27766,51.242322,47.848486,3.393836,0.0,0.0,0.0,6.511575,Abra,Daguioman


In [5]:
# Spark Code
finpop_spark_df.limit(5).show()

+-----+-----------------+---------------+-----------------------+-------------------+---------------+-------------------------+-------------------+---------------------------+----------------+------------+----------------+
|  pop|tot_local_sources|tot_tax_revenue|tot_current_oper_income|total_oper_expenses|net_oper_income|total_non_income_receipts|capital_expenditure|total_non_oper_expenditures|cash_balance_end|shp_province|shp_municipality|
+-----+-----------------+---------------+-----------------------+-------------------+---------------+-------------------------+-------------------+---------------------------+----------------+------------+----------------+
|48163|      74.04592008|    36.55629364|            230.5770761|        206.8487167|    23.72835939|                        0|          16.202464|                21.18111613|      15.7586811|        Abra|         Bangued|
| 3573|       0.11566694|     0.03563294|            56.68988294|        54.40990988|     2.27997306|       

#### Selecting Specific Columns

In [6]:
# Pandas Selecting a single column
finpop_df["pop"].head(5)

0    48163.0
1     3573.0
2    17115.0
3     2501.0
4     2088.0
Name: pop, dtype: float64

In [7]:
# Pyspark Selecting a single column 
finpop_spark_df.select("pop").limit(5).show()

+-----+
|  pop|
+-----+
|48163|
| 3573|
|17115|
| 2501|
| 2088|
+-----+



In [8]:
finpop_df.columns

Index(['pop', 'tot_local_sources', 'tot_tax_revenue',
       'tot_current_oper_income', 'total_oper_expenses', 'net_oper_income',
       'total_non_income_receipts', 'capital_expenditure',
       'total_non_oper_expenditures', 'cash_balance_end', 'shp_province',
       'shp_municipality'],
      dtype='object')

In [9]:
# Pandas Selecting multiple columns
finpop_df[["pop", "tot_local_sources"]].head(5)

Unnamed: 0,pop,tot_local_sources
0,48163.0,74.04592
1,3573.0,0.115667
2,17115.0,1.736411
3,2501.0,0.273689
4,2088.0,0.643801


In [10]:
# Pyspark Selecting multiple columns
finpop_spark_df.select("pop", "tot_local_sources").limit(5).show()

+-----+-----------------+
|  pop|tot_local_sources|
+-----+-----------------+
|48163|      74.04592008|
| 3573|       0.11566694|
|17115|       1.73641072|
| 2501|       0.27368935|
| 2088|       0.64380135|
+-----+-----------------+



#### Filtering

In [11]:
finpop_df.shape

(1627, 12)

In [12]:
# Pandas filtering by One Condition
finpop_df[finpop_df["pop"] > 10_000].shape

(1490, 12)

In [13]:
# Pyspark filtering by One Condition
finpop_spark_df.filter(finpop_spark_df.pop > 10_000).count()

1490

In [14]:
# Pandas filtering by Multiple Conditions
finpop_df[(finpop_df["pop"] > 10_000) & ((finpop_df["shp_province"] == "Abra") | (finpop_df["shp_province"] == "Iloilo"))].shape

(53, 12)

In [15]:
# Pyspark filtering by Multiple Conditions
# https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.col.html
from pyspark.sql.functions import col

finpop_spark_df.filter(
    (col("pop") > 10_000) &
    ((col("shp_province") == "Abra") | (col("shp_province") == "Iloilo"))
).count()

53

#### Null Handling

In [16]:
# Pandas Filter rows where any specified columns have NaN values
finpop_df[finpop_df[["pop", "tot_local_sources"]].isnull().any(axis=1)].shape

(37, 12)

In [17]:
# Pyspark Filter rows where any specified columns have NaN values
for column in finpop_spark_df.columns:
    print(
    finpop_spark_df.filter(
        col(f"{column}").isNull()
    ).count())

37
0
0
0
0
0
0
0
0
0
0
0


In [18]:
# Pandas filling nulls in the dataframe
finpop_df = finpop_df.fillna(0)

In [19]:
# Pyspark filling nulls in the dataframe
finpop_spark_df = finpop_spark_df.fillna({
    "pop": 0,
    "tot_local_sources": 0
})

In [20]:
# Pandas dropping nulls in the dataframe
print(finpop_df.shape)
finpop_df.dropna(inplace=True)
print(finpop_df.shape)

(1627, 12)
(1627, 12)


In [21]:
# Pyspark dropping nulls in the dataframe
finpop_spark_df = finpop_spark_df.dropna()
finpop_spark_df.count()

1627

#### Data Aggregation

In [22]:
finpop_df

Unnamed: 0,pop,tot_local_sources,tot_tax_revenue,tot_current_oper_income,total_oper_expenses,net_oper_income,total_non_income_receipts,capital_expenditure,total_non_oper_expenditures,cash_balance_end,shp_province,shp_municipality
0,48163.0,74.045920,36.556294,230.577076,206.848717,23.728359,0.0,16.202464,21.181116,15.758681,Abra,Bangued
1,3573.0,0.115667,0.035633,56.689883,54.409910,2.279973,0.0,0.000000,0.030000,20.369743,Abra,Boliney
2,17115.0,1.736411,1.019565,93.647242,66.222389,27.424853,0.0,19.560034,20.792182,6.632671,Abra,Bucay
3,2501.0,0.273689,0.129150,44.789104,29.633181,15.155923,0.0,0.479569,0.656569,24.348356,Abra,Bucloc
4,2088.0,0.643801,0.277660,51.242322,47.848486,3.393836,0.0,0.000000,0.000000,6.511575,Abra,Daguioman
...,...,...,...,...,...,...,...,...,...,...,...,...
1622,0.0,15.448920,5.801291,165.833472,116.742464,49.091008,0.0,30.375550,40.505854,29.068270,Camarines Sur,Tinambac
1623,0.0,3.881869,2.064453,59.176489,50.055428,9.121061,0.0,0.000000,0.000000,33.135557,Camarines Sur,Camaligan
1624,0.0,4.867135,2.615213,144.692431,100.076373,44.616059,0.0,0.502352,0.502352,106.534443,Camarines Sur,Lagonoy
1625,0.0,5.261260,1.360630,95.936120,61.682604,34.253516,0.0,13.592182,13.949405,40.976700,Camarines Sur,Balatan


In [23]:
# Pandas Group by a Single Column and Calculate the Mean
finpop_df.groupby(["shp_province"])["pop"].mean().head(5)

shp_province
Abra                 8931.851852
Agusan del Norte    57630.500000
Agusan del Sur      50046.642857
Aklan               33813.117647
Albay               73045.888889
Name: pop, dtype: float64

In [24]:
# Pyspark Group by a Single Column and Calculate the Mean
# https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.functions.mean.html
from pyspark.sql.functions import mean

finpop_spark_df.groupBy("shp_province").agg(mean("pop")).limit(5).show()

+-------------------+------------------+
|       shp_province|          avg(pop)|
+-------------------+------------------+
|   Misamis Oriental|           60171.5|
|      Eastern Samar|20311.304347826088|
|    Dinagat Islands|18164.571428571428|
|Zamboanga del Norte|           37459.0|
|   Oriental Mindoro|           56270.6|
+-------------------+------------------+



In [25]:
# Ordered Pyspark
(
    finpop_spark_df.groupBy("shp_province")
    .agg(mean("pop"))
    .orderBy("shp_province", ascending=True)
    .limit(5)
    .show()
)

+----------------+------------------+
|    shp_province|          avg(pop)|
+----------------+------------------+
|            Abra| 8931.851851851852|
|Agusan del Norte|           57630.5|
|  Agusan del Sur|50046.642857142855|
|           Aklan|33813.117647058825|
|           Albay| 73045.88888888889|
+----------------+------------------+



In [26]:
# Pandas Group by Multipsle Columns and Calculate the Mean
finpop_df.groupby(["shp_province", "shp_municipality"])["pop"].mean().head(5)

shp_province  shp_municipality
Abra          Bangued             48163.0
              Boliney              3573.0
              Bucay               17115.0
              Bucloc               2501.0
              Daguioman            2088.0
Name: pop, dtype: float64

In [27]:
# Pyspark Group by Multiple Columns and Calculate the Mean
(
    finpop_spark_df.groupBy("shp_province", "shp_municipality")
    .agg(mean("pop"))
    .orderBy("shp_province", "shp_municipality", ascending=[True, True])
    .limit(5)
    .show()
)

+------------+----------------+--------+
|shp_province|shp_municipality|avg(pop)|
+------------+----------------+--------+
|        Abra|         Bangued| 48163.0|
|        Abra|         Boliney|  3573.0|
|        Abra|           Bucay| 17115.0|
|        Abra|          Bucloc|  2501.0|
|        Abra|       Daguioman|  2088.0|
+------------+----------------+--------+



In [28]:
# Pandas Applying Multiple Aggregation Functions
finpop_df.groupby(["shp_province"]).agg({"pop":"mean", "tot_local_sources":"sum"}).head(5)

Unnamed: 0_level_0,pop,tot_local_sources
shp_province,Unnamed: 1_level_1,Unnamed: 2_level_1
Abra,8931.851852,101.405983
Agusan del Norte,57630.5,690.178614
Agusan del Sur,50046.642857,453.74995
Aklan,33813.117647,644.480904
Albay,73045.888889,891.561918


In [29]:
# Pyspark Applying Multiple Aggregation Functions
# https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.sum.html
from pyspark.sql.functions import mean, sum

(
    finpop_spark_df.groupBy("shp_province")
    .agg(mean("pop"), sum("tot_local_sources"))
    .orderBy("shp_province", ascending=[True])
    .limit(5)
    .show()
)

+----------------+------------------+----------------------+
|    shp_province|          avg(pop)|sum(tot_local_sources)|
+----------------+------------------+----------------------+
|            Abra| 8931.851851851852|          101.40598313|
|Agusan del Norte|           57630.5|          690.17861372|
|  Agusan del Sur|50046.642857142855|          453.74995047|
|           Aklan|33813.117647058825|          644.48090356|
|           Albay| 73045.88888888889|     891.5619181399999|
+----------------+------------------+----------------------+



In [30]:
# Pandas Named Aggregation
finpop_df.groupby(["shp_province"]).agg(
    mean_pop = ("pop", "mean"),
    prov_total_loc_source = ("tot_local_sources", "sum"),
    prov_ave_tax_revenue = ("tot_tax_revenue", "mean")
).head(5)

Unnamed: 0_level_0,mean_pop,prov_total_loc_source,prov_ave_tax_revenue
shp_province,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
Abra,8931.851852,101.405983,1.917385
Agusan del Norte,57630.5,690.178614,34.663475
Agusan del Sur,50046.642857,453.74995,14.040303
Aklan,33813.117647,644.480904,15.517541
Albay,73045.888889,891.561918,29.981841


In [31]:
# Pyspark Named Aggregation
(
    finpop_spark_df.groupBy("shp_province")
    .agg(
        mean("pop").alias("mean_pop"),
        sum("tot_local_sources").alias("prov_total_loc_source"),
        mean("tot_tax_revenue").alias("prov_ave_tax_revenue")
    )
    .orderBy("shp_province", ascending=[True])
    .limit(5)
    .show()
)

+----------------+------------------+---------------------+--------------------+
|    shp_province|          mean_pop|prov_total_loc_source|prov_ave_tax_revenue|
+----------------+------------------+---------------------+--------------------+
|            Abra| 8931.851851851852|         101.40598313|  1.9173847725925923|
|Agusan del Norte|           57630.5|         690.17861372|   34.66347479833333|
|  Agusan del Sur|50046.642857142855|         453.74995047|   14.04030341857143|
|           Aklan|33813.117647058825|         644.48090356|  15.517540882352943|
|           Albay| 73045.88888888889|    891.5619181399999|  29.981840775555554|
+----------------+------------------+---------------------+--------------------+



#### Collecting the Output of an action

In [32]:
collection_trial = (
    finpop_spark_df.groupBy("shp_province")
    .agg(
        mean("pop").alias("mean_pop"),
        sum("tot_local_sources").alias("prov_total_loc_source"),
        mean("tot_tax_revenue").alias("prov_ave_tax_revenue")
    )
    .orderBy("shp_province", ascending=[True])
    .collect()
)

In [33]:
# for row in collection_trial:
#     for column in row:
#         print(column)
#     print()

In [34]:
# collection_trial[0]["shp_province"]

In [35]:
collection_df = (
    finpop_spark_df.groupBy("shp_province")
    .agg(
        mean("pop").alias("mean_pop"),
        sum("tot_local_sources").alias("prov_total_loc_source"),
        mean("tot_tax_revenue").alias("prov_ave_tax_revenue")
    )
    .orderBy("shp_province", ascending=[True])
    .toPandas()
)

#### PySpark Exercises:
***

In [36]:
from numpy.testing import assert_equal, assert_array_equal, assert_allclose
from pandas.testing import assert_series_equal, assert_frame_equal
from pyspark.sql.functions import count

#### Coding Task 1

Create a function `top_plus_bottom` that takes in the dataframe from `finpop_spark_df` and returns the sum of the first 5 rows with the corresponding row of the last 5 rows for columns `pop` and `tot_tax_revenue` when the dataframe is sorted according to `shp_province` in an alphabetic order and then `shp_municipality` in a reverse alphabetic order as a numpy array.

In [37]:
from pyspark.sql.functions import col

def top_plus_bottom(finpop_spark_df):
    sorted_df = (
        finpop_spark_df
        .orderBy(col("shp_province")
                 .asc(), col("shp_municipality")
                 .desc())
    )
    columns_to_cast = ["pop", "tot_tax_revenue"]
    
    for column in columns_to_cast:
        sorted_df = sorted_df.withColumn(column, col(column).cast("float"))
        
    top_5_rows = (
        sorted_df
        .limit(5)
        .select("pop", "tot_tax_revenue")
        .collect()
    )
    bottom_5_rows = (
        sorted_df
        .orderBy(col("shp_province")
                 .desc(), col("shp_municipality")
                 .asc())
        .limit(5)
        .select("pop", "tot_tax_revenue")
        .collect()
    )
    
    top_5_np = np.array([[row["pop"], row["tot_tax_revenue"]] for row in top_5_rows])
    bottom_5_np = np.array([[row["pop"], row["tot_tax_revenue"]] for row in bottom_5_rows])
    result = top_5_np + bottom_5_np
    
    return result

In [38]:
tb_sum = top_plus_bottom(finpop_spark_df)
assert_equal(isinstance(tb_sum, np.ndarray), True)
assert_equal(tb_sum.shape, (5, 2))
assert_allclose(tb_sum, np.array([[5.61470000e+04, 8.26607199e+00],
       [3.92900000e+04, 5.86770853e+00],
       [3.51780000e+04, 1.81730101e+00],
       [4.99710000e+04, 3.59247471e+00],
       [3.74510000e+04, 4.73484815e+00]]))

#### Copding Task 2
Create a function `pop_stats` that takes in the dataframe from `finpop_spark_df` and returns a `pandas` `DataFrame` that contains the count, mean, standard deviation minimum and maximum of the `pop` column.

In [39]:
def pop_stats(finpop_spark_df):
    return finpop_spark_df.describe("pop").toPandas()

In [40]:
ss = pop_stats(finpop_spark_df)
assert_equal(ss['pop'].tolist(),
             ['1627', '60782.0356484327', '130390.63063471451', '0', '99779'])
assert_equal(ss.shape, ((5, 2)))

#### Coding Task 3
Create a function `elyu_municips` that takes in the dataframe from `finpop_spark_df` and returns a list that contains all the municipalities of La Union.

In [41]:
def elyu_municips(finpop_spark_df):
    municipalities = (
        finpop_spark_df.filter(col("shp_province") == "La Union")
        .select("shp_municipality")
        .collect()
    )
    municipality_list = [row["shp_municipality"] for row in municipalities]
    
    return municipality_list

In [42]:
elyu = elyu_municips(finpop_spark_df)
assert_equal(elyu, [
    'Agoo', 'Aringay', 'Bacnotan', 'Bagulin', 'Balaoan', 'Bangar', 'Bauang',
    'Burgos', 'Caba', 'San Fernando City', 'Luna', 'Naguilian', 'Pugo',
    'Rosario', 'San Gabriel', 'San Juan', 'Santo Tomas', 'Santol', 'Sudipen', 'Tubao'
])

#### Coding Task 4

Create a function `high_tax` that takes in the dataframe from `finpop_spark_df` and returns a list of all the provinces where the total tax revenue is higher than `150`. The answer should be sorted alphabetically.

In [43]:
def high_tax(finpop_spark_df):
    tax_150 = (
        finpop_spark_df.filter(col("tot_tax_revenue") > 150)
        .select("shp_province")
        .distinct()
        .orderBy("shp_province")
        .toPandas()["shp_province"]
        .tolist() 
    )
    return tax_150

In [44]:
assert_equal(len(high_tax(finpop_spark_df)), 33)
assert_equal(high_tax(finpop_spark_df)[13], "Iloilo")

#### Coding Task 5

Create a function `pop_count` that takes in the dataframe from `finpop_spark_df`, gets the population count per province and returns a dataframe. The dataframe should be sorted on a descending order based on the population count.

In [45]:
def pop_count(finpop_spark_df):
    pop_count_df = (
        finpop_spark_df.groupBy("shp_province")
        .agg(sum("pop")
             .alias("total_pop"))
        .orderBy(col("total_pop")
                 .desc())
    )
    return pop_count_df.toPandas()

In [46]:
assert_equal(pop_count(finpop_spark_df).iloc[0]["shp_province"], 'Metropolitan Manila')
assert_equal(pop_count(finpop_spark_df).shape,(82, 2))

#### Coding Task 6

Create a function `prov_details` that takes in the dataframe from `finpop_spark_df` and returns a dataframe where the index is `shp_province`. The dataframe should contain the average population per province, total capital expenditure per province, and the number of municipalities it governs. The resulting DataFrame should be sorted by ascending count of municipalities and then descending average population.

In [47]:
def prov_details(finpop_spark_df):
    prov_details_df = finpop_spark_df.groupBy('shp_province').agg(
        mean('pop').alias('pop'),
        sum('capital_expenditure').alias('capital_expenditure'),
        count('shp_municipality').alias('shp_municipality')
    )

    sorted_prov_details_df = prov_details_df.orderBy(
        col('shp_municipality').asc(),
        col('pop').desc()
    )

    prov_details_pd_df = sorted_prov_details_df.toPandas().set_index('shp_province')
    
    return prov_details_pd_df

In [48]:
df_provdet = prov_details(finpop_spark_df)
assert_equal(df_provdet.index.name, "shp_province")
assert_equal(df_provdet.reset_index()['capital_expenditure'][24], 595.94820096)
assert_equal(len(df_provdet['shp_municipality'].unique()), 33)

***
***