# Experimental simple performance testing notebook for Spark
- testing and comparing simple dataframe / sql operations of commong data (pre-)processing tasks 
- various available single-machine Python solutions are to be tested: Pandas, PySpark, Turi Create and Dask.
- execution times, CPU load and maximal memory use should be tracked


## Kiva dataset 
- [Kiva](https://www.kaggle.com/gaborfodor/additional-kiva-snapshot): crowdfunding data with lenders and loans, with additional geographic data
- download the related CSV files and move them to a folder where the kernel can read them


## init spark session

In [1]:
import findspark
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, split
import timeit 

findspark.init()

# use SparkSession instead of SparkContext:
# setting SparkSession config paramters are necesary to use available memory (we can also limit CPUs by eg. 
# .config('spark.default.parallelism', 5), but it uses all the CPUs by default)
spark = SparkSession.builder \
    .appName('PySpark local test') \
    .config("spark.core.connection.ack.wait.timeout", "12000") \
    .config("spark.driver.maxResultSize", "4g") \
    .config('spark.executor.memory', '4G') \
    .config('spark.driver.memory', '5G') \
    .getOrCreate()

## read files to dataframes: loans and lenders

In [2]:
#%%timeit -r 1 -n 1  
# this is a setup line for timeit (not meauserd)

lenders_df = spark.read.csv("../../kiva/lenders.csv", header=True)  # 130 MB file, 2.349.174 lines
lenders_df.createOrReplaceTempView("lenders") 
loans_df = spark.read.csv("../../kiva/loans.csv", header=True)      # 2.1 GB file, 1.419.607 lines
loans_df.createOrReplaceTempView("loans") 

In [4]:
lenders_df.printSchema()
lenders_df.show(5)

root
 |-- permanent_name: string (nullable = true)
 |-- display_name: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- country_code: string (nullable = true)
 |-- member_since: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- loan_because: string (nullable = true)
 |-- loan_purchase_num: string (nullable = true)
 |-- invited_by: string (nullable = true)
 |-- num_invited: string (nullable = true)

+--------------+------------+----+-----+------------+------------+----------+------------+-----------------+----------+-----------+
|permanent_name|display_name|city|state|country_code|member_since|occupation|loan_because|loan_purchase_num|invited_by|num_invited|
+--------------+------------+----+-----+------------+------------+----------+------------+-----------------+----------+-----------+
|      qian3013|        Qian|null| null|        null|  1461300457|      null|        null|              1.0|      null|        

In [5]:
loans_df.printSchema()
loans_df.show(5)

root
 |-- loan_id: string (nullable = true)
 |-- loan_name: string (nullable = true)
 |-- original_language: string (nullable = true)
 |-- description: string (nullable = true)
 |-- description_translated: string (nullable = true)
 |-- funded_amount: string (nullable = true)
 |-- loan_amount: string (nullable = true)
 |-- status: string (nullable = true)
 |-- activity_name: string (nullable = true)
 |-- sector_name: string (nullable = true)
 |-- loan_use: string (nullable = true)
 |-- country_code: string (nullable = true)
 |-- country_name: string (nullable = true)
 |-- town_name: string (nullable = true)
 |-- currency_policy: string (nullable = true)
 |-- currency_exchange_coverage_rate: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- partner_id: string (nullable = true)
 |-- posted_time: string (nullable = true)
 |-- planned_expiration_time: string (nullable = true)
 |-- disburse_time: string (nullable = true)
 |-- raised_time: string (nullable = true)
 |-- len

## read, transform and count loan_lenders 
string enumeration to rows: split tuple strings to array, then explode the array to rows

In [3]:
start = timeit.default_timer()

# loans_lenders_df10 = spark.sql(" SELECT * FROM loans_lenders LIMIT 10 ") 

lldf = spark.read.csv("../../kiva/loans_lenders.csv", header=True) # .limit(20) 
# full: 339 MB file, 1.387.433 lines --> 27.459.086, 27 sec, 1.2GB (6.3GB max) memory
# 100.000 heading line --> 2.060.259 output lines
# 200.000 heading line --> 4.110.948 output lines, 1.3 GB mem max

loans_lenders_df = lldf.select( \
      lldf.loan_id, explode(split(lldf.lenders, ', ?')).alias('lender') \
).distinct() 

loans_lenders_df.createOrReplaceTempView("loans_lenders") 

print('row count: ', loans_lenders_df.count())
print('ellapsed time: ', timeit.default_timer() - start)
loans_lenders_df.show()

#loans_lenders_df.coalesce(1).write.csv('kiva/pyspark-loans_lenders_df-20.csv', header=True)

row count:  27459086
ellapsed time:  70.239741525
+-------+-----------------+
|loan_id|           lender|
+-------+-----------------+
|1000103|     kirramalatte|
|1000103|     mike48519820|
|1000141|       rachel1813|
|1000193|    gwendolen8929|
|1000200|     ryan18955662|
|1000272|      bradley5620|
|1000293|      william4267|
|1000306|    david13647704|
|1000306|        julie2056|
|1000383|100ofhumanity1199|
|1000400|        libby6754|
|1000744|         hendrikd|
|1000865|         remi5960|
|1001124|         rick7694|
|1001155|         mary8909|
|1001209|         fran9888|
|1001274|      suzanne3391|
|1001378|        linda3440|
|  10014|          sno7221|
|1001431|          cas5618|
+-------+-----------------+
only showing top 20 rows



## join, filter and sort loan and lender data
get distinct joined lines with renamed columns, then write to an output file (for fully materialized results)
- filtering on lenders.country_code: 
  - 'US': 25% of lenders
  - 'CA': 3% of lenders --> 3.5 GB joined, 1.971.548 lines

In [4]:
start = timeit.default_timer()

# remove previous folder if exists: 
import shutil
try: 
    shutil.rmtree('../../kiva/pyspark-result-joined.csv')
except FileNotFoundError: 
    pass

# join and filter using SQL: 
joined_df = spark.sql("""
select distinct 
  le.permanent_name as lender_permanent_name, le.display_name as lender_display_name, 
  le.city as lender_city, le.state as lender_state, le.country_code as lender_country_code, 
  le.member_since as lender_member_since, le.occupation as lender_occupation, 
  le.loan_because as lender_loan_because, le.loan_purchase_num as lender_loan_purchase_num, 
  le.invited_by as lender_invited_by, le.num_invited as lender_num_invited, 
  lo.loan_id, lo.loan_name, lo.original_language, lo.description, lo.description_translated, 
  lo.funded_amount, lo.loan_amount, lo.status, lo.activity_name, lo.sector_name, 
  lo.loan_use, lo.country_code, lo.country_name, lo.town_name, lo.currency_policy, 
  lo.currency_exchange_coverage_rate, lo.currency, lo.partner_id, lo.posted_time, 
  lo.planned_expiration_time, lo.disburse_time, lo.raised_time, lo.lender_term, 
  lo.num_lenders_total, lo.num_journal_entries, lo.num_bulk_entries, lo.tags, 
  lo.borrower_genders, lo.borrower_pictured, lo.repayment_interval, lo.distribution_model
from   loans_lenders as ll
         inner join loans lo ON lo.loan_id = ll.loan_id
         inner join lenders le ON le.permanent_name = ll.lender
where  le.country_code = 'CA'
order by lender_permanent_name, loan_id
""")

# coalesce(1) is for writing into one file
#joined_df.coalesce(1).write.csv('../../kiva/pyspark-result-joined.csv', header=True)

joined_df.createOrReplaceTempView("joined") 

print('ellapsed time: ', timeit.default_timer() - start)
#print('line count: ', joined_df.count() )

ellapsed time:  0.15953717000002143


## group and sort joined data
* group by on the joined ‘CA’ table (3.5 GB): count distinct sector_name by lender, then sort

In [None]:
# TODO

* group by on the exploded loans_lenders table (6 GB): count distinct loan_id by lender

In [6]:
# remove previous results if exists: 
import shutil
try: 
    shutil.rmtree('../../kiva/pyspark-result-groupby.csv')
except FileNotFoundError: 
    pass

start = timeit.default_timer()

lender_loan_count_df = spark.sql("""
select lender_permanent_name, count(distinct loan_id) as loan_ct
from   joined 
group by lender_permanent_name
-- order by count(distinct loan_id) desc
""")

lender_loan_count_df.createOrReplaceTempView("lender_loan_count")

print('ellapsed time: ', timeit.default_timer() - start)

lender_loan_count_df.coalesce(1).write.csv('../../kiva/pyspark-result-groupby.csv', header=True)
print('ellapsed time: ', timeit.default_timer() - start)

lender_loan_count_df.show(5)

ellapsed time:  0.04001515400000244
ellapsed time:  795.8832310860003
+---------------------+-------+
|lender_permanent_name|loan_ct|
+---------------------+-------+
|             aime8260|      1|
|             alex1543|      1|
|             alex1822|      1|
|       alexandria3351|     62|
|            alexx3309|      5|
+---------------------+-------+
only showing top 5 rows



- group by on the loans table (2.1 GB): sum funded_amount by sector_name

In [None]:
# TODO