<h1>SparkSession & HDFS</h1>
    In this demo I'm going to use sample CSV file with data about bank loans.<p> 
    I will perform the creation of dataframe from CSV file.<p> 
    Then I will perform various operations on it. As a result I'm going to save the file on HDFS and then retrieve it.
    To begin with I need to import few modules.


In [None]:
from pyspark import SparkContext
from pyspark.sql import SQLContext,SparkSession
from pyspark.sql.functions import lower, col,trim, udf,struct,isnan,when, lit, avg, sum
from pyspark.sql.types import StructType, StructField, IntegerType, StringType,FloatType,ArrayType,Row
import gc, time , re, os
import pandas as pd
from collections import defaultdict
import numpy as np  
epochNow = int(time.time())

<h3> Session config </h3>

In [None]:
spark = SparkSession.\
        builder.\
        appName("pyspark-notebook").\
        master("spark://spark-master:7077").\
        config("spark.executor.memory", "512m").\
        getOrCreate()


<h4> Reading source raw CSV file from the URL </h4>

In [None]:
bank_loanDfPandas = pd.read_csv('https://people.math.sc.edu/Burkardt/datasets/csv/bankloan2.csv')
display(bank_loanDfPandas)

<h2> Rework on column names. Elimination of unwanted characters with
<blockquote>'\s+([a-zA-Z_][a-zA-Z_0-9]*)\s*' regex.</blockquote> <p><p> Turning big case column letters into small case. </h2>

In [None]:
bank_loanDfSpark=spark.createDataFrame(bank_loanDfPandas)

for each in bank_loanDfSpark.schema.names:
    bank_loanDfSpark = bank_loanDfSpark.withColumnRenamed(each,  re.sub(r'\s+([a-zA-Z_][a-zA-Z_0-9]*)\s*','',each.replace(' ', '').lower()))
for colname in bank_loanDfSpark.columns:
    bank_loanDfSpark = bank_loanDfSpark.withColumn(colname, trim(col(colname)))

<h2> CSV table loaded and prepared for further work as Spark DataFrame</h2>

In [None]:
bank_loanDfSpark.show()


<h2> Checking schema of a table </h2>

In [None]:
bank_loanDfSpark.printSchema()

<h2>Some sample filtering job. This time we are filtering all records with age equal to '44'.</h2>

In [None]:
bank_loanDfSpark.filter(col('age') == '44').show()

<h2>Counting average salary with use of of 'avg' aggregation funciton. </h2>

In [None]:
bank_loanDfSpark.select(avg("salary")).show()

<h2> Saving the table on HDFS file system as a parquet table in overwrite mode.</h2>
<h4> Of course in normal scenario we wouldn't allow such small file in target filesystem. Such saving operation exist only as a POC.</h4>

In [None]:
# Write Dataframe into HDFS
# Repartition it by "Country" column before storing as parquet files in Hadoop
bank_loanDfSpark.write.option("header",True) \
        .mode("overwrite") \
        .parquet("hdfs://hadoop-namenode:9000/salaries/{}_{}.parquet".format('bank_loan',epochNow))
print("Sales Dataframe stored in Hadoop.")

<h2> Loading the parquet table stored on HDFS file system</h2>


In [None]:
# Read from HDFS to confirm it was successfully stored
df_load = spark.read.parquet("hdfs://hadoop-namenode:9000/salaries/{}_{}.parquet".format('bank_loan',epochNow))
print("Sales Dataframe read from Hadoop : ")
df_load.show()

<h2> Summing up all loans from table. With help of 'sum' function. </h2>

In [None]:
df_load.select(sum("amount").alias("all_loans_summed_up")).show()

<h2> Now let's sort the records by the 'age' value in descending order</h2>

In [None]:
bank_loanDfSpark.orderBy(col("age").desc()).show(truncate=False)

<h2>Let's do some quick example of an union operation. First we filter one dataframe with property column value equal to 'farm'. Then a second one this time with 'apartment' value. Then with help of union() we may get the desired result</h2>

In [None]:
df_property_farm = bank_loanDfSpark.filter(col('property') == 'farm')
df_property_farm.show()

In [None]:
df_property_apartment= bank_loanDfSpark.filter(col('property') == 'apartment')
df_property_apartment.show()

In [None]:
df_union = df_property_farm.union(df_property_apartment)
df_union.show()

<h2> Now let's proceed to present the result of join operation. First we need to select 1st dataframe. We select 'id, amount, ratio and age' columns </h2>

In [None]:
df_select_1 = bank_loanDfSpark.select(col("id"), col("amount"), col("ratio"), col("age"))
df_select_1.show()

<h2> Second we need to select 2nd dataframe. We select 'id, occupation, property and outcome' columns </h2>

In [None]:
df_select_2 = bank_loanDfSpark.select(col("id"), col("occupation"), col("property"), col('outcome'))
df_select_2.show()

<h2> Now we may see the result of inner join operation on 'id' column. </h2>

In [None]:
result_of_join_df = df_select_1.join(df_select_2, df_select_1.id ==  df_select_2.id, "inner") \
     .show(truncate=False)